This commit is contained in:
Luke Tomlinson
2023-03-22 12:55:15 -07:00
parent df885279a1
commit af657acebc
5 changed files with 261 additions and 180 deletions

6
.vscode/launch.json vendored
View File

@@ -24,7 +24,10 @@
],
"cwd": "${workspaceFolder}/src",
"console": "integratedTerminal",
"requireExactSource": false
"requireExactSource": false,
"env": {
"USE_BROKER_FLOW": "1"
}
},
{
"name": "Configure",
@@ -55,4 +58,3 @@
},
],
}

View File

@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token);
}
public sealed class BrokerServer : RunnerService, IBrokerServer
{
private bool _hasConnection;
private Uri _brokerUri;
private RawConnection _connection;
private BrokerHttpClient _brokerHttpClient;
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
_hasConnection = true;
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken);
return jobMessage;
}
}
}

View File

@@ -18,28 +18,33 @@ namespace GitHub.Runner.Listener
{
public sealed class BrokerMessageListener : RunnerService, IMessageListener
{
private long? _lastMessageId;
private RunnerSettings _settings;
private ITerminal _term;
private IRunnerServer _runnerServer;
private TaskAgentSession _session;
private TimeSpan _getNextMessageRetryInterval;
private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30);
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
// private long? _lastMessageId;
// private RunnerSettings _settings;
// private ITerminal _term;
// private IRunnerServer _runnerServer;
// private TaskAgentSession _session;
// private TimeSpan _getNextMessageRetryInterval;
// private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30);
// private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
// private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
// private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer;
public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_term = HostContext.GetService<ITerminal>();
// _term = HostContext.GetService<ITerminal>();
_brokerServer = HostContext.GetService<IBrokerServer>();
}
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri("http://broker.actions.localhost"), creds);
return await Task.FromResult(true);
}
@@ -50,8 +55,7 @@ namespace GitHub.Runner.Listener
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
@@ -62,128 +66,139 @@ namespace GitHub.Runner.Listener
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
}
public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
Trace.Entering();
ArgUtil.NotNull(_settings, nameof(_settings));
bool encounteringError = false;
int continuousError = 0;
string errorMessage = string.Empty;
Stopwatch heartbeat = new();
heartbeat.Restart();
while (true)
{
token.ThrowIfCancellationRequested();
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
runnerStatus,
BuildConstants.RunnerPackage.Version,
_getMessagesTokenSource.Token);
var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token);
if (message != null)
{
_lastMessageId = message.MessageId;
}
if (encounteringError) //print the message once only if there was an error
{
_term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected.");
encounteringError = false;
continuousError = 0;
}
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
throw;
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
throw;
}
catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
{
throw;
}
catch (Exception ex)
{
Trace.Error("Catch exception during get next message.");
Trace.Error(ex);
// don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs.
if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token))
{
Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session.");
}
else if (!IsGetNextMessageExceptionRetriable(ex))
{
throw;
}
else
{
continuousError++;
//retry after a random backoff to avoid service throttling
//in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
if (continuousError <= 5)
{
// random backoff [15, 30]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
}
else
{
// more aggressive backoff [30, 60]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
}
if (!encounteringError)
{
//print error only on the first consecutive error
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
encounteringError = true;
}
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}
if (message == null)
{
if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
{
Trace.Info($"No message retrieved within last 30 minutes.");
heartbeat.Restart();
}
else
{
Trace.Verbose($"No message retrieved");
}
continue;
}
Trace.Info($"Message '{message.MessageId}' received");
return message;
}
}
// return message;
// Trace.Entering();
// ArgUtil.NotNull(_settings, nameof(_settings));
// bool encounteringError = false;
// int continuousError = 0;
// string errorMessage = string.Empty;
// Stopwatch heartbeat = new();
// heartbeat.Restart();
// while (true)
// {
// token.ThrowIfCancellationRequested();
// TaskAgentMessage message = null;
// _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
// try
// {
// message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
// _session.SessionId,
// _lastMessageId,
// runnerStatus,
// BuildConstants.RunnerPackage.Version,
// _getMessagesTokenSource.Token);
// if (message != null)
// {
// _lastMessageId = message.MessageId;
// }
// if (encounteringError) //print the message once only if there was an error
// {
// _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected.");
// encounteringError = false;
// continuousError = 0;
// }
// }
// catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
// {
// Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
// continue;
// }
// catch (OperationCanceledException) when (token.IsCancellationRequested)
// {
// Trace.Info("Get next message has been cancelled.");
// throw;
// }
// catch (TaskAgentAccessTokenExpiredException)
// {
// Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
// throw;
// }
// catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
// {
// throw;
// }
// catch (Exception ex)
// {
// Trace.Error("Catch exception during get next message.");
// Trace.Error(ex);
// // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs.
// if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token))
// {
// Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session.");
// }
// else if (!IsGetNextMessageExceptionRetriable(ex))
// {
// throw;
// }
// else
// {
// continuousError++;
// //retry after a random backoff to avoid service throttling
// //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
// if (continuousError <= 5)
// {
// // random backoff [15, 30]
// _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
// }
// else
// {
// // more aggressive backoff [30, 60]
// _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
// }
// if (!encounteringError)
// {
// //print error only on the first consecutive error
// _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
// encounteringError = true;
// }
// Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
// await HostContext.Delay(_getNextMessageRetryInterval, token);
// }
// }
// finally
// {
// _getMessagesTokenSource.Dispose();
// }
// if (message == null)
// {
// if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
// {
// Trace.Info($"No message retrieved within last 30 minutes.");
// heartbeat.Restart();
// }
// else
// {
// Trace.Verbose($"No message retrieved");
// }
// continue;
// }
// Trace.Info($"Message '{message.MessageId}' received");
// return message;
}
public async Task DeleteMessageAsync(TaskAgentMessage message)
@@ -209,9 +224,9 @@ namespace GitHub.Runner.Listener
}
}
private <TaskAgentMessage> GetMessageAsync(string status, string version)
{
// private <TaskAgentMessage> GetMessageAsync(string status, string version)
// {
}
// }
}
}

