From 32282ea36b8338defc3264944361249ea5156a30 Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Thu, 18 Jan 2024 15:04:51 -0800 Subject: [PATCH] Implement Broker Redirects for Session and Messages --- src/Runner.Common/BrokerServer.cs | 16 +++++++-- src/Runner.Listener/BrokerMessageListener.cs | 1 + src/Runner.Listener/MessageListener.cs | 36 +++++++++++++++++-- .../DTWebApi/WebApi/BrokerMigrationMessage.cs | 35 ++++++++++++++++++ src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs | 7 ++++ src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 33 +++++++++++++++++ 6 files changed, 122 insertions(+), 6 deletions(-) create mode 100644 src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 9d5287a2f..21ca070f6 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -17,7 +17,8 @@ namespace GitHub.Runner.Common { Task ConnectAsync(Uri serverUrl, VssCredentials credentials); - Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate); + Task CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session); + Task GetRunnerMessageAsync(CancellationToken token, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate); } public sealed class BrokerServer : RunnerService, IBrokerServer @@ -44,11 +45,20 @@ namespace GitHub.Runner.Common } } - public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate) + public Task CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session) + { + CheckConnection(); + var jobMessage = RetryRequest( + async () => await _brokerHttpClient.CreateSessionAsync(session, cancellationToken), cancellationToken); + + return jobMessage; + } + + public Task GetRunnerMessageAsync(CancellationToken cancellationToken, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate) { CheckConnection(); var jobMessage = RetryRequest( - async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken); + async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken); return jobMessage; } diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index f3fd33fd6..e236da1d5 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -74,6 +74,7 @@ namespace GitHub.Runner.Listener try { message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, + null, runnerStatus, BuildConstants.RunnerPackage.Version, VarUtil.OS, diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 113573b6a..4954773c7 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -14,6 +14,7 @@ using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; using GitHub.Services.Common; using GitHub.Services.OAuth; +using GitHub.Services.WebApi; namespace GitHub.Runner.Listener { @@ -33,6 +34,7 @@ namespace GitHub.Runner.Listener private RunnerSettings _settings; private ITerminal _term; private IRunnerServer _runnerServer; + private IBrokerServer _brokerServer; private TaskAgentSession _session; private TimeSpan _getNextMessageRetryInterval; private bool _accessTokenRevoked = false; @@ -42,6 +44,7 @@ namespace GitHub.Runner.Listener private readonly Dictionary _sessionCreationExceptionTracker = new(); private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; + private VssCredentials _creds; public override void Initialize(IHostContext hostContext) { @@ -49,6 +52,7 @@ namespace GitHub.Runner.Listener _term = HostContext.GetService(); _runnerServer = HostContext.GetService(); + _brokerServer = hostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) @@ -64,7 +68,7 @@ namespace GitHub.Runner.Listener // Create connection. Trace.Info("Loading Credentials"); var credMgr = HostContext.GetService(); - VssCredentials creds = credMgr.LoadCredentials(); + _creds = credMgr.LoadCredentials(); var agent = new TaskAgentReference { @@ -86,7 +90,7 @@ namespace GitHub.Runner.Listener try { Trace.Info("Connecting to the Runner Server..."); - await _runnerServer.ConnectAsync(new Uri(serverUrl), creds); + await _runnerServer.ConnectAsync(new Uri(serverUrl), _creds); Trace.Info("VssConnection created"); _term.WriteLine(); @@ -98,6 +102,14 @@ namespace GitHub.Runner.Listener taskAgentSession, token); + // if (_session.SessionMigrationURI != null) + // { + // Trace.Info($"Runner session is in migration mode: Creating Broker session with SessionMigrationURI: {0}", _session.SessionMigrationURI); + // var brokerServer = HostContext.GetService(); + // await brokerServer.ConnectAsync(_session.SessionMigrationURI, _creds); + // _session = await brokerServer.CreateSessionAsync(token, taskAgentSession); + // } + Trace.Info($"Session created."); if (encounteringError) { @@ -124,7 +136,7 @@ namespace GitHub.Runner.Listener Trace.Error("Catch exception during create session."); Trace.Error(ex); - if (ex is VssOAuthTokenRequestException vssOAuthEx && creds.Federated is VssOAuthCredential vssOAuthCred) + if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred) { // "invalid_client" means the runner registration has been deleted from the server. if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase)) @@ -228,6 +240,24 @@ namespace GitHub.Runner.Listener // Decrypt the message body if the session is using encryption message = DecryptMessage(message); + + if (message != null && message.MessageType == BrokerMigrationMessage.MessageType) + { + Trace.Info("BrokerMigration message received. Polling Broker for messages..."); + + var migrationMessage = JsonUtility.FromString(message.Body); + var brokerServer = HostContext.GetService(); + + await brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds); + message = await brokerServer.GetRunnerMessageAsync(token, + _session.SessionId, + runnerStatus, + BuildConstants.RunnerPackage.Version, + VarUtil.OS, + VarUtil.OSArchitecture, + _settings.DisableUpdate); + } + if (message != null) { _lastMessageId = message.MessageId; diff --git a/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs b/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs new file mode 100644 index 000000000..a6f9fffc1 --- /dev/null +++ b/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs @@ -0,0 +1,35 @@ +using System; +using System.Runtime.Serialization; + +namespace GitHub.DistributedTask.WebApi +{ + /// + /// Represents a session for performing message exchanges from an agent. + /// + [DataContract] + public class BrokerMigrationMessage + { + + public static readonly string MessageType = "BrokerMigration"; + + public BrokerMigrationMessage() + { + } + + public BrokerMigrationMessage( + Uri brokerUrl) + { + this.BrokerBaseUrl = brokerUrl; + } + + /// + /// Gets the unique identifier for this session. + /// + [DataMember] + public Uri BrokerBaseUrl + { + get; + internal set; + } + } +} diff --git a/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs b/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs index 8135de83b..7b9fdfd0a 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs @@ -75,5 +75,12 @@ namespace GitHub.DistributedTask.WebApi get; set; } + + [DataMember(EmitDefaultValue = false, IsRequired = false)] + public BrokerMigrationMessage BrokerMigrationMessage + { + get; + set; + } } } diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index fa914561c..0f4492554 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -57,6 +57,7 @@ namespace GitHub.Actions.RunService.WebApi } public async Task GetRunnerMessageAsync( + Guid? sessionId, string runnerVersion, TaskAgentStatus? status, string os = null, @@ -69,6 +70,11 @@ namespace GitHub.Actions.RunService.WebApi List> queryParams = new List>(); + if (sessionId != null) + { + queryParams.Add("sessionId", sessionId.Value.ToString()); + } + if (status != null) { queryParams.Add("status", status.Value.ToString()); @@ -111,5 +117,32 @@ namespace GitHub.Actions.RunService.WebApi throw new Exception($"Failed to get job message: {result.Error}"); } + + public async Task CreateSessionAsync( + TaskAgentSession session, + CancellationToken cancellationToken = default) + { + + var requestUri = new Uri(Client.BaseAddress, "session"); + var requestContent = new ObjectContent(session, new VssJsonMediaTypeFormatter(true)); + + var result = await SendAsync( + new HttpMethod("POST"), + requestUri: requestUri, + content: requestContent, + cancellationToken: cancellationToken); + + if (result.IsSuccess) + { + return result.Value; + } + + if (result.StatusCode == HttpStatusCode.Forbidden) + { + throw new AccessDeniedException(result.Error); + } + + throw new Exception($"Failed to create broker session: {result.Error}"); + } } }