From af657acebc20e4448e4c524be6abcf205a390f5d Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Wed, 22 Mar 2023 12:55:15 -0700 Subject: [PATCH] WIP --- .vscode/launch.json | 8 +- src/Runner.Common/BrokerServer.cs | 56 ++++ src/Runner.Listener/BrokerMessageListener.cs | 285 ++++++++++--------- src/Runner.Listener/MessageListener.cs | 2 +- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 90 +++--- 5 files changed, 261 insertions(+), 180 deletions(-) create mode 100644 src/Runner.Common/BrokerServer.cs diff --git a/.vscode/launch.json b/.vscode/launch.json index 3c5f5c694..270f1d6f5 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -24,7 +24,10 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false + "requireExactSource": false, + "env": { + "USE_BROKER_FLOW": "1" + } }, { "name": "Configure", @@ -54,5 +57,4 @@ "requireExactSource": false }, ], -} - +} \ No newline at end of file diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs new file mode 100644 index 000000000..284616210 --- /dev/null +++ b/src/Runner.Common/BrokerServer.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Actions.RunService.WebApi; +using GitHub.DistributedTask.Pipelines; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi.RawClient; + +namespace GitHub.Runner.Common +{ + [ServiceLocator(Default = typeof(BrokerServer))] + public interface IBrokerServer : IRunnerService + { + Task ConnectAsync(Uri serverUrl, VssCredentials credentials); + + Task GetRunnerMessageAsync(CancellationToken token); + } + + public sealed class BrokerServer : RunnerService, IBrokerServer + { + private bool _hasConnection; + private Uri _brokerUri; + private RawConnection _connection; + private BrokerHttpClient _brokerHttpClient; + + public async Task ConnectAsync(Uri serverUri, VssCredentials credentials) + { + _brokerUri = serverUri; + + _connection = VssUtil.CreateRawConnection(serverUri, credentials); + _brokerHttpClient = await _connection.GetClientAsync(); + _hasConnection = true; + } + + private void CheckConnection() + { + if (!_hasConnection) + { + throw new InvalidOperationException($"SetConnection"); + } + } + + public Task GetRunnerMessageAsync(CancellationToken cancellationToken) + { + CheckConnection(); + var jobMessage = RetryRequest( + async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken); + + return jobMessage; + } + } +} diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 3dc474d93..35a37969e 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -18,28 +18,33 @@ namespace GitHub.Runner.Listener { public sealed class BrokerMessageListener : RunnerService, IMessageListener { - private long? _lastMessageId; - private RunnerSettings _settings; - private ITerminal _term; - private IRunnerServer _runnerServer; - private TaskAgentSession _session; - private TimeSpan _getNextMessageRetryInterval; - private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); - private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); - private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); - private readonly Dictionary _sessionCreationExceptionTracker = new(); + // private long? _lastMessageId; + // private RunnerSettings _settings; + // private ITerminal _term; + // private IRunnerServer _runnerServer; + // private TaskAgentSession _session; + // private TimeSpan _getNextMessageRetryInterval; + // private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); + // private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); + // private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); + // private readonly Dictionary _sessionCreationExceptionTracker = new(); private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; + private IBrokerServer _brokerServer; public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); - _term = HostContext.GetService(); + // _term = HostContext.GetService(); + _brokerServer = HostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) { + var credMgr = HostContext.GetService(); + VssCredentials creds = credMgr.LoadCredentials(); + await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); return await Task.FromResult(true); } @@ -50,140 +55,150 @@ namespace GitHub.Runner.Listener public void OnJobStatus(object sender, JobStatusEventArgs e) { - if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW"))) + + Trace.Info("Received job status event. JobState: {0}", e.Status); + runnerStatus = e.Status; + try { - Trace.Info("Received job status event. JobState: {0}", e.Status); - runnerStatus = e.Status; - try - { - _getMessagesTokenSource?.Cancel(); - } - catch (ObjectDisposedException) - { - Trace.Info("_getMessagesTokenSource is already disposed."); - } + _getMessagesTokenSource?.Cancel(); } + catch (ObjectDisposedException) + { + Trace.Info("_getMessagesTokenSource is already disposed."); + } + } public async Task GetNextMessageAsync(CancellationToken token) { - Trace.Entering(); - ArgUtil.NotNull(_settings, nameof(_settings)); - bool encounteringError = false; - int continuousError = 0; - string errorMessage = string.Empty; - Stopwatch heartbeat = new(); - heartbeat.Restart(); while (true) { - token.ThrowIfCancellationRequested(); - TaskAgentMessage message = null; _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - try + var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token); + if (message != null) { - message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, - _session.SessionId, - _lastMessageId, - runnerStatus, - BuildConstants.RunnerPackage.Version, - _getMessagesTokenSource.Token); - - - if (message != null) - { - _lastMessageId = message.MessageId; - } - - if (encounteringError) //print the message once only if there was an error - { - _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); - encounteringError = false; - continuousError = 0; - } - } - catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) - { - Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); - continue; - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - Trace.Info("Get next message has been cancelled."); - throw; - } - catch (TaskAgentAccessTokenExpiredException) - { - Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); - throw; - } - catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) - { - throw; - } - catch (Exception ex) - { - Trace.Error("Catch exception during get next message."); - Trace.Error(ex); - - // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. - if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) - { - Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); - } - else if (!IsGetNextMessageExceptionRetriable(ex)) - { - throw; - } - else - { - continuousError++; - //retry after a random backoff to avoid service throttling - //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. - if (continuousError <= 5) - { - // random backoff [15, 30] - _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); - } - else - { - // more aggressive backoff [30, 60] - _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); - } - - if (!encounteringError) - { - //print error only on the first consecutive error - _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); - encounteringError = true; - } - - Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); - await HostContext.Delay(_getNextMessageRetryInterval, token); - } - } - finally - { - _getMessagesTokenSource.Dispose(); + return message; } - if (message == null) - { - if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) - { - Trace.Info($"No message retrieved within last 30 minutes."); - heartbeat.Restart(); - } - else - { - Trace.Verbose($"No message retrieved"); - } - - continue; - } - - Trace.Info($"Message '{message.MessageId}' received"); - return message; } + + // return message; + // Trace.Entering(); + // ArgUtil.NotNull(_settings, nameof(_settings)); + // bool encounteringError = false; + // int continuousError = 0; + // string errorMessage = string.Empty; + // Stopwatch heartbeat = new(); + // heartbeat.Restart(); + // while (true) + // { + // token.ThrowIfCancellationRequested(); + // TaskAgentMessage message = null; + // _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + // try + // { + // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, + // _session.SessionId, + // _lastMessageId, + // runnerStatus, + // BuildConstants.RunnerPackage.Version, + // _getMessagesTokenSource.Token); + + + // if (message != null) + // { + // _lastMessageId = message.MessageId; + // } + + // if (encounteringError) //print the message once only if there was an error + // { + // _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); + // encounteringError = false; + // continuousError = 0; + // } + // } + // catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) + // { + // Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + // continue; + // } + // catch (OperationCanceledException) when (token.IsCancellationRequested) + // { + // Trace.Info("Get next message has been cancelled."); + // throw; + // } + // catch (TaskAgentAccessTokenExpiredException) + // { + // Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + // throw; + // } + // catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + // { + // throw; + // } + // catch (Exception ex) + // { + // Trace.Error("Catch exception during get next message."); + // Trace.Error(ex); + + // // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. + // if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) + // { + // Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); + // } + // else if (!IsGetNextMessageExceptionRetriable(ex)) + // { + // throw; + // } + // else + // { + // continuousError++; + // //retry after a random backoff to avoid service throttling + // //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + // if (continuousError <= 5) + // { + // // random backoff [15, 30] + // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + // } + // else + // { + // // more aggressive backoff [30, 60] + // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + // } + + // if (!encounteringError) + // { + // //print error only on the first consecutive error + // _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + // encounteringError = true; + // } + + // Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + // await HostContext.Delay(_getNextMessageRetryInterval, token); + // } + // } + // finally + // { + // _getMessagesTokenSource.Dispose(); + // } + + // if (message == null) + // { + // if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + // { + // Trace.Info($"No message retrieved within last 30 minutes."); + // heartbeat.Restart(); + // } + // else + // { + // Trace.Verbose($"No message retrieved"); + // } + + // continue; + // } + + // Trace.Info($"Message '{message.MessageId}' received"); + // return message; } public async Task DeleteMessageAsync(TaskAgentMessage message) @@ -209,9 +224,9 @@ namespace GitHub.Runner.Listener } } - private GetMessageAsync(string status, string version) - { + // private GetMessageAsync(string status, string version) + // { - } + // } } } diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index cb8edb607..80c0227ea 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -16,7 +16,7 @@ using GitHub.Services.OAuth; namespace GitHub.Runner.Listener { - [ServiceLocator(Default = typeof(MessageListener))] + [ServiceLocator(Default = typeof(BrokerMessageListener))] public interface IMessageListener : IRunnerService { Task CreateSessionAsync(CancellationToken token); diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index 6a7a5e9db..40563b351 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -1,61 +1,69 @@ using System; -using System.IO; +using System.Collections.Generic; using System.Net.Http; -using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; -using GitHub.Services.Results.Contracts; -using System.Net.Http.Formatting; -using Sdk.WebApi.WebApi; +using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; +using GitHub.Services.Common; +using GitHub.Services.OAuth; +using GitHub.Services.WebApi; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi; -namespace GitHub.Services.Results.Client +namespace GitHub.Actions.RunService.WebApi { public class BrokerHttpClient : RawHttpClientBase { + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials) + : base(baseUrl, credentials) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings) + : base(baseUrl, credentials, settings) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, handlers) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, settings, handlers) + { + } + public BrokerHttpClient( Uri baseUrl, HttpMessageHandler pipeline, - string token, - bool disposeHandler) + Boolean disposeHandler) : base(baseUrl, pipeline, disposeHandler) { - m_token = token; - m_brokerUrl = baseUrl; - m_formatter = new JsonMediaTypeFormatter(); } - public async Task GetMessagesAsync(CancellationToken cancellationToken) + public Task GetRunnerMessageAsync( + CancellationToken cancellationToken = default) { - var uri = new Uri(m_brokerUrl, Constants.Messages); - return await GetSignedURLResponse(uri, cancellationToken); + var requestUri = new Uri(Client.BaseAddress, "message"); + + return SendAsync( + new HttpMethod("GET"), + requestUri: requestUri, + cancellationToken: cancellationToken); } - - // Get Sas URL calls - private async Task GetSignedURLResponse(Uri uri, CancellationToken cancellationToken) - { - using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, uri)) - { - requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); - requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); - - using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) - { - return await ReadJsonContentAsync(response, cancellationToken); - } - } - } - - private MediaTypeFormatter m_formatter; - private Uri m_brokerUrl; - private string m_token; } - - // Constants specific to results - public static class Constants - { - - public static readonly string Messages = "messages"; - } - }