mirror of
https://github.com/actions/runner.git
synced 2025-12-25 19:08:20 +08:00
Uploading step logs to Results as well (#2422)
* Rename queue to results queue * Add results contracts * Add Results logging handling * Adding calls to create and finalize append blob * Modifications for azurite upload * Only call upload complete on final section and remove size * Make method specific to step log so we can support job log later * Change contract for results * Add totalline count to the result log upload file * Actually pass lineCount to Results Service * Fix typos * Code cleanup * Fixing typos * Apply suggestions from code review Co-authored-by: Konrad Pabjan <konradpabjan@github.com> --------- Co-authored-by: Brittany Ellich <brittanyellich@github.com> Co-authored-by: Konrad Pabjan <konradpabjan@github.com>
This commit is contained in:
@@ -30,7 +30,8 @@ namespace GitHub.Runner.Common
|
||||
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
|
||||
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
|
||||
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
|
||||
Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken);
|
||||
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken);
|
||||
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
|
||||
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
|
||||
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
|
||||
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
|
||||
@@ -316,7 +317,7 @@ namespace GitHub.Runner.Common
|
||||
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
|
||||
public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_resultsClient != null)
|
||||
{
|
||||
@@ -325,6 +326,15 @@ namespace GitHub.Runner.Common
|
||||
throw new InvalidOperationException("Results client is not initialized.");
|
||||
}
|
||||
|
||||
public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_resultsClient != null)
|
||||
{
|
||||
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
|
||||
}
|
||||
throw new InvalidOperationException("Results client is not initialized.");
|
||||
}
|
||||
|
||||
|
||||
public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
@@ -20,7 +20,7 @@ namespace GitHub.Runner.Common
|
||||
void Start(Pipelines.AgentJobRequestMessage jobRequest);
|
||||
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
|
||||
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
|
||||
void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource);
|
||||
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines = 0);
|
||||
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ namespace GitHub.Runner.Common
|
||||
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
|
||||
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
|
||||
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
|
||||
private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000);
|
||||
private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000);
|
||||
|
||||
// Job message information
|
||||
private Guid _scopeIdentifier;
|
||||
@@ -46,7 +46,7 @@ namespace GitHub.Runner.Common
|
||||
// queue for file upload (log file or attachment)
|
||||
private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new();
|
||||
|
||||
private readonly ConcurrentQueue<SummaryUploadFileInfo> _summaryFileUploadQueue = new();
|
||||
private readonly ConcurrentQueue<ResultsUploadFileInfo> _resultsFileUploadQueue = new();
|
||||
|
||||
// queue for timeline or timeline record update (one queue per timeline)
|
||||
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new();
|
||||
@@ -60,7 +60,7 @@ namespace GitHub.Runner.Common
|
||||
// Task for each queue's dequeue process
|
||||
private Task _webConsoleLineDequeueTask;
|
||||
private Task _fileUploadDequeueTask;
|
||||
private Task _summaryUploadDequeueTask;
|
||||
private Task _resultsUploadDequeueTask;
|
||||
private Task _timelineUpdateDequeueTask;
|
||||
|
||||
// common
|
||||
@@ -140,12 +140,12 @@ namespace GitHub.Runner.Common
|
||||
_fileUploadDequeueTask = ProcessFilesUploadQueueAsync();
|
||||
|
||||
Trace.Info("Start results file upload queue.");
|
||||
_summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync();
|
||||
_resultsUploadDequeueTask = ProcessResultsUploadQueueAsync();
|
||||
|
||||
Trace.Info("Start process timeline update queue.");
|
||||
_timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync();
|
||||
|
||||
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask };
|
||||
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
|
||||
_queueInProcess = true;
|
||||
}
|
||||
|
||||
@@ -176,9 +176,9 @@ namespace GitHub.Runner.Common
|
||||
await ProcessFilesUploadQueueAsync(runOnce: true);
|
||||
Trace.Info("File upload queue drained.");
|
||||
|
||||
Trace.Verbose("Draining results summary upload queue.");
|
||||
await ProcessSummaryUploadQueueAsync(runOnce: true);
|
||||
Trace.Info("Results summary upload queue drained.");
|
||||
Trace.Verbose("Draining results upload queue.");
|
||||
await ProcessResultsUploadQueueAsync(runOnce: true);
|
||||
Trace.Info("Results upload queue drained.");
|
||||
|
||||
// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
|
||||
// if there is any timeline records that failed to update contains output variabls.
|
||||
@@ -230,21 +230,31 @@ namespace GitHub.Runner.Common
|
||||
_fileUploadQueue.Enqueue(newFile);
|
||||
}
|
||||
|
||||
public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource)
|
||||
public void QueueResultsUpload(Guid recordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines)
|
||||
{
|
||||
if (recordId == _jobTimelineRecordId)
|
||||
{
|
||||
Trace.Verbose("Skipping job log {0} for record {1}", path, recordId);
|
||||
return;
|
||||
}
|
||||
|
||||
// all parameter not null, file path exist.
|
||||
var newFile = new SummaryUploadFileInfo()
|
||||
var newFile = new ResultsUploadFileInfo()
|
||||
{
|
||||
Name = name,
|
||||
Path = path,
|
||||
Type = type,
|
||||
PlanId = _planId.ToString(),
|
||||
JobId = _jobTimelineRecordId.ToString(),
|
||||
StepId = stepRecordId.ToString(),
|
||||
DeleteSource = deleteSource
|
||||
RecordId = recordId,
|
||||
DeleteSource = deleteSource,
|
||||
Finalize = finalize,
|
||||
FirstBlock = firstBlock,
|
||||
TotalLines = totalLines,
|
||||
};
|
||||
|
||||
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId);
|
||||
_summaryFileUploadQueue.Enqueue(newFile);
|
||||
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, recordId);
|
||||
_resultsFileUploadQueue.Enqueue(newFile);
|
||||
}
|
||||
|
||||
public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord)
|
||||
@@ -437,18 +447,18 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
|
||||
private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
|
||||
{
|
||||
Trace.Info("Starting results-based upload queue...");
|
||||
|
||||
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
|
||||
{
|
||||
List<SummaryUploadFileInfo> filesToUpload = new();
|
||||
SummaryUploadFileInfo dequeueFile;
|
||||
while (_summaryFileUploadQueue.TryDequeue(out dequeueFile))
|
||||
List<ResultsUploadFileInfo> filesToUpload = new();
|
||||
ResultsUploadFileInfo dequeueFile;
|
||||
while (_resultsFileUploadQueue.TryDequeue(out dequeueFile))
|
||||
{
|
||||
filesToUpload.Add(dequeueFile);
|
||||
// process at most 10 file upload.
|
||||
// process at most 10 file uploads.
|
||||
if (!runOnce && filesToUpload.Count > 10)
|
||||
{
|
||||
break;
|
||||
@@ -459,7 +469,7 @@ namespace GitHub.Runner.Common
|
||||
{
|
||||
if (runOnce)
|
||||
{
|
||||
Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service.");
|
||||
Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service.");
|
||||
}
|
||||
|
||||
int errorCount = 0;
|
||||
@@ -467,11 +477,22 @@ namespace GitHub.Runner.Common
|
||||
{
|
||||
try
|
||||
{
|
||||
await UploadSummaryFile(file);
|
||||
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
await UploadSummaryFile(file);
|
||||
}
|
||||
else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
if (file.RecordId != _jobTimelineRecordId)
|
||||
{
|
||||
Trace.Info($"Got a step log file to send to results service.");
|
||||
await UploadResultsStepLogFile(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" };
|
||||
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" };
|
||||
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;
|
||||
|
||||
var telemetryRecord = new TimelineRecord()
|
||||
@@ -481,16 +502,13 @@ namespace GitHub.Runner.Common
|
||||
telemetryRecord.Issues.Add(issue);
|
||||
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);
|
||||
|
||||
Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort.");
|
||||
Trace.Info("Catch exception during file upload to results, keep going since the process is best effort.");
|
||||
Trace.Error(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
errorCount++;
|
||||
}
|
||||
}
|
||||
|
||||
Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
|
||||
Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
|
||||
}
|
||||
|
||||
if (runOnce)
|
||||
@@ -499,7 +517,7 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.Delay(_delayForSummaryUploadDequeue);
|
||||
await Task.Delay(_delayForResultsUploadDequeue);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -776,7 +794,7 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UploadSummaryFile(SummaryUploadFileInfo file)
|
||||
private async Task UploadSummaryFile(ResultsUploadFileInfo file)
|
||||
{
|
||||
bool uploadSucceed = false;
|
||||
try
|
||||
@@ -784,7 +802,7 @@ namespace GitHub.Runner.Common
|
||||
// Upload the step summary
|
||||
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
|
||||
var cancellationTokenSource = new CancellationTokenSource();
|
||||
await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token);
|
||||
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, cancellationTokenSource.Token);
|
||||
|
||||
uploadSucceed = true;
|
||||
}
|
||||
@@ -804,6 +822,34 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file)
|
||||
{
|
||||
bool uploadSucceed = false;
|
||||
try
|
||||
{
|
||||
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
|
||||
var cancellationTokenSource = new CancellationTokenSource();
|
||||
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, cancellationTokenSource.Token);
|
||||
|
||||
uploadSucceed = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (uploadSucceed && file.DeleteSource)
|
||||
{
|
||||
try
|
||||
{
|
||||
File.Delete(file.Path);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results.");
|
||||
Trace.Error(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal class PendingTimelineRecord
|
||||
@@ -822,14 +868,18 @@ namespace GitHub.Runner.Common
|
||||
public bool DeleteSource { get; set; }
|
||||
}
|
||||
|
||||
internal class SummaryUploadFileInfo
|
||||
internal class ResultsUploadFileInfo
|
||||
{
|
||||
public string Name { get; set; }
|
||||
public string Type { get; set; }
|
||||
public string Path { get; set; }
|
||||
public string PlanId { get; set; }
|
||||
public string JobId { get; set; }
|
||||
public string StepId { get; set; }
|
||||
public Guid RecordId { get; set; }
|
||||
public bool DeleteSource { get; set; }
|
||||
public bool Finalize { get; set; }
|
||||
public bool FirstBlock { get; set; }
|
||||
public long TotalLines { get; set; }
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -32,6 +32,19 @@ namespace GitHub.Runner.Common
|
||||
private string _pagesFolder;
|
||||
private IJobServerQueue _jobServerQueue;
|
||||
|
||||
// For Results
|
||||
public static string BlocksFolder = "blocks";
|
||||
|
||||
// 2 MB
|
||||
public const int BlockSize = 2 * 1024 * 1024;
|
||||
|
||||
private string _resultsDataFileName;
|
||||
private FileStream _resultsBlockData;
|
||||
private StreamWriter _resultsBlockWriter;
|
||||
private string _resultsBlockFolder;
|
||||
private int _blockByteCount;
|
||||
private int _blockCount;
|
||||
|
||||
public long TotalLines => _totalLines;
|
||||
|
||||
public override void Initialize(IHostContext hostContext)
|
||||
@@ -39,8 +52,10 @@ namespace GitHub.Runner.Common
|
||||
base.Initialize(hostContext);
|
||||
_totalLines = 0;
|
||||
_pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder);
|
||||
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
||||
Directory.CreateDirectory(_pagesFolder);
|
||||
_resultsBlockFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), BlocksFolder);
|
||||
Directory.CreateDirectory(_resultsBlockFolder);
|
||||
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
||||
}
|
||||
|
||||
public void Setup(Guid timelineId, Guid timelineRecordId)
|
||||
@@ -60,11 +75,17 @@ namespace GitHub.Runner.Common
|
||||
// lazy creation on write
|
||||
if (_pageWriter == null)
|
||||
{
|
||||
Create();
|
||||
NewPage();
|
||||
}
|
||||
|
||||
if (_resultsBlockWriter == null)
|
||||
{
|
||||
NewBlock();
|
||||
}
|
||||
|
||||
string line = $"{DateTime.UtcNow.ToString("O")} {message}";
|
||||
_pageWriter.WriteLine(line);
|
||||
_resultsBlockWriter.WriteLine(line);
|
||||
|
||||
_totalLines++;
|
||||
if (line.IndexOf('\n') != -1)
|
||||
@@ -78,21 +99,25 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
}
|
||||
|
||||
_byteCount += System.Text.Encoding.UTF8.GetByteCount(line);
|
||||
var bytes = System.Text.Encoding.UTF8.GetByteCount(line);
|
||||
_byteCount += bytes;
|
||||
_blockByteCount += bytes;
|
||||
if (_byteCount >= PageSize)
|
||||
{
|
||||
NewPage();
|
||||
}
|
||||
|
||||
if (_blockByteCount >= BlockSize)
|
||||
{
|
||||
NewBlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void End()
|
||||
{
|
||||
EndPage();
|
||||
}
|
||||
|
||||
private void Create()
|
||||
{
|
||||
NewPage();
|
||||
EndBlock(true);
|
||||
}
|
||||
|
||||
private void NewPage()
|
||||
@@ -117,5 +142,27 @@ namespace GitHub.Runner.Common
|
||||
_jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void NewBlock()
|
||||
{
|
||||
EndBlock(false);
|
||||
_blockByteCount = 0;
|
||||
_resultsDataFileName = Path.Combine(_resultsBlockFolder, $"{_timelineId}_{_timelineRecordId}.{++_blockCount}");
|
||||
_resultsBlockData = new FileStream(_resultsDataFileName, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.ReadWrite);
|
||||
_resultsBlockWriter = new StreamWriter(_resultsBlockData, System.Text.Encoding.UTF8);
|
||||
}
|
||||
|
||||
private void EndBlock(bool finalize)
|
||||
{
|
||||
if (_resultsBlockWriter != null)
|
||||
{
|
||||
_resultsBlockWriter.Flush();
|
||||
_resultsBlockData.Flush();
|
||||
_resultsBlockWriter.Dispose();
|
||||
_resultsBlockWriter = null;
|
||||
_resultsBlockData = null;
|
||||
_jobServerQueue.QueueResultsUpload(_timelineRecordId, "ResultsLog", _resultsDataFileName, "Results.Core.Log", deleteSource: true, finalize, firstBlock: _resultsDataFileName.EndsWith(".1"), totalLines: _totalLines);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user