This commit is contained in:
Luke Tomlinson
2023-03-22 15:20:31 -07:00
parent af657acebc
commit ee19ca253e
4 changed files with 43 additions and 8 deletions

5
.vscode/launch.json vendored
View File

@@ -12,7 +12,10 @@
], ],
"cwd": "${workspaceFolder}/src", "cwd": "${workspaceFolder}/src",
"console": "integratedTerminal", "console": "integratedTerminal",
"requireExactSource": false "requireExactSource": false,
"env": {
"USE_BROKER_FLOW": "1"
}
}, },
{ {
"name": "Run", "name": "Run",

View File

@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
{ {
Task ConnectAsync(Uri serverUrl, VssCredentials credentials); Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token); Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
} }
public sealed class BrokerServer : RunnerService, IBrokerServer public sealed class BrokerServer : RunnerService, IBrokerServer
@@ -44,11 +44,11 @@ namespace GitHub.Runner.Common
} }
} }
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken) public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{ {
CheckConnection(); CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>( var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken); async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);
return jobMessage; return jobMessage;
} }

View File

@@ -12,6 +12,7 @@ using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth; using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener namespace GitHub.Runner.Listener
@@ -31,6 +32,7 @@ namespace GitHub.Runner.Listener
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource; private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer; private IBrokerServer _brokerServer;
private string lastRunnerRequestId;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
@@ -74,12 +76,27 @@ namespace GitHub.Runner.Listener
while (true) while (true)
{ {
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token); var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version);
if (message != null)
if (message == null)
{
continue;
}
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
if (messageRef.RunnerRequestId != lastRunnerRequestId)
{
lastRunnerRequestId = messageRef.RunnerRequestId;
return message;
}
}
else
{ {
return message; return message;
} }
} }
// return message; // return message;

View File

@@ -56,13 +56,28 @@ namespace GitHub.Actions.RunService.WebApi
} }
public Task<TaskAgentMessage> GetRunnerMessageAsync( public Task<TaskAgentMessage> GetRunnerMessageAsync(
CancellationToken cancellationToken = default) string runnerVersion,
TaskAgentStatus? status,
CancellationToken cancellationToken = default
)
{ {
var requestUri = new Uri(Client.BaseAddress, "message"); var requestUri = new Uri(Client.BaseAddress, "message");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
return SendAsync<TaskAgentMessage>( return SendAsync<TaskAgentMessage>(
new HttpMethod("GET"), new HttpMethod("GET"),
requestUri: requestUri, requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken); cancellationToken: cancellationToken);
} }
} }