diff --git a/.vscode/launch.json b/.vscode/launch.json index 270f1d6f5..887789e3b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,10 @@ ], "cwd": "${workspaceFolder}/src", "console": "integratedTerminal", - "requireExactSource": false + "requireExactSource": false, + "env": { + "USE_BROKER_FLOW": "1" + } }, { "name": "Run", diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 284616210..bb0691771 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -17,7 +17,7 @@ namespace GitHub.Runner.Common { Task ConnectAsync(Uri serverUrl, VssCredentials credentials); - Task GetRunnerMessageAsync(CancellationToken token); + Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); } public sealed class BrokerServer : RunnerService, IBrokerServer @@ -44,11 +44,11 @@ namespace GitHub.Runner.Common } } - public Task GetRunnerMessageAsync(CancellationToken cancellationToken) + public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) { CheckConnection(); var jobMessage = RetryRequest( - async () => await _brokerHttpClient.GetRunnerMessageAsync(cancellationToken), cancellationToken); + async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken); return jobMessage; } diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index 35a37969e..40a73dae0 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -12,6 +12,7 @@ using GitHub.Runner.Common; using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; using GitHub.Services.Common; +using GitHub.Runner.Common.Util; using GitHub.Services.OAuth; namespace GitHub.Runner.Listener @@ -31,6 +32,7 @@ namespace GitHub.Runner.Listener private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private IBrokerServer _brokerServer; + private string lastRunnerRequestId; public override void Initialize(IHostContext hostContext) { @@ -74,12 +76,27 @@ namespace GitHub.Runner.Listener while (true) { _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); - var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token); - if (message != null) + var message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version); + + if (message == null) + { + continue; + } + + if (MessageUtil.IsRunServiceJob(message.MessageType)) + { + var messageRef = StringUtil.ConvertFromJson(message.Body); + + if (messageRef.RunnerRequestId != lastRunnerRequestId) + { + lastRunnerRequestId = messageRef.RunnerRequestId; + return message; + } + } + else { return message; } - } // return message; diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index 40563b351..8f9b22e75 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -56,13 +56,28 @@ namespace GitHub.Actions.RunService.WebApi } public Task GetRunnerMessageAsync( - CancellationToken cancellationToken = default) + string runnerVersion, + TaskAgentStatus? status, + CancellationToken cancellationToken = default + ) { var requestUri = new Uri(Client.BaseAddress, "message"); + List> queryParams = new List>(); + + if (status != null) + { + queryParams.Add("status", status.Value.ToString()); + } + if (runnerVersion != null) + { + queryParams.Add("runnerVersion", runnerVersion); + } + return SendAsync( new HttpMethod("GET"), requestUri: requestUri, + queryParameters: queryParams, cancellationToken: cancellationToken); } }