Update Runner to send step updates to Results (#2510)

* Also send Steps update to Results service

* Refactor to separate results server from current job server

* If hit any error while uploading to Results, skip Results upload

* Add proxy authentication and buffer request for WinHttpHandler

* Remove unnecessary null guard

* Also send Results telemetry when step update fails

* IResultsServer is not disposable
This commit is contained in:
Yang Cao
2023-04-03 16:31:13 -04:00
committed by GitHub
parent 1ceb1a67f2
commit 0484afeec7
9 changed files with 314 additions and 68 deletions

View File

@@ -24,15 +24,11 @@ namespace GitHub.Runner.Common
Task ConnectAsync(VssConnection jobConnection); Task ConnectAsync(VssConnection jobConnection);
void InitializeWebsocketClient(ServiceEndpoint serviceEndpoint); void InitializeWebsocketClient(ServiceEndpoint serviceEndpoint);
void InitializeResultsClient(Uri uri, string token);
// logging and console // logging and console
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken); Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
@@ -46,7 +42,6 @@ namespace GitHub.Runner.Common
private bool _hasConnection; private bool _hasConnection;
private VssConnection _connection; private VssConnection _connection;
private TaskHttpClient _taskClient; private TaskHttpClient _taskClient;
private ResultsHttpClient _resultsClient;
private ClientWebSocket _websocketClient; private ClientWebSocket _websocketClient;
private ServiceEndpoint _serviceEndpoint; private ServiceEndpoint _serviceEndpoint;
@@ -150,12 +145,6 @@ namespace GitHub.Runner.Common
InitializeWebsocketClient(TimeSpan.Zero); InitializeWebsocketClient(TimeSpan.Zero);
} }
public void InitializeResultsClient(Uri uri, string token)
{
var httpMessageHandler = HostContext.CreateHttpClientHandler();
this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true);
}
public ValueTask DisposeAsync() public ValueTask DisposeAsync()
{ {
CloseWebSocket(WebSocketCloseStatus.NormalClosure, CancellationToken.None); CloseWebSocket(WebSocketCloseStatus.NormalClosure, CancellationToken.None);
@@ -318,32 +307,6 @@ namespace GitHub.Runner.Common
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken); return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
} }
public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadStepSummaryAsync(planId, jobId, stepId, file, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsJobLogAsync(planId, jobId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken) public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
{ {

View File

@@ -65,6 +65,7 @@ namespace GitHub.Runner.Common
// common // common
private IJobServer _jobServer; private IJobServer _jobServer;
private IResultsServer _resultsServer;
private Task[] _allDequeueTasks; private Task[] _allDequeueTasks;
private readonly TaskCompletionSource<int> _jobCompletionSource = new(); private readonly TaskCompletionSource<int> _jobCompletionSource = new();
private readonly TaskCompletionSource<int> _jobRecordUpdated = new(); private readonly TaskCompletionSource<int> _jobRecordUpdated = new();
@@ -91,6 +92,7 @@ namespace GitHub.Runner.Common
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
_jobServer = hostContext.GetService<IJobServer>(); _jobServer = hostContext.GetService<IJobServer>();
_resultsServer = hostContext.GetService<IResultsServer>();
} }
public void Start(Pipelines.AgentJobRequestMessage jobRequest) public void Start(Pipelines.AgentJobRequestMessage jobRequest)
@@ -111,7 +113,7 @@ namespace GitHub.Runner.Common
!string.IsNullOrEmpty(resultsReceiverEndpoint)) !string.IsNullOrEmpty(resultsReceiverEndpoint))
{ {
Trace.Info("Initializing results client"); Trace.Info("Initializing results client");
_jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); _resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken);
_resultsClientInitiated = true; _resultsClientInitiated = true;
} }
@@ -512,19 +514,14 @@ namespace GitHub.Runner.Common
} }
catch (Exception ex) catch (Exception ex)
{ {
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;
var telemetryRecord = new TimelineRecord()
{
Id = Constants.Runner.TelemetryRecordId,
};
telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);
Trace.Info("Catch exception during file upload to results, keep going since the process is best effort."); Trace.Info("Catch exception during file upload to results, keep going since the process is best effort.");
Trace.Error(ex); Trace.Error(ex);
errorCount++; errorCount++;
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results
_resultsClientInitiated = false;
SendResultsTelemetry(ex);
} }
} }
@@ -542,6 +539,19 @@ namespace GitHub.Runner.Common
} }
} }
private void SendResultsTelemetry(Exception ex)
{
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception with results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;
var telemetryRecord = new TimelineRecord()
{
Id = Constants.Runner.TelemetryRecordId,
};
telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);
}
private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{ {
while (!_jobCompletionSource.Task.IsCompleted || runOnce) while (!_jobCompletionSource.Task.IsCompleted || runOnce)
@@ -612,6 +622,22 @@ namespace GitHub.Runner.Common
try try
{ {
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken));
try
{
if (_resultsClientInitiated)
{
await _resultsServer.UpdateResultsWorkflowStepsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken));
}
}
catch (Exception e)
{
Trace.Info("Catch exception during update steps, skip update Results.");
Trace.Error(e);
_resultsClientInitiated = false;
SendResultsTelemetry(e);
}
if (_bufferedRetryRecords.Remove(update.TimelineId)) if (_bufferedRetryRecords.Remove(update.TimelineId))
{ {
Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId); Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId);
@@ -819,7 +845,7 @@ namespace GitHub.Runner.Common
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}"); Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler summaryHandler = async (file) => ResultsFileUploadHandler summaryHandler = async (file) =>
{ {
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None); await _resultsServer.CreateResultsStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None);
}; };
await UploadResultsFile(file, summaryHandler); await UploadResultsFile(file, summaryHandler);
@@ -830,7 +856,7 @@ namespace GitHub.Runner.Common
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}"); Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler stepLogHandler = async (file) => ResultsFileUploadHandler stepLogHandler = async (file) =>
{ {
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); await _resultsServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None);
}; };
await UploadResultsFile(file, stepLogHandler); await UploadResultsFile(file, stepLogHandler);
@@ -841,7 +867,7 @@ namespace GitHub.Runner.Common
Trace.Info($"Starting upload of job log file to results service {file.Name}, {file.Path}"); Trace.Info($"Starting upload of job log file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler jobLogHandler = async (file) => ResultsFileUploadHandler jobLogHandler = async (file) =>
{ {
await _jobServer.CreateResultsJobLogAsync(file.PlanId, file.JobId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); await _resultsServer.CreateResultsJobLogAsync(file.PlanId, file.JobId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None);
}; };
await UploadResultsFile(file, jobLogHandler); await UploadResultsFile(file, jobLogHandler);
@@ -849,6 +875,11 @@ namespace GitHub.Runner.Common
private async Task UploadResultsFile(ResultsUploadFileInfo file, ResultsFileUploadHandler uploadHandler) private async Task UploadResultsFile(ResultsUploadFileInfo file, ResultsFileUploadHandler uploadHandler)
{ {
if (!_resultsClientInitiated)
{
return;
}
bool uploadSucceed = false; bool uploadSucceed = false;
try try
{ {
@@ -903,8 +934,6 @@ namespace GitHub.Runner.Common
public long TotalLines { get; set; } public long TotalLines { get; set; }
} }
internal class ConsoleLineInfo internal class ConsoleLineInfo
{ {
public ConsoleLineInfo(Guid recordId, string line, long? lineNumber) public ConsoleLineInfo(Guid recordId, string line, long? lineNumber)

View File

@@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Results.Client;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(ResultServer))]
public interface IResultsServer : IRunnerService
{
void InitializeResultsClient(Uri uri, string token);
// logging and console
Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file,
CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize,
bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock,
long lineCount, CancellationToken cancellationToken);
Task UpdateResultsWorkflowStepsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId,
IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
}
public sealed class ResultServer : RunnerService, IResultsServer
{
private ResultsHttpClient _resultsClient;
public void InitializeResultsClient(Uri uri, string token)
{
var httpMessageHandler = HostContext.CreateHttpClientHandler();
this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true);
}
public Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file,
CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadStepSummaryAsync(planId, jobId, stepId, file,
cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize,
bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock,
lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock,
long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsJobLogAsync(planId, jobId, file, finalize, firstBlock, lineCount,
cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task UpdateResultsWorkflowStepsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId,
IEnumerable<TimelineRecord> records, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
try
{
var timelineRecords = records.ToList();
return _resultsClient.UpdateWorkflowStepsAsync(planId, new List<TimelineRecord>(timelineRecords),
cancellationToken: cancellationToken);
}
catch (Exception ex)
{
// Log error, but continue as this call is best-effort
Trace.Info($"Failed to update steps status due to {ex.GetType().Name}");
Trace.Error(ex);
}
}
throw new InvalidOperationException("Results client is not initialized.");
}
}
}

