From 6bec1e3bb832aad26f4ad5b64759a8e4d468df24 Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Wed, 4 Mar 2020 21:40:58 -0500 Subject: [PATCH] Switch to use token service instead of SPS for exchanging oauth token. (#325) * Gracefully switch the runner to use Token Service instead of SPS. * PR feedback. * feedback2 * report error. --- src/Runner.Common/ConfigurationStore.cs | 35 +- src/Runner.Common/Constants.cs | 1 + src/Runner.Common/HostContext.cs | 6 + src/Runner.Common/RunnerServer.cs | 19 + .../Configuration/CredentialManager.cs | 14 +- .../Configuration/OAuthCredential.cs | 5 +- src/Runner.Listener/JobDispatcher.cs | 531 +++---- src/Runner.Listener/MessageListener.cs | 191 ++- src/Runner.Listener/SelfUpdater.cs | 69 +- .../Generated/TaskAgentHttpClientBase.cs | 60 + src/Sdk/DTWebApi/WebApi/TaskResourceIds.cs | 3 + src/Test/L0/Listener/MessageListenerL0.cs | 1307 ++++++++++++++++- src/Test/L0/TestHostContext.cs | 2 +- 13 files changed, 1938 insertions(+), 305 deletions(-) diff --git a/src/Runner.Common/ConfigurationStore.cs b/src/Runner.Common/ConfigurationStore.cs index ab83132a5..da66d7f8d 100644 --- a/src/Runner.Common/ConfigurationStore.cs +++ b/src/Runner.Common/ConfigurationStore.cs @@ -78,8 +78,10 @@ namespace GitHub.Runner.Common bool IsServiceConfigured(); bool HasCredentials(); CredentialData GetCredentials(); + CredentialData GetMigratedCredentials(); RunnerSettings GetSettings(); void SaveCredential(CredentialData credential); + void SaveMigratedCredential(CredentialData credential); void SaveSettings(RunnerSettings settings); void DeleteCredential(); void DeleteSettings(); @@ -90,9 +92,11 @@ namespace GitHub.Runner.Common private string _binPath; private string _configFilePath; private string _credFilePath; + private string _migratedCredFilePath; private string _serviceConfigFilePath; private CredentialData _creds; + private CredentialData _migratedCreds; private RunnerSettings _settings; public override void Initialize(IHostContext hostContext) @@ -114,6 +118,9 @@ namespace GitHub.Runner.Common _credFilePath = hostContext.GetConfigFile(WellKnownConfigFile.Credentials); Trace.Info("CredFilePath: {0}", _credFilePath); + _migratedCredFilePath = hostContext.GetConfigFile(WellKnownConfigFile.MigratedCredentials); + Trace.Info("MigratedCredFilePath: {0}", _migratedCredFilePath); + _serviceConfigFilePath = hostContext.GetConfigFile(WellKnownConfigFile.Service); Trace.Info("ServiceConfigFilePath: {0}", _serviceConfigFilePath); } @@ -123,7 +130,7 @@ namespace GitHub.Runner.Common public bool HasCredentials() { Trace.Info("HasCredentials()"); - bool credsStored = (new FileInfo(_credFilePath)).Exists; + bool credsStored = (new FileInfo(_credFilePath)).Exists || (new FileInfo(_migratedCredFilePath)).Exists; Trace.Info("stored {0}", credsStored); return credsStored; } @@ -154,6 +161,16 @@ namespace GitHub.Runner.Common return _creds; } + public CredentialData GetMigratedCredentials() + { + if (_migratedCreds == null && File.Exists(_migratedCredFilePath)) + { + _migratedCreds = IOUtil.LoadObject(_migratedCredFilePath); + } + + return _migratedCreds; + } + public RunnerSettings GetSettings() { if (_settings == null) @@ -188,6 +205,21 @@ namespace GitHub.Runner.Common File.SetAttributes(_credFilePath, File.GetAttributes(_credFilePath) | FileAttributes.Hidden); } + public void SaveMigratedCredential(CredentialData credential) + { + Trace.Info("Saving {0} migrated credential @ {1}", credential.Scheme, _migratedCredFilePath); + if (File.Exists(_migratedCredFilePath)) + { + // Delete existing credential file first, since the file is hidden and not able to overwrite. + Trace.Info("Delete exist runner migrated credential file."); + IOUtil.DeleteFile(_migratedCredFilePath); + } + + IOUtil.SaveObject(credential, _migratedCredFilePath); + Trace.Info("Migrated Credentials Saved."); + File.SetAttributes(_migratedCredFilePath, File.GetAttributes(_migratedCredFilePath) | FileAttributes.Hidden); + } + public void SaveSettings(RunnerSettings settings) { Trace.Info("Saving runner settings."); @@ -206,6 +238,7 @@ namespace GitHub.Runner.Common public void DeleteCredential() { IOUtil.Delete(_credFilePath, default(CancellationToken)); + IOUtil.Delete(_migratedCredFilePath, default(CancellationToken)); } public void DeleteSettings() diff --git a/src/Runner.Common/Constants.cs b/src/Runner.Common/Constants.cs index 633a0baa8..0d333464b 100644 --- a/src/Runner.Common/Constants.cs +++ b/src/Runner.Common/Constants.cs @@ -19,6 +19,7 @@ namespace GitHub.Runner.Common { Runner, Credentials, + MigratedCredentials, RSACredentials, Service, CredentialStore, diff --git a/src/Runner.Common/HostContext.cs b/src/Runner.Common/HostContext.cs index f110bbd40..1a44e8588 100644 --- a/src/Runner.Common/HostContext.cs +++ b/src/Runner.Common/HostContext.cs @@ -281,6 +281,12 @@ namespace GitHub.Runner.Common ".credentials"); break; + case WellKnownConfigFile.MigratedCredentials: + path = Path.Combine( + GetDirectory(WellKnownDirectory.Root), + ".credentials_migrated"); + break; + case WellKnownConfigFile.RSACredentials: path = Path.Combine( GetDirectory(WellKnownDirectory.Root), diff --git a/src/Runner.Common/RunnerServer.cs b/src/Runner.Common/RunnerServer.cs index f06635aad..7b244db0e 100644 --- a/src/Runner.Common/RunnerServer.cs +++ b/src/Runner.Common/RunnerServer.cs @@ -50,6 +50,10 @@ namespace GitHub.Runner.Common // agent update Task UpdateAgentUpdateStateAsync(int agentPoolId, int agentId, string currentState); + + // runner authorization url + Task GetRunnerAuthUrlAsync(int runnerPoolId, int runnerId); + Task ReportRunnerAuthUrlErrorAsync(int runnerPoolId, int runnerId, string error); } public sealed class RunnerServer : RunnerService, IRunnerServer @@ -334,5 +338,20 @@ namespace GitHub.Runner.Common CheckConnection(RunnerConnectionType.Generic); return _genericTaskAgentClient.UpdateAgentUpdateStateAsync(agentPoolId, agentId, currentState); } + + //----------------------------------------------------------------- + // Runner Auth Url + //----------------------------------------------------------------- + public Task GetRunnerAuthUrlAsync(int runnerPoolId, int runnerId) + { + CheckConnection(RunnerConnectionType.MessageQueue); + return _messageTaskAgentClient.GetAgentAuthUrlAsync(runnerPoolId, runnerId); + } + + public Task ReportRunnerAuthUrlErrorAsync(int runnerPoolId, int runnerId, string error) + { + CheckConnection(RunnerConnectionType.MessageQueue); + return _messageTaskAgentClient.ReportAgentAuthUrlMigrationErrorAsync(runnerPoolId, runnerId, error); + } } } diff --git a/src/Runner.Listener/Configuration/CredentialManager.cs b/src/Runner.Listener/Configuration/CredentialManager.cs index 9c804b736..871aae4de 100644 --- a/src/Runner.Listener/Configuration/CredentialManager.cs +++ b/src/Runner.Listener/Configuration/CredentialManager.cs @@ -13,7 +13,7 @@ namespace GitHub.Runner.Listener.Configuration public interface ICredentialManager : IRunnerService { ICredentialProvider GetCredentialProvider(string credType); - VssCredentials LoadCredentials(); + VssCredentials LoadCredentials(bool preferMigrated = true); } public class CredentialManager : RunnerService, ICredentialManager @@ -40,7 +40,7 @@ namespace GitHub.Runner.Listener.Configuration return creds; } - public VssCredentials LoadCredentials() + public VssCredentials LoadCredentials(bool preferMigrated = true) { IConfigurationStore store = HostContext.GetService(); @@ -50,6 +50,16 @@ namespace GitHub.Runner.Listener.Configuration } CredentialData credData = store.GetCredentials(); + + if (preferMigrated) + { + var migratedCred = store.GetMigratedCredentials(); + if (migratedCred != null) + { + credData = migratedCred; + } + } + ICredentialProvider credProv = GetCredentialProvider(credData.Scheme); credProv.CredentialData = credData; diff --git a/src/Runner.Listener/Configuration/OAuthCredential.cs b/src/Runner.Listener/Configuration/OAuthCredential.cs index 72d4899c9..a7162aafe 100644 --- a/src/Runner.Listener/Configuration/OAuthCredential.cs +++ b/src/Runner.Listener/Configuration/OAuthCredential.cs @@ -1,6 +1,5 @@ using System; using GitHub.Runner.Common; -using GitHub.Runner.Common.Util; using GitHub.Runner.Sdk; using GitHub.Services.Common; using GitHub.Services.OAuth; @@ -29,7 +28,7 @@ namespace GitHub.Runner.Listener.Configuration var authorizationUrl = this.CredentialData.Data.GetValueOrDefault("authorizationUrl", null); // For back compat with .credential file that doesn't has 'oauthEndpointUrl' section - var oathEndpointUrl = this.CredentialData.Data.GetValueOrDefault("oauthEndpointUrl", authorizationUrl); + var oauthEndpointUrl = this.CredentialData.Data.GetValueOrDefault("oauthEndpointUrl", authorizationUrl); ArgUtil.NotNullOrEmpty(clientId, nameof(clientId)); ArgUtil.NotNullOrEmpty(authorizationUrl, nameof(authorizationUrl)); @@ -39,7 +38,7 @@ namespace GitHub.Runner.Listener.Configuration var keyManager = context.GetService(); var signingCredentials = VssSigningCredentials.Create(() => keyManager.GetKey()); var clientCredential = new VssOAuthJwtBearerClientCredential(clientId, authorizationUrl, signingCredentials); - var agentCredential = new VssOAuthCredential(new Uri(oathEndpointUrl, UriKind.Absolute), VssOAuthGrant.ClientCredentials, clientCredential); + var agentCredential = new VssOAuthCredential(new Uri(oauthEndpointUrl, UriKind.Absolute), VssOAuthGrant.ClientCredentials, clientCredential); // Construct a credentials cache with a single OAuth credential for communication. The windows credential // is explicitly set to null to ensure we never do that negotiation. diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index 6e809a91f..00d31b116 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -18,6 +18,7 @@ namespace GitHub.Runner.Listener [ServiceLocator(Default = typeof(JobDispatcher))] public interface IJobDispatcher : IRunnerService { + bool Busy { get; } TaskCompletionSource RunOnceJobCompleted { get; } void Run(Pipelines.AgentJobRequestMessage message, bool runOnce = false); bool Cancel(JobCancelMessage message); @@ -69,6 +70,8 @@ namespace GitHub.Runner.Listener public TaskCompletionSource RunOnceJobCompleted => _runOnceJobCompleted; + public bool Busy { get; private set; } + public void Run(Pipelines.AgentJobRequestMessage jobRequestMessage, bool runOnce = false) { Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received."); @@ -247,7 +250,7 @@ namespace GitHub.Runner.Listener Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45))); if (completedTask != jobDispatch.WorkerDispatch) { - // at this point, the job exectuion might encounter some dead lock and even not able to be canclled. + // at this point, the job execution might encounter some dead lock and even not able to be cancelled. // no need to localize the exception string should never happen. throw new InvalidOperationException($"Job dispatch process for {jobDispatch.JobId} has encountered unexpected error, the dispatch task is not able to be canceled within 45 seconds."); } @@ -296,190 +299,290 @@ namespace GitHub.Runner.Listener private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken) { - if (previousJobDispatch != null) + Busy = true; + try { - Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker."); - await EnsureDispatchFinished(previousJobDispatch); - } - else - { - Trace.Verbose($"This is the first job request."); - } - - var term = HostContext.GetService(); - term.WriteLine($"{DateTime.UtcNow:u}: Running job: {message.JobDisplayName}"); - - // first job request renew succeed. - TaskCompletionSource firstJobRequestRenewed = new TaskCompletionSource(); - var notification = HostContext.GetService(); - - // lock renew cancellation token. - using (var lockRenewalTokenSource = new CancellationTokenSource()) - using (var workerProcessCancelTokenSource = new CancellationTokenSource()) - { - long requestId = message.RequestId; - Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat - - // start renew job request - Trace.Info($"Start renew job request {requestId} for job {message.JobId}."); - Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token); - - // wait till first renew succeed or job request is canceled - // not even start worker if the first renew fail - await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken)); - - if (renewJobRequest.IsCompleted) + if (previousJobDispatch != null) { - // renew job request task complete means we run out of retry for the first job request renew. - Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker."); - return; + Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker."); + await EnsureDispatchFinished(previousJobDispatch); + } + else + { + Trace.Verbose($"This is the first job request."); } - if (jobRequestCancellationToken.IsCancellationRequested) + var term = HostContext.GetService(); + term.WriteLine($"{DateTime.UtcNow:u}: Running job: {message.JobDisplayName}"); + + // first job request renew succeed. + TaskCompletionSource firstJobRequestRenewed = new TaskCompletionSource(); + var notification = HostContext.GetService(); + + // lock renew cancellation token. + using (var lockRenewalTokenSource = new CancellationTokenSource()) + using (var workerProcessCancelTokenSource = new CancellationTokenSource()) { - Trace.Info($"Stop renew job request for job {message.JobId}."); - // stop renew lock - lockRenewalTokenSource.Cancel(); - // renew job request should never blows up. - await renewJobRequest; + long requestId = message.RequestId; + Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat - // complete job request with result Cancelled - await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled); - return; - } + // start renew job request + Trace.Info($"Start renew job request {requestId} for job {message.JobId}."); + Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token); - HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}"); + // wait till first renew succeed or job request is canceled + // not even start worker if the first renew fail + await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken)); - Task workerProcessTask = null; - object _outputLock = new object(); - List workerOutput = new List(); - using (var processChannel = HostContext.CreateService()) - using (var processInvoker = HostContext.CreateService()) - { - // Start the process channel. - // It's OK if StartServer bubbles an execption after the worker process has already started. - // The worker will shutdown after 30 seconds if it hasn't received the job message. - processChannel.StartServer( - // Delegate to start the child process. - startProcess: (string pipeHandleOut, string pipeHandleIn) => - { - // Validate args. - ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut)); - ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn)); - - // Save STDOUT from worker, worker will use STDOUT report unhandle exception. - processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout) - { - if (!string.IsNullOrEmpty(stdout.Data)) - { - lock (_outputLock) - { - workerOutput.Add(stdout.Data); - } - } - }; - - // Save STDERR from worker, worker will use STDERR on crash. - processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr) - { - if (!string.IsNullOrEmpty(stderr.Data)) - { - lock (_outputLock) - { - workerOutput.Add(stderr.Data); - } - } - }; - - // Start the child process. - HostContext.WritePerfCounter("StartingWorkerProcess"); - var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin); - string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName); - workerProcessTask = processInvoker.ExecuteAsync( - workingDirectory: assemblyDirectory, - fileName: workerFileName, - arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn, - environment: null, - requireExitCodeZero: false, - outputEncoding: null, - killProcessOnCancel: true, - redirectStandardIn: null, - inheritConsoleHandler: false, - keepStandardInOpen: false, - highPriorityProcess: true, - cancellationToken: workerProcessCancelTokenSource.Token); - }); - - // Send the job request message. - // Kill the worker process if sending the job message times out. The worker - // process may have successfully received the job message. - try + if (renewJobRequest.IsCompleted) { - Trace.Info($"Send job request message to worker for job {message.JobId}."); - HostContext.WritePerfCounter($"RunnerSendingJobToWorker_{message.JobId}"); - using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout)) - { - await processChannel.SendAsync( - messageType: MessageType.NewJobRequest, - body: JsonUtility.ToString(message), - cancellationToken: csSendJobRequest.Token); - } + // renew job request task complete means we run out of retry for the first job request renew. + Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker."); + return; } - catch (OperationCanceledException) - { - // message send been cancelled. - // timeout 30 sec. kill worker. - Trace.Info($"Job request message sending for job {message.JobId} been cancelled, kill running worker."); - workerProcessCancelTokenSource.Cancel(); - try - { - await workerProcessTask; - } - catch (OperationCanceledException) - { - Trace.Info("worker process has been killed."); - } + if (jobRequestCancellationToken.IsCancellationRequested) + { Trace.Info($"Stop renew job request for job {message.JobId}."); // stop renew lock lockRenewalTokenSource.Cancel(); // renew job request should never blows up. await renewJobRequest; - // not finish the job request since the job haven't run on worker at all, we will not going to set a result to server. + // complete job request with result Cancelled + await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled); return; } - // we get first jobrequest renew succeed and start the worker process with the job message. - // send notification to machine provisioner. - var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); - var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"]; - notification.JobStarted(message.JobId, accessToken, systemConnection.Url); + HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}"); - HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}"); - - try + Task workerProcessTask = null; + object _outputLock = new object(); + List workerOutput = new List(); + using (var processChannel = HostContext.CreateService()) + using (var processInvoker = HostContext.CreateService()) { - TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded; - // wait for renewlock, worker process or cancellation token been fired. - var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken)); - if (completedTask == workerProcessTask) - { - // worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished. - int returnCode = await workerProcessTask; - Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode); - - string detailInfo = null; - if (!TaskResultUtil.IsValidReturnCode(returnCode)) + // Start the process channel. + // It's OK if StartServer bubbles an execption after the worker process has already started. + // The worker will shutdown after 30 seconds if it hasn't received the job message. + processChannel.StartServer( + // Delegate to start the child process. + startProcess: (string pipeHandleOut, string pipeHandleIn) => { - detailInfo = string.Join(Environment.NewLine, workerOutput); - Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); - await LogWorkerProcessUnhandledException(message, detailInfo); + // Validate args. + ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut)); + ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn)); + + // Save STDOUT from worker, worker will use STDOUT report unhandle exception. + processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout) + { + if (!string.IsNullOrEmpty(stdout.Data)) + { + lock (_outputLock) + { + workerOutput.Add(stdout.Data); + } + } + }; + + // Save STDERR from worker, worker will use STDERR on crash. + processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr) + { + if (!string.IsNullOrEmpty(stderr.Data)) + { + lock (_outputLock) + { + workerOutput.Add(stderr.Data); + } + } + }; + + // Start the child process. + HostContext.WritePerfCounter("StartingWorkerProcess"); + var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin); + string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName); + workerProcessTask = processInvoker.ExecuteAsync( + workingDirectory: assemblyDirectory, + fileName: workerFileName, + arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn, + environment: null, + requireExitCodeZero: false, + outputEncoding: null, + killProcessOnCancel: true, + redirectStandardIn: null, + inheritConsoleHandler: false, + keepStandardInOpen: false, + highPriorityProcess: true, + cancellationToken: workerProcessCancelTokenSource.Token); + }); + + // Send the job request message. + // Kill the worker process if sending the job message times out. The worker + // process may have successfully received the job message. + try + { + Trace.Info($"Send job request message to worker for job {message.JobId}."); + HostContext.WritePerfCounter($"RunnerSendingJobToWorker_{message.JobId}"); + using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout)) + { + await processChannel.SendAsync( + messageType: MessageType.NewJobRequest, + body: JsonUtility.ToString(message), + cancellationToken: csSendJobRequest.Token); + } + } + catch (OperationCanceledException) + { + // message send been cancelled. + // timeout 30 sec. kill worker. + Trace.Info($"Job request message sending for job {message.JobId} been cancelled, kill running worker."); + workerProcessCancelTokenSource.Cancel(); + try + { + await workerProcessTask; + } + catch (OperationCanceledException) + { + Trace.Info("worker process has been killed."); } - TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode); - Trace.Info($"finish job request for job {message.JobId} with result: {result}"); - term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {result}"); + Trace.Info($"Stop renew job request for job {message.JobId}."); + // stop renew lock + lockRenewalTokenSource.Cancel(); + // renew job request should never blows up. + await renewJobRequest; + + // not finish the job request since the job haven't run on worker at all, we will not going to set a result to server. + return; + } + + // we get first jobrequest renew succeed and start the worker process with the job message. + // send notification to machine provisioner. + var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"]; + notification.JobStarted(message.JobId, accessToken, systemConnection.Url); + + HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}"); + + try + { + TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded; + // wait for renewlock, worker process or cancellation token been fired. + var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken)); + if (completedTask == workerProcessTask) + { + // worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished. + int returnCode = await workerProcessTask; + Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode); + + string detailInfo = null; + if (!TaskResultUtil.IsValidReturnCode(returnCode)) + { + detailInfo = string.Join(Environment.NewLine, workerOutput); + Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); + await LogWorkerProcessUnhandledException(message, detailInfo); + } + + TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode); + Trace.Info($"finish job request for job {message.JobId} with result: {result}"); + term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {result}"); + + Trace.Info($"Stop renew job request for job {message.JobId}."); + // stop renew lock + lockRenewalTokenSource.Cancel(); + // renew job request should never blows up. + await renewJobRequest; + + // complete job request + await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); + + // print out unhandled exception happened in worker after we complete job request. + // when we run out of disk space, report back to server has higher priority. + if (!string.IsNullOrEmpty(detailInfo)) + { + Trace.Error("Unhandled exception happened in worker:"); + Trace.Error(detailInfo); + } + + return; + } + else if (completedTask == renewJobRequest) + { + resultOnAbandonOrCancel = TaskResult.Abandoned; + } + else + { + resultOnAbandonOrCancel = TaskResult.Canceled; + } + + // renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage) + // cancel worker gracefully first, then kill it after worker cancel timeout + try + { + Trace.Info($"Send job cancellation message to worker for job {message.JobId}."); + using (var csSendCancel = new CancellationTokenSource(_channelTimeout)) + { + var messageType = MessageType.CancelRequest; + if (HostContext.RunnerShutdownToken.IsCancellationRequested) + { + switch (HostContext.RunnerShutdownReason) + { + case ShutdownReason.UserCancelled: + messageType = MessageType.RunnerShutdown; + break; + case ShutdownReason.OperatingSystemShutdown: + messageType = MessageType.OperatingSystemShutdown; + break; + } + } + + await processChannel.SendAsync( + messageType: messageType, + body: string.Empty, + cancellationToken: csSendCancel.Token); + } + } + catch (OperationCanceledException) + { + // message send been cancelled. + Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker."); + workerProcessCancelTokenSource.Cancel(); + try + { + await workerProcessTask; + } + catch (OperationCanceledException) + { + Trace.Info("worker process has been killed."); + } + } + + // wait worker to exit + // if worker doesn't exit within timeout, then kill worker. + completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken)); + + // worker haven't exit within cancellation timeout. + if (completedTask != workerProcessTask) + { + Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker."); + workerProcessCancelTokenSource.Cancel(); + try + { + await workerProcessTask; + } + catch (OperationCanceledException) + { + Trace.Info("worker process has been killed."); + } + + // When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs. + // The runner will try to upload these logs at this time. + await TryUploadUnfinishedLogs(message); + } + + Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}"); + term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {resultOnAbandonOrCancel}"); + // complete job request with cancel result, stop renew lock, job has finished. Trace.Info($"Stop renew job request for job {message.JobId}."); // stop renew lock @@ -488,112 +591,20 @@ namespace GitHub.Runner.Listener await renewJobRequest; // complete job request - await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo); - - // print out unhandled exception happened in worker after we complete job request. - // when we run out of disk space, report back to server has higher priority. - if (!string.IsNullOrEmpty(detailInfo)) - { - Trace.Error("Unhandled exception happened in worker:"); - Trace.Error(detailInfo); - } - - return; + await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel); } - else if (completedTask == renewJobRequest) + finally { - resultOnAbandonOrCancel = TaskResult.Abandoned; + // This should be the last thing to run so we don't notify external parties until actually finished + await notification.JobCompleted(message.JobId); } - else - { - resultOnAbandonOrCancel = TaskResult.Canceled; - } - - // renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage) - // cancel worker gracefully first, then kill it after worker cancel timeout - try - { - Trace.Info($"Send job cancellation message to worker for job {message.JobId}."); - using (var csSendCancel = new CancellationTokenSource(_channelTimeout)) - { - var messageType = MessageType.CancelRequest; - if (HostContext.RunnerShutdownToken.IsCancellationRequested) - { - switch (HostContext.RunnerShutdownReason) - { - case ShutdownReason.UserCancelled: - messageType = MessageType.RunnerShutdown; - break; - case ShutdownReason.OperatingSystemShutdown: - messageType = MessageType.OperatingSystemShutdown; - break; - } - } - - await processChannel.SendAsync( - messageType: messageType, - body: string.Empty, - cancellationToken: csSendCancel.Token); - } - } - catch (OperationCanceledException) - { - // message send been cancelled. - Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker."); - workerProcessCancelTokenSource.Cancel(); - try - { - await workerProcessTask; - } - catch (OperationCanceledException) - { - Trace.Info("worker process has been killed."); - } - } - - // wait worker to exit - // if worker doesn't exit within timeout, then kill worker. - completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken)); - - // worker haven't exit within cancellation timeout. - if (completedTask != workerProcessTask) - { - Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker."); - workerProcessCancelTokenSource.Cancel(); - try - { - await workerProcessTask; - } - catch (OperationCanceledException) - { - Trace.Info("worker process has been killed."); - } - - // When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs. - // The runner will try to upload these logs at this time. - await TryUploadUnfinishedLogs(message); - } - - Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}"); - term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {resultOnAbandonOrCancel}"); - // complete job request with cancel result, stop renew lock, job has finished. - - Trace.Info($"Stop renew job request for job {message.JobId}."); - // stop renew lock - lockRenewalTokenSource.Cancel(); - // renew job request should never blows up. - await renewJobRequest; - - // complete job request - await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel); - } - finally - { - // This should be the last thing to run so we don't notify external parties until actually finished - await notification.JobCompleted(message.JobId); } } } + finally + { + Busy = false; + } } public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, TaskCompletionSource firstJobRequestRenewed, CancellationToken token) diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 53e8b8d4b..d1e43e283 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -13,7 +13,10 @@ using System.Diagnostics; using System.Runtime.InteropServices; using GitHub.Runner.Common; using GitHub.Runner.Sdk; +using GitHub.Services.WebApi; +using System.Runtime.CompilerServices; +[assembly: InternalsVisibleTo("Test")] namespace GitHub.Runner.Listener { [ServiceLocator(Default = typeof(MessageListener))] @@ -32,18 +35,30 @@ namespace GitHub.Runner.Listener private ITerminal _term; private IRunnerServer _runnerServer; private TaskAgentSession _session; + private ICredentialManager _credMgr; + private IConfigurationStore _configStore; private TimeSpan _getNextMessageRetryInterval; private readonly TimeSpan _sessionCreationRetryInterval = TimeSpan.FromSeconds(30); private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); private readonly Dictionary _sessionCreationExceptionTracker = new Dictionary(); + // Whether load credentials from .credentials_migrated file + internal bool _useMigratedCredentials; + + // need to check auth url if there is only .credentials and auth schema is OAuth + internal bool _needToCheckAuthorizationUrlUpdate; + internal Task _authorizationUrlMigrationBackgroundTask; + internal Task _authorizationUrlRollbackReattemptDelayBackgroundTask; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); _term = HostContext.GetService(); _runnerServer = HostContext.GetService(); + _credMgr = HostContext.GetService(); + _configStore = HostContext.GetService(); } public async Task CreateSessionAsync(CancellationToken token) @@ -58,8 +73,8 @@ namespace GitHub.Runner.Listener // Create connection. Trace.Info("Loading Credentials"); - var credMgr = HostContext.GetService(); - VssCredentials creds = credMgr.LoadCredentials(); + _useMigratedCredentials = !StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_SPSAUTHURL")); + VssCredentials creds = _credMgr.LoadCredentials(_useMigratedCredentials); var agent = new TaskAgentReference { @@ -74,6 +89,17 @@ namespace GitHub.Runner.Listener string errorMessage = string.Empty; bool encounteringError = false; + var originalCreds = _configStore.GetCredentials(); + var migratedCreds = _configStore.GetMigratedCredentials(); + if (migratedCreds == null) + { + _useMigratedCredentials = false; + if (originalCreds.Scheme == Constants.Configuration.OAuth) + { + _needToCheckAuthorizationUrlUpdate = true; + } + } + while (true) { token.ThrowIfCancellationRequested(); @@ -83,7 +109,7 @@ namespace GitHub.Runner.Listener Trace.Info("Connecting to the Runner Server..."); await _runnerServer.ConnectAsync(new Uri(serverUrl), creds); Trace.Info("VssConnection created"); - + _term.WriteLine(); _term.WriteSuccessMessage("Connected to GitHub"); _term.WriteLine(); @@ -101,6 +127,12 @@ namespace GitHub.Runner.Listener encounteringError = false; } + if (_needToCheckAuthorizationUrlUpdate) + { + // start background task try to get new authorization url + _authorizationUrlMigrationBackgroundTask = GetNewOAuthAuthorizationSetting(token); + } + return true; } catch (OperationCanceledException) when (token.IsCancellationRequested) @@ -120,8 +152,21 @@ namespace GitHub.Runner.Listener if (!IsSessionCreationExceptionRetriable(ex)) { - _term.WriteError($"Failed to create session. {ex.Message}"); - return false; + if (_useMigratedCredentials) + { + // migrated credentials might cause lose permission during permission check, + // we will force to use original credential and try again + _useMigratedCredentials = false; + var reattemptBackoff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromHours(24), TimeSpan.FromHours(36)); + _authorizationUrlRollbackReattemptDelayBackgroundTask = HostContext.Delay(reattemptBackoff, token); // retry migrated creds in 24-36 hours. + creds = _credMgr.LoadCredentials(false); + Trace.Error("Fallback to original credentials and try again."); + } + else + { + _term.WriteError($"Failed to create session. {ex.Message}"); + return false; + } } if (!encounteringError) //print the message only on the first error @@ -182,6 +227,51 @@ namespace GitHub.Runner.Listener encounteringError = false; continuousError = 0; } + + if (_needToCheckAuthorizationUrlUpdate && + _authorizationUrlMigrationBackgroundTask?.IsCompleted == true) + { + if (HostContext.GetService().Busy || + HostContext.GetService().Busy) + { + Trace.Info("Job or runner updates in progress, update credentials next time."); + } + else + { + try + { + var newCred = await _authorizationUrlMigrationBackgroundTask; + await _runnerServer.ConnectAsync(new Uri(_settings.ServerUrl), newCred); + Trace.Info("Updated connection to use migrated credential for next GetMessage call."); + _useMigratedCredentials = true; + _authorizationUrlMigrationBackgroundTask = null; + _needToCheckAuthorizationUrlUpdate = false; + } + catch (Exception ex) + { + Trace.Error("Fail to refresh connection with new authorization url."); + Trace.Error(ex); + } + } + } + + if (_authorizationUrlRollbackReattemptDelayBackgroundTask?.IsCompleted == true) + { + try + { + // we rolled back to use original creds about 2 days before, now it's a good time to try migrated creds again. + Trace.Info("Re-attempt to use migrated credential"); + var migratedCreds = _credMgr.LoadCredentials(); + await _runnerServer.ConnectAsync(new Uri(_settings.ServerUrl), migratedCreds); + _useMigratedCredentials = true; + _authorizationUrlRollbackReattemptDelayBackgroundTask = null; + } + catch (Exception ex) + { + Trace.Error("Fail to refresh connection with new authorization url on rollback reattempt."); + Trace.Error(ex); + } + } } catch (OperationCanceledException) when (token.IsCancellationRequested) { @@ -205,7 +295,21 @@ namespace GitHub.Runner.Listener } else if (!IsGetNextMessageExceptionRetriable(ex)) { - throw; + if (_useMigratedCredentials) + { + // migrated credentials might cause lose permission during permission check, + // we will force to use original credential and try again + _useMigratedCredentials = false; + var reattemptBackoff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromHours(24), TimeSpan.FromHours(36)); + _authorizationUrlRollbackReattemptDelayBackgroundTask = HostContext.Delay(reattemptBackoff, token); // retry migrated creds in 24-36 hours. + var originalCreds = _credMgr.LoadCredentials(false); + await _runnerServer.ConnectAsync(new Uri(_settings.ServerUrl), originalCreds); + Trace.Error("Fallback to original credentials and try again."); + } + else + { + throw; + } } else { @@ -397,5 +501,80 @@ namespace GitHub.Runner.Listener return true; } } + + private async Task GetNewOAuthAuthorizationSetting(CancellationToken token) + { + Trace.Info("Start checking oauth authorization url update."); + while (true) + { + var backoff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(45)); + await HostContext.Delay(backoff, token); + + try + { + var migratedAuthorizationUrl = await _runnerServer.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId); + if (!string.IsNullOrEmpty(migratedAuthorizationUrl)) + { + var credData = _configStore.GetCredentials(); + var clientId = credData.Data.GetValueOrDefault("clientId", null); + var currentAuthorizationUrl = credData.Data.GetValueOrDefault("authorizationUrl", null); + Trace.Info($"Current authorization url: {currentAuthorizationUrl}, new authorization url: {migratedAuthorizationUrl}"); + + if (string.Equals(currentAuthorizationUrl, migratedAuthorizationUrl, StringComparison.OrdinalIgnoreCase)) + { + // We don't need to update credentials. + Trace.Info("No needs to update authorization url"); + await Task.Delay(TimeSpan.FromMilliseconds(-1), token); + } + + var keyManager = HostContext.GetService(); + var signingCredentials = VssSigningCredentials.Create(() => keyManager.GetKey()); + + var migratedClientCredential = new VssOAuthJwtBearerClientCredential(clientId, migratedAuthorizationUrl, signingCredentials); + var migratedRunnerCredential = new VssOAuthCredential(new Uri(migratedAuthorizationUrl, UriKind.Absolute), VssOAuthGrant.ClientCredentials, migratedClientCredential); + + Trace.Info("Try connect service with Token Service OAuth endpoint."); + var runnerServer = HostContext.CreateService(); + await runnerServer.ConnectAsync(new Uri(_settings.ServerUrl), migratedRunnerCredential); + await runnerServer.GetAgentPoolsAsync(); + Trace.Info($"Successfully connected service with new authorization url."); + + var migratedCredData = new CredentialData + { + Scheme = Constants.Configuration.OAuth, + Data = + { + { "clientId", clientId }, + { "authorizationUrl", migratedAuthorizationUrl }, + { "oauthEndpointUrl", migratedAuthorizationUrl }, + }, + }; + + _configStore.SaveMigratedCredential(migratedCredData); + return migratedRunnerCredential; + } + else + { + Trace.Verbose("No authorization url updates"); + } + } + catch (Exception ex) + { + Trace.Error("Fail to get/test new authorization url."); + Trace.Error(ex); + + try + { + await _runnerServer.ReportRunnerAuthUrlErrorAsync(_settings.PoolId, _settings.AgentId, ex.ToString()); + } + catch (Exception e) + { + // best effort + Trace.Error("Fail to report the migration error"); + Trace.Error(e); + } + } + } + } } } diff --git a/src/Runner.Listener/SelfUpdater.cs b/src/Runner.Listener/SelfUpdater.cs index 6290a7023..05f856ca1 100644 --- a/src/Runner.Listener/SelfUpdater.cs +++ b/src/Runner.Listener/SelfUpdater.cs @@ -17,6 +17,7 @@ namespace GitHub.Runner.Listener [ServiceLocator(Default = typeof(SelfUpdater))] public interface ISelfUpdater : IRunnerService { + bool Busy { get; } Task SelfUpdate(AgentRefreshMessage updateMessage, IJobDispatcher jobDispatcher, bool restartInteractiveRunner, CancellationToken token); } @@ -31,6 +32,8 @@ namespace GitHub.Runner.Listener private int _poolId; private int _agentId; + public bool Busy { get; private set; } + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -45,52 +48,60 @@ namespace GitHub.Runner.Listener public async Task SelfUpdate(AgentRefreshMessage updateMessage, IJobDispatcher jobDispatcher, bool restartInteractiveRunner, CancellationToken token) { - if (!await UpdateNeeded(updateMessage.TargetVersion, token)) + Busy = true; + try { - Trace.Info($"Can't find available update package."); - return false; - } + if (!await UpdateNeeded(updateMessage.TargetVersion, token)) + { + Trace.Info($"Can't find available update package."); + return false; + } - Trace.Info($"An update is available."); + Trace.Info($"An update is available."); - // Print console line that warn user not shutdown runner. - await UpdateRunnerUpdateStateAsync("Runner update in progress, do not shutdown runner."); - await UpdateRunnerUpdateStateAsync($"Downloading {_targetPackage.Version} runner"); + // Print console line that warn user not shutdown runner. + await UpdateRunnerUpdateStateAsync("Runner update in progress, do not shutdown runner."); + await UpdateRunnerUpdateStateAsync($"Downloading {_targetPackage.Version} runner"); - await DownloadLatestRunner(token); - Trace.Info($"Download latest runner and unzip into runner root."); + await DownloadLatestRunner(token); + Trace.Info($"Download latest runner and unzip into runner root."); - // wait till all running job finish - await UpdateRunnerUpdateStateAsync("Waiting for current job finish running."); + // wait till all running job finish + await UpdateRunnerUpdateStateAsync("Waiting for current job finish running."); - await jobDispatcher.WaitAsync(token); - Trace.Info($"All running job has exited."); + await jobDispatcher.WaitAsync(token); + Trace.Info($"All running job has exited."); - // delete runner backup - DeletePreviousVersionRunnerBackup(token); - Trace.Info($"Delete old version runner backup."); + // delete runner backup + DeletePreviousVersionRunnerBackup(token); + Trace.Info($"Delete old version runner backup."); - // generate update script from template - await UpdateRunnerUpdateStateAsync("Generate and execute update script."); + // generate update script from template + await UpdateRunnerUpdateStateAsync("Generate and execute update script."); - string updateScript = GenerateUpdateScript(restartInteractiveRunner); - Trace.Info($"Generate update script into: {updateScript}"); + string updateScript = GenerateUpdateScript(restartInteractiveRunner); + Trace.Info($"Generate update script into: {updateScript}"); - // kick off update script - Process invokeScript = new Process(); + // kick off update script + Process invokeScript = new Process(); #if OS_WINDOWS invokeScript.StartInfo.FileName = WhichUtil.Which("cmd.exe", trace: Trace); invokeScript.StartInfo.Arguments = $"/c \"{updateScript}\""; #elif (OS_OSX || OS_LINUX) - invokeScript.StartInfo.FileName = WhichUtil.Which("bash", trace: Trace); - invokeScript.StartInfo.Arguments = $"\"{updateScript}\""; + invokeScript.StartInfo.FileName = WhichUtil.Which("bash", trace: Trace); + invokeScript.StartInfo.Arguments = $"\"{updateScript}\""; #endif - invokeScript.Start(); - Trace.Info($"Update script start running"); + invokeScript.Start(); + Trace.Info($"Update script start running"); - await UpdateRunnerUpdateStateAsync("Runner will exit shortly for update, should back online within 10 seconds."); + await UpdateRunnerUpdateStateAsync("Runner will exit shortly for update, should back online within 10 seconds."); - return true; + return true; + } + finally + { + Busy = false; + } } private async Task UpdateNeeded(string targetVersion, CancellationToken token) diff --git a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs index cc92e4ff2..14327a6f8 100644 --- a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs +++ b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs @@ -779,5 +779,65 @@ namespace GitHub.DistributedTask.WebApi userState: userState, cancellationToken: cancellationToken); } + + /// + /// [Preview API] + /// + /// + /// + /// + /// The cancellation token to cancel operation. + public Task GetAgentAuthUrlAsync( + int poolId, + int agentId, + object userState = null, + CancellationToken cancellationToken = default) + { + HttpMethod httpMethod = new HttpMethod("GET"); + Guid locationId = new Guid("a82a119c-1e46-44b6-8d75-c82a79cf975b"); + object routeValues = new { poolId = poolId, agentId = agentId }; + + return SendAsync( + httpMethod, + locationId, + routeValues: routeValues, + version: new ApiResourceVersion(6.0, 1), + userState: userState, + cancellationToken: cancellationToken); + } + + /// + /// [Preview API] + /// + /// + /// + /// + /// + /// The cancellation token to cancel operation. + [EditorBrowsable(EditorBrowsableState.Never)] + public virtual async Task ReportAgentAuthUrlMigrationErrorAsync( + int poolId, + int agentId, + string error, + object userState = null, + CancellationToken cancellationToken = default) + { + HttpMethod httpMethod = new HttpMethod("POST"); + Guid locationId = new Guid("a82a119c-1e46-44b6-8d75-c82a79cf975b"); + object routeValues = new { poolId = poolId, agentId = agentId }; + HttpContent content = new ObjectContent(error, new VssJsonMediaTypeFormatter(true)); + + using (HttpResponseMessage response = await SendAsync( + httpMethod, + locationId, + routeValues: routeValues, + version: new ApiResourceVersion(6.0, 1), + userState: userState, + cancellationToken: cancellationToken, + content: content).ConfigureAwait(false)) + { + return; + } + } } } diff --git a/src/Sdk/DTWebApi/WebApi/TaskResourceIds.cs b/src/Sdk/DTWebApi/WebApi/TaskResourceIds.cs index 24435a807..8c0ae1e3b 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskResourceIds.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskResourceIds.cs @@ -260,5 +260,8 @@ namespace GitHub.DistributedTask.WebApi public static readonly Guid CheckpointResourcesLocationId = new Guid(CheckpointResourcesLocationIdString); public const String CheckpointResourcesResource = "references"; + public static readonly Guid RunnerAuthUrl = new Guid("{A82A119C-1E46-44B6-8D75-C82A79CF975B}"); + public const string RunnerAuthUrlResource = "authurl"; + } } diff --git a/src/Test/L0/Listener/MessageListenerL0.cs b/src/Test/L0/Listener/MessageListenerL0.cs index e6da526cd..ea358e73e 100644 --- a/src/Test/L0/Listener/MessageListenerL0.cs +++ b/src/Test/L0/Listener/MessageListenerL0.cs @@ -11,6 +11,8 @@ using Xunit; using System.Threading; using System.Reflection; using System.Collections.Generic; +using System.IO; +using System.Security.Cryptography; namespace GitHub.Runner.Common.Tests.Listener { @@ -20,6 +22,7 @@ namespace GitHub.Runner.Common.Tests.Listener private Mock _config; private Mock _runnerServer; private Mock _credMgr; + private Mock _store; public MessageListenerL0() { @@ -28,6 +31,7 @@ namespace GitHub.Runner.Common.Tests.Listener _config.Setup(x => x.LoadSettings()).Returns(_settings); _runnerServer = new Mock(); _credMgr = new Mock(); + _store = new Mock(); } private TestHostContext CreateTestContext([CallerMemberName] String testName = "") @@ -36,6 +40,7 @@ namespace GitHub.Runner.Common.Tests.Listener tc.SetSingleton(_config.Object); tc.SetSingleton(_runnerServer.Object); tc.SetSingleton(_credMgr.Object); + tc.SetSingleton(_store.Object); return tc; } @@ -58,7 +63,9 @@ namespace GitHub.Runner.Common.Tests.Listener tokenSource.Token)) .Returns(Task.FromResult(expectedSession)); - _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); // Act. MessageListener listener = new MessageListener(); @@ -100,7 +107,9 @@ namespace GitHub.Runner.Common.Tests.Listener tokenSource.Token)) .Returns(Task.FromResult(expectedSession)); - _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); // Act. MessageListener listener = new MessageListener(); @@ -145,7 +154,9 @@ namespace GitHub.Runner.Common.Tests.Listener tokenSource.Token)) .Returns(Task.FromResult(expectedSession)); - _credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials()); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); // Act. MessageListener listener = new MessageListener(); @@ -200,5 +211,1295 @@ namespace GitHub.Runner.Common.Tests.Listener _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); } } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalCredential() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _runnerServer + .Setup(x => x.GetRunnerAuthUrlAsync( + _settings.PoolId, + _settings.AgentId)) + .Returns(async () => + { + await Task.Delay(10); + return ""; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithMigratedCredential() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithHostedCredential() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + _store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken }); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithMigratedCredentialFallBackOriginalSucceed() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + 123, + It.Is(y => y != null), + tokenSource.Token)) + .Callback(() => { _settings.PoolId = 1234; }) + .Throws(new TaskAgentPoolNotFoundException("L0 Pool not found")); + + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + 1234, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var originalVssCred = new VssCredentials(); + var migratedVssCred = new VssCredentials(); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(migratedVssCred); + _credMgr.Setup(x => x.LoadCredentials(false)).Returns(originalVssCred); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + It.IsAny(), + It.Is(y => y != null), + tokenSource.Token), Times.Exactly(2)); + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + originalVssCred), Times.Once); + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + migratedVssCred), Times.Once); + + Assert.False(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.NotNull(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithMigratedCredentialFallBackOriginalStillFailed() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Throws(new TaskAgentPoolNotFoundException("L0 Pool not found")); + + var originalVssCred = new VssCredentials(); + var migratedVssCred = new VssCredentials(); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(migratedVssCred); + _credMgr.Setup(x => x.LoadCredentials(false)).Returns(originalVssCred); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.False(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Exactly(2)); + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + originalVssCred), Times.Once); + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + migratedVssCred), Times.Once); + + Assert.False(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.NotNull(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageWaitForMigtateToMigrated() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _runnerServer + .Setup(x => x.GetRunnerAuthUrlAsync( + _settings.PoolId, + _settings.AgentId)) + .Returns(async () => + { + await Task.Delay(10); + return ""; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + return messages.Dequeue(); + }); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.AtLeast(2)); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Once); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.DoesNotContain(traceContent, x => x.Contains("Try connect service with migrated OAuth endpoint.")); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageMigtateToMigrated() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _runnerServer + .Setup(x => x.GetRunnerAuthUrlAsync( + _settings.PoolId, + _settings.AgentId)) + .Returns(async () => + { + await Task.Delay(10); + return "https://t.server"; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + return messages.Dequeue(); + }); + + var newRunnerServer = new Mock(); + tc.EnqueueInstance(newRunnerServer.Object); + + var keyManager = new Mock(); + keyManager.Setup(x => x.GetKey()).Returns(new RSACryptoServiceProvider(2048)); + tc.SetSingleton(keyManager.Object); + + tc.SetSingleton(new Mock().Object); + tc.SetSingleton(new Mock().Object); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Once); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Exactly(2)); + + newRunnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Once); + + newRunnerServer + .Verify(x => x.GetAgentPoolsAsync(null, TaskAgentPoolType.Automation), Times.Once); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.Contains(traceContent, x => x.Contains("Try connect service with Token Service OAuth endpoint.")); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageMigtateToMigratedWaitForIdle() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _runnerServer + .Setup(x => x.GetRunnerAuthUrlAsync( + _settings.PoolId, + _settings.AgentId)) + .Returns(async () => + { + await Task.Delay(10); + return "https://t.server"; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + var busy = true; + var counter = 0; + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + if (++counter == 4) + { + busy = false; + } + return messages.Dequeue(); + }); + + var newRunnerServer = new Mock(); + tc.EnqueueInstance(newRunnerServer.Object); + + var keyManager = new Mock(); + keyManager.Setup(x => x.GetKey()).Returns(new RSACryptoServiceProvider(2048)); + tc.SetSingleton(keyManager.Object); + + var jobDispatcher = new Mock(); + + jobDispatcher.Setup(x => x.Busy).Returns(() => + { + return busy; + }); + tc.SetSingleton(jobDispatcher.Object); + tc.SetSingleton(new Mock().Object); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Once); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Exactly(2)); + + newRunnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Once); + + newRunnerServer + .Verify(x => x.GetAgentPoolsAsync(null, TaskAgentPoolType.Automation), Times.Once); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.Contains(traceContent, x => x.Contains("Job or runner updates in progress, update credentials next time.")); + Assert.Contains(traceContent, x => x.Contains("Try connect service with Token Service OAuth endpoint.")); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithMigratedGetMessageNotMigrateAgain() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + _runnerServer + .Setup(x => x.GetRunnerAuthUrlAsync( + _settings.PoolId, + _settings.AgentId)) + .Returns(async () => + { + await Task.Delay(10); + return "https://t.server"; + }); + + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials()); + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(migratedCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData)); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + return messages.Dequeue(); + }); + + var newRunnerServer = new Mock(); + tc.EnqueueInstance(newRunnerServer.Object); + + var keyManager = new Mock(); + keyManager.Setup(x => x.GetKey()).Returns(new RSACryptoServiceProvider(2048)); + tc.SetSingleton(keyManager.Object); + + tc.SetSingleton(new Mock().Object); + tc.SetSingleton(new Mock().Object); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Once); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Once); + + newRunnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Never); + + newRunnerServer + .Verify(x => x.GetAgentPoolsAsync(null, TaskAgentPoolType.Automation), Times.Never); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.Contains(traceContent, x => x.Contains("No needs to update authorization url")); + + Assert.False(listener._useMigratedCredentials); + Assert.True(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.NotNull(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageMigrateToMigratedFallbackToOriginal() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var originalVssCred = new VssCredentials(); + var migratedVssCred = new VssCredentials(); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(migratedVssCred); + _credMgr.Setup(x => x.LoadCredentials(false)).Returns(originalVssCred); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + var counter = 0; + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + counter++; + + if (counter == 5) + { + throw new TaskAgentNotFoundException("L0 runner not found"); + } + + if (counter == 6) + { + Assert.NotNull(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.False(listener._useMigratedCredentials); + } + + return messages.Dequeue(); + }); + + var newRunnerServer = new Mock(); + tc.EnqueueInstance(newRunnerServer.Object); + + var keyManager = new Mock(); + keyManager.Setup(x => x.GetKey()).Returns(new RSACryptoServiceProvider(2048)); + tc.SetSingleton(keyManager.Object); + + tc.SetSingleton(new Mock().Object); + tc.SetSingleton(new Mock().Object); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length + 1)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Never); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.AtLeast(2)); + + newRunnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Never); + + newRunnerServer + .Verify(x => x.GetAgentPoolsAsync(null, TaskAgentPoolType.Automation), Times.Never); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.Contains(traceContent, x => x.Contains("Fallback to original credentials and try again.")); + + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageMigrateToMigratedFallbackToOriginalReattemptMigrated() + { + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var originalVssCred = new VssCredentials(); + var migratedVssCred = new VssCredentials(); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(migratedVssCred); + _credMgr.Setup(x => x.LoadCredentials(false)).Returns(originalVssCred); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + var counter = 0; + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(200); + counter++; + + if (counter == 2) + { + throw new TaskAgentNotFoundException("L0 runner not found"); + } + + if (counter == 3) + { + Assert.NotNull(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + } + + return messages.Dequeue(); + }); + + var newRunnerServer = new Mock(); + tc.EnqueueInstance(newRunnerServer.Object); + + var keyManager = new Mock(); + keyManager.Setup(x => x.GetKey()).Returns(new RSACryptoServiceProvider(2048)); + tc.SetSingleton(keyManager.Object); + + tc.SetSingleton(new Mock().Object); + tc.SetSingleton(new Mock().Object); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length + 1)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Never); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Exactly(3)); + + newRunnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Never); + + newRunnerServer + .Verify(x => x.GetAgentPoolsAsync(null, TaskAgentPoolType.Automation), Times.Never); + + var tempLog = Path.GetTempFileName(); + File.Copy(tc.TraceFileName, tempLog, true); + var traceContent = File.ReadAllLines(tempLog); + Assert.Contains(traceContent, x => x.Contains("Fallback to original credentials and try again.")); + Assert.Contains(traceContent, x => x.Contains("Re-attempt to use migrated credential")); + + Assert.True(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void CreateSessionWithOriginalGetMessageWithOriginalEnvOverwrite() + { + try + { + Environment.SetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_SPSAUTHURL", "1"); + using (TestHostContext tc = CreateTestContext()) + using (var tokenSource = new CancellationTokenSource()) + { + Tracing trace = tc.GetTrace(); + + // Arrange. + var expectedSession = new TaskAgentSession(); + _runnerServer + .Setup(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token)) + .Returns(Task.FromResult(expectedSession)); + + var originalVssCred = new VssCredentials(); + var migratedVssCred = new VssCredentials(); + _credMgr.Setup(x => x.LoadCredentials(false)).Returns(originalVssCred); + _credMgr.Setup(x => x.LoadCredentials(true)).Returns(migratedVssCred); + + var originalCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + originalCred.Data["authorizationUrl"] = "https://s.server"; + originalCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + var migratedCred = new CredentialData() { Scheme = Constants.Configuration.OAuth }; + migratedCred.Data["authorizationUrl"] = "https://t.server"; + migratedCred.Data["clientId"] = "d842fd7b-61b0-4a80-96b4-f2797c353897"; + + _store.Setup(x => x.GetCredentials()).Returns(originalCred); + _store.Setup(x => x.GetMigratedCredentials()).Returns(migratedCred); + + // Act. + MessageListener listener = new MessageListener(); + listener.Initialize(tc); + + bool result = await listener.CreateSessionAsync(tokenSource.Token); + trace.Info("result: {0}", result); + + // Assert. + Assert.True(result); + _runnerServer + .Verify(x => x.CreateAgentSessionAsync( + _settings.PoolId, + It.Is(y => y != null), + tokenSource.Token), Times.Once()); + + Assert.False(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + + var arMessages = new TaskAgentMessage[] + { + new TaskAgentMessage + { + Body = "somebody1", + MessageId = 4234, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + }, + new TaskAgentMessage + { + Body = "somebody2", + MessageId = 4235, + MessageType = JobCancelMessage.MessageType + }, + null, //should be skipped by GetNextMessageAsync implementation + null, + new TaskAgentMessage + { + Body = "somebody3", + MessageId = 4236, + MessageType = JobRequestMessageTypes.PipelineAgentJobRequest + } + }; + var messages = new Queue(arMessages); + + _runnerServer + .Setup(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + { + await Task.Delay(1); + return messages.Dequeue(); + }); + + TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token); + TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token); + Assert.Equal(arMessages[0], message1); + Assert.Equal(arMessages[1], message2); + Assert.Equal(arMessages[4], message3); + + //Assert + _runnerServer + .Verify(x => x.GetAgentMessageAsync( + _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + + _runnerServer + .Verify(x => x.GetRunnerAuthUrlAsync(_settings.PoolId, _settings.AgentId), Times.Never); + + _runnerServer + .Verify(x => x.ConnectAsync( + It.IsAny(), + It.IsAny()), Times.Once); + + Assert.False(listener._useMigratedCredentials); + Assert.False(listener._needToCheckAuthorizationUrlUpdate); + Assert.Null(listener._authorizationUrlRollbackReattemptDelayBackgroundTask); + Assert.Null(listener._authorizationUrlMigrationBackgroundTask); + } + } + finally + { + Environment.SetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_SPSAUTHURL", null); + } + } } } diff --git a/src/Test/L0/TestHostContext.cs b/src/Test/L0/TestHostContext.cs index 622c19bde..88c38b7c7 100644 --- a/src/Test/L0/TestHostContext.cs +++ b/src/Test/L0/TestHostContext.cs @@ -198,7 +198,7 @@ namespace GitHub.Runner.Common.Tests case WellKnownDirectory.Tools: path = Environment.GetEnvironmentVariable("RUNNER_TOOL_CACHE"); - + if (string.IsNullOrEmpty(path)) { path = Path.Combine(