From 894c50073a587f83695a82fa9846e6da7d5877ad Mon Sep 17 00:00:00 2001 From: Luke Tomlinson Date: Tue, 30 Jan 2024 15:57:49 -0500 Subject: [PATCH] Implement Broker Redirects for Session and Messages (#3103) --- src/Runner.Common/BrokerServer.cs | 25 +- src/Runner.Listener/BrokerMessageListener.cs | 214 ++++++++++++++- src/Runner.Listener/MessageListener.cs | 42 ++- src/Runner.Listener/Runner.cs | 1 + .../DTWebApi/WebApi/BrokerMigrationMessage.cs | 38 +++ src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs | 7 + src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 56 ++++ .../L0/Listener/BrokerMessageListenerL0.cs | 81 ++++++ src/Test/L0/Listener/MessageListenerL0.cs | 253 ++++++++++++++++++ 9 files changed, 704 insertions(+), 13 deletions(-) create mode 100644 src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs create mode 100644 src/Test/L0/Listener/BrokerMessageListenerL0.cs diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 9d5287a2f..77bf5d882 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -17,7 +17,10 @@ namespace GitHub.Runner.Common { Task ConnectAsync(Uri serverUrl, VssCredentials credentials); - Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate); + Task CreateSessionAsync(TaskAgentSession session, CancellationToken cancellationToken); + Task DeleteSessionAsync(CancellationToken cancellationToken); + + Task GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token); } public sealed class BrokerServer : RunnerService, IBrokerServer @@ -44,13 +47,27 @@ namespace GitHub.Runner.Common } } - public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate) + public async Task CreateSessionAsync(TaskAgentSession session, CancellationToken cancellationToken) { CheckConnection(); - var jobMessage = RetryRequest( - async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken); + var jobMessage = await _brokerHttpClient.CreateSessionAsync(session, cancellationToken); return jobMessage; } + + public Task GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) + { + CheckConnection(); + var brokerSession = RetryRequest( + async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken); + + return brokerSession; + } + + public async Task DeleteSessionAsync(CancellationToken cancellationToken) + { + CheckConnection(); + await _brokerHttpClient.DeleteSessionAsync(cancellationToken); + } } } diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index f3fd33fd6..3781855b7 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -24,7 +24,15 @@ namespace GitHub.Runner.Listener private TimeSpan _getNextMessageRetryInterval; private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; + private VssCredentials _creds; + private TaskAgentSession _session; private IBrokerServer _brokerServer; + private readonly Dictionary _sessionCreationExceptionTracker = new(); + private bool _accessTokenRevoked = false; + private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); + private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); + private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); + public override void Initialize(IHostContext hostContext) { @@ -36,13 +44,134 @@ namespace GitHub.Runner.Listener public async Task CreateSessionAsync(CancellationToken token) { - await RefreshBrokerConnection(); - return await Task.FromResult(true); + Trace.Entering(); + + // Settings + var configManager = HostContext.GetService(); + _settings = configManager.LoadSettings(); + var serverUrl = _settings.ServerUrlV2; + Trace.Info(_settings); + + if (string.IsNullOrEmpty(_settings.ServerUrlV2)) + { + throw new InvalidOperationException("ServerUrlV2 is not set"); + } + + // Create connection. + Trace.Info("Loading Credentials"); + var credMgr = HostContext.GetService(); + _creds = credMgr.LoadCredentials(); + + var agent = new TaskAgentReference + { + Id = _settings.AgentId, + Name = _settings.AgentName, + Version = BuildConstants.RunnerPackage.Version, + OSDescription = RuntimeInformation.OSDescription, + }; + string sessionName = $"{Environment.MachineName ?? "RUNNER"}"; + var taskAgentSession = new TaskAgentSession(sessionName, agent); + + string errorMessage = string.Empty; + bool encounteringError = false; + + while (true) + { + token.ThrowIfCancellationRequested(); + Trace.Info($"Attempt to create session."); + try + { + Trace.Info("Connecting to the Broker Server..."); + await _brokerServer.ConnectAsync(new Uri(serverUrl), _creds); + Trace.Info("VssConnection created"); + + _term.WriteLine(); + _term.WriteSuccessMessage("Connected to GitHub"); + _term.WriteLine(); + + _session = await _brokerServer.CreateSessionAsync(taskAgentSession, token); + + Trace.Info($"Session created."); + if (encounteringError) + { + _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected."); + _sessionCreationExceptionTracker.Clear(); + encounteringError = false; + } + + return true; + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info("Session creation has been cancelled."); + throw; + } + catch (TaskAgentAccessTokenExpiredException) + { + Trace.Info("Runner OAuth token has been revoked. Session creation failed."); + _accessTokenRevoked = true; + throw; + } + catch (Exception ex) + { + Trace.Error("Catch exception during create session."); + Trace.Error(ex); + + if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred) + { + // "invalid_client" means the runner registration has been deleted from the server. + if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase)) + { + _term.WriteError("Failed to create a session. The runner registration has been deleted from the server, please re-configure. Runner registrations are automatically deleted for runners that have not connected to the service recently."); + return false; + } + + // Check whether we get 401 because the runner registration already removed by the service. + // If the runner registration get deleted, we can't exchange oauth token. + Trace.Error("Test oauth app registration."); + var oauthTokenProvider = new VssOAuthTokenProvider(vssOAuthCred, new Uri(serverUrl)); + var authError = await oauthTokenProvider.ValidateCredentialAsync(token); + if (string.Equals(authError, "invalid_client", StringComparison.OrdinalIgnoreCase)) + { + _term.WriteError("Failed to create a session. The runner registration has been deleted from the server, please re-configure. Runner registrations are automatically deleted for runners that have not connected to the service recently."); + return false; + } + } + + if (!IsSessionCreationExceptionRetriable(ex)) + { + _term.WriteError($"Failed to create session. {ex.Message}"); + return false; + } + + if (!encounteringError) //print the message only on the first error + { + _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); + encounteringError = true; + } + + Trace.Info("Sleeping for {0} seconds before retrying.", _sessionCreationRetryInterval.TotalSeconds); + await HostContext.Delay(_sessionCreationRetryInterval, token); + } + } } public async Task DeleteSessionAsync() { - await Task.CompletedTask; + if (_session != null && _session.SessionId != Guid.Empty) + { + if (!_accessTokenRevoked) + { + using (var ts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) + { + await _brokerServer.DeleteSessionAsync(ts.Token); + } + } + else + { + Trace.Warning("Runner OAuth token has been revoked. Skip deleting session."); + } + } } public void OnJobStatus(object sender, JobStatusEventArgs e) @@ -73,12 +202,13 @@ namespace GitHub.Runner.Listener _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); try { - message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, + message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, runnerStatus, BuildConstants.RunnerPackage.Version, VarUtil.OS, VarUtil.OSArchitecture, - _settings.DisableUpdate); + _settings.DisableUpdate, + _getMessagesTokenSource.Token); if (message == null) { @@ -196,12 +326,84 @@ namespace GitHub.Runner.Listener } } + private bool IsSessionCreationExceptionRetriable(Exception ex) + { + if (ex is TaskAgentNotFoundException) + { + Trace.Info("The runner no longer exists on the server. Stopping the runner."); + _term.WriteError("The runner no longer exists on the server. Please reconfigure the runner."); + return false; + } + else if (ex is TaskAgentSessionConflictException) + { + Trace.Info("The session for this runner already exists."); + _term.WriteError("A session for this runner already exists."); + if (_sessionCreationExceptionTracker.ContainsKey(nameof(TaskAgentSessionConflictException))) + { + _sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)]++; + if (_sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)] * _sessionCreationRetryInterval.TotalSeconds >= _sessionConflictRetryLimit.TotalSeconds) + { + Trace.Info("The session conflict exception have reached retry limit."); + _term.WriteError($"Stop retry on SessionConflictException after retried for {_sessionConflictRetryLimit.TotalSeconds} seconds."); + return false; + } + } + else + { + _sessionCreationExceptionTracker[nameof(TaskAgentSessionConflictException)] = 1; + } + + Trace.Info("The session conflict exception haven't reached retry limit."); + return true; + } + else if (ex is VssOAuthTokenRequestException && ex.Message.Contains("Current server time is")) + { + Trace.Info("Local clock might be skewed."); + _term.WriteError("The local machine's clock may be out of sync with the server time by more than five minutes. Please sync your clock with your domain or internet time and try again."); + if (_sessionCreationExceptionTracker.ContainsKey(nameof(VssOAuthTokenRequestException))) + { + _sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)]++; + if (_sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)] * _sessionCreationRetryInterval.TotalSeconds >= _clockSkewRetryLimit.TotalSeconds) + { + Trace.Info("The OAuth token request exception have reached retry limit."); + _term.WriteError($"Stopped retrying OAuth token request exception after {_clockSkewRetryLimit.TotalSeconds} seconds."); + return false; + } + } + else + { + _sessionCreationExceptionTracker[nameof(VssOAuthTokenRequestException)] = 1; + } + + Trace.Info("The OAuth token request exception haven't reached retry limit."); + return true; + } + else if (ex is TaskAgentPoolNotFoundException || + ex is AccessDeniedException || + ex is VssUnauthorizedException) + { + Trace.Info($"Non-retriable exception: {ex.Message}"); + return false; + } + + else if (ex is InvalidOperationException) + { + Trace.Info($"Non-retriable exception: {ex.Message}"); + return false; + } + else + { + Trace.Info($"Retriable exception: {ex.Message}"); + return true; + } + } + private async Task RefreshBrokerConnection() { var configManager = HostContext.GetService(); _settings = configManager.LoadSettings(); - if (_settings.ServerUrlV2 == null) + if (string.IsNullOrEmpty(_settings.ServerUrlV2)) { throw new InvalidOperationException("ServerUrlV2 is not set"); } diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 113573b6a..04e0e5727 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -14,6 +14,7 @@ using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; using GitHub.Services.Common; using GitHub.Services.OAuth; +using GitHub.Services.WebApi; namespace GitHub.Runner.Listener { @@ -33,6 +34,7 @@ namespace GitHub.Runner.Listener private RunnerSettings _settings; private ITerminal _term; private IRunnerServer _runnerServer; + private IBrokerServer _brokerServer; private TaskAgentSession _session; private TimeSpan _getNextMessageRetryInterval; private bool _accessTokenRevoked = false; @@ -42,6 +44,9 @@ namespace GitHub.Runner.Listener private readonly Dictionary _sessionCreationExceptionTracker = new(); private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; + private VssCredentials _creds; + + private bool _isBrokerSession = false; public override void Initialize(IHostContext hostContext) { @@ -49,6 +54,7 @@ namespace GitHub.Runner.Listener _term = HostContext.GetService(); _runnerServer = HostContext.GetService(); + _brokerServer = hostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) @@ -64,7 +70,7 @@ namespace GitHub.Runner.Listener // Create connection. Trace.Info("Loading Credentials"); var credMgr = HostContext.GetService(); - VssCredentials creds = credMgr.LoadCredentials(); + _creds = credMgr.LoadCredentials(); var agent = new TaskAgentReference { @@ -86,7 +92,7 @@ namespace GitHub.Runner.Listener try { Trace.Info("Connecting to the Runner Server..."); - await _runnerServer.ConnectAsync(new Uri(serverUrl), creds); + await _runnerServer.ConnectAsync(new Uri(serverUrl), _creds); Trace.Info("VssConnection created"); _term.WriteLine(); @@ -98,6 +104,14 @@ namespace GitHub.Runner.Listener taskAgentSession, token); + if (_session.BrokerMigrationMessage != null) + { + Trace.Info("Runner session is in migration mode: Creating Broker session with BrokerBaseUrl: {0}", _session.BrokerMigrationMessage.BrokerBaseUrl); + await _brokerServer.ConnectAsync(_session.BrokerMigrationMessage.BrokerBaseUrl, _creds); + _session = await _brokerServer.CreateSessionAsync(taskAgentSession, token); + _isBrokerSession = true; + } + Trace.Info($"Session created."); if (encounteringError) { @@ -124,7 +138,7 @@ namespace GitHub.Runner.Listener Trace.Error("Catch exception during create session."); Trace.Error(ex); - if (ex is VssOAuthTokenRequestException vssOAuthEx && creds.Federated is VssOAuthCredential vssOAuthCred) + if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred) { // "invalid_client" means the runner registration has been deleted from the server. if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase)) @@ -171,6 +185,11 @@ namespace GitHub.Runner.Listener { using (var ts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) { + if (_isBrokerSession) + { + await _brokerServer.DeleteSessionAsync(ts.Token); + return; + } await _runnerServer.DeleteAgentSessionAsync(_settings.PoolId, _session.SessionId, ts.Token); } } @@ -228,6 +247,23 @@ namespace GitHub.Runner.Listener // Decrypt the message body if the session is using encryption message = DecryptMessage(message); + + if (message != null && message.MessageType == BrokerMigrationMessage.MessageType) + { + Trace.Info("BrokerMigration message received. Polling Broker for messages..."); + + var migrationMessage = JsonUtility.FromString(message.Body); + + await _brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds); + message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, + runnerStatus, + BuildConstants.RunnerPackage.Version, + VarUtil.OS, + VarUtil.OSArchitecture, + _settings.DisableUpdate, + token); + } + if (message != null) { _lastMessageId = message.MessageId; diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 0e05e8a15..263da5fef 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -634,6 +634,7 @@ namespace GitHub.Runner.Listener { try { + Trace.Info("Deleting Runner Session..."); await _listener.DeleteSessionAsync(); } catch (Exception ex) when (runOnce) diff --git a/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs b/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs new file mode 100644 index 000000000..eebfa4c7d --- /dev/null +++ b/src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs @@ -0,0 +1,38 @@ +using System; +using System.Runtime.Serialization; + +namespace GitHub.DistributedTask.WebApi +{ + /// + /// Message that tells the runner to redirect itself to BrokerListener for messages. + /// (Note that we use a special Message instead of a simple 302. This is because + /// the runner will need to apply the runner's token to the request, and it is + /// a security best practice to *not* blindly add sensitive data to redirects + /// 302s.) + /// + [DataContract] + public class BrokerMigrationMessage + { + public static readonly string MessageType = "BrokerMigration"; + + public BrokerMigrationMessage() + { + } + + public BrokerMigrationMessage( + Uri brokerUrl) + { + this.BrokerBaseUrl = brokerUrl; + } + + /// + /// The base url for the broker listener + /// + [DataMember] + public Uri BrokerBaseUrl + { + get; + internal set; + } + } +} diff --git a/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs b/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs index 8135de83b..7b9fdfd0a 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskAgentSession.cs @@ -75,5 +75,12 @@ namespace GitHub.DistributedTask.WebApi get; set; } + + [DataMember(EmitDefaultValue = false, IsRequired = false)] + public BrokerMigrationMessage BrokerMigrationMessage + { + get; + set; + } } } diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index fa914561c..37380e0c3 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -57,6 +57,7 @@ namespace GitHub.Actions.RunService.WebApi } public async Task GetRunnerMessageAsync( + Guid? sessionId, string runnerVersion, TaskAgentStatus? status, string os = null, @@ -69,6 +70,11 @@ namespace GitHub.Actions.RunService.WebApi List> queryParams = new List>(); + if (sessionId != null) + { + queryParams.Add("sessionId", sessionId.Value.ToString()); + } + if (status != null) { queryParams.Add("status", status.Value.ToString()); @@ -111,5 +117,55 @@ namespace GitHub.Actions.RunService.WebApi throw new Exception($"Failed to get job message: {result.Error}"); } + + public async Task CreateSessionAsync( + + TaskAgentSession session, + CancellationToken cancellationToken = default) + { + var requestUri = new Uri(Client.BaseAddress, "session"); + var requestContent = new ObjectContent(session, new VssJsonMediaTypeFormatter(true)); + + var result = await SendAsync( + new HttpMethod("POST"), + requestUri: requestUri, + content: requestContent, + cancellationToken: cancellationToken); + + if (result.IsSuccess) + { + return result.Value; + } + + if (result.StatusCode == HttpStatusCode.Forbidden) + { + throw new AccessDeniedException(result.Error); + } + + if (result.StatusCode == HttpStatusCode.Conflict) + { + throw new TaskAgentSessionConflictException(result.Error); + } + + throw new Exception($"Failed to create broker session: {result.Error}"); + } + + public async Task DeleteSessionAsync( + CancellationToken cancellationToken = default) + { + var requestUri = new Uri(Client.BaseAddress, $"session"); + + var result = await SendAsync( + new HttpMethod("DELETE"), + requestUri: requestUri, + cancellationToken: cancellationToken); + + if (result.IsSuccess) + { + return; + } + + throw new Exception($"Failed to delete broker session: {result.Error}"); + } } } diff --git a/src/Test/L0/Listener/BrokerMessageListenerL0.cs b/src/Test/L0/Listener/BrokerMessageListenerL0.cs new file mode 100644 index 000000000..7dface3b2 --- /dev/null +++ b/src/Test/L0/Listener/BrokerMessageListenerL0.cs @@ -0,0 +1,81 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using GitHub.DistributedTask.WebApi; +using GitHub.Runner.Listener; +using GitHub.Runner.Listener.Configuration; +using GitHub.Services.Common; +using Moq; +using Xunit; + +namespace GitHub.Runner.Common.Tests.Listener +{ + public sealed class BrokerMessageListenerL0 + { + private readonly RunnerSettings _settings; + private readonly Mock _config; + private readonly Mock _brokerServer; + private readonly Mock _credMgr; + private Mock _store; + + + public BrokerMessageListenerL0() + { + _settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrl = "http://myserver", WorkFolder = "_work", ServerUrlV2 = "http://myserverv2" }; + _config = new Mock(); + _config.Setup(x => x.LoadSettings()).Returns(_settings); + _credMgr = new Mock(); + _store = new Mock(); + _brokerServer = new Mock(); + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreatesSession() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + } + } + + private TestHostContext CreateTestContext([CallerMemberName] String testName = "") + { + TestHostContext tc = new(this, testName); + tc.SetSingleton(_config.Object); + tc.SetSingleton(_credMgr.Object); + tc.SetSingleton(_store.Object); + tc.SetSingleton(_brokerServer.Object); + return tc; + } + } +} diff --git a/src/Test/L0/Listener/MessageListenerL0.cs b/src/Test/L0/Listener/MessageListenerL0.cs index 7cd6035e1..57a1f60d8 100644 --- a/src/Test/L0/Listener/MessageListenerL0.cs +++ b/src/Test/L0/Listener/MessageListenerL0.cs @@ -24,6 +24,8 @@ namespace GitHub.Runner.Common.Tests.Listener private Mock _credMgr; private Mock _store; + private Mock _brokerServer; + public MessageListenerL0() { _settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrl = "http://myserver", WorkFolder = "_work" }; @@ -32,6 +34,7 @@ namespace GitHub.Runner.Common.Tests.Listener _runnerServer = new Mock(); _credMgr = new Mock(); _store = new Mock(); + _brokerServer = new Mock(); } private TestHostContext CreateTestContext([CallerMemberName] String testName = "") @@ -41,6 +44,7 @@ namespace GitHub.Runner.Common.Tests.Listener tc.SetSingleton(_runnerServer.Object); tc.SetSingleton(_credMgr.Object); tc.SetSingleton(_store.Object); + tc.SetSingleton(_brokerServer.Object); return tc; } @@ -81,6 +85,72 @@ namespace GitHub.Runner.Common.Tests.Listener _settings.PoolId, It.Is(y => y != null), tokenSource.Token), Times.Once()); + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Never()); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreatesSessionWithBrokerMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession() + { + OwnerName = "legacy", + BrokerMigrationMessage = new BrokerMigrationMessage(new Uri("https://broker.actions.github.com")) + }; + + var expectedBrokerSession = new TaskAgentSession() + { + OwnerName = "broker" + }; + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedBrokerSession)); + + _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Once()); } } @@ -131,6 +201,83 @@ namespace GitHub.Runner.Common.Tests.Listener } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void DeleteSessionWithBrokerMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession() + { + OwnerName = "legacy", + BrokerMigrationMessage = new BrokerMigrationMessage(new Uri("https://broker.actions.github.com")) + }; + + var expectedBrokerSession = new TaskAgentSession() + { + SessionId = Guid.NewGuid(), + OwnerName = "broker" + }; + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedBrokerSession)); + + _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + Assert.True(result); + + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + _brokerServer + .Setup(x => x.DeleteSessionAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // Act. + await listener.DeleteSessionAsync(); + + + //Assert + _runnerServer + .Verify(x => x.DeleteAgentSessionAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny()), Times.Never()); + _brokerServer + .Verify(x => x.DeleteSessionAsync(It.IsAny()), Times.Once()); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Runner")] @@ -212,6 +359,112 @@ namespace GitHub.Runner.Common.Tests.Listener } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void GetNextMessageWithBrokerMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.NotNull(sessionIdProperty); + sessionIdProperty.SetValue(expectedSession, Guid.NewGuid()); + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + Assert.True(result); + + var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com")); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = JsonUtility.ToString(brokerMigrationMesage), + MessageType = BrokerMigrationMessage.MessageType + }, + }; + + var brokerMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var brokerMessageQueue = new Queue(brokerMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + await Task.Yield(); + return arMessages[0]; // always send migration message + }); + + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + await Task.Yield(); + return brokerMessageQueue.Dequeue(); + }); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(brokerMessages[0], message1); + Assert.Equal(brokerMessages[1], message2); + Assert.Equal(brokerMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length)); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Runner")]