This commit is contained in:
Patrick Ellis
2023-05-18 13:21:32 -07:00
committed by GitHub
parent 3b149bd8db
commit 1558d47a31
4 changed files with 29 additions and 16 deletions

View File

@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi; using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common.Util;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts; using Sdk.RSWebApi.Contracts;
@@ -18,6 +19,7 @@ namespace GitHub.Runner.Common
Task ConnectAsync(Uri serverUrl, VssCredentials credentials); Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
Task<bool> DeleteRunnerMessageAsync(string messageID, CancellationToken token);
} }
public sealed class BrokerServer : RunnerService, IBrokerServer public sealed class BrokerServer : RunnerService, IBrokerServer
@@ -48,9 +50,20 @@ namespace GitHub.Runner.Common
{ {
CheckConnection(); CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>( var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken); async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken
);
return jobMessage; return jobMessage;
} }
public Task<bool> DeleteRunnerMessageAsync(string messageID, CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest(
async () => await _brokerHttpClient.DeleteRunnerMessageAsync(messageID, cancellationToken),
cancellationToken
);
}
} }
} }

View File

@@ -168,10 +168,19 @@ namespace GitHub.Runner.Listener
} }
} }
public async Task DeleteMessageAsync(string messageID, CancellationToken token) public async Task DeleteMessageAsync(TaskAgentMessage message)
{ {
Trace.Entering(); Trace.Entering();
await _brokerServer.DeleteRunnerMessageAsync(messageID, token);
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
await _brokerServer.DeleteRunnerMessageAsync(messageRef.RunnerRequestId, cs.Token);
}
}
} }
private bool IsGetNextMessageExceptionRetriable(Exception ex) private bool IsGetNextMessageExceptionRetriable(Exception ex)

View File

@@ -393,8 +393,6 @@ namespace GitHub.Runner.Listener
TaskAgentMessage message = null; TaskAgentMessage message = null;
bool skipMessageDeletion = false; bool skipMessageDeletion = false;
bool useBrokerDeletion = false; bool useBrokerDeletion = false;
string brokerDeletionParamsMessageID = null;
CancellationToken brokerDeletionParamsToken = null;
try try
{ {
Task<TaskAgentMessage> getNextMessage = _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token); Task<TaskAgentMessage> getNextMessage = _listener.GetNextMessageAsync(messageQueueLoopTokenSource.Token);
@@ -557,9 +555,6 @@ namespace GitHub.Runner.Listener
jobRequestMessage = jobRequestMessage =
await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, await runServer.GetJobMessageAsync(messageRef.RunnerRequestId,
messageQueueLoopTokenSource.Token); messageQueueLoopTokenSource.Token);
useBrokerDeletion = true;
brokerDeletionParamsMessageID = messageRef.RunnerRequestId;
brokerDeletionParamsToken = messageQueueLoopTokenSource.Token;
} }
catch (TaskOrchestrationJobAlreadyAcquiredException) catch (TaskOrchestrationJobAlreadyAcquiredException)
{ {
@@ -608,7 +603,8 @@ namespace GitHub.Runner.Listener
{ {
if (useBrokerDeletion) if (useBrokerDeletion)
{ {
await _listener.DeleteMessageAsync(brokerDeletionParamsMessageID, brokerDeletionParamsToken); // await _listener.DeleteMessageAsync(brokerDeletionParamsMessageID, brokerDeletionParamsToken);
await _listener.DeleteMessageAsync(message);
} }
else else
{ {

View File

@@ -89,7 +89,7 @@ namespace GitHub.Actions.RunService.WebApi
throw new Exception($"Failed to get job message: {result.Error}"); throw new Exception($"Failed to get job message: {result.Error}");
} }
public async Task<TaskAgentMessage> DeleteRunnerMessageAsync( public async Task<bool> DeleteRunnerMessageAsync(
string messageID, string messageID,
CancellationToken cancellationToken = default CancellationToken cancellationToken = default
) )
@@ -109,12 +109,7 @@ namespace GitHub.Actions.RunService.WebApi
queryParameters: queryParams, queryParameters: queryParams,
cancellationToken: cancellationToken); cancellationToken: cancellationToken);
if (result.IsSuccess) return (result.IsSuccess);
{
return result.Value;
}
throw new Exception($"Failed to delete job message: {result.Error}");
} }
} }
} }