From 92258f9ea199435e172c04a7010473f5d994ff80 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Fri, 24 Mar 2023 14:00:34 -0400 Subject: [PATCH] Listen directly to Broker for messages (#2500) --- src/Runner.Common/BrokerServer.cs | 56 +++++ src/Runner.Common/ConfigurationStore.cs | 3 + src/Runner.Listener/BrokerMessageListener.cs | 204 +++++++++++++++++++ src/Runner.Listener/Runner.cs | 14 +- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 84 ++++++++ 5 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 src/Runner.Common/BrokerServer.cs create mode 100644 src/Runner.Listener/BrokerMessageListener.cs create mode 100644 src/Sdk/WebApi/WebApi/BrokerHttpClient.cs diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs new file mode 100644 index 000000000..bb0691771 --- /dev/null +++ b/src/Runner.Common/BrokerServer.cs @@ -0,0 +1,56 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Actions.RunService.WebApi; +using GitHub.DistributedTask.Pipelines; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi.RawClient; + +namespace GitHub.Runner.Common +{ + [ServiceLocator(Default = typeof(BrokerServer))] + public interface IBrokerServer : IRunnerService + { + Task ConnectAsync(Uri serverUrl, VssCredentials credentials); + + Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); + } + + public sealed class BrokerServer : RunnerService, IBrokerServer + { + private bool _hasConnection; + private Uri _brokerUri; + private RawConnection _connection; + private BrokerHttpClient _brokerHttpClient; + + public async Task ConnectAsync(Uri serverUri, VssCredentials credentials) + { + _brokerUri = serverUri; + + _connection = VssUtil.CreateRawConnection(serverUri, credentials); + _brokerHttpClient = await _connection.GetClientAsync(); + _hasConnection = true; + } + + private void CheckConnection() + { + if (!_hasConnection) + { + throw new InvalidOperationException($"SetConnection"); + } + } + + public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) + { + CheckConnection(); + var jobMessage = RetryRequest( + async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken); + + return jobMessage; + } + } +} diff --git a/src/Runner.Common/ConfigurationStore.cs b/src/Runner.Common/ConfigurationStore.cs index 49c4229da..3dd5a8032 100644 --- a/src/Runner.Common/ConfigurationStore.cs +++ b/src/Runner.Common/ConfigurationStore.cs @@ -53,6 +53,9 @@ namespace GitHub.Runner.Common [DataMember(EmitDefaultValue = false)] public bool UseV2Flow { get; set; } + [DataMember(EmitDefaultValue = false)] + public string ServerUrlV2 { get; set; } + [IgnoreDataMember] public bool IsHostedServer { diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs new file mode 100644 index 000000000..93259c8e5 --- /dev/null +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -0,0 +1,204 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Runtime.InteropServices; +using System.Security.Cryptography; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Common; +using GitHub.Runner.Listener.Configuration; +using GitHub.Runner.Sdk; +using GitHub.Services.Common; +using GitHub.Runner.Common.Util; +using GitHub.Services.OAuth; + +namespace GitHub.Runner.Listener +{ + public sealed class BrokerMessageListener : RunnerService, IMessageListener + { + private RunnerSettings _settings; + private ITerminal _term; + private TimeSpan _getNextMessageRetryInterval; + private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; + private CancellationTokenSource _getMessagesTokenSource; + private IBrokerServer _brokerServer; + + public override void Initialize(IHostContext hostContext) + { + base.Initialize(hostContext); + + _term = HostContext.GetService(); + _brokerServer = HostContext.GetService(); + } + + public async Task CreateSessionAsync(CancellationToken token) + { + await RefreshBrokerConnection(); + return await Task.FromResult(true); + } + + public async Task DeleteSessionAsync() + { + await Task.CompletedTask; + } + + public void OnJobStatus(object sender, JobStatusEventArgs e) + { + Trace.Info("Received job status event. JobState: {0}", e.Status); + runnerStatus = e.Status; + try + { + _getMessagesTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + Trace.Info("_getMessagesTokenSource is already disposed."); + } + } + + public async Task GetNextMessageAsync(CancellationToken token) + { + bool encounteringError = false; + int continuousError = 0; + Stopwatch heartbeat = new(); + heartbeat.Restart(); + var maxRetryCount = 10; + + while (true) + { + TaskAgentMessage message = null; + _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); + try + { + message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); + + if (message == null) + { + continue; + } + + return message; + } + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) + { + Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + continue; + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info("Get next message has been cancelled."); + throw; + } + catch (TaskAgentAccessTokenExpiredException) + { + Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + throw; + } + catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + { + throw; + } + catch (Exception ex) + { + Trace.Error("Catch exception during get next message."); + Trace.Error(ex); + + if (!IsGetNextMessageExceptionRetriable(ex)) + { + throw; + } + else + { + continuousError++; + //retry after a random backoff to avoid service throttling + //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + if (continuousError <= 5) + { + // random backoff [15, 30] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + } + else if (continuousError >= maxRetryCount) + { + throw; + } + else + { + // more aggressive backoff [30, 60] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + } + + if (!encounteringError) + { + //print error only on the first consecutive error + _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + encounteringError = true; + } + + // re-create VssConnection before next retry + await RefreshBrokerConnection(); + + Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + await HostContext.Delay(_getNextMessageRetryInterval, token); + } + } + finally + { + _getMessagesTokenSource.Dispose(); + } + + if (message == null) + { + if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + { + Trace.Info($"No message retrieved within last 30 minutes."); + heartbeat.Restart(); + } + else + { + Trace.Verbose($"No message retrieved."); + } + + continue; + } + + Trace.Info($"Message '{message.MessageId}' received."); + } + } + + public async Task DeleteMessageAsync(TaskAgentMessage message) + { + await Task.CompletedTask; + } + + private bool IsGetNextMessageExceptionRetriable(Exception ex) + { + if (ex is TaskAgentNotFoundException || + ex is TaskAgentPoolNotFoundException || + ex is TaskAgentSessionExpiredException || + ex is AccessDeniedException || + ex is VssUnauthorizedException) + { + Trace.Info($"Non-retriable exception: {ex.Message}"); + return false; + } + else + { + Trace.Info($"Retriable exception: {ex.Message}"); + return true; + } + } + + private async Task RefreshBrokerConnection() + { + var configManager = HostContext.GetService(); + _settings = configManager.LoadSettings(); + + var credMgr = HostContext.GetService(); + VssCredentials creds = credMgr.LoadCredentials(); + await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds); + } + } +} diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index c727f1b38..e95379ad3 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -339,13 +339,25 @@ namespace GitHub.Runner.Listener } } + private IMessageListener GetMesageListener(RunnerSettings settings) + { + if (settings.UseV2Flow) + { + var brokerListener = new BrokerMessageListener(); + brokerListener.Initialize(HostContext); + return brokerListener; + } + + return HostContext.GetService(); + } + //create worker manager, create message listener and start listening to the queue private async Task RunAsync(RunnerSettings settings, bool runOnce = false) { try { Trace.Info(nameof(RunAsync)); - _listener = HostContext.GetService(); + _listener = GetMesageListener(settings); if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken)) { return Constants.Runner.ReturnCode.TerminatedError; diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs new file mode 100644 index 000000000..8f9b22e75 --- /dev/null +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.Pipelines; +using GitHub.DistributedTask.WebApi; +using GitHub.Services.Common; +using GitHub.Services.OAuth; +using GitHub.Services.WebApi; +using Sdk.RSWebApi.Contracts; +using Sdk.WebApi.WebApi; + +namespace GitHub.Actions.RunService.WebApi +{ + public class BrokerHttpClient : RawHttpClientBase + { + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials) + : base(baseUrl, credentials) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings) + : base(baseUrl, credentials, settings) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, handlers) + { + } + + public BrokerHttpClient( + Uri baseUrl, + VssOAuthCredential credentials, + RawClientHttpRequestSettings settings, + params DelegatingHandler[] handlers) + : base(baseUrl, credentials, settings, handlers) + { + } + + public BrokerHttpClient( + Uri baseUrl, + HttpMessageHandler pipeline, + Boolean disposeHandler) + : base(baseUrl, pipeline, disposeHandler) + { + } + + public Task GetRunnerMessageAsync( + string runnerVersion, + TaskAgentStatus? status, + CancellationToken cancellationToken = default + ) + { + var requestUri = new Uri(Client.BaseAddress, "message"); + + List> queryParams = new List>(); + + if (status != null) + { + queryParams.Add("status", status.Value.ToString()); + } + if (runnerVersion != null) + { + queryParams.Add("runnerVersion", runnerVersion); + } + + return SendAsync( + new HttpMethod("GET"), + requestUri: requestUri, + queryParameters: queryParams, + cancellationToken: cancellationToken); + } + } +}