From ad9a4a45d131c1747da2988d67caacfc59c22195 Mon Sep 17 00:00:00 2001 From: Yang Cao Date: Wed, 26 Apr 2023 12:51:56 -0400 Subject: [PATCH] Bypass job server when run service is enabled (#2516) * Only upload to Results with new job message type * No need to have separate websocketFeedServer * Linting fix * Update src/Runner.Common/JobServerQueue.cs Co-authored-by: Tingluo Huang * add connection timeout * Consolidate initializing webclient to result client * Retry websocket delivery for console logs * Linter fix * Do not give up for good, reconnect again in 10 minutes * Has to reset delivered * Only first time retry 3 times to connect to websocket --------- Co-authored-by: Tingluo Huang --- src/Runner.Common/JobServerQueue.cs | 78 +++++++++---- src/Runner.Common/ResultsServer.cs | 170 +++++++++++++++++++++++++++- src/Runner.Worker/JobRunner.cs | 3 + 3 files changed, 224 insertions(+), 27 deletions(-) diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 20ad27a89..57ba3eb23 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -17,7 +17,7 @@ namespace GitHub.Runner.Common TaskCompletionSource JobRecordUpdated { get; } event EventHandler JobServerQueueThrottling; Task ShutdownAsync(); - void Start(Pipelines.AgentJobRequestMessage jobRequest); + void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines); @@ -70,6 +70,7 @@ namespace GitHub.Runner.Common private readonly TaskCompletionSource _jobCompletionSource = new(); private readonly TaskCompletionSource _jobRecordUpdated = new(); private bool _queueInProcess = false; + private bool _resultsServiceOnly = false; public TaskCompletionSource JobRecordUpdated => _jobRecordUpdated; @@ -95,13 +96,17 @@ namespace GitHub.Runner.Common _resultsServer = hostContext.GetService(); } - public void Start(Pipelines.AgentJobRequestMessage jobRequest) + public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false) { Trace.Entering(); + _resultsServiceOnly = resultServiceOnly; var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); - _jobServer.InitializeWebsocketClient(serviceEndPoint); + if (!resultServiceOnly) + { + _jobServer.InitializeWebsocketClient(serviceEndPoint); + } // This code is usually wrapped by an instance of IExecutionContext which isn't available here. jobRequest.Variables.TryGetValue("system.github.results_endpoint", out VariableValue resultsEndpointVariable); @@ -112,8 +117,16 @@ namespace GitHub.Runner.Common !string.IsNullOrEmpty(accessToken) && !string.IsNullOrEmpty(resultsReceiverEndpoint)) { + string liveConsoleFeedUrl = null; Trace.Info("Initializing results client"); - _resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); + if (resultServiceOnly + && serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) + && !string.IsNullOrEmpty(feedStreamUrl)) + { + liveConsoleFeedUrl = feedStreamUrl; + } + + _resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), liveConsoleFeedUrl, accessToken); _resultsClientInitiated = true; } @@ -194,6 +207,9 @@ namespace GitHub.Runner.Common Trace.Info($"Disposing job server ..."); await _jobServer.DisposeAsync(); + Trace.Info($"Disposing results server ..."); + await _resultsServer.DisposeAsync(); + Trace.Info("All queue process tasks have been stopped, and all queues are drained."); } @@ -372,7 +388,14 @@ namespace GitHub.Runner.Common // Give at most 60s for each request. using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60))) { - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token); + if (_resultsServiceOnly) + { + await _resultsServer.AppendLiveConsoleFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token); + } + else + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token); + } } if (_firstConsoleOutputs) @@ -599,7 +622,7 @@ namespace GitHub.Runner.Common foreach (var detailTimeline in update.PendingRecords.Where(r => r.Details != null)) { - if (!_allTimelines.Contains(detailTimeline.Details.Id)) + if (!_resultsServiceOnly && !_allTimelines.Contains(detailTimeline.Details.Id)) { try { @@ -621,7 +644,11 @@ namespace GitHub.Runner.Common try { - await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); + if (!_resultsServiceOnly) + { + await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken)); + } + try { if (_resultsClientInitiated) @@ -797,27 +824,30 @@ namespace GitHub.Runner.Common bool uploadSucceed = false; try { - if (String.Equals(file.Type, CoreAttachmentType.Log, StringComparison.OrdinalIgnoreCase)) + if (!_resultsServiceOnly) { - // Create the log - var taskLog = await _jobServer.CreateLogAsync(_scopeIdentifier, _hubName, _planId, new TaskLog(String.Format(@"logs\{0:D}", file.TimelineRecordId)), default(CancellationToken)); - - // Upload the contents - using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + if (String.Equals(file.Type, CoreAttachmentType.Log, StringComparison.OrdinalIgnoreCase)) { - var logUploaded = await _jobServer.AppendLogContentAsync(_scopeIdentifier, _hubName, _planId, taskLog.Id, fs, default(CancellationToken)); + // Create the log + var taskLog = await _jobServer.CreateLogAsync(_scopeIdentifier, _hubName, _planId, new TaskLog(String.Format(@"logs\{0:D}", file.TimelineRecordId)), default(CancellationToken)); + + // Upload the contents + using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + var logUploaded = await _jobServer.AppendLogContentAsync(_scopeIdentifier, _hubName, _planId, taskLog.Id, fs, default(CancellationToken)); + } + + // Create a new record and only set the Log field + var attachmentUpdataRecord = new TimelineRecord() { Id = file.TimelineRecordId, Log = taskLog }; + QueueTimelineRecordUpdate(file.TimelineId, attachmentUpdataRecord); } - - // Create a new record and only set the Log field - var attachmentUpdataRecord = new TimelineRecord() { Id = file.TimelineRecordId, Log = taskLog }; - QueueTimelineRecordUpdate(file.TimelineId, attachmentUpdataRecord); - } - else - { - // Create attachment - using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + else { - var result = await _jobServer.CreateAttachmentAsync(_scopeIdentifier, _hubName, _planId, file.TimelineId, file.TimelineRecordId, file.Type, file.Name, fs, default(CancellationToken)); + // Create attachment + using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + var result = await _jobServer.CreateAttachmentAsync(_scopeIdentifier, _hubName, _planId, file.TimelineId, file.TimelineRecordId, file.Type, file.Name, fs, default(CancellationToken)); + } } } diff --git a/src/Runner.Common/ResultsServer.cs b/src/Runner.Common/ResultsServer.cs index 4d24d55c8..b36a5efbd 100644 --- a/src/Runner.Common/ResultsServer.cs +++ b/src/Runner.Common/ResultsServer.cs @@ -1,17 +1,26 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net.Http.Headers; +using System.Net.WebSockets; +using System.Security; +using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; using GitHub.Services.Results.Client; +using GitHub.Services.WebApi.Utilities.Internal; namespace GitHub.Runner.Common { [ServiceLocator(Default = typeof(ResultServer))] - public interface IResultsServer : IRunnerService + public interface IResultsServer : IRunnerService, IAsyncDisposable { - void InitializeResultsClient(Uri uri, string token); + void InitializeResultsClient(Uri uri, string liveConsoleFeedUrl, string token); + + Task AppendLiveConsoleFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken); // logging and console Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file, @@ -31,10 +40,26 @@ namespace GitHub.Runner.Common { private ResultsHttpClient _resultsClient; - public void InitializeResultsClient(Uri uri, string token) + private ClientWebSocket _websocketClient; + private DateTime? _lastConnectionFailure; + + private static readonly TimeSpan MinDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(100); + private static readonly TimeSpan MaxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500); + + private Task _websocketConnectTask; + private String _liveConsoleFeedUrl; + private string _token; + + public void InitializeResultsClient(Uri uri, string liveConsoleFeedUrl, string token) { var httpMessageHandler = HostContext.CreateHttpClientHandler(); this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true); + _token = token; + if (!string.IsNullOrEmpty(liveConsoleFeedUrl)) + { + _liveConsoleFeedUrl = liveConsoleFeedUrl; + InitializeWebsocketClient(liveConsoleFeedUrl, token, TimeSpan.Zero, retryConnection: true); + } } public Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file, @@ -94,5 +119,144 @@ namespace GitHub.Runner.Common throw new InvalidOperationException("Results client is not initialized."); } + + public ValueTask DisposeAsync() + { + CloseWebSocket(WebSocketCloseStatus.NormalClosure, CancellationToken.None); + + GC.SuppressFinalize(this); + + return ValueTask.CompletedTask; + } + + private void InitializeWebsocketClient(string liveConsoleFeedUrl, string accessToken, TimeSpan delay, bool retryConnection = false) + { + if (!string.IsNullOrEmpty(accessToken)) + { + Trace.Info($"No access token from server"); + return; + } + + if (!string.IsNullOrEmpty(liveConsoleFeedUrl)) + { + Trace.Info($"No live console feed url from server"); + return; + } + + Trace.Info($"Creating websocket client ..." + liveConsoleFeedUrl); + this._websocketClient = new ClientWebSocket(); + this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}"); + var userAgentValues = new List(); + userAgentValues.AddRange(UserAgentUtility.GetDefaultRestUserAgent()); + userAgentValues.AddRange(HostContext.UserAgents); + this._websocketClient.Options.SetRequestHeader("User-Agent", string.Join(" ", userAgentValues.Select(x => x.ToString()))); + + // during initialization, retry upto 3 times to setup connection + this._websocketConnectTask = ConnectWebSocketClient(liveConsoleFeedUrl, delay, retryConnection); + } + + private async Task ConnectWebSocketClient(string feedStreamUrl, TimeSpan delay, bool retryConnection = false) + { + bool connected = false; + int retries = 0; + + do + { + try + { + Trace.Info($"Attempting to start websocket client with delay {delay}."); + await Task.Delay(delay); + using var connectTimeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), connectTimeoutTokenSource.Token); + Trace.Info($"Successfully started websocket client."); + connected = true; + } + catch (Exception ex) + { + Trace.Info("Exception caught during websocket client connect, retry connection."); + Trace.Error(ex); + retries++; + this._websocketClient = null; + _lastConnectionFailure = DateTime.Now; + } + } while (retryConnection && !connected && retries < 3); + } + + public async Task AppendLiveConsoleFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken) + { + if (_websocketConnectTask != null) + { + await _websocketConnectTask; + } + + bool delivered = false; + int retries = 0; + + // "_websocketClient != null" implies either: We have a successful connection OR we have to attempt sending again and then reconnect + // ...in other words, if websocket client is null, we will skip sending to websocket + if (_websocketClient != null) + { + var linesWrapper = startLine.HasValue + ? new TimelineRecordFeedLinesWrapper(stepId, lines, startLine.Value) + : new TimelineRecordFeedLinesWrapper(stepId, lines); + var jsonData = StringUtil.ConvertToJson(linesWrapper); + var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData); + // break the message into chunks of 1024 bytes + for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024) + { + var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length; + var chunk = new ArraySegment(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i)); + + delivered = false; + while (!delivered && retries < 3) + { + try + { + if (_websocketClient != null) + { + await _websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage: lastChunk, cancellationToken); + delivered = true; + } + } + catch (Exception ex) + { + var delay = BackoffTimerHelper.GetRandomBackoff(MinDelayForWebsocketReconnect, MaxDelayForWebsocketReconnect); + Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms."); + Trace.Error(ex); + retries++; + InitializeWebsocketClient(_liveConsoleFeedUrl, _token, delay); + } + } + } + } + + if (!delivered) + { + // Giving up for now, so next invocation of this method won't attempt to reconnect + _websocketClient = null; + + // however if 10 minutes have already passed, let's try reestablish connection again + if (_lastConnectionFailure.HasValue && DateTime.Now > _lastConnectionFailure.Value.AddMinutes(10)) + { + // Some minutes passed since we retried last time, try connection again + InitializeWebsocketClient(_liveConsoleFeedUrl, _token, TimeSpan.Zero); + } + } + + return delivered; + } + + private void CloseWebSocket(WebSocketCloseStatus closeStatus, CancellationToken cancellationToken) + { + try + { + _websocketClient?.CloseOutputAsync(closeStatus, "Closing websocket", cancellationToken); + } + catch (Exception websocketEx) + { + // In some cases this might be okay since the websocket might be open yet, so just close and don't trace exceptions + Trace.Info($"Failed to close websocket gracefully {websocketEx.GetType().Name}"); + } + } } } diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index f63fbf717..3911bd340 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -49,6 +49,9 @@ namespace GitHub.Runner.Worker VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); await runServer.ConnectAsync(systemConnection.Url, jobServerCredential); server = runServer; + + _jobServerQueue = HostContext.GetService(); + _jobServerQueue.Start(message, resultServiceOnly: true); } else {