From a49a5bef65846a60a440576faaec5aa6117a4894 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 23 Mar 2023 07:00:27 -0700 Subject: [PATCH] . --- src/Runner.Listener/BrokerMessageListener.cs | 263 ++++++++----------- 1 file changed, 115 insertions(+), 148 deletions(-) diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 40a73dae0..e4f64ae6a 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -19,16 +19,9 @@ namespace GitHub.Runner.Listener { public sealed class BrokerMessageListener : RunnerService, IMessageListener { - // private long? _lastMessageId; - // private RunnerSettings _settings; - // private ITerminal _term; - // private IRunnerServer _runnerServer; - // private TaskAgentSession _session; - // private TimeSpan _getNextMessageRetryInterval; - // private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); - // private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); - // private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); - // private readonly Dictionary _sessionCreationExceptionTracker = new(); + private RunnerSettings _settings; + private ITerminal _term; + private TimeSpan _getNextMessageRetryInterval; private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private IBrokerServer _brokerServer; @@ -38,15 +31,13 @@ namespace GitHub.Runner.Listener { base.Initialize(hostContext); - // _term = HostContext.GetService(); + _term = HostContext.GetService(); _brokerServer = HostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) { - var credMgr = HostContext.GetService(); - VssCredentials creds = credMgr.LoadCredentials(); - await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); + await RefreshBrokerConnection(); return await Task.FromResult(true); } @@ -73,149 +64,119 @@ namespace GitHub.Runner.Listener public async Task GetNextMessageAsync(CancellationToken token) { + bool encounteringError = false; + int continuousError = 0; + Stopwatch heartbeat = new(); + heartbeat.Restart(); + while (true) { + TaskAgentMessage message = null; _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); - - if (message == null) + try { - continue; - } + message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); - if (MessageUtil.IsRunServiceJob(message.MessageType)) - { - var messageRef = StringUtil.ConvertFromJson(message.Body); - - if (messageRef.RunnerRequestId != lastRunnerRequestId) + if (message == null) + { + continue; + } + + if (MessageUtil.IsRunServiceJob(message.MessageType)) + { + var messageRef = StringUtil.ConvertFromJson(message.Body); + + if (messageRef.RunnerRequestId != lastRunnerRequestId) + { + lastRunnerRequestId = messageRef.RunnerRequestId; + return message; + } + } + else { - lastRunnerRequestId = messageRef.RunnerRequestId; return message; } } - else + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) { - return message; + Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + continue; } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info("Get next message has been cancelled."); + throw; + } + catch (TaskAgentAccessTokenExpiredException) + { + Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); + throw; + } + catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) + { + throw; + } + catch (Exception ex) + { + Trace.Error("Catch exception during get next message."); + Trace.Error(ex); + + if (!IsGetNextMessageExceptionRetriable(ex)) + { + throw; + } + else + { + continuousError++; + //retry after a random backoff to avoid service throttling + //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. + if (continuousError <= 5) + { + // random backoff [15, 30] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); + } + else + { + // more aggressive backoff [30, 60] + _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); + } + + if (!encounteringError) + { + //print error only on the first consecutive error + _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + encounteringError = true; + } + + // re-create VssConnection before next retry + await RefreshBrokerConnection(); + + Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); + await HostContext.Delay(_getNextMessageRetryInterval, token); + } + } + finally + { + _getMessagesTokenSource.Dispose(); + } + + if (message == null) + { + if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) + { + Trace.Info($"No message retrieved within last 30 minutes."); + heartbeat.Restart(); + } + else + { + Trace.Verbose($"No message retrieved."); + } + + continue; + } + + Trace.Info($"Message '{message.MessageId}' received."); } - - // return message; - // Trace.Entering(); - // ArgUtil.NotNull(_settings, nameof(_settings)); - // bool encounteringError = false; - // int continuousError = 0; - // string errorMessage = string.Empty; - // Stopwatch heartbeat = new(); - // heartbeat.Restart(); - // while (true) - // { - // token.ThrowIfCancellationRequested(); - // TaskAgentMessage message = null; - // _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - // try - // { - // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, - // _session.SessionId, - // _lastMessageId, - // runnerStatus, - // BuildConstants.RunnerPackage.Version, - // _getMessagesTokenSource.Token); - - - // if (message != null) - // { - // _lastMessageId = message.MessageId; - // } - - // if (encounteringError) //print the message once only if there was an error - // { - // _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); - // encounteringError = false; - // continuousError = 0; - // } - // } - // catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) - // { - // Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); - // continue; - // } - // catch (OperationCanceledException) when (token.IsCancellationRequested) - // { - // Trace.Info("Get next message has been cancelled."); - // throw; - // } - // catch (TaskAgentAccessTokenExpiredException) - // { - // Trace.Info("Runner OAuth token has been revoked. Unable to pull message."); - // throw; - // } - // catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException) - // { - // throw; - // } - // catch (Exception ex) - // { - // Trace.Error("Catch exception during get next message."); - // Trace.Error(ex); - - // // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. - // if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) - // { - // Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); - // } - // else if (!IsGetNextMessageExceptionRetriable(ex)) - // { - // throw; - // } - // else - // { - // continuousError++; - // //retry after a random backoff to avoid service throttling - // //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time. - // if (continuousError <= 5) - // { - // // random backoff [15, 30] - // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval); - // } - // else - // { - // // more aggressive backoff [30, 60] - // _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval); - // } - - // if (!encounteringError) - // { - // //print error only on the first consecutive error - // _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); - // encounteringError = true; - // } - - // Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); - // await HostContext.Delay(_getNextMessageRetryInterval, token); - // } - // } - // finally - // { - // _getMessagesTokenSource.Dispose(); - // } - - // if (message == null) - // { - // if (heartbeat.Elapsed > TimeSpan.FromMinutes(30)) - // { - // Trace.Info($"No message retrieved within last 30 minutes."); - // heartbeat.Restart(); - // } - // else - // { - // Trace.Verbose($"No message retrieved"); - // } - - // continue; - // } - - // Trace.Info($"Message '{message.MessageId}' received"); - // return message; } public async Task DeleteMessageAsync(TaskAgentMessage message) @@ -241,9 +202,15 @@ namespace GitHub.Runner.Listener } } - // private GetMessageAsync(string status, string version) - // { + private async Task RefreshBrokerConnection() + { - // } + var configManager = HostContext.GetService(); + _settings = configManager.LoadSettings(); + + var credMgr = HostContext.GetService(); + VssCredentials creds = credMgr.LoadCredentials(); + await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds); + } } }