This commit is contained in:
Luke Tomlinson
2023-03-23 07:00:27 -07:00
parent ee19ca253e
commit a49a5bef65

View File

@@ -19,16 +19,9 @@ namespace GitHub.Runner.Listener
{ {
public sealed class BrokerMessageListener : RunnerService, IMessageListener public sealed class BrokerMessageListener : RunnerService, IMessageListener
{ {
// private long? _lastMessageId; private RunnerSettings _settings;
// private RunnerSettings _settings; private ITerminal _term;
// private ITerminal _term; private TimeSpan _getNextMessageRetryInterval;
// private IRunnerServer _runnerServer;
// private TaskAgentSession _session;
// private TimeSpan _getNextMessageRetryInterval;
// private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30);
// private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
// private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
// private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource; private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer; private IBrokerServer _brokerServer;
@@ -38,15 +31,13 @@ namespace GitHub.Runner.Listener
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
// _term = HostContext.GetService<ITerminal>(); _term = HostContext.GetService<ITerminal>();
_brokerServer = HostContext.GetService<IBrokerServer>(); _brokerServer = HostContext.GetService<IBrokerServer>();
} }
public async Task<Boolean> CreateSessionAsync(CancellationToken token) public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{ {
var credMgr = HostContext.GetService<ICredentialManager>(); await RefreshBrokerConnection();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@@ -73,10 +64,18 @@ namespace GitHub.Runner.Listener
public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token) public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{ {
bool encounteringError = false;
int continuousError = 0;
Stopwatch heartbeat = new();
heartbeat.Restart();
while (true) while (true)
{ {
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version);
if (message == null) if (message == null)
{ {
@@ -98,124 +97,86 @@ namespace GitHub.Runner.Listener
return message; return message;
} }
} }
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
throw;
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
throw;
}
catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
{
throw;
}
catch (Exception ex)
{
Trace.Error("Catch exception during get next message.");
Trace.Error(ex);
// return message; if (!IsGetNextMessageExceptionRetriable(ex))
// Trace.Entering(); {
// ArgUtil.NotNull(_settings, nameof(_settings)); throw;
// bool encounteringError = false; }
// int continuousError = 0; else
// string errorMessage = string.Empty; {
// Stopwatch heartbeat = new(); continuousError++;
// heartbeat.Restart(); //retry after a random backoff to avoid service throttling
// while (true) //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
// { if (continuousError <= 5)
// token.ThrowIfCancellationRequested(); {
// TaskAgentMessage message = null; // random backoff [15, 30]
// _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
// try }
// { else
// message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, {
// _session.SessionId, // more aggressive backoff [30, 60]
// _lastMessageId, _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
// runnerStatus, }
// BuildConstants.RunnerPackage.Version,
// _getMessagesTokenSource.Token);
if (!encounteringError)
{
//print error only on the first consecutive error
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
encounteringError = true;
}
// if (message != null) // re-create VssConnection before next retry
// { await RefreshBrokerConnection();
// _lastMessageId = message.MessageId;
// }
// if (encounteringError) //print the message once only if there was an error Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
// { await HostContext.Delay(_getNextMessageRetryInterval, token);
// _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); }
// encounteringError = false; }
// continuousError = 0; finally
// } {
// } _getMessagesTokenSource.Dispose();
// catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) }
// {
// Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
// continue;
// }
// catch (OperationCanceledException) when (token.IsCancellationRequested)
// {
// Trace.Info("Get next message has been cancelled.");
// throw;
// }
// catch (TaskAgentAccessTokenExpiredException)
// {
// Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
// throw;
// }
// catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
// {
// throw;
// }
// catch (Exception ex)
// {
// Trace.Error("Catch exception during get next message.");
// Trace.Error(ex);
// // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs. if (message == null)
// if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token)) {
// { if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
// Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session."); {
// } Trace.Info($"No message retrieved within last 30 minutes.");
// else if (!IsGetNextMessageExceptionRetriable(ex)) heartbeat.Restart();
// { }
// throw; else
// } {
// else Trace.Verbose($"No message retrieved.");
// { }
// continuousError++;
// //retry after a random backoff to avoid service throttling
// //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
// if (continuousError <= 5)
// {
// // random backoff [15, 30]
// _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
// }
// else
// {
// // more aggressive backoff [30, 60]
// _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
// }
// if (!encounteringError) continue;
// { }
// //print error only on the first consecutive error
// _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
// encounteringError = true;
// }
// Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); Trace.Info($"Message '{message.MessageId}' received.");
// await HostContext.Delay(_getNextMessageRetryInterval, token); }
// }
// }
// finally
// {
// _getMessagesTokenSource.Dispose();
// }
// if (message == null)
// {
// if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
// {
// Trace.Info($"No message retrieved within last 30 minutes.");
// heartbeat.Restart();
// }
// else
// {
// Trace.Verbose($"No message retrieved");
// }
// continue;
// }
// Trace.Info($"Message '{message.MessageId}' received");
// return message;
} }
public async Task DeleteMessageAsync(TaskAgentMessage message) public async Task DeleteMessageAsync(TaskAgentMessage message)
@@ -241,9 +202,15 @@ namespace GitHub.Runner.Listener
} }
} }
// private <TaskAgentMessage> GetMessageAsync(string status, string version) private async Task RefreshBrokerConnection()
// { {
// } var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds);
}
} }
} }