View File

@@ -16,7 +16,7 @@ using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener
{
[ServiceLocator(Default = typeof(MessageListener))]
[ServiceLocator(Default = typeof(BrokerMessageListener))]
public interface IMessageListener : IRunnerService
{
Task<Boolean> CreateSessionAsync(CancellationToken token);

View File

@@ -1,61 +1,69 @@
using System;
using System.IO;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Services.Results.Contracts;
using System.Net.Http.Formatting;
using Sdk.WebApi.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi;
namespace GitHub.Services.Results.Client
namespace GitHub.Actions.RunService.WebApi
{
public class BrokerHttpClient : RawHttpClientBase
{
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials)
: base(baseUrl, credentials)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: base(baseUrl, credentials, settings)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, settings, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
HttpMessageHandler pipeline,
string token,
bool disposeHandler)
Boolean disposeHandler)
: base(baseUrl, pipeline, disposeHandler)
{
m_token = token;
m_brokerUrl = baseUrl;
m_formatter = new JsonMediaTypeFormatter();
}
public async Task<TaskAgentMessage> GetMessagesAsync(CancellationToken cancellationToken)
public Task<TaskAgentMessage> GetRunnerMessageAsync(
CancellationToken cancellationToken = default)
{
var uri = new Uri(m_brokerUrl, Constants.Messages);
return await GetSignedURLResponse<TaskAgentMessage>(uri, cancellationToken);
}
var requestUri = new Uri(Client.BaseAddress, "message");
// Get Sas URL calls
private async Task<T> GetSignedURLResponse<T>(Uri uri, CancellationToken cancellationToken)
{
using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Get, uri))
{
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token);
requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json"));
using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken))
{
return await ReadJsonContentAsync<T>(response, cancellationToken);
return SendAsync<TaskAgentMessage>(
new HttpMethod("GET"),
requestUri: requestUri,
cancellationToken: cancellationToken);
}
}
}
private MediaTypeFormatter m_formatter;
private Uri m_brokerUrl;
private string m_token;
}
// Constants specific to results
public static class Constants
{
public static readonly string Messages = "messages";
}
}