From 0484afeec71b612022e35ba80e5fe98a99cd0be8 Mon Sep 17 00:00:00 2001 From: Yang Cao Date: Mon, 3 Apr 2023 16:31:13 -0400 Subject: [PATCH] Update Runner to send step updates to Results (#2510) * Also send Steps update to Results service * Refactor to separate results server from current job server * If hit any error while uploading to Results, skip Results upload * Add proxy authentication and buffer request for WinHttpHandler * Remove unnecessary null guard * Also send Results telemetry when step update fails * IResultsServer is not disposable --- src/Runner.Common/JobServer.cs | 37 ------- src/Runner.Common/JobServerQueue.cs | 61 +++++++++--- src/Runner.Common/ResultsServer.cs | 98 +++++++++++++++++++ .../Common/RawClientHttpRequestSettings.cs | 24 +++++ .../Common/Common/RawHttpMessageHandler.cs | 43 +++++++- .../Common/Common/VssHttpMessageHandler.cs | 2 +- .../Common/Common/VssHttpRequestSettings.cs | 2 +- src/Sdk/WebApi/WebApi/Contracts.cs | 41 ++++++++ src/Sdk/WebApi/WebApi/ResultsHttpClient.cs | 74 ++++++++++++-- 9 files changed, 314 insertions(+), 68 deletions(-) create mode 100644 src/Runner.Common/ResultsServer.cs diff --git a/src/Runner.Common/JobServer.cs b/src/Runner.Common/JobServer.cs index 9d8450378..04c244ad8 100644 --- a/src/Runner.Common/JobServer.cs +++ b/src/Runner.Common/JobServer.cs @@ -24,15 +24,11 @@ namespace GitHub.Runner.Common Task ConnectAsync(VssConnection jobConnection); void InitializeWebsocketClient(ServiceEndpoint serviceEndpoint); - void InitializeResultsClient(Uri uri, string token); // logging and console Task AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken); Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); - Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken); - Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken); - Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken); Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable records, CancellationToken cancellationToken); @@ -46,7 +42,6 @@ namespace GitHub.Runner.Common private bool _hasConnection; private VssConnection _connection; private TaskHttpClient _taskClient; - private ResultsHttpClient _resultsClient; private ClientWebSocket _websocketClient; private ServiceEndpoint _serviceEndpoint; @@ -150,12 +145,6 @@ namespace GitHub.Runner.Common InitializeWebsocketClient(TimeSpan.Zero); } - public void InitializeResultsClient(Uri uri, string token) - { - var httpMessageHandler = HostContext.CreateHttpClientHandler(); - this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true); - } - public ValueTask DisposeAsync() { CloseWebSocket(WebSocketCloseStatus.NormalClosure, CancellationToken.None); @@ -318,32 +307,6 @@ namespace GitHub.Runner.Common return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken); } - public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) - { - if (_resultsClient != null) - { - return _resultsClient.UploadStepSummaryAsync(planId, jobId, stepId, file, cancellationToken: cancellationToken); - } - throw new InvalidOperationException("Results client is not initialized."); - } - - public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) - { - if (_resultsClient != null) - { - return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken); - } - throw new InvalidOperationException("Results client is not initialized."); - } - - public Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) - { - if (_resultsClient != null) - { - return _resultsClient.UploadResultsJobLogAsync(planId, jobId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken); - } - throw new InvalidOperationException("Results client is not initialized."); - } public Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken) { diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 1fadd99a8..20ad27a89 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -65,6 +65,7 @@ namespace GitHub.Runner.Common // common private IJobServer _jobServer; + private IResultsServer _resultsServer; private Task[] _allDequeueTasks; private readonly TaskCompletionSource _jobCompletionSource = new(); private readonly TaskCompletionSource _jobRecordUpdated = new(); @@ -91,6 +92,7 @@ namespace GitHub.Runner.Common { base.Initialize(hostContext); _jobServer = hostContext.GetService(); + _resultsServer = hostContext.GetService(); } public void Start(Pipelines.AgentJobRequestMessage jobRequest) @@ -111,7 +113,7 @@ namespace GitHub.Runner.Common !string.IsNullOrEmpty(resultsReceiverEndpoint)) { Trace.Info("Initializing results client"); - _jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); + _resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); _resultsClientInitiated = true; } @@ -512,19 +514,14 @@ namespace GitHub.Runner.Common } catch (Exception ex) { - var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" }; - issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure; - - var telemetryRecord = new TimelineRecord() - { - Id = Constants.Runner.TelemetryRecordId, - }; - telemetryRecord.Issues.Add(issue); - QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord); - Trace.Info("Catch exception during file upload to results, keep going since the process is best effort."); Trace.Error(ex); errorCount++; + + // If we hit any exceptions uploading to Results, let's skip any additional uploads to Results + _resultsClientInitiated = false; + + SendResultsTelemetry(ex); } } @@ -542,6 +539,19 @@ namespace GitHub.Runner.Common } } + private void SendResultsTelemetry(Exception ex) + { + var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception with results. {ex.Message}" }; + issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure; + + var telemetryRecord = new TimelineRecord() + { + Id = Constants.Runner.TelemetryRecordId, + }; + telemetryRecord.Issues.Add(issue); + QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord); + } + private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { while (!_jobCompletionSource.Task.IsCompleted || runOnce) @@ -612,6 +622,22 @@ namespace GitHub.Runner.Common try { await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); + try + { + if (_resultsClientInitiated) + { + await _resultsServer.UpdateResultsWorkflowStepsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); + } + } + catch (Exception e) + { + Trace.Info("Catch exception during update steps, skip update Results."); + Trace.Error(e); + _resultsClientInitiated = false; + + SendResultsTelemetry(e); + } + if (_bufferedRetryRecords.Remove(update.TimelineId)) { Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId); @@ -819,7 +845,7 @@ namespace GitHub.Runner.Common Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}"); ResultsFileUploadHandler summaryHandler = async (file) => { - await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None); + await _resultsServer.CreateResultsStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None); }; await UploadResultsFile(file, summaryHandler); @@ -830,7 +856,7 @@ namespace GitHub.Runner.Common Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}"); ResultsFileUploadHandler stepLogHandler = async (file) => { - await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); + await _resultsServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); }; await UploadResultsFile(file, stepLogHandler); @@ -841,7 +867,7 @@ namespace GitHub.Runner.Common Trace.Info($"Starting upload of job log file to results service {file.Name}, {file.Path}"); ResultsFileUploadHandler jobLogHandler = async (file) => { - await _jobServer.CreateResultsJobLogAsync(file.PlanId, file.JobId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); + await _resultsServer.CreateResultsJobLogAsync(file.PlanId, file.JobId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); }; await UploadResultsFile(file, jobLogHandler); @@ -849,6 +875,11 @@ namespace GitHub.Runner.Common private async Task UploadResultsFile(ResultsUploadFileInfo file, ResultsFileUploadHandler uploadHandler) { + if (!_resultsClientInitiated) + { + return; + } + bool uploadSucceed = false; try { @@ -903,8 +934,6 @@ namespace GitHub.Runner.Common public long TotalLines { get; set; } } - - internal class ConsoleLineInfo { public ConsoleLineInfo(Guid recordId, string line, long? lineNumber) diff --git a/src/Runner.Common/ResultsServer.cs b/src/Runner.Common/ResultsServer.cs new file mode 100644 index 000000000..4d24d55c8 --- /dev/null +++ b/src/Runner.Common/ResultsServer.cs @@ -0,0 +1,98 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.WebApi; +using GitHub.Services.Results.Client; + +namespace GitHub.Runner.Common +{ + [ServiceLocator(Default = typeof(ResultServer))] + public interface IResultsServer : IRunnerService + { + void InitializeResultsClient(Uri uri, string token); + + // logging and console + Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file, + CancellationToken cancellationToken); + + Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, + bool firstBlock, long lineCount, CancellationToken cancellationToken); + + Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, + long lineCount, CancellationToken cancellationToken); + + Task UpdateResultsWorkflowStepsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, + IEnumerable records, CancellationToken cancellationToken); + } + + public sealed class ResultServer : RunnerService, IResultsServer + { + private ResultsHttpClient _resultsClient; + + public void InitializeResultsClient(Uri uri, string token) + { + var httpMessageHandler = HostContext.CreateHttpClientHandler(); + this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true); + } + + public Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file, + CancellationToken cancellationToken) + { + if (_resultsClient != null) + { + return _resultsClient.UploadStepSummaryAsync(planId, jobId, stepId, file, + cancellationToken: cancellationToken); + } + + throw new InvalidOperationException("Results client is not initialized."); + } + + public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, + bool firstBlock, long lineCount, CancellationToken cancellationToken) + { + if (_resultsClient != null) + { + return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, + lineCount, cancellationToken: cancellationToken); + } + + throw new InvalidOperationException("Results client is not initialized."); + } + + public Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, + long lineCount, CancellationToken cancellationToken) + { + if (_resultsClient != null) + { + return _resultsClient.UploadResultsJobLogAsync(planId, jobId, file, finalize, firstBlock, lineCount, + cancellationToken: cancellationToken); + } + + throw new InvalidOperationException("Results client is not initialized."); + } + + public Task UpdateResultsWorkflowStepsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, + IEnumerable records, CancellationToken cancellationToken) + { + if (_resultsClient != null) + { + try + { + var timelineRecords = records.ToList(); + return _resultsClient.UpdateWorkflowStepsAsync(planId, new List(timelineRecords), + cancellationToken: cancellationToken); + } + catch (Exception ex) + { + // Log error, but continue as this call is best-effort + Trace.Info($"Failed to update steps status due to {ex.GetType().Name}"); + Trace.Error(ex); + } + } + + throw new InvalidOperationException("Results client is not initialized."); + } + } +} diff --git a/src/Sdk/Common/Common/RawClientHttpRequestSettings.cs b/src/Sdk/Common/Common/RawClientHttpRequestSettings.cs index 60fbbe093..c6956f29f 100644 --- a/src/Sdk/Common/Common/RawClientHttpRequestSettings.cs +++ b/src/Sdk/Common/Common/RawClientHttpRequestSettings.cs @@ -184,9 +184,33 @@ namespace GitHub.Services.Common return settings; } + /// + /// Gets or sets the maximum size allowed for response content buffering. + /// + [DefaultValue(c_defaultContentBufferSize)] + public Int32 MaxContentBufferSize + { + get + { + return m_maxContentBufferSize; + } + set + { + ArgumentUtility.CheckForOutOfRange(value, nameof(value), 0, c_maxAllowedContentBufferSize); + m_maxContentBufferSize = value; + } + } + private static Lazy s_defaultSettings = new Lazy(ConstructDefaultSettings); + private Int32 m_maxContentBufferSize; + // We will buffer a maximum of 1024MB in the message handler + private const Int32 c_maxAllowedContentBufferSize = 1024 * 1024 * 1024; + + // We will buffer, by default, up to 512MB in the message handler + private const Int32 c_defaultContentBufferSize = 1024 * 1024 * 512; + private const Int32 c_defaultMaxRetry = 3; private static readonly TimeSpan s_defaultTimeout = TimeSpan.FromSeconds(100); //default WebAPI timeout private ICollection m_acceptLanguages = new List(); diff --git a/src/Sdk/Common/Common/RawHttpMessageHandler.cs b/src/Sdk/Common/Common/RawHttpMessageHandler.cs index 8b3e75e18..6774d3712 100644 --- a/src/Sdk/Common/Common/RawHttpMessageHandler.cs +++ b/src/Sdk/Common/Common/RawHttpMessageHandler.cs @@ -9,7 +9,7 @@ using GitHub.Services.OAuth; namespace GitHub.Services.Common { - public class RawHttpMessageHandler: HttpMessageHandler + public class RawHttpMessageHandler : HttpMessageHandler { public RawHttpMessageHandler( FederatedCredential credentials) @@ -120,6 +120,7 @@ namespace GitHub.Services.Common Boolean succeeded = false; HttpResponseMessageWrapper responseWrapper; + Boolean lastResponseDemandedProxyAuth = false; Int32 retries = m_maxAuthRetries; try { @@ -138,7 +139,13 @@ namespace GitHub.Services.Common // Let's start with sending a token IssuedToken token = await m_tokenProvider.GetTokenAsync(null, tokenSource.Token).ConfigureAwait(false); - ApplyToken(request, token); + ApplyToken(request, token, applyICredentialsToWebProxy: lastResponseDemandedProxyAuth); + + // The WinHttpHandler will chunk any content that does not have a computed length which is + // not what we want. By loading into a buffer up-front we bypass this behavior and there is + // no difference in the normal HttpClientHandler behavior here since this is what they were + // already doing. + await BufferRequestContentAsync(request, tokenSource.Token).ConfigureAwait(false); // ConfigureAwait(false) enables the continuation to be run outside any captured // SyncronizationContext (such as ASP.NET's) which keeps things from deadlocking... @@ -147,7 +154,8 @@ namespace GitHub.Services.Common responseWrapper = new HttpResponseMessageWrapper(response); var isUnAuthorized = responseWrapper.StatusCode == HttpStatusCode.Unauthorized; - if (!isUnAuthorized) + lastResponseDemandedProxyAuth = responseWrapper.StatusCode == HttpStatusCode.ProxyAuthenticationRequired; + if (!isUnAuthorized && !lastResponseDemandedProxyAuth) { // Validate the token after it has been successfully authenticated with the server. m_tokenProvider?.ValidateToken(token, responseWrapper); @@ -211,15 +219,42 @@ namespace GitHub.Services.Common } } + private static async Task BufferRequestContentAsync( + HttpRequestMessage request, + CancellationToken cancellationToken) + { + if (request.Content != null && + request.Headers.TransferEncodingChunked != true) + { + Int64? contentLength = request.Content.Headers.ContentLength; + if (contentLength == null) + { + await request.Content.LoadIntoBufferAsync().EnforceCancellation(cancellationToken).ConfigureAwait(false); + } + + // Explicitly turn off chunked encoding since we have computed the request content size + request.Headers.TransferEncodingChunked = false; + } + } + private void ApplyToken( HttpRequestMessage request, - IssuedToken token) + IssuedToken token, + bool applyICredentialsToWebProxy = false) { switch (token) { case null: return; case ICredentials credentialsToken: + if (applyICredentialsToWebProxy) + { + HttpClientHandler httpClientHandler = m_transportHandler as HttpClientHandler; + if (httpClientHandler != null && httpClientHandler.Proxy != null) + { + httpClientHandler.Proxy.Credentials = credentialsToken; + } + } m_credentialWrapper.InnerCredentials = credentialsToken; break; default: diff --git a/src/Sdk/Common/Common/VssHttpMessageHandler.cs b/src/Sdk/Common/Common/VssHttpMessageHandler.cs index b40f7ea21..39be573b8 100644 --- a/src/Sdk/Common/Common/VssHttpMessageHandler.cs +++ b/src/Sdk/Common/Common/VssHttpMessageHandler.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Net; diff --git a/src/Sdk/Common/Common/VssHttpRequestSettings.cs b/src/Sdk/Common/Common/VssHttpRequestSettings.cs index 06c1554ce..8a86eea08 100644 --- a/src/Sdk/Common/Common/VssHttpRequestSettings.cs +++ b/src/Sdk/Common/Common/VssHttpRequestSettings.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.ComponentModel; using System.Globalization; diff --git a/src/Sdk/WebApi/WebApi/Contracts.cs b/src/Sdk/WebApi/WebApi/Contracts.cs index 9165d832f..8da6bf236 100644 --- a/src/Sdk/WebApi/WebApi/Contracts.cs +++ b/src/Sdk/WebApi/WebApi/Contracts.cs @@ -1,3 +1,4 @@ +using System.Collections.Generic; using System.Runtime.Serialization; using Newtonsoft.Json; using Newtonsoft.Json.Serialization; @@ -126,6 +127,46 @@ namespace GitHub.Services.Results.Contracts public bool Ok; } + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class StepsUpdateRequest + { + [DataMember] + public IEnumerable Steps; + [DataMember] + public long ChangeOrder; + [DataMember] + public string WorkflowJobRunBackendId; + [DataMember] + public string WorkflowRunBackendId; + } + + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class Step + { + [DataMember] + public string ExternalId; + [DataMember] + public int Number; + [DataMember] + public string Name; + [DataMember] + public Status Status; + [DataMember] + public string StartedAt; + [DataMember] + public string CompletedAt; + } + + public enum Status + { + StatusUnknown = 0, + StatusInProgress = 3, + StatusPending = 5, + StatusCompleted = 6 + } + public static class BlobStorageTypes { public static readonly string AzureBlobStorage = "BLOB_STORAGE_TYPE_AZURE"; diff --git a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs index b80ce4db3..b80740f44 100644 --- a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs @@ -1,11 +1,16 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; using System.IO; +using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; -using GitHub.Services.Results.Contracts; using System.Net.Http.Formatting; +using GitHub.DistributedTask.WebApi; +using GitHub.Services.Common; +using GitHub.Services.Results.Contracts; using Sdk.WebApi.WebApi; namespace GitHub.Services.Results.Client @@ -22,6 +27,7 @@ namespace GitHub.Services.Results.Client m_token = token; m_resultsServiceUrl = baseUrl; m_formatter = new JsonMediaTypeFormatter(); + m_changeIdCounter = 1; } // Get Sas URL calls @@ -86,7 +92,7 @@ namespace GitHub.Services.Results.Client // Create metadata calls - private async Task CreateMetadata(Uri uri, CancellationToken cancellationToken, R request, string timestamp) + private async Task SendRequest(Uri uri, CancellationToken cancellationToken, R request, string timestamp) { using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, uri)) { @@ -121,7 +127,7 @@ namespace GitHub.Services.Results.Client }; var createStepSummaryMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepSummaryMetadata); - await CreateMetadata(createStepSummaryMetadataEndpoint, cancellationToken, request, timestamp); + await SendRequest(createStepSummaryMetadataEndpoint, cancellationToken, request, timestamp); } private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken) @@ -137,7 +143,7 @@ namespace GitHub.Services.Results.Client }; var createStepLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepLogsMetadata); - await CreateMetadata(createStepLogsMetadataEndpoint, cancellationToken, request, timestamp); + await SendRequest(createStepLogsMetadataEndpoint, cancellationToken, request, timestamp); } private async Task JobLogUploadCompleteAsync(string planId, string jobId, long lineCount, CancellationToken cancellationToken) @@ -152,7 +158,7 @@ namespace GitHub.Services.Results.Client }; var createJobLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateJobLogsMetadata); - await CreateMetadata(createJobLogsMetadataEndpoint, cancellationToken, request, timestamp); + await SendRequest(createJobLogsMetadataEndpoint, cancellationToken, request, timestamp); } private async Task UploadBlockFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken) @@ -252,7 +258,7 @@ namespace GitHub.Services.Results.Client await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken); } - // Handle file upload for step log + // Handle file upload for step log public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) { // Get the upload url @@ -262,7 +268,7 @@ namespace GitHub.Services.Results.Client throw new Exception("Failed to get step log upload url"); } - // Create the Append blob + // Create the Append blob if (firstBlock) { await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); @@ -283,7 +289,7 @@ namespace GitHub.Services.Results.Client } } - // Handle file upload for job log + // Handle file upload for job log public async Task UploadResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) { // Get the upload url @@ -293,7 +299,7 @@ namespace GitHub.Services.Results.Client throw new Exception("Failed to get job log upload url"); } - // Create the Append blob + // Create the Append blob if (firstBlock) { await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); @@ -314,9 +320,57 @@ namespace GitHub.Services.Results.Client } } + private Step ConvertTimelineRecordToStep(TimelineRecord r) + { + return new Step() + { + ExternalId = r.Id.ToString(), + Number = r.Order.GetValueOrDefault(), + Name = r.Name, + Status = ConvertStateToStatus(r.State.GetValueOrDefault()), + StartedAt = r.StartTime?.ToString(Constants.TimestampFormat), + CompletedAt = r.FinishTime?.ToString(Constants.TimestampFormat) + }; + } + + private Status ConvertStateToStatus(TimelineRecordState s) + { + switch (s) + { + case TimelineRecordState.Completed: + return Status.StatusCompleted; + case TimelineRecordState.Pending: + return Status.StatusPending; + case TimelineRecordState.InProgress: + return Status.StatusInProgress; + default: + return Status.StatusUnknown; + } + } + + public async Task UpdateWorkflowStepsAsync(Guid planId, IEnumerable records, CancellationToken cancellationToken) + { + var timestamp = DateTime.UtcNow.ToString(Constants.TimestampFormat); + var stepRecords = records.Where(r => String.Equals(r.RecordType, "Task", StringComparison.Ordinal)); + var stepUpdateRequests = stepRecords.GroupBy(r => r.ParentId).Select(sg => new StepsUpdateRequest() + { + WorkflowRunBackendId = planId.ToString(), + WorkflowJobRunBackendId = sg.Key.ToString(), + ChangeOrder = m_changeIdCounter++, + Steps = sg.Select(ConvertTimelineRecordToStep) + }); + + var stepUpdateEndpoint = new Uri(m_resultsServiceUrl, Constants.WorkflowStepsUpdate); + foreach (var request in stepUpdateRequests) + { + await SendRequest(stepUpdateEndpoint, cancellationToken, request, timestamp); + } + } + private MediaTypeFormatter m_formatter; private Uri m_resultsServiceUrl; private string m_token; + private int m_changeIdCounter; } // Constants specific to results @@ -331,6 +385,8 @@ namespace GitHub.Services.Results.Client public static readonly string CreateStepLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateStepLogsMetadata"; public static readonly string GetJobLogsSignedBlobURL = ResultsReceiverTwirpEndpoint + "GetJobLogsSignedBlobURL"; public static readonly string CreateJobLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateJobLogsMetadata"; + public static readonly string ResultsProtoApiV1Endpoint = "twirp/github.actions.results.api.v1.WorkflowStepUpdateService/"; + public static readonly string WorkflowStepsUpdate = ResultsProtoApiV1Endpoint + "WorkflowStepsUpdate"; public static readonly string AzureBlobSealedHeader = "x-ms-blob-sealed"; public static readonly string AzureBlobTypeHeader = "x-ms-blob-type";