mirror of
https://github.com/actions/runner.git
synced 2025-12-10 12:36:23 +00:00
Bypass job server when run service is enabled (#2516)
* Only upload to Results with new job message type * No need to have separate websocketFeedServer * Linting fix * Update src/Runner.Common/JobServerQueue.cs Co-authored-by: Tingluo Huang <tingluohuang@github.com> * add connection timeout * Consolidate initializing webclient to result client * Retry websocket delivery for console logs * Linter fix * Do not give up for good, reconnect again in 10 minutes * Has to reset delivered * Only first time retry 3 times to connect to websocket --------- Co-authored-by: Tingluo Huang <tingluohuang@github.com>
This commit is contained in:
@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
|
||||
TaskCompletionSource<int> JobRecordUpdated { get; }
|
||||
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
|
||||
Task ShutdownAsync();
|
||||
void Start(Pipelines.AgentJobRequestMessage jobRequest);
|
||||
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false);
|
||||
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
|
||||
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
|
||||
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
|
||||
@@ -70,6 +70,7 @@ namespace GitHub.Runner.Common
|
||||
private readonly TaskCompletionSource<int> _jobCompletionSource = new();
|
||||
private readonly TaskCompletionSource<int> _jobRecordUpdated = new();
|
||||
private bool _queueInProcess = false;
|
||||
private bool _resultsServiceOnly = false;
|
||||
|
||||
public TaskCompletionSource<int> JobRecordUpdated => _jobRecordUpdated;
|
||||
|
||||
@@ -95,13 +96,17 @@ namespace GitHub.Runner.Common
|
||||
_resultsServer = hostContext.GetService<IResultsServer>();
|
||||
}
|
||||
|
||||
public void Start(Pipelines.AgentJobRequestMessage jobRequest)
|
||||
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false)
|
||||
{
|
||||
Trace.Entering();
|
||||
_resultsServiceOnly = resultServiceOnly;
|
||||
|
||||
var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
_jobServer.InitializeWebsocketClient(serviceEndPoint);
|
||||
if (!resultServiceOnly)
|
||||
{
|
||||
_jobServer.InitializeWebsocketClient(serviceEndPoint);
|
||||
}
|
||||
|
||||
// This code is usually wrapped by an instance of IExecutionContext which isn't available here.
|
||||
jobRequest.Variables.TryGetValue("system.github.results_endpoint", out VariableValue resultsEndpointVariable);
|
||||
@@ -112,8 +117,16 @@ namespace GitHub.Runner.Common
|
||||
!string.IsNullOrEmpty(accessToken) &&
|
||||
!string.IsNullOrEmpty(resultsReceiverEndpoint))
|
||||
{
|
||||
string liveConsoleFeedUrl = null;
|
||||
Trace.Info("Initializing results client");
|
||||
_resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken);
|
||||
if (resultServiceOnly
|
||||
&& serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl)
|
||||
&& !string.IsNullOrEmpty(feedStreamUrl))
|
||||
{
|
||||
liveConsoleFeedUrl = feedStreamUrl;
|
||||
}
|
||||
|
||||
_resultsServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), liveConsoleFeedUrl, accessToken);
|
||||
_resultsClientInitiated = true;
|
||||
}
|
||||
|
||||
@@ -194,6 +207,9 @@ namespace GitHub.Runner.Common
|
||||
Trace.Info($"Disposing job server ...");
|
||||
await _jobServer.DisposeAsync();
|
||||
|
||||
Trace.Info($"Disposing results server ...");
|
||||
await _resultsServer.DisposeAsync();
|
||||
|
||||
Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
|
||||
}
|
||||
|
||||
@@ -372,7 +388,14 @@ namespace GitHub.Runner.Common
|
||||
// Give at most 60s for each request.
|
||||
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(60)))
|
||||
{
|
||||
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token);
|
||||
if (_resultsServiceOnly)
|
||||
{
|
||||
await _resultsServer.AppendLiveConsoleFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber, timeoutTokenSource.Token);
|
||||
}
|
||||
}
|
||||
|
||||
if (_firstConsoleOutputs)
|
||||
@@ -599,7 +622,7 @@ namespace GitHub.Runner.Common
|
||||
|
||||
foreach (var detailTimeline in update.PendingRecords.Where(r => r.Details != null))
|
||||
{
|
||||
if (!_allTimelines.Contains(detailTimeline.Details.Id))
|
||||
if (!_resultsServiceOnly && !_allTimelines.Contains(detailTimeline.Details.Id))
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -621,7 +644,11 @@ namespace GitHub.Runner.Common
|
||||
|
||||
try
|
||||
{
|
||||
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken));
|
||||
if (!_resultsServiceOnly)
|
||||
{
|
||||
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (_resultsClientInitiated)
|
||||
@@ -797,27 +824,30 @@ namespace GitHub.Runner.Common
|
||||
bool uploadSucceed = false;
|
||||
try
|
||||
{
|
||||
if (String.Equals(file.Type, CoreAttachmentType.Log, StringComparison.OrdinalIgnoreCase))
|
||||
if (!_resultsServiceOnly)
|
||||
{
|
||||
// Create the log
|
||||
var taskLog = await _jobServer.CreateLogAsync(_scopeIdentifier, _hubName, _planId, new TaskLog(String.Format(@"logs\{0:D}", file.TimelineRecordId)), default(CancellationToken));
|
||||
|
||||
// Upload the contents
|
||||
using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
if (String.Equals(file.Type, CoreAttachmentType.Log, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
var logUploaded = await _jobServer.AppendLogContentAsync(_scopeIdentifier, _hubName, _planId, taskLog.Id, fs, default(CancellationToken));
|
||||
// Create the log
|
||||
var taskLog = await _jobServer.CreateLogAsync(_scopeIdentifier, _hubName, _planId, new TaskLog(String.Format(@"logs\{0:D}", file.TimelineRecordId)), default(CancellationToken));
|
||||
|
||||
// Upload the contents
|
||||
using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
{
|
||||
var logUploaded = await _jobServer.AppendLogContentAsync(_scopeIdentifier, _hubName, _planId, taskLog.Id, fs, default(CancellationToken));
|
||||
}
|
||||
|
||||
// Create a new record and only set the Log field
|
||||
var attachmentUpdataRecord = new TimelineRecord() { Id = file.TimelineRecordId, Log = taskLog };
|
||||
QueueTimelineRecordUpdate(file.TimelineId, attachmentUpdataRecord);
|
||||
}
|
||||
|
||||
// Create a new record and only set the Log field
|
||||
var attachmentUpdataRecord = new TimelineRecord() { Id = file.TimelineRecordId, Log = taskLog };
|
||||
QueueTimelineRecordUpdate(file.TimelineId, attachmentUpdataRecord);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Create attachment
|
||||
using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
else
|
||||
{
|
||||
var result = await _jobServer.CreateAttachmentAsync(_scopeIdentifier, _hubName, _planId, file.TimelineId, file.TimelineRecordId, file.Type, file.Name, fs, default(CancellationToken));
|
||||
// Create attachment
|
||||
using (FileStream fs = File.Open(file.Path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
{
|
||||
var result = await _jobServer.CreateAttachmentAsync(_scopeIdentifier, _hubName, _planId, file.TimelineId, file.TimelineRecordId, file.Type, file.Name, fs, default(CancellationToken));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,26 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.WebSockets;
|
||||
using System.Security;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using GitHub.Runner.Sdk;
|
||||
using GitHub.Services.Common;
|
||||
using GitHub.Services.Results.Client;
|
||||
using GitHub.Services.WebApi.Utilities.Internal;
|
||||
|
||||
namespace GitHub.Runner.Common
|
||||
{
|
||||
[ServiceLocator(Default = typeof(ResultServer))]
|
||||
public interface IResultsServer : IRunnerService
|
||||
public interface IResultsServer : IRunnerService, IAsyncDisposable
|
||||
{
|
||||
void InitializeResultsClient(Uri uri, string token);
|
||||
void InitializeResultsClient(Uri uri, string liveConsoleFeedUrl, string token);
|
||||
|
||||
Task<bool> AppendLiveConsoleFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
|
||||
|
||||
// logging and console
|
||||
Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file,
|
||||
@@ -31,10 +40,26 @@ namespace GitHub.Runner.Common
|
||||
{
|
||||
private ResultsHttpClient _resultsClient;
|
||||
|
||||
public void InitializeResultsClient(Uri uri, string token)
|
||||
private ClientWebSocket _websocketClient;
|
||||
private DateTime? _lastConnectionFailure;
|
||||
|
||||
private static readonly TimeSpan MinDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(100);
|
||||
private static readonly TimeSpan MaxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500);
|
||||
|
||||
private Task _websocketConnectTask;
|
||||
private String _liveConsoleFeedUrl;
|
||||
private string _token;
|
||||
|
||||
public void InitializeResultsClient(Uri uri, string liveConsoleFeedUrl, string token)
|
||||
{
|
||||
var httpMessageHandler = HostContext.CreateHttpClientHandler();
|
||||
this._resultsClient = new ResultsHttpClient(uri, httpMessageHandler, token, disposeHandler: true);
|
||||
_token = token;
|
||||
if (!string.IsNullOrEmpty(liveConsoleFeedUrl))
|
||||
{
|
||||
_liveConsoleFeedUrl = liveConsoleFeedUrl;
|
||||
InitializeWebsocketClient(liveConsoleFeedUrl, token, TimeSpan.Zero, retryConnection: true);
|
||||
}
|
||||
}
|
||||
|
||||
public Task CreateResultsStepSummaryAsync(string planId, string jobId, Guid stepId, string file,
|
||||
@@ -94,5 +119,144 @@ namespace GitHub.Runner.Common
|
||||
|
||||
throw new InvalidOperationException("Results client is not initialized.");
|
||||
}
|
||||
|
||||
public ValueTask DisposeAsync()
|
||||
{
|
||||
CloseWebSocket(WebSocketCloseStatus.NormalClosure, CancellationToken.None);
|
||||
|
||||
GC.SuppressFinalize(this);
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
private void InitializeWebsocketClient(string liveConsoleFeedUrl, string accessToken, TimeSpan delay, bool retryConnection = false)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(accessToken))
|
||||
{
|
||||
Trace.Info($"No access token from server");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(liveConsoleFeedUrl))
|
||||
{
|
||||
Trace.Info($"No live console feed url from server");
|
||||
return;
|
||||
}
|
||||
|
||||
Trace.Info($"Creating websocket client ..." + liveConsoleFeedUrl);
|
||||
this._websocketClient = new ClientWebSocket();
|
||||
this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
|
||||
var userAgentValues = new List<ProductInfoHeaderValue>();
|
||||
userAgentValues.AddRange(UserAgentUtility.GetDefaultRestUserAgent());
|
||||
userAgentValues.AddRange(HostContext.UserAgents);
|
||||
this._websocketClient.Options.SetRequestHeader("User-Agent", string.Join(" ", userAgentValues.Select(x => x.ToString())));
|
||||
|
||||
// during initialization, retry upto 3 times to setup connection
|
||||
this._websocketConnectTask = ConnectWebSocketClient(liveConsoleFeedUrl, delay, retryConnection);
|
||||
}
|
||||
|
||||
private async Task ConnectWebSocketClient(string feedStreamUrl, TimeSpan delay, bool retryConnection = false)
|
||||
{
|
||||
bool connected = false;
|
||||
int retries = 0;
|
||||
|
||||
do
|
||||
{
|
||||
try
|
||||
{
|
||||
Trace.Info($"Attempting to start websocket client with delay {delay}.");
|
||||
await Task.Delay(delay);
|
||||
using var connectTimeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||||
await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), connectTimeoutTokenSource.Token);
|
||||
Trace.Info($"Successfully started websocket client.");
|
||||
connected = true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Info("Exception caught during websocket client connect, retry connection.");
|
||||
Trace.Error(ex);
|
||||
retries++;
|
||||
this._websocketClient = null;
|
||||
_lastConnectionFailure = DateTime.Now;
|
||||
}
|
||||
} while (retryConnection && !connected && retries < 3);
|
||||
}
|
||||
|
||||
public async Task<bool> AppendLiveConsoleFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_websocketConnectTask != null)
|
||||
{
|
||||
await _websocketConnectTask;
|
||||
}
|
||||
|
||||
bool delivered = false;
|
||||
int retries = 0;
|
||||
|
||||
// "_websocketClient != null" implies either: We have a successful connection OR we have to attempt sending again and then reconnect
|
||||
// ...in other words, if websocket client is null, we will skip sending to websocket
|
||||
if (_websocketClient != null)
|
||||
{
|
||||
var linesWrapper = startLine.HasValue
|
||||
? new TimelineRecordFeedLinesWrapper(stepId, lines, startLine.Value)
|
||||
: new TimelineRecordFeedLinesWrapper(stepId, lines);
|
||||
var jsonData = StringUtil.ConvertToJson(linesWrapper);
|
||||
var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData);
|
||||
// break the message into chunks of 1024 bytes
|
||||
for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024)
|
||||
{
|
||||
var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length;
|
||||
var chunk = new ArraySegment<byte>(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i));
|
||||
|
||||
delivered = false;
|
||||
while (!delivered && retries < 3)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (_websocketClient != null)
|
||||
{
|
||||
await _websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage: lastChunk, cancellationToken);
|
||||
delivered = true;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var delay = BackoffTimerHelper.GetRandomBackoff(MinDelayForWebsocketReconnect, MaxDelayForWebsocketReconnect);
|
||||
Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms.");
|
||||
Trace.Error(ex);
|
||||
retries++;
|
||||
InitializeWebsocketClient(_liveConsoleFeedUrl, _token, delay);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!delivered)
|
||||
{
|
||||
// Giving up for now, so next invocation of this method won't attempt to reconnect
|
||||
_websocketClient = null;
|
||||
|
||||
// however if 10 minutes have already passed, let's try reestablish connection again
|
||||
if (_lastConnectionFailure.HasValue && DateTime.Now > _lastConnectionFailure.Value.AddMinutes(10))
|
||||
{
|
||||
// Some minutes passed since we retried last time, try connection again
|
||||
InitializeWebsocketClient(_liveConsoleFeedUrl, _token, TimeSpan.Zero);
|
||||
}
|
||||
}
|
||||
|
||||
return delivered;
|
||||
}
|
||||
|
||||
private void CloseWebSocket(WebSocketCloseStatus closeStatus, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
_websocketClient?.CloseOutputAsync(closeStatus, "Closing websocket", cancellationToken);
|
||||
}
|
||||
catch (Exception websocketEx)
|
||||
{
|
||||
// In some cases this might be okay since the websocket might be open yet, so just close and don't trace exceptions
|
||||
Trace.Info($"Failed to close websocket gracefully {websocketEx.GetType().Name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,6 +49,9 @@ namespace GitHub.Runner.Worker
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
|
||||
server = runServer;
|
||||
|
||||
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
||||
_jobServerQueue.Start(message, resultServiceOnly: true);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user