View File

@@ -184,9 +184,33 @@ namespace GitHub.Services.Common
return settings; return settings;
} }
/// <summary>
/// Gets or sets the maximum size allowed for response content buffering.
/// </summary>
[DefaultValue(c_defaultContentBufferSize)]
public Int32 MaxContentBufferSize
{
get
{
return m_maxContentBufferSize;
}
set
{
ArgumentUtility.CheckForOutOfRange(value, nameof(value), 0, c_maxAllowedContentBufferSize);
m_maxContentBufferSize = value;
}
}
private static Lazy<RawClientHttpRequestSettings> s_defaultSettings private static Lazy<RawClientHttpRequestSettings> s_defaultSettings
= new Lazy<RawClientHttpRequestSettings>(ConstructDefaultSettings); = new Lazy<RawClientHttpRequestSettings>(ConstructDefaultSettings);
private Int32 m_maxContentBufferSize;
// We will buffer a maximum of 1024MB in the message handler
private const Int32 c_maxAllowedContentBufferSize = 1024 * 1024 * 1024;
// We will buffer, by default, up to 512MB in the message handler
private const Int32 c_defaultContentBufferSize = 1024 * 1024 * 512;
private const Int32 c_defaultMaxRetry = 3; private const Int32 c_defaultMaxRetry = 3;
private static readonly TimeSpan s_defaultTimeout = TimeSpan.FromSeconds(100); //default WebAPI timeout private static readonly TimeSpan s_defaultTimeout = TimeSpan.FromSeconds(100); //default WebAPI timeout
private ICollection<CultureInfo> m_acceptLanguages = new List<CultureInfo>(); private ICollection<CultureInfo> m_acceptLanguages = new List<CultureInfo>();

View File

@@ -9,7 +9,7 @@ using GitHub.Services.OAuth;
namespace GitHub.Services.Common namespace GitHub.Services.Common
{ {
public class RawHttpMessageHandler: HttpMessageHandler public class RawHttpMessageHandler : HttpMessageHandler
{ {
public RawHttpMessageHandler( public RawHttpMessageHandler(
FederatedCredential credentials) FederatedCredential credentials)
@@ -120,6 +120,7 @@ namespace GitHub.Services.Common
Boolean succeeded = false; Boolean succeeded = false;
HttpResponseMessageWrapper responseWrapper; HttpResponseMessageWrapper responseWrapper;
Boolean lastResponseDemandedProxyAuth = false;
Int32 retries = m_maxAuthRetries; Int32 retries = m_maxAuthRetries;
try try
{ {
@@ -138,7 +139,13 @@ namespace GitHub.Services.Common
// Let's start with sending a token // Let's start with sending a token
IssuedToken token = await m_tokenProvider.GetTokenAsync(null, tokenSource.Token).ConfigureAwait(false); IssuedToken token = await m_tokenProvider.GetTokenAsync(null, tokenSource.Token).ConfigureAwait(false);
ApplyToken(request, token); ApplyToken(request, token, applyICredentialsToWebProxy: lastResponseDemandedProxyAuth);
// The WinHttpHandler will chunk any content that does not have a computed length which is
// not what we want. By loading into a buffer up-front we bypass this behavior and there is
// no difference in the normal HttpClientHandler behavior here since this is what they were
// already doing.
await BufferRequestContentAsync(request, tokenSource.Token).ConfigureAwait(false);
// ConfigureAwait(false) enables the continuation to be run outside any captured // ConfigureAwait(false) enables the continuation to be run outside any captured
// SyncronizationContext (such as ASP.NET's) which keeps things from deadlocking... // SyncronizationContext (such as ASP.NET's) which keeps things from deadlocking...
@@ -147,7 +154,8 @@ namespace GitHub.Services.Common
responseWrapper = new HttpResponseMessageWrapper(response); responseWrapper = new HttpResponseMessageWrapper(response);
var isUnAuthorized = responseWrapper.StatusCode == HttpStatusCode.Unauthorized; var isUnAuthorized = responseWrapper.StatusCode == HttpStatusCode.Unauthorized;
if (!isUnAuthorized) lastResponseDemandedProxyAuth = responseWrapper.StatusCode == HttpStatusCode.ProxyAuthenticationRequired;
if (!isUnAuthorized && !lastResponseDemandedProxyAuth)
{ {
// Validate the token after it has been successfully authenticated with the server. // Validate the token after it has been successfully authenticated with the server.
m_tokenProvider?.ValidateToken(token, responseWrapper); m_tokenProvider?.ValidateToken(token, responseWrapper);
@@ -211,15 +219,42 @@ namespace GitHub.Services.Common
} }
} }
private static async Task BufferRequestContentAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
if (request.Content != null &&
request.Headers.TransferEncodingChunked != true)
{
Int64? contentLength = request.Content.Headers.ContentLength;
if (contentLength == null)
{
await request.Content.LoadIntoBufferAsync().EnforceCancellation(cancellationToken).ConfigureAwait(false);
}
// Explicitly turn off chunked encoding since we have computed the request content size
request.Headers.TransferEncodingChunked = false;
}
}
private void ApplyToken( private void ApplyToken(
HttpRequestMessage request, HttpRequestMessage request,
IssuedToken token) IssuedToken token,
bool applyICredentialsToWebProxy = false)
{ {
switch (token) switch (token)
{ {
case null: case null:
return; return;
case ICredentials credentialsToken: case ICredentials credentialsToken:
if (applyICredentialsToWebProxy)
{
HttpClientHandler httpClientHandler = m_transportHandler as HttpClientHandler;
if (httpClientHandler != null && httpClientHandler.Proxy != null)
{
httpClientHandler.Proxy.Credentials = credentialsToken;
}
}
m_credentialWrapper.InnerCredentials = credentialsToken; m_credentialWrapper.InnerCredentials = credentialsToken;
break; break;
default: default:

View File

@@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net; using System.Net;

View File

@@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel; using System.ComponentModel;
using System.Globalization; using System.Globalization;

View File

@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.Runtime.Serialization; using System.Runtime.Serialization;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Serialization; using Newtonsoft.Json.Serialization;
@@ -126,6 +127,46 @@ namespace GitHub.Services.Results.Contracts
public bool Ok; public bool Ok;
} }
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class StepsUpdateRequest
{
[DataMember]
public IEnumerable<Step> Steps;
[DataMember]
public long ChangeOrder;
[DataMember]
public string WorkflowJobRunBackendId;
[DataMember]
public string WorkflowRunBackendId;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class Step
{
[DataMember]
public string ExternalId;
[DataMember]
public int Number;
[DataMember]
public string Name;
[DataMember]
public Status Status;
[DataMember]
public string StartedAt;
[DataMember]
public string CompletedAt;
}
public enum Status
{
StatusUnknown = 0,
StatusInProgress = 3,
StatusPending = 5,
StatusCompleted = 6
}
public static class BlobStorageTypes public static class BlobStorageTypes
{ {
public static readonly string AzureBlobStorage = "BLOB_STORAGE_TYPE_AZURE"; public static readonly string AzureBlobStorage = "BLOB_STORAGE_TYPE_AZURE";

View File

@@ -1,11 +1,16 @@
using System; using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq;
using System.Net.Http; using System.Net.Http;
using System.Net.Http.Headers; using System.Net.Http.Headers;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.Services.Results.Contracts;
using System.Net.Http.Formatting; using System.Net.Http.Formatting;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.Results.Contracts;
using Sdk.WebApi.WebApi; using Sdk.WebApi.WebApi;
namespace GitHub.Services.Results.Client namespace GitHub.Services.Results.Client
@@ -22,6 +27,7 @@ namespace GitHub.Services.Results.Client
m_token = token; m_token = token;
m_resultsServiceUrl = baseUrl; m_resultsServiceUrl = baseUrl;
m_formatter = new JsonMediaTypeFormatter(); m_formatter = new JsonMediaTypeFormatter();
m_changeIdCounter = 1;
} }
// Get Sas URL calls // Get Sas URL calls
@@ -86,7 +92,7 @@ namespace GitHub.Services.Results.Client
// Create metadata calls // Create metadata calls
private async Task CreateMetadata<R>(Uri uri, CancellationToken cancellationToken, R request, string timestamp) private async Task SendRequest<R>(Uri uri, CancellationToken cancellationToken, R request, string timestamp)
{ {
using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, uri)) using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, uri))
{ {
@@ -121,7 +127,7 @@ namespace GitHub.Services.Results.Client
}; };
var createStepSummaryMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepSummaryMetadata); var createStepSummaryMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepSummaryMetadata);
await CreateMetadata<StepSummaryMetadataCreate>(createStepSummaryMetadataEndpoint, cancellationToken, request, timestamp); await SendRequest<StepSummaryMetadataCreate>(createStepSummaryMetadataEndpoint, cancellationToken, request, timestamp);
} }
private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken) private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken)
@@ -137,7 +143,7 @@ namespace GitHub.Services.Results.Client
}; };
var createStepLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepLogsMetadata); var createStepLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepLogsMetadata);
await CreateMetadata<StepLogsMetadataCreate>(createStepLogsMetadataEndpoint, cancellationToken, request, timestamp); await SendRequest<StepLogsMetadataCreate>(createStepLogsMetadataEndpoint, cancellationToken, request, timestamp);
} }
private async Task JobLogUploadCompleteAsync(string planId, string jobId, long lineCount, CancellationToken cancellationToken) private async Task JobLogUploadCompleteAsync(string planId, string jobId, long lineCount, CancellationToken cancellationToken)
@@ -152,7 +158,7 @@ namespace GitHub.Services.Results.Client
}; };
var createJobLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateJobLogsMetadata); var createJobLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateJobLogsMetadata);
await CreateMetadata<JobLogsMetadataCreate>(createJobLogsMetadataEndpoint, cancellationToken, request, timestamp); await SendRequest<JobLogsMetadataCreate>(createJobLogsMetadataEndpoint, cancellationToken, request, timestamp);
} }
private async Task<HttpResponseMessage> UploadBlockFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken) private async Task<HttpResponseMessage> UploadBlockFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken)
@@ -252,7 +258,7 @@ namespace GitHub.Services.Results.Client
await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken); await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken);
} }
// Handle file upload for step log // Handle file upload for step log
public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{ {
// Get the upload url // Get the upload url
@@ -262,7 +268,7 @@ namespace GitHub.Services.Results.Client
throw new Exception("Failed to get step log upload url"); throw new Exception("Failed to get step log upload url");
} }
// Create the Append blob // Create the Append blob
if (firstBlock) if (firstBlock)
{ {
await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken);
@@ -283,7 +289,7 @@ namespace GitHub.Services.Results.Client
} }
} }
// Handle file upload for job log // Handle file upload for job log
public async Task UploadResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) public async Task UploadResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{ {
// Get the upload url // Get the upload url
@@ -293,7 +299,7 @@ namespace GitHub.Services.Results.Client
throw new Exception("Failed to get job log upload url"); throw new Exception("Failed to get job log upload url");
} }
// Create the Append blob // Create the Append blob
if (firstBlock) if (firstBlock)
{ {
await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken);
@@ -314,9 +320,57 @@ namespace GitHub.Services.Results.Client
} }
} }
private Step ConvertTimelineRecordToStep(TimelineRecord r)
{
return new Step()
{
ExternalId = r.Id.ToString(),
Number = r.Order.GetValueOrDefault(),
Name = r.Name,
Status = ConvertStateToStatus(r.State.GetValueOrDefault()),
StartedAt = r.StartTime?.ToString(Constants.TimestampFormat),
CompletedAt = r.FinishTime?.ToString(Constants.TimestampFormat)
};
}
private Status ConvertStateToStatus(TimelineRecordState s)
{
switch (s)
{
case TimelineRecordState.Completed:
return Status.StatusCompleted;
case TimelineRecordState.Pending:
return Status.StatusPending;
case TimelineRecordState.InProgress:
return Status.StatusInProgress;
default:
return Status.StatusUnknown;
}
}
public async Task UpdateWorkflowStepsAsync(Guid planId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken)
{
var timestamp = DateTime.UtcNow.ToString(Constants.TimestampFormat);
var stepRecords = records.Where(r => String.Equals(r.RecordType, "Task", StringComparison.Ordinal));
var stepUpdateRequests = stepRecords.GroupBy(r => r.ParentId).Select(sg => new StepsUpdateRequest()
{
WorkflowRunBackendId = planId.ToString(),
WorkflowJobRunBackendId = sg.Key.ToString(),
ChangeOrder = m_changeIdCounter++,
Steps = sg.Select(ConvertTimelineRecordToStep)
});
var stepUpdateEndpoint = new Uri(m_resultsServiceUrl, Constants.WorkflowStepsUpdate);
foreach (var request in stepUpdateRequests)
{
await SendRequest<StepsUpdateRequest>(stepUpdateEndpoint, cancellationToken, request, timestamp);
}
}
private MediaTypeFormatter m_formatter; private MediaTypeFormatter m_formatter;
private Uri m_resultsServiceUrl; private Uri m_resultsServiceUrl;
private string m_token; private string m_token;
private int m_changeIdCounter;
} }
// Constants specific to results // Constants specific to results
@@ -331,6 +385,8 @@ namespace GitHub.Services.Results.Client
public static readonly string CreateStepLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateStepLogsMetadata"; public static readonly string CreateStepLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateStepLogsMetadata";
public static readonly string GetJobLogsSignedBlobURL = ResultsReceiverTwirpEndpoint + "GetJobLogsSignedBlobURL"; public static readonly string GetJobLogsSignedBlobURL = ResultsReceiverTwirpEndpoint + "GetJobLogsSignedBlobURL";
public static readonly string CreateJobLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateJobLogsMetadata"; public static readonly string CreateJobLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateJobLogsMetadata";
public static readonly string ResultsProtoApiV1Endpoint = "twirp/github.actions.results.api.v1.WorkflowStepUpdateService/";
public static readonly string WorkflowStepsUpdate = ResultsProtoApiV1Endpoint + "WorkflowStepsUpdate";
public static readonly string AzureBlobSealedHeader = "x-ms-blob-sealed"; public static readonly string AzureBlobSealedHeader = "x-ms-blob-sealed";
public static readonly string AzureBlobTypeHeader = "x-ms-blob-type"; public static readonly string AzureBlobTypeHeader = "x-ms-blob-type";