diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index eb8fa9e67..9f237279b 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -3,10 +3,14 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using Newtonsoft.Json; using Pipelines = GitHub.DistributedTask.Pipelines; namespace GitHub.Runner.Common @@ -30,6 +34,11 @@ namespace GitHub.Runner.Common private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); + private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1); + private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500); + + private static readonly int _minWebsocketFailurePercentageAllowed = 50; + private static readonly int _minWebsocketBatchedLinesCountToConsider = 5; // Job message information private Guid _scopeIdentifier; @@ -58,6 +67,8 @@ namespace GitHub.Runner.Common private Task _fileUploadDequeueTask; private Task _timelineUpdateDequeueTask; + private Task _websocketConnectTask = null; + // common private IJobServer _jobServer; private Task[] _allDequeueTasks; @@ -71,7 +82,7 @@ namespace GitHub.Runner.Common // Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds). // Then the dequeue will happen every 500ms. - // In this way, customer still can get instance live console output on job start, + // In this way, customer still can get instance live console output on job start, // at the same time we can cut the load to server after the build run for more than 60s private int _webConsoleLineAggressiveDequeueCount = 0; private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60; @@ -79,6 +90,13 @@ namespace GitHub.Runner.Common private bool _webConsoleLineAggressiveDequeue = true; private bool _firstConsoleOutputs = true; + private int totalBatchedLinesAttemptedByWebsocket = 0; + private int failedAttemptsToPostBatchedLinesByWebsocket = 0; + + private ClientWebSocket _websocketClient = null; + + private ServiceEndpoint _serviceEndPoint; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -89,6 +107,10 @@ namespace GitHub.Runner.Common { Trace.Entering(); + this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + + InitializeWebsocket(); + if (_queueInProcess) { Trace.Info("No-opt, all queue process tasks are running."); @@ -156,6 +178,9 @@ namespace GitHub.Runner.Common await ProcessTimelinesUpdateQueueAsync(runOnce: true); Trace.Info("Timeline update queue drained."); + Trace.Info($"Disposing websocket client ..."); + this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None); + Trace.Info("All queue process tasks have been stopped, and all queues are drained."); } @@ -292,14 +317,69 @@ namespace GitHub.Runner.Common { try { - // we will not requeue failed batch, since the web console lines are time sensitive. - if (batch[0].LineNumber.HasValue) + if (this._websocketConnectTask != null) { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); + // lazily await here, we are already in the background task here + await this._websocketConnectTask; } - else + + var pushedLinesViaWebsocket = false; + if (this._websocketClient != null) { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); + var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value): + new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList()); + var jsonData = StringUtil.ConvertToJson(linesWrapper); + try + { + totalBatchedLinesAttemptedByWebsocket++; + var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData); + // break the message into chunks of 1024 bytes + for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024) + { + var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length; + var chunk = new ArraySegment(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i)); + await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None); + } + + pushedLinesViaWebsocket = true; + } + catch (Exception ex) + { + Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State})."); + Trace.Error(ex); + failedAttemptsToPostBatchedLinesByWebsocket++; + if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider) + { + // let's consider failure percentage + if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed) + { + Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again."); + this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None); + this._websocketClient = null; + } + } + + if (this._websocketClient != null) + { + var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect); + Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket})."); + InitializeWebsocket(delay); + } + } + } + + // if we can't push via websocket, let's fallback to posting via REST API + if (!pushedLinesViaWebsocket) + { + // we will not requeue failed batch, since the web console lines are time sensitive. + if (batch[0].LineNumber.HasValue) + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); + } + else + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); + } } if (_firstConsoleOutputs) @@ -391,6 +471,46 @@ namespace GitHub.Runner.Common } } + private void InitializeWebsocket(TimeSpan? delay = null) + { + if (_serviceEndPoint.Authorization != null && + _serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) && + !string.IsNullOrEmpty(accessToken)) + { + if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl)) + { + // let's ensure we use the right scheme + feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://"); + Trace.Info($"Creating websocket client ..." + feedStreamUrl); + this._websocketClient = new ClientWebSocket(); + this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}"); + this._websocketConnectTask = Task.Run(async () => + { + try + { + Trace.Info($"Attempting to start websocket client with delay {delay}."); + await Task.Delay(delay ?? TimeSpan.Zero); + await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken)); + Trace.Info($"Successfully started websocket client."); + } + catch(Exception ex) + { + Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket."); + Trace.Error(ex); + } + }); + } + else + { + Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data"); + } + } + else + { + Trace.Info($"No access token from the service endpoint"); + } + } + private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false) { while (!_jobCompletionSource.Task.IsCompleted || runOnce) @@ -489,8 +609,8 @@ namespace GitHub.Runner.Common if (runOnce) { - // continue process timeline records update, - // we might have more records need update, + // continue process timeline records update, + // we might have more records need update, // since we just create a new sub-timeline if (pendingSubtimelineUpdate) {