Acknowledge runner request (#3996)

This commit is contained in:
eric sciple
2025-08-22 13:52:32 -05:00
committed by GitHub
parent 20d82ad357
commit 5f1efec208
7 changed files with 155 additions and 19 deletions

View File

@@ -23,6 +23,8 @@ namespace GitHub.Runner.Common
Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token);
Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken token);
Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials);
Task ForceRefreshConnection(VssCredentials credentials);
@@ -67,10 +69,17 @@ namespace GitHub.Runner.Common
var brokerSession = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken, shouldRetry: ShouldRetryException);
return brokerSession;
}
public async Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken cancellationToken)
{
CheckConnection();
// No retries
await _brokerHttpClient.AcknowledgeRunnerRequestAsync(runnerRequestId, sessionId, version, status, os, architecture, cancellationToken);
}
public async Task DeleteSessionAsync(CancellationToken cancellationToken)
{
CheckConnection();

View File

@@ -70,7 +70,7 @@ namespace GitHub.Runner.Common
protected async Task RetryRequest(Func<Task> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5,
int maxAttempts = 5,
Func<Exception, bool> shouldRetry = null
)
{
@@ -79,31 +79,31 @@ namespace GitHub.Runner.Common
await func();
return Unit.Value;
}
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxRetryAttemptsCount, shouldRetry);
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxAttempts, shouldRetry);
}
protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5,
int maxAttempts = 5,
Func<Exception, bool> shouldRetry = null
)
{
var retryCount = 0;
var attempt = 0;
while (true)
{
retryCount++;
attempt++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount && (shouldRetry == null || shouldRetry(ex)))
catch (Exception ex) when (attempt < maxAttempts && (shouldRetry == null || shouldRetry(ex)))
{
Trace.Error("Catch exception during request");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxAttempts - attempt} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}

View File

@@ -23,7 +23,7 @@ namespace GitHub.Runner.Listener
private RunnerSettings _settings;
private ITerminal _term;
private TimeSpan _getNextMessageRetryInterval;
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;
private VssCredentials _credsV2;
@@ -258,7 +258,7 @@ namespace GitHub.Runner.Listener
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
_runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
@@ -291,7 +291,7 @@ namespace GitHub.Runner.Listener
}
message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
@@ -417,6 +417,21 @@ namespace GitHub.Runner.Listener
await Task.CompletedTask;
}
public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
Trace.Info($"Acknowledging runner request '{runnerRequestId}'.");
await _brokerServer.AcknowledgeRunnerRequestAsync(
runnerRequestId,
_session.SessionId,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
linkedCts.Token);
}
private bool IsGetNextMessageExceptionRetriable(Exception ex)
{
if (ex is TaskAgentNotFoundException ||

View File

@@ -32,6 +32,7 @@ namespace GitHub.Runner.Listener
Task DeleteSessionAsync();
Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token);
Task DeleteMessageAsync(TaskAgentMessage message);
Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken);
Task RefreshListenerTokenAsync();
void OnJobStatus(object sender, JobStatusEventArgs e);
@@ -52,7 +53,7 @@ namespace GitHub.Runner.Listener
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;
private VssCredentials _credsV2;
@@ -217,7 +218,7 @@ namespace GitHub.Runner.Listener
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
_runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
@@ -250,7 +251,7 @@ namespace GitHub.Runner.Listener
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
@@ -274,7 +275,7 @@ namespace GitHub.Runner.Listener
}
message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
@@ -437,6 +438,21 @@ namespace GitHub.Runner.Listener
await _brokerServer.ForceRefreshConnection(_credsV2);
}
public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
Trace.Info($"Acknowledging runner request '{runnerRequestId}'.");
await _brokerServer.AcknowledgeRunnerRequestAsync(
runnerRequestId,
_session.SessionId,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
linkedCts.Token);
}
private TaskAgentMessage DecryptMessage(TaskAgentMessage message)
{
if (_session.EncryptionKey == null ||

View File

@@ -654,22 +654,42 @@ namespace GitHub.Runner.Listener
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;
// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
// Acknowledge (best-effort)
if (messageRef.ShouldAcknowledge) // Temporary feature flag
{
try
{
await _listener.AcknowledgeMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
catch (Exception ex)
{
Trace.Error($"Best-effort acknowledge failed for request '{messageRef.RunnerRequestId}'");
Trace.Error(ex);
}
}
Pipelines.AgentJobRequestMessage jobRequestMessage = null;
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{
// Connect
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials(allowAuthUrlV2: false);
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
// Get job message
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
// Connect
var credMgr = HostContext.GetService<ICredentialManager>();
var credsV2 = credMgr.LoadCredentials(allowAuthUrlV2: true);
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), credsV2);
// Get job message
try
{
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageRef.BillingOwnerId, messageQueueLoopTokenSource.Token);
@@ -698,7 +718,10 @@ namespace GitHub.Runner.Listener
}
}
// Dispatch
jobDispatcher.Run(jobRequestMessage, runOnce);
// Run once?
if (runOnce)
{
Trace.Info("One time used runner received job message.");

View File

@@ -10,6 +10,9 @@ namespace GitHub.Runner.Listener
[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }
[DataMember(Name = "should_acknowledge")]
public bool ShouldAcknowledge { get; set; }
[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }

View File

@@ -79,6 +79,7 @@ namespace GitHub.Actions.RunService.WebApi
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
@@ -142,7 +143,6 @@ namespace GitHub.Actions.RunService.WebApi
}
public async Task<TaskAgentSession> CreateSessionAsync(
TaskAgentSession session,
CancellationToken cancellationToken = default)
{
@@ -191,6 +191,76 @@ namespace GitHub.Actions.RunService.WebApi
throw new Exception($"Failed to delete broker session: {result.Error}");
}
public async Task AcknowledgeRunnerRequestAsync(
string runnerRequestId,
Guid? sessionId,
string runnerVersion,
TaskAgentStatus? status,
string os = null,
string architecture = null,
CancellationToken cancellationToken = default)
{
// URL
var requestUri = new Uri(Client.BaseAddress, "acknowledge");
// Query parameters
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
if (os != null)
{
queryParams.Add("os", os);
}
if (architecture != null)
{
queryParams.Add("architecture", architecture);
}
// Body
var payload = new Dictionary<string, string>
{
["runnerRequestId"] = runnerRequestId,
};
var requestContent = new ObjectContent<Dictionary<string, string>>(payload, new VssJsonMediaTypeFormatter(true));
// POST
var result = await SendAsync<object>(
new HttpMethod("POST"),
requestUri: requestUri,
queryParameters: queryParams,
content: requestContent,
readErrorBody: true,
cancellationToken: cancellationToken);
if (result.IsSuccess)
{
return;
}
if (TryParseErrorBody(result.ErrorBody, out BrokerError brokerError))
{
switch (brokerError.ErrorKind)
{
case BrokerErrorKind.RunnerNotFound:
throw new RunnerNotFoundException(brokerError.Message);
default:
break;
}
}
throw new Exception($"Failed to acknowledge runner request. Request to {requestUri} failed with status: {result.StatusCode}. Error message {result.Error}");
}
private static bool TryParseErrorBody(string errorBody, out BrokerError error)
{
if (!string.IsNullOrEmpty(errorBody))