diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index b82719e39..595db8ace 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -26,6 +26,7 @@ namespace GitHub.Runner.Listener private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private VssCredentials _creds; + private VssCredentials _credsV2; private TaskAgentSession _session; private IRunnerServer _runnerServer; private IBrokerServer _brokerServer; @@ -35,7 +36,8 @@ namespace GitHub.Runner.Listener private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); - + private bool _needRefreshCredsV2 = false; + private bool _handlerInitialized = false; public override void Initialize(IHostContext hostContext) { @@ -88,7 +90,8 @@ namespace GitHub.Runner.Listener try { Trace.Info("Connecting to the Broker Server..."); - await _brokerServer.ConnectAsync(new Uri(serverUrlV2), _creds); + _credsV2 = _credMgr.LoadCredentials(allowAuthUrlV2: true); + await _brokerServer.ConnectAsync(new Uri(serverUrlV2), _credsV2); Trace.Info("VssConnection created"); if (!string.IsNullOrEmpty(serverUrl) && @@ -113,6 +116,13 @@ namespace GitHub.Runner.Listener encounteringError = false; } + if (!_handlerInitialized) + { + // Register event handler for auth migration state change + HostContext.AuthMigrationChanged += HandleAuthMigrationChanged; + _handlerInitialized = true; + } + return CreateSessionResult.Success; } catch (OperationCanceledException) when (token.IsCancellationRequested) @@ -131,7 +141,7 @@ namespace GitHub.Runner.Listener Trace.Error("Catch exception during create session."); Trace.Error(ex); - if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred) + if (ex is VssOAuthTokenRequestException vssOAuthEx && _credsV2.Federated is VssOAuthCredential vssOAuthCred) { // "invalid_client" means the runner registration has been deleted from the server. if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase)) @@ -162,6 +172,12 @@ namespace GitHub.Runner.Listener return CreateSessionResult.Failure; } + if (HostContext.AllowAuthMigration) + { + Trace.Info("Disable migration mode for 60 minutes."); + HostContext.DeferAuthMigration(TimeSpan.FromMinutes(60), $"Session creation failed with exception: {ex}"); + } + if (!encounteringError) //print the message only on the first error { _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected."); @@ -178,6 +194,11 @@ namespace GitHub.Runner.Listener { if (_session != null && _session.SessionId != Guid.Empty) { + if (_handlerInitialized) + { + HostContext.AuthMigrationChanged -= HandleAuthMigrationChanged; + } + if (!_accessTokenRevoked) { using (var ts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) @@ -220,6 +241,13 @@ namespace GitHub.Runner.Listener _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); try { + if (_needRefreshCredsV2) + { + Trace.Info("Refreshing broker connection."); + await RefreshBrokerConnectionAsync(); + _needRefreshCredsV2 = false; + } + message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, runnerStatus, BuildConstants.RunnerPackage.Version, @@ -299,6 +327,12 @@ namespace GitHub.Runner.Listener encounteringError = true; } + if (HostContext.AllowAuthMigration) + { + Trace.Info("Disable migration mode for 60 minutes."); + HostContext.DeferAuthMigration(TimeSpan.FromMinutes(60), $"Get next message failed with exception: {ex}"); + } + // re-create VssConnection before next retry await RefreshBrokerConnectionAsync(); @@ -434,9 +468,15 @@ namespace GitHub.Runner.Listener private async Task RefreshBrokerConnectionAsync() { Trace.Info("Reload credentials."); - _creds = _credMgr.LoadCredentials(allowAuthUrlV2: false); // TODO: change to `true` in the next PR. - await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), _creds); + _credsV2 = _credMgr.LoadCredentials(allowAuthUrlV2: true); + await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), _credsV2); Trace.Info("Connection to Broker Server recreated."); } + + private void HandleAuthMigrationChanged(object sender, EventArgs e) + { + Trace.Info($"Auth migration changed. Current allow auth migration state: {HostContext.AllowAuthMigration}"); + _needRefreshCredsV2 = true; + } } } diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 4f3875363..3c38ea884 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -55,6 +55,9 @@ namespace GitHub.Runner.Listener private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private VssCredentials _creds; + private VssCredentials _credsV2; + private bool _needRefreshCredsV2 = false; + private bool _handlerInitialized = false; public override void Initialize(IHostContext hostContext) { @@ -120,6 +123,13 @@ namespace GitHub.Runner.Listener encounteringError = false; } + if (!_handlerInitialized) + { + Trace.Info("Registering AuthMigrationChanged event handler."); + HostContext.AuthMigrationChanged += HandleAuthMigrationChanged; + _handlerInitialized = true; + } + return CreateSessionResult.Success; } catch (OperationCanceledException) when (token.IsCancellationRequested) @@ -185,6 +195,11 @@ namespace GitHub.Runner.Listener { if (_session != null && _session.SessionId != Guid.Empty) { + if (_handlerInitialized) + { + HostContext.AuthMigrationChanged -= HandleAuthMigrationChanged; + } + if (!_accessTokenRevoked) { using (var ts = new CancellationTokenSource(TimeSpan.FromSeconds(30))) @@ -249,7 +264,15 @@ namespace GitHub.Runner.Listener { var migrationMessage = JsonUtility.FromString(message.Body); - await _brokerServer.UpdateConnectionIfNeeded(migrationMessage.BrokerBaseUrl, _creds); + _credsV2 = _credMgr.LoadCredentials(allowAuthUrlV2: true); + await _brokerServer.UpdateConnectionIfNeeded(migrationMessage.BrokerBaseUrl, _credsV2); + if (_needRefreshCredsV2) + { + Trace.Info("Refreshing credentials for V2."); + await _brokerServer.ForceRefreshConnection(_credsV2); + _needRefreshCredsV2 = false; + } + message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, runnerStatus, BuildConstants.RunnerPackage.Version, @@ -341,6 +364,12 @@ namespace GitHub.Runner.Listener encounteringError = true; } + if (HostContext.AllowAuthMigration) + { + Trace.Info("Disable migration mode for 60 minutes."); + HostContext.DeferAuthMigration(TimeSpan.FromMinutes(60), $"Get next message failed with exception: {ex}"); + } + // re-create VssConnection before next retry await _runnerServer.RefreshConnectionAsync(RunnerConnectionType.MessageQueue, TimeSpan.FromSeconds(60)); @@ -401,8 +430,8 @@ namespace GitHub.Runner.Listener public async Task RefreshListenerTokenAsync() { await _runnerServer.RefreshConnectionAsync(RunnerConnectionType.MessageQueue, TimeSpan.FromSeconds(60)); - _creds = _credMgr.LoadCredentials(allowAuthUrlV2: false); // TODO: change to `true` in next PR - await _brokerServer.ForceRefreshConnection(_creds); + _credsV2 = _credMgr.LoadCredentials(allowAuthUrlV2: true); + await _brokerServer.ForceRefreshConnection(_credsV2); } private TaskAgentMessage DecryptMessage(TaskAgentMessage message) @@ -533,5 +562,11 @@ namespace GitHub.Runner.Listener return true; } } + + private void HandleAuthMigrationChanged(object sender, EventArgs e) + { + Trace.Info($"Auth migration changed. Current allow auth migration state: {HostContext.AllowAuthMigration}"); + _needRefreshCredsV2 = true; + } } } diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index a7ef60d1c..d01c871c3 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -31,7 +32,11 @@ namespace GitHub.Runner.Listener private ITerminal _term; private bool _inConfigStage; private ManualResetEvent _completedCommand = new(false); + private readonly ConcurrentQueue _authMigrationTelemetries = new(); + private Task _authMigrationTelemetryTask; + private readonly object _authMigrationTelemetryLock = new(); private IRunnerServer _runnerServer; + private CancellationTokenSource _authMigrationTelemetryTokenSource = new(); // // Helps avoid excessive calls to Run Service when encountering non-retriable errors from /acquirejob. @@ -68,6 +73,8 @@ namespace GitHub.Runner.Listener //register a SIGTERM handler HostContext.Unloading += Runner_Unloading; + HostContext.AuthMigrationChanged += HandleAuthMigrationChanged; + // TODO Unit test to cover this logic Trace.Info(nameof(ExecuteCommand)); var configManager = HostContext.GetService(); @@ -313,6 +320,8 @@ namespace GitHub.Runner.Listener } finally { + _authMigrationTelemetryTokenSource?.Cancel(); + HostContext.AuthMigrationChanged -= HandleAuthMigrationChanged; _term.CancelKeyPress -= CtrlCHandler; HostContext.Unloading -= Runner_Unloading; _completedCommand.Set(); @@ -572,18 +581,18 @@ namespace GitHub.Runner.Listener // Create connection var credMgr = HostContext.GetService(); - var creds = credMgr.LoadCredentials(allowAuthUrlV2: false); - if (string.IsNullOrEmpty(messageRef.RunServiceUrl)) { + var creds = credMgr.LoadCredentials(allowAuthUrlV2: false); var actionsRunServer = HostContext.CreateService(); await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); } else { + var credsV2 = credMgr.LoadCredentials(allowAuthUrlV2: true); var runServer = HostContext.CreateService(); - await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); + await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), credsV2); try { jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageRef.BillingOwnerId, messageQueueLoopTokenSource.Token); @@ -601,6 +610,13 @@ namespace GitHub.Runner.Listener catch (Exception ex) { Trace.Error($"Caught exception from acquiring job message: {ex}"); + + if (HostContext.AllowAuthMigration) + { + Trace.Info("Disable migration mode for 60 minutes."); + HostContext.DeferAuthMigration(TimeSpan.FromMinutes(60), $"Acquire job failed with exception: {ex}"); + } + continue; } } @@ -718,6 +734,73 @@ namespace GitHub.Runner.Listener return Constants.Runner.ReturnCode.Success; } + private void HandleAuthMigrationChanged(object sender, AuthMigrationEventArgs e) + { + Trace.Verbose("Handle AuthMigrationChanged in Runner"); + _authMigrationTelemetries.Enqueue($"{DateTime.UtcNow.ToString("O")}: {e.Trace}"); + + // only start the telemetry reporting task once auth migration is changed (enabled or disabled) + lock (_authMigrationTelemetryLock) + { + if (_authMigrationTelemetryTask == null) + { + _authMigrationTelemetryTask = ReportAuthMigrationTelemetryAsync(_authMigrationTelemetryTokenSource.Token); + } + } + } + + private async Task ReportAuthMigrationTelemetryAsync(CancellationToken token) + { + var configManager = HostContext.GetService(); + var runnerSettings = configManager.LoadSettings(); + + while (!token.IsCancellationRequested) + { + try + { + await HostContext.Delay(TimeSpan.FromSeconds(60), token); + } + catch (TaskCanceledException) + { + // Ignore cancellation + } + + Trace.Verbose("Checking for auth migration telemetry to report"); + while (_authMigrationTelemetries.TryDequeue(out var telemetry)) + { + Trace.Verbose($"Reporting auth migration telemetry: {telemetry}"); + if (runnerSettings != null) + { + try + { + using (var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30))) + { + await _runnerServer.UpdateAgentUpdateStateAsync(runnerSettings.PoolId, runnerSettings.AgentId, "RefreshConfig", telemetry, tokenSource.Token); + } + } + catch (Exception ex) + { + Trace.Error("Failed to report auth migration telemetry."); + Trace.Error(ex); + _authMigrationTelemetries.Enqueue(telemetry); + } + } + + if (!token.IsCancellationRequested) + { + try + { + await HostContext.Delay(TimeSpan.FromSeconds(10), token); + } + catch (TaskCanceledException) + { + // Ignore cancellation + } + } + } + } + } + private void PrintUsage(CommandSettings command) { string separator; diff --git a/src/Test/L0/Listener/BrokerMessageListenerL0.cs b/src/Test/L0/Listener/BrokerMessageListenerL0.cs index 3328d1852..f1d2d65d4 100644 --- a/src/Test/L0/Listener/BrokerMessageListenerL0.cs +++ b/src/Test/L0/Listener/BrokerMessageListenerL0.cs @@ -1,4 +1,5 @@ using System; +using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; @@ -47,7 +48,7 @@ namespace GitHub.Runner.Common.Tests.Listener tokenSource.Token)) .Returns(Task.FromResult(expectedSession)); - _credMgr.Setup(x => x.LoadCredentials(It.IsAny())).Returns(new VssCredentials()); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); // Act. BrokerMessageListener listener = new(); @@ -65,6 +66,303 @@ namespace GitHub.Runner.Common.Tests.Listener } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task HandleAuthMigrationChanged() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.Equal(CreateSessionResult.Success, result); + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + tc.EnableAuthMigration("L0Test"); + + var traceFile = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, traceFile, true); + Assert.Contains("Auth migration changed", File.ReadAllText(traceFile)); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task CreatesSession_DeferAuthMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var throwException = true; + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(async (TaskAgentSession session, CancellationToken token) => + { + await Task.Yield(); + if (throwException) + { + throwException = false; + throw new NotSupportedException("Error during create session"); + } + + return expectedSession; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + tc.EnableAuthMigration("L0Test"); + Assert.True(tc.AllowAuthMigration); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.Equal(CreateSessionResult.Success, result); + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Exactly(2)); + _credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(2)); + + Assert.False(tc.AllowAuthMigration); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task GetNextMessage() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var expectedMessage = new TaskAgentMessage(); + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(expectedMessage)); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + Assert.Equal(CreateSessionResult.Success, result); + + TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token); + trace.Info("message: {0}", message); + + // Assert. + Assert.Equal(expectedMessage, message); + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once()); + + _brokerServer.Verify(x => x.ConnectAsync(It.IsAny(), It.IsAny()), Times.Once()); + + _credMgr.Verify(x => x.LoadCredentials(true), Times.Once()); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task GetNextMessage_EnableAuthMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var expectedMessage = new TaskAgentMessage(); + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(expectedMessage)); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + Assert.Equal(CreateSessionResult.Success, result); + + tc.EnableAuthMigration("L0Test"); + + TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token); + trace.Info("message: {0}", message); + + // Assert. + Assert.Equal(expectedMessage, message); + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once()); + + _brokerServer.Verify(x => x.ConnectAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); + + _credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(2)); + + Assert.True(tc.AllowAuthMigration); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task GetNextMessage_AuthMigrationFallback() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + tc.EnableAuthMigration("L0Test"); + + // Arrange. + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var expectedSession = new TaskAgentSession(); + _brokerServer + .Setup(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var expectedMessage = new TaskAgentMessage(); + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(async (Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token) => + { + await Task.Yield(); + if (tc.AllowAuthMigration) + { + throw new NotSupportedException("Error during get message"); + } + + return expectedMessage; + }); + + // Act. + BrokerMessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + Assert.Equal(CreateSessionResult.Success, result); + + Assert.True(tc.AllowAuthMigration); + + TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token); + trace.Info("message: {0}", message); + + // Assert. + Assert.Equal(expectedMessage, message); + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Exactly(2)); + + _brokerServer.Verify(x => x.ConnectAsync(It.IsAny(), It.IsAny()), Times.Exactly(3)); + + _credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(3)); + + Assert.False(tc.AllowAuthMigration); + } + } + private TestHostContext CreateTestContext([CallerMemberName] String testName = "") { TestHostContext tc = new(this, testName); diff --git a/src/Test/L0/Listener/MessageListenerL0.cs b/src/Test/L0/Listener/MessageListenerL0.cs index 2da28d710..80792539b 100644 --- a/src/Test/L0/Listener/MessageListenerL0.cs +++ b/src/Test/L0/Listener/MessageListenerL0.cs @@ -323,6 +323,15 @@ namespace GitHub.Runner.Common.Tests.Listener _brokerServer .Verify(x => x.GetRunnerMessageAsync( expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _credMgr + .Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.ForceRefreshConnection(It.IsAny()), Times.Never); } } @@ -432,5 +441,301 @@ namespace GitHub.Runner.Common.Tests.Listener _settings.PoolId, expectedSession.SessionId, It.IsAny()), Times.Never); } } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task HandleAuthMigrationChanged() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(It.IsAny())).Returns(new VssCredentials()); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.Equal(CreateSessionResult.Success, result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + _brokerServer + .Verify(x => x.CreateSessionAsync( + It.Is(y => y != null), + tokenSource.Token), Times.Never()); + + tc.EnableAuthMigration("L0Test"); + + var traceFile = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, traceFile, true); + Assert.Contains("Auth migration changed", File.ReadAllText(traceFile)); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task GetNextMessageWithBrokerMigration_AuthMigrationFallback() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.NotNull(sessionIdProperty); + sessionIdProperty.SetValue(expectedSession, Guid.NewGuid()); + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(It.IsAny())).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + tc.EnableAuthMigration("L0Test"); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + Assert.Equal(CreateSessionResult.Success, result); + + var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com")); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = JsonUtility.ToString(brokerMigrationMesage), + MessageType = BrokerMigrationMessage.MessageType + }, + }; + + var brokerMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var brokerMessageQueue = new Queue(brokerMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + await Task.Yield(); + return arMessages[0]; // always send migration message + }); + + var counter = 0; + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + counter++; + await Task.Yield(); + if (counter == 2) + { + throw new NotSupportedException("Something wrong."); + } + + return brokerMessageQueue.Dequeue(); + }); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(brokerMessages[0], message1); + Assert.Equal(brokerMessages[1], message2); + Assert.Equal(brokerMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length + 1)); + + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length + 1)); + + _credMgr + .Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length + 1)); + + _brokerServer + .Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny()), Times.Exactly(brokerMessages.Length + 1)); + + _brokerServer + .Verify(x => x.ForceRefreshConnection(It.IsAny()), Times.Once()); + + Assert.False(tc.AllowAuthMigration); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task GetNextMessageWithBrokerMigration_EnableAuthMigration() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.NotNull(sessionIdProperty); + sessionIdProperty.SetValue(expectedSession, Guid.NewGuid()); + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(It.IsAny())).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new(); + listener.Initialize(tc); + + CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token); + Assert.Equal(CreateSessionResult.Success, result); + + var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com")); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = JsonUtility.ToString(brokerMigrationMesage), + MessageType = BrokerMigrationMessage.MessageType + }, + }; + + var brokerMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var brokerMessageQueue = new Queue(brokerMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + await Task.Yield(); + return arMessages[0]; // always send migration message + }); + + _brokerServer + .Setup(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) => + { + await Task.Yield(); + if (!tc.AllowAuthMigration) + { + tc.EnableAuthMigration("L0Test"); + } + + return brokerMessageQueue.Dequeue(); + }); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(brokerMessages[0], message1); + Assert.Equal(brokerMessages[1], message2); + Assert.Equal(brokerMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.GetRunnerMessageAsync( + expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _credMgr + .Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny()), Times.Exactly(brokerMessages.Length)); + + _brokerServer + .Verify(x => x.ForceRefreshConnection(It.IsAny()), Times.Once()); + + Assert.True(tc.AllowAuthMigration); + } + } } } diff --git a/src/Test/L0/Listener/RunnerL0.cs b/src/Test/L0/Listener/RunnerL0.cs index 1dd24fe4d..98f7395b6 100644 --- a/src/Test/L0/Listener/RunnerL0.cs +++ b/src/Test/L0/Listener/RunnerL0.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.IO; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Listener; using GitHub.Runner.Listener.Configuration; +using GitHub.Services.Common; using GitHub.Services.WebApi; using Moq; using Xunit; @@ -24,6 +26,9 @@ namespace GitHub.Runner.Common.Tests.Listener private Mock _configStore; private Mock _updater; private Mock _acquireJobThrottler; + private Mock _credentialManager; + private Mock _actionsRunServer; + private Mock _runServer; public RunnerL0() { @@ -37,6 +42,9 @@ namespace GitHub.Runner.Common.Tests.Listener _configStore = new Mock(); _updater = new Mock(); _acquireJobThrottler = new Mock(); + _credentialManager = new Mock(); + _actionsRunServer = new Mock(); + _runServer = new Mock(); } private Pipelines.AgentJobRequestMessage CreateJobRequestMessage(string jobName) @@ -552,5 +560,428 @@ namespace GitHub.Runner.Common.Tests.Listener _configurationManager.Verify(x => x.DeleteLocalRunnerConfig(), Times.Once()); } } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task TestReportAuthMigrationTelemetry() + { + using (var hc = new TestHostContext(this)) + { + //Arrange + var runner = new Runner.Listener.Runner(); + hc.SetSingleton(_configurationManager.Object); + hc.SetSingleton(_jobNotification.Object); + hc.SetSingleton(_messageListener.Object); + hc.SetSingleton(_promptManager.Object); + hc.SetSingleton(_runnerServer.Object); + hc.SetSingleton(_configStore.Object); + hc.SetSingleton(_credentialManager.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); + hc.EnqueueInstance(_jobDispatcher.Object); + + runner.Initialize(hc); + var settings = new RunnerSettings + { + PoolId = 43242, + AgentId = 5678, + Ephemeral = true + }; + + var message1 = new TaskAgentMessage() + { + MessageId = 4234, + MessageType = "unknown" + }; + + var messages = new Queue(); + messages.Enqueue(message1); + _updater.Setup(x => x.SelfUpdate(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(true)); + _configurationManager.Setup(x => x.LoadSettings()) + .Returns(settings); + _configurationManager.Setup(x => x.IsConfigured()) + .Returns(true); + _messageListener.Setup(x => x.CreateSessionAsync(It.IsAny())) + .Returns(Task.FromResult(CreateSessionResult.Success)); + _messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny())) + .Returns(async (CancellationToken token) => + { + hc.GetTrace().Info("Waiting for message"); + Assert.False(hc.AllowAuthMigration); + await Task.Delay(100, token); + + var traceFile = Path.GetTempFileName(); + File.Copy(hc.TraceFileName, traceFile, true); + Assert.DoesNotContain("Checking for auth migration telemetry to report", File.ReadAllText(traceFile)); + + hc.EnableAuthMigration("L0Test"); + hc.DeferAuthMigration(TimeSpan.FromSeconds(1), "L0Test"); + hc.EnableAuthMigration("L0Test"); + hc.DeferAuthMigration(TimeSpan.FromSeconds(1), "L0Test"); + + await Task.Delay(1000, token); + + hc.ShutdownRunner(ShutdownReason.UserCancelled); + + File.Copy(hc.TraceFileName, traceFile, true); + Assert.Contains("Checking for auth migration telemetry to report", File.ReadAllText(traceFile)); + + return messages.Dequeue(); + }); + _messageListener.Setup(x => x.DeleteSessionAsync()) + .Returns(Task.CompletedTask); + _messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny())) + .Returns(Task.CompletedTask); + _jobNotification.Setup(x => x.StartClient(It.IsAny())) + .Callback(() => + { + + }); + + _configStore.Setup(x => x.IsServiceConfigured()).Returns(false); + + _runnerServer.Setup(x => x.UpdateAgentUpdateStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new TaskAgent())); + + //Act + var command = new CommandSettings(hc, new string[] { "run" }); + var returnCode = await runner.ExecuteCommand(command); + + //Assert + Assert.Equal(Constants.Runner.ReturnCode.Success, returnCode); + + _messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny()), Times.AtLeastOnce()); + _messageListener.Verify(x => x.CreateSessionAsync(It.IsAny()), Times.Once()); + _messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once()); + _messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny()), Times.Once()); + + _runnerServer.Verify(x => x.UpdateAgentUpdateStateAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.Is(s => s.Contains("L0Test")), It.IsAny()), Times.Exactly(4)); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task TestRunnerJobRequestMessageFromPipeline() + { + using (var hc = new TestHostContext(this)) + { + //Arrange + var runner = new Runner.Listener.Runner(); + hc.SetSingleton(_configurationManager.Object); + hc.SetSingleton(_jobNotification.Object); + hc.SetSingleton(_messageListener.Object); + hc.SetSingleton(_promptManager.Object); + hc.SetSingleton(_runnerServer.Object); + hc.SetSingleton(_configStore.Object); + hc.SetSingleton(_updater.Object); + hc.SetSingleton(_credentialManager.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); + hc.EnqueueInstance(_actionsRunServer.Object); + hc.EnqueueInstance(_jobDispatcher.Object); + + runner.Initialize(hc); + var settings = new RunnerSettings + { + PoolId = 43242, + AgentId = 5678, + Ephemeral = true, + ServerUrl = "https://github.com", + }; + + var message1 = new TaskAgentMessage() + { + Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999" }), + MessageId = 4234, + MessageType = JobRequestMessageTypes.RunnerJobRequest + }; + + var messages = new Queue(); + messages.Enqueue(message1); + _updater.Setup(x => x.SelfUpdate(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(true)); + _configurationManager.Setup(x => x.LoadSettings()) + .Returns(settings); + _configurationManager.Setup(x => x.IsConfigured()) + .Returns(true); + _messageListener.Setup(x => x.CreateSessionAsync(It.IsAny())) + .Returns(Task.FromResult(CreateSessionResult.Success)); + _messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny())) + .Returns(async (CancellationToken token) => + { + if (0 == messages.Count) + { + await Task.Delay(2000, token); + } + + return messages.Dequeue(); + }); + _messageListener.Setup(x => x.DeleteSessionAsync()) + .Returns(Task.CompletedTask); + _messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny())) + .Returns(Task.CompletedTask); + _jobNotification.Setup(x => x.StartClient(It.IsAny())) + .Callback(() => + { + + }); + _actionsRunServer.Setup(x => x.GetJobMessageAsync("999", It.IsAny())) + .Returns(Task.FromResult(CreateJobRequestMessage("test"))); + + _credentialManager.Setup(x => x.LoadCredentials(false)).Returns(new VssCredentials()); + + _configStore.Setup(x => x.IsServiceConfigured()).Returns(false); + + var completedTask = new TaskCompletionSource(); + completedTask.SetResult(true); + _jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask); + + //Act + var command = new CommandSettings(hc, new string[] { "run" }); + Task runnerTask = runner.ExecuteCommand(command); + + //Assert + //wait for the runner to exit with right return code + await Task.WhenAny(runnerTask, Task.Delay(30000)); + + Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out."); + Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString()); + if (runnerTask.IsCompleted) + { + Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask); + } + + _jobDispatcher.Verify(x => x.Run(It.IsAny(), true), Times.Once()); + _messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny()), Times.AtLeastOnce()); + _messageListener.Verify(x => x.CreateSessionAsync(It.IsAny()), Times.Once()); + _messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once()); + _messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny()), Times.Once()); + _credentialManager.Verify(x => x.LoadCredentials(false), Times.Once()); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task TestRunnerJobRequestMessageFromRunService() + { + using (var hc = new TestHostContext(this)) + { + //Arrange + var runner = new Runner.Listener.Runner(); + hc.SetSingleton(_configurationManager.Object); + hc.SetSingleton(_jobNotification.Object); + hc.SetSingleton(_messageListener.Object); + hc.SetSingleton(_promptManager.Object); + hc.SetSingleton(_runnerServer.Object); + hc.SetSingleton(_configStore.Object); + hc.SetSingleton(_updater.Object); + hc.SetSingleton(_credentialManager.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); + hc.EnqueueInstance(_runServer.Object); + hc.EnqueueInstance(_jobDispatcher.Object); + + runner.Initialize(hc); + var settings = new RunnerSettings + { + PoolId = 43242, + AgentId = 5678, + Ephemeral = true, + ServerUrl = "https://github.com", + }; + + var message1 = new TaskAgentMessage() + { + Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999", RunServiceUrl = "https://run-service.com" }), + MessageId = 4234, + MessageType = JobRequestMessageTypes.RunnerJobRequest + }; + + var messages = new Queue(); + messages.Enqueue(message1); + _updater.Setup(x => x.SelfUpdate(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(true)); + _configurationManager.Setup(x => x.LoadSettings()) + .Returns(settings); + _configurationManager.Setup(x => x.IsConfigured()) + .Returns(true); + _messageListener.Setup(x => x.CreateSessionAsync(It.IsAny())) + .Returns(Task.FromResult(CreateSessionResult.Success)); + _messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny())) + .Returns(async (CancellationToken token) => + { + if (0 == messages.Count) + { + await Task.Delay(2000, token); + } + + return messages.Dequeue(); + }); + _messageListener.Setup(x => x.DeleteSessionAsync()) + .Returns(Task.CompletedTask); + _messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny())) + .Returns(Task.CompletedTask); + _jobNotification.Setup(x => x.StartClient(It.IsAny())) + .Callback(() => + { + + }); + _runServer.Setup(x => x.GetJobMessageAsync("999", "github", It.IsAny())) + .Returns(Task.FromResult(CreateJobRequestMessage("test"))); + + _credentialManager.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + _configStore.Setup(x => x.IsServiceConfigured()).Returns(false); + + var completedTask = new TaskCompletionSource(); + completedTask.SetResult(true); + _jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask); + + //Act + var command = new CommandSettings(hc, new string[] { "run" }); + Task runnerTask = runner.ExecuteCommand(command); + + //Assert + //wait for the runner to exit with right return code + await Task.WhenAny(runnerTask, Task.Delay(30000)); + + Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out."); + Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString()); + if (runnerTask.IsCompleted) + { + Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask); + } + + _jobDispatcher.Verify(x => x.Run(It.IsAny(), true), Times.Once()); + _messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny()), Times.AtLeastOnce()); + _messageListener.Verify(x => x.CreateSessionAsync(It.IsAny()), Times.Once()); + _messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once()); + _messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny()), Times.Once()); + _credentialManager.Verify(x => x.LoadCredentials(true), Times.Once()); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async Task TestRunnerJobRequestMessageFromRunService_AuthMigrationFallback() + { + using (var hc = new TestHostContext(this)) + { + //Arrange + var runner = new Runner.Listener.Runner(); + hc.SetSingleton(_configurationManager.Object); + hc.SetSingleton(_jobNotification.Object); + hc.SetSingleton(_messageListener.Object); + hc.SetSingleton(_promptManager.Object); + hc.SetSingleton(_runnerServer.Object); + hc.SetSingleton(_configStore.Object); + hc.SetSingleton(_updater.Object); + hc.SetSingleton(_credentialManager.Object); + hc.EnqueueInstance(_acquireJobThrottler.Object); + hc.EnqueueInstance(_jobDispatcher.Object); + hc.EnqueueInstance(_runServer.Object); + hc.EnqueueInstance(_runServer.Object); + + runner.Initialize(hc); + var settings = new RunnerSettings + { + PoolId = 43242, + AgentId = 5678, + Ephemeral = true, + ServerUrl = "https://github.com", + }; + + var message1 = new TaskAgentMessage() + { + Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999", RunServiceUrl = "https://run-service.com" }), + MessageId = 4234, + MessageType = JobRequestMessageTypes.RunnerJobRequest + }; + + var messages = new Queue(); + messages.Enqueue(message1); + messages.Enqueue(message1); + _updater.Setup(x => x.SelfUpdate(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(true)); + _configurationManager.Setup(x => x.LoadSettings()) + .Returns(settings); + _configurationManager.Setup(x => x.IsConfigured()) + .Returns(true); + _messageListener.Setup(x => x.CreateSessionAsync(It.IsAny())) + .Returns(Task.FromResult(CreateSessionResult.Success)); + _messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny())) + .Returns(async (CancellationToken token) => + { + if (2 == messages.Count) + { + hc.EnableAuthMigration("L0Test"); + } + + if (0 == messages.Count) + { + await Task.Delay(2000, token); + } + + return messages.Dequeue(); + }); + _messageListener.Setup(x => x.DeleteSessionAsync()) + .Returns(Task.CompletedTask); + _messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny())) + .Returns(Task.CompletedTask); + _jobNotification.Setup(x => x.StartClient(It.IsAny())) + .Callback(() => + { + + }); + + var throwError = true; + _runServer.Setup(x => x.GetJobMessageAsync("999", "github", It.IsAny())) + .Returns(() => + { + if (throwError) + { + Assert.True(hc.AllowAuthMigration); + throwError = false; + throw new NotSupportedException("some error"); + } + + return Task.FromResult(CreateJobRequestMessage("test")); + }); + + _credentialManager.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + _configStore.Setup(x => x.IsServiceConfigured()).Returns(false); + + var completedTask = new TaskCompletionSource(); + completedTask.SetResult(true); + _jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask); + + //Act + var command = new CommandSettings(hc, new string[] { "run" }); + Task runnerTask = runner.ExecuteCommand(command); + + //Assert + //wait for the runner to exit with right return code + await Task.WhenAny(runnerTask, Task.Delay(30000)); + + Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out."); + Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString()); + if (runnerTask.IsCompleted) + { + Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask); + } + + _jobDispatcher.Verify(x => x.Run(It.IsAny(), true), Times.Once()); + _messageListener.Verify(x => x.CreateSessionAsync(It.IsAny()), Times.Once()); + _messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny()), Times.AtLeast(2)); + _messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny()), Times.AtLeast(2)); + _messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once()); + _credentialManager.Verify(x => x.LoadCredentials(true), Times.Exactly(2)); + + Assert.False(hc.AllowAuthMigration); + } + } } }