Listen directly to Broker for messages (#2500)

This commit is contained in:
Luke Tomlinson
2023-03-24 14:00:34 -04:00
committed by GitHub
parent 74eeb82684
commit 92258f9ea1
5 changed files with 360 additions and 1 deletions

View File

@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
}
public sealed class BrokerServer : RunnerService, IBrokerServer
{
private bool _hasConnection;
private Uri _brokerUri;
private RawConnection _connection;
private BrokerHttpClient _brokerHttpClient;
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
_hasConnection = true;
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);
return jobMessage;
}
}
}

View File

@@ -53,6 +53,9 @@ namespace GitHub.Runner.Common
[DataMember(EmitDefaultValue = false)]
public bool UseV2Flow { get; set; }
[DataMember(EmitDefaultValue = false)]
public string ServerUrlV2 { get; set; }
[IgnoreDataMember]
public bool IsHostedServer
{

View File

@@ -0,0 +1,204 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener
{
public sealed class BrokerMessageListener : RunnerService, IMessageListener
{
private RunnerSettings _settings;
private ITerminal _term;
private TimeSpan _getNextMessageRetryInterval;
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer;
public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_term = HostContext.GetService<ITerminal>();
_brokerServer = HostContext.GetService<IBrokerServer>();
}
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{
await RefreshBrokerConnection();
return await Task.FromResult(true);
}
public async Task DeleteSessionAsync()
{
await Task.CompletedTask;
}
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
bool encounteringError = false;
int continuousError = 0;
Stopwatch heartbeat = new();
heartbeat.Restart();
var maxRetryCount = 10;
while (true)
{
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version);
if (message == null)
{
continue;
}
return message;
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
throw;
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
throw;
}
catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
{
throw;
}
catch (Exception ex)
{
Trace.Error("Catch exception during get next message.");
Trace.Error(ex);
if (!IsGetNextMessageExceptionRetriable(ex))
{
throw;
}
else
{
continuousError++;
//retry after a random backoff to avoid service throttling
//in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
if (continuousError <= 5)
{
// random backoff [15, 30]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
}
else if (continuousError >= maxRetryCount)
{
throw;
}
else
{
// more aggressive backoff [30, 60]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
}
if (!encounteringError)
{
//print error only on the first consecutive error
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
encounteringError = true;
}
// re-create VssConnection before next retry
await RefreshBrokerConnection();
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}
if (message == null)
{
if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
{
Trace.Info($"No message retrieved within last 30 minutes.");
heartbeat.Restart();
}
else
{
Trace.Verbose($"No message retrieved.");
}
continue;
}
Trace.Info($"Message '{message.MessageId}' received.");
}
}
public async Task DeleteMessageAsync(TaskAgentMessage message)
{
await Task.CompletedTask;
}
private bool IsGetNextMessageExceptionRetriable(Exception ex)
{
if (ex is TaskAgentNotFoundException ||
ex is TaskAgentPoolNotFoundException ||
ex is TaskAgentSessionExpiredException ||
ex is AccessDeniedException ||
ex is VssUnauthorizedException)
{
Trace.Info($"Non-retriable exception: {ex.Message}");
return false;
}
else
{
Trace.Info($"Retriable exception: {ex.Message}");
return true;
}
}
private async Task RefreshBrokerConnection()
{
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds);
}
}
}

View File

@@ -339,13 +339,25 @@ namespace GitHub.Runner.Listener
}
}
private IMessageListener GetMesageListener(RunnerSettings settings)
{
if (settings.UseV2Flow)
{
var brokerListener = new BrokerMessageListener();
brokerListener.Initialize(HostContext);
return brokerListener;
}
return HostContext.GetService<IMessageListener>();
}
//create worker manager, create message listener and start listening to the queue
private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{
try
{
Trace.Info(nameof(RunAsync));
_listener = HostContext.GetService<IMessageListener>();
_listener = GetMesageListener(settings);
if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken))
{
return Constants.Runner.ReturnCode.TerminatedError;

View File

@@ -0,0 +1,84 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi;
namespace GitHub.Actions.RunService.WebApi
{
public class BrokerHttpClient : RawHttpClientBase
{
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials)
: base(baseUrl, credentials)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: base(baseUrl, credentials, settings)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, settings, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
HttpMessageHandler pipeline,
Boolean disposeHandler)
: base(baseUrl, pipeline, disposeHandler)
{
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(
string runnerVersion,
TaskAgentStatus? status,
CancellationToken cancellationToken = default
)
{
var requestUri = new Uri(Client.BaseAddress, "message");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
return SendAsync<TaskAgentMessage>(
new HttpMethod("GET"),
requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken);
}
}
}