From d081289ed53841c9d78c3e796c429965c4fe5d58 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Thu, 17 Mar 2022 21:35:20 -0400 Subject: [PATCH] postlines: refactor per feedback (#1755) * refactor per feedback * feedback * nit * commentify * feedback * feedback --- src/Runner.Common/JobServer.cs | 149 ++++++++++++++++++++++++++-- src/Runner.Common/JobServerQueue.cs | 131 +----------------------- 2 files changed, 144 insertions(+), 136 deletions(-) diff --git a/src/Runner.Common/JobServer.cs b/src/Runner.Common/JobServer.cs index 7d483c7e8..98cf88f92 100644 --- a/src/Runner.Common/JobServer.cs +++ b/src/Runner.Common/JobServer.cs @@ -3,23 +3,27 @@ using System; using System.Collections.Generic; using System.IO; using System.Net.Http; +using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.Runner.Sdk; using GitHub.Services.Common; using GitHub.Services.WebApi; +using Newtonsoft.Json; namespace GitHub.Runner.Common { [ServiceLocator(Default = typeof(JobServer))] - public interface IJobServer : IRunnerService + public interface IJobServer : IRunnerService, IAsyncDisposable { Task ConnectAsync(VssConnection jobConnection); + void InitializeWebsocketClient(ServiceEndpoint serviceEndpoint); + // logging and console Task AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); - Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, CancellationToken cancellationToken); - Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long startLine, CancellationToken cancellationToken); + Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken); Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); @@ -34,6 +38,20 @@ namespace GitHub.Runner.Common private bool _hasConnection; private VssConnection _connection; private TaskHttpClient _taskClient; + private ClientWebSocket _websocketClient; + + private ServiceEndpoint _serviceEndpoint; + + private int totalBatchedLinesAttemptedByWebsocket = 0; + private int failedAttemptsToPostBatchedLinesByWebsocket = 0; + + + private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500); + private static readonly int _minWebsocketFailurePercentageAllowed = 50; + private static readonly int _minWebsocketBatchedLinesCountToConsider = 5; + + private Task _websocketConnectTask; public async Task ConnectAsync(VssConnection jobConnection) { @@ -117,6 +135,19 @@ namespace GitHub.Runner.Common } } + public void InitializeWebsocketClient(ServiceEndpoint serviceEndpoint) + { + this._serviceEndpoint = serviceEndpoint; + InitializeWebsocketClient(TimeSpan.Zero); + } + + public ValueTask DisposeAsync() + { + _websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None); + GC.SuppressFinalize(this); + return ValueTask.CompletedTask; + } + private void CheckConnection() { if (!_hasConnection) @@ -125,6 +156,48 @@ namespace GitHub.Runner.Common } } + private void InitializeWebsocketClient(TimeSpan delay) + { + if (_serviceEndpoint.Authorization != null && + _serviceEndpoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) && + !string.IsNullOrEmpty(accessToken)) + { + if (_serviceEndpoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl)) + { + // let's ensure we use the right scheme + feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://"); + Trace.Info($"Creating websocket client ..." + feedStreamUrl); + this._websocketClient = new ClientWebSocket(); + this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}"); + this._websocketConnectTask = ConnectWebSocketClient(feedStreamUrl, delay); + } + else + { + Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data"); + } + } + else + { + Trace.Info($"No access token from the service endpoint"); + } + } + + private async Task ConnectWebSocketClient(string feedStreamUrl, TimeSpan delay) + { + try + { + Trace.Info($"Attempting to start websocket client with delay {delay}."); + await Task.Delay(delay); + await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken)); + Trace.Info($"Successfully started websocket client."); + } + catch(Exception ex) + { + Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket."); + Trace.Error(ex); + } + } + //----------------------------------------------------------------- // Feedback: WebConsole, TimelineRecords and Logs //----------------------------------------------------------------- @@ -135,16 +208,72 @@ namespace GitHub.Runner.Common return _taskClient.AppendLogContentAsync(scopeIdentifier, hubName, planId, logId, uploadStream, cancellationToken: cancellationToken); } - public Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, CancellationToken cancellationToken) + public async Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken) { CheckConnection(); - return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, cancellationToken: cancellationToken); - } + var pushedLinesViaWebsocket = false; + if (_websocketConnectTask != null) + { + await _websocketConnectTask; + } - public Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long startLine, CancellationToken cancellationToken) - { - CheckConnection(); - return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, startLine, cancellationToken: cancellationToken); + // "_websocketClient != null" implies either: We have a successful connection OR we have to attempt sending again and then reconnect + // ...in other words, if websocket client is null, we will skip sending to websocket and just use rest api calls to send data + if (_websocketClient != null) + { + var linesWrapper = startLine.HasValue? new TimelineRecordFeedLinesWrapper(stepId, lines, startLine.Value): new TimelineRecordFeedLinesWrapper(stepId, lines); + var jsonData = StringUtil.ConvertToJson(linesWrapper); + try + { + totalBatchedLinesAttemptedByWebsocket++; + var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData); + // break the message into chunks of 1024 bytes + for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024) + { + var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length; + var chunk = new ArraySegment(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i)); + await _websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, cancellationToken); + } + + pushedLinesViaWebsocket = true; + } + catch (Exception ex) + { + failedAttemptsToPostBatchedLinesByWebsocket++; + Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State})."); + Trace.Error(ex); + if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider) + { + // let's consider failure percentage + if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed) + { + Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again."); + _websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", cancellationToken); + // By setting it to null, we will ensure that we never try websocket path again for this job + _websocketClient = null; + } + } + + if (_websocketClient != null) + { + var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect); + Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket})."); + InitializeWebsocketClient(delay); + } + } + } + + if (!pushedLinesViaWebsocket) + { + if (startLine.HasValue) + { + await _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, startLine.Value, cancellationToken: cancellationToken); + } + else + { + await _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, cancellationToken: cancellationToken); + } + } } public Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, string type, string name, Stream uploadStream, CancellationToken cancellationToken) diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 9f237279b..822f8c2c0 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -3,14 +3,10 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; -using System.Net.WebSockets; -using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; -using GitHub.Services.Common; -using Newtonsoft.Json; using Pipelines = GitHub.DistributedTask.Pipelines; namespace GitHub.Runner.Common @@ -34,11 +30,6 @@ namespace GitHub.Runner.Common private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); - private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1); - private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500); - - private static readonly int _minWebsocketFailurePercentageAllowed = 50; - private static readonly int _minWebsocketBatchedLinesCountToConsider = 5; // Job message information private Guid _scopeIdentifier; @@ -67,8 +58,6 @@ namespace GitHub.Runner.Common private Task _fileUploadDequeueTask; private Task _timelineUpdateDequeueTask; - private Task _websocketConnectTask = null; - // common private IJobServer _jobServer; private Task[] _allDequeueTasks; @@ -90,13 +79,6 @@ namespace GitHub.Runner.Common private bool _webConsoleLineAggressiveDequeue = true; private bool _firstConsoleOutputs = true; - private int totalBatchedLinesAttemptedByWebsocket = 0; - private int failedAttemptsToPostBatchedLinesByWebsocket = 0; - - private ClientWebSocket _websocketClient = null; - - private ServiceEndpoint _serviceEndPoint; - public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -107,9 +89,9 @@ namespace GitHub.Runner.Common { Trace.Entering(); - this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); - InitializeWebsocket(); + _jobServer.InitializeWebsocketClient(serviceEndPoint); if (_queueInProcess) { @@ -178,8 +160,8 @@ namespace GitHub.Runner.Common await ProcessTimelinesUpdateQueueAsync(runOnce: true); Trace.Info("Timeline update queue drained."); - Trace.Info($"Disposing websocket client ..."); - this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None); + Trace.Info($"Disposing job server ..."); + await _jobServer.DisposeAsync(); Trace.Info("All queue process tasks have been stopped, and all queues are drained."); } @@ -317,70 +299,7 @@ namespace GitHub.Runner.Common { try { - if (this._websocketConnectTask != null) - { - // lazily await here, we are already in the background task here - await this._websocketConnectTask; - } - - var pushedLinesViaWebsocket = false; - if (this._websocketClient != null) - { - var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value): - new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList()); - var jsonData = StringUtil.ConvertToJson(linesWrapper); - try - { - totalBatchedLinesAttemptedByWebsocket++; - var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData); - // break the message into chunks of 1024 bytes - for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024) - { - var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length; - var chunk = new ArraySegment(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i)); - await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None); - } - - pushedLinesViaWebsocket = true; - } - catch (Exception ex) - { - Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State})."); - Trace.Error(ex); - failedAttemptsToPostBatchedLinesByWebsocket++; - if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider) - { - // let's consider failure percentage - if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed) - { - Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again."); - this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None); - this._websocketClient = null; - } - } - - if (this._websocketClient != null) - { - var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect); - Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket})."); - InitializeWebsocket(delay); - } - } - } - - // if we can't push via websocket, let's fallback to posting via REST API - if (!pushedLinesViaWebsocket) - { - // we will not requeue failed batch, since the web console lines are time sensitive. - if (batch[0].LineNumber.HasValue) - { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); - } - else - { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); - } - } + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, default(CancellationToken)); if (_firstConsoleOutputs) { @@ -471,46 +390,6 @@ namespace GitHub.Runner.Common } } - private void InitializeWebsocket(TimeSpan? delay = null) - { - if (_serviceEndPoint.Authorization != null && - _serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) && - !string.IsNullOrEmpty(accessToken)) - { - if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl)) - { - // let's ensure we use the right scheme - feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://"); - Trace.Info($"Creating websocket client ..." + feedStreamUrl); - this._websocketClient = new ClientWebSocket(); - this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}"); - this._websocketConnectTask = Task.Run(async () => - { - try - { - Trace.Info($"Attempting to start websocket client with delay {delay}."); - await Task.Delay(delay ?? TimeSpan.Zero); - await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken)); - Trace.Info($"Successfully started websocket client."); - } - catch(Exception ex) - { - Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket."); - Trace.Error(ex); - } - }); - } - else - { - Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data"); - } - } - else - { - Trace.Info($"No access token from the service endpoint"); - } - } - private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { while (!_jobCompletionSource.Task.IsCompleted || runOnce)