diff --git a/src/Runner.Common/RunServer.cs b/src/Runner.Common/RunServer.cs index 879045055..0f9fa1a01 100644 --- a/src/Runner.Common/RunServer.cs +++ b/src/Runner.Common/RunServer.cs @@ -5,7 +5,6 @@ using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; using GitHub.Services.Common; -using GitHub.Services.WebApi; using Sdk.WebApi.WebApi.RawClient; namespace GitHub.Runner.Common @@ -16,6 +15,8 @@ namespace GitHub.Runner.Common Task ConnectAsync(Uri serverUrl, VssCredentials credentials); Task GetJobMessageAsync(string id, CancellationToken token); + + Task CompleteJobAsync(Guid planId, Guid jobId, CancellationToken token); } public sealed class RunServer : RunnerService, IRunServer @@ -55,5 +56,11 @@ namespace GitHub.Runner.Common return jobMessage; } + public Task CompleteJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken) + { + CheckConnection(); + return RetryRequest( + async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, cancellationToken), cancellationToken); + } } } diff --git a/src/Runner.Common/RunnerService.cs b/src/Runner.Common/RunnerService.cs index 403764b03..a0cb96a0e 100644 --- a/src/Runner.Common/RunnerService.cs +++ b/src/Runner.Common/RunnerService.cs @@ -68,6 +68,19 @@ namespace GitHub.Runner.Common throw new InvalidOperationException(nameof(EstablishVssConnection)); } + protected async Task RetryRequest(Func func, + CancellationToken cancellationToken, + int maxRetryAttemptsCount = 5 + ) + { + async Task wrappedFunc() + { + await func(); + return Unit.Value; + } + await RetryRequest(wrappedFunc, cancellationToken, maxRetryAttemptsCount); + } + protected async Task RetryRequest(Func> func, CancellationToken cancellationToken, int maxRetryAttemptsCount = 5 @@ -85,7 +98,7 @@ namespace GitHub.Runner.Common // TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122 catch (Exception ex) when (retryCount < maxRetryAttemptsCount) { - Trace.Error("Catch exception during get full job message"); + Trace.Error("Catch exception during request"); Trace.Error(ex); var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15)); Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left."); diff --git a/src/Runner.Common/Unit.cs b/src/Runner.Common/Unit.cs new file mode 100644 index 000000000..31f4ad55f --- /dev/null +++ b/src/Runner.Common/Unit.cs @@ -0,0 +1,8 @@ +// Represents absence of value. +namespace GitHub.Runner.Common +{ + public readonly struct Unit + { + public static readonly Unit Value = default; + } +} \ No newline at end of file diff --git a/src/Runner.Sdk/Util/VssUtil.cs b/src/Runner.Sdk/Util/VssUtil.cs index 6bfc7f131..12cc5c118 100644 --- a/src/Runner.Sdk/Util/VssUtil.cs +++ b/src/Runner.Sdk/Util/VssUtil.cs @@ -116,7 +116,7 @@ namespace GitHub.Runner.Sdk // settings are applied to an HttpRequestMessage. settings.AcceptLanguages.Remove(CultureInfo.InvariantCulture); - RawConnection connection = new(serverUri, new RawHttpMessageHandler(credentials.ToOAuthCredentials(), settings), additionalDelegatingHandler); + RawConnection connection = new(serverUri, new RawHttpMessageHandler(credentials.Federated, settings), additionalDelegatingHandler); return connection; } diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index 562ef9068..49150c5ca 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -6,7 +6,6 @@ using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; -using GitHub.DistributedTask.Pipelines.ContextData; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Common; using GitHub.Runner.Common.Util; @@ -40,21 +39,34 @@ namespace GitHub.Runner.Worker Trace.Info("Job ID {0}", message.JobId); DateTime jobStartTimeUtc = DateTime.UtcNow; + IRunnerService server = null; ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase)) + { + var runServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + await runServer.ConnectAsync(systemConnection.Url, jobServerCredential); + server = runServer; + } + else + { + // Setup the job server and job server queue. + var jobServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + Uri jobServerUrl = systemConnection.Url; - // Setup the job server and job server queue. - var jobServer = HostContext.GetService(); - VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); - Uri jobServerUrl = systemConnection.Url; + Trace.Info($"Creating job server with URL: {jobServerUrl}"); + // jobServerQueue is the throttling reporter. + _jobServerQueue = HostContext.GetService(); + VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) }); + await jobServer.ConnectAsync(jobConnection); - Trace.Info($"Creating job server with URL: {jobServerUrl}"); - // jobServerQueue is the throttling reporter. - _jobServerQueue = HostContext.GetService(); - VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) }); - await jobServer.ConnectAsync(jobConnection); + _jobServerQueue.Start(message); + server = jobServer; + } + - _jobServerQueue.Start(message); HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}"); IExecutionContext jobContext = null; @@ -99,7 +111,7 @@ namespace GitHub.Runner.Worker { Trace.Error(ex); jobContext.Error(ex); - return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); + return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed); } if (jobContext.Global.WriteDebug) @@ -136,7 +148,7 @@ namespace GitHub.Runner.Worker // don't log error issue to job ExecutionContext, since server owns the job level issue Trace.Error($"Job is cancelled during initialize."); Trace.Error($"Caught exception: {ex}"); - return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled); + return await CompleteJobAsync(server, jobContext, message, TaskResult.Canceled); } catch (Exception ex) { @@ -144,7 +156,7 @@ namespace GitHub.Runner.Worker // don't log error issue to job ExecutionContext, since server owns the job level issue Trace.Error($"Job initialize failed."); Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}"); - return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); + return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed); } // trace out all steps @@ -181,7 +193,7 @@ namespace GitHub.Runner.Worker // Log the error and fail the job. Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}"); jobContext.Error(ex); - return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); + return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed); } finally { @@ -192,7 +204,7 @@ namespace GitHub.Runner.Worker Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}"); Trace.Info("Completing the job execution context."); - return await CompleteJobAsync(jobServer, jobContext, message); + return await CompleteJobAsync(server, jobContext, message); } finally { @@ -206,6 +218,66 @@ namespace GitHub.Runner.Worker } } + private async Task CompleteJobAsync(IRunnerService server, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null) + { + if (server is IRunServer runServer) + { + return await CompleteJobAsync(runServer, jobContext, message, taskResult); + } + else if (server is IJobServer jobServer) + { + return await CompleteJobAsync(jobServer, jobContext, message, taskResult); + } + else + { + throw new NotSupportedException(); + } + } + + private async Task CompleteJobAsync(IRunServer runServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null) + { + jobContext.Debug($"Finishing: {message.JobDisplayName}"); + TaskResult result = jobContext.Complete(taskResult); + if (jobContext.Global.Variables.TryGetValue("Node12ActionsWarnings", out var node12Warnings)) + { + var actions = string.Join(", ", StringUtil.ConvertFromJson>(node12Warnings)); + jobContext.Warning(string.Format(Constants.Runner.Node12DetectedAfterEndOfLife, actions)); + } + + // Make sure to clean temp after file upload since they may be pending fileupload still use the TEMP dir. + _tempDirectoryManager?.CleanupTempDirectory(); + + // Load any upgrade telemetry + LoadFromTelemetryFile(jobContext.Global.JobTelemetry); + + // Make sure we don't submit secrets as telemetry + MaskTelemetrySecrets(jobContext.Global.JobTelemetry); + + Trace.Info($"Raising job completed against run service"); + var completeJobRetryLimit = 5; + var exceptions = new List(); + while (completeJobRetryLimit-- > 0) + { + try + { + await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, default); + return result; + } + catch (Exception ex) + { + Trace.Error($"Catch exception while attempting to complete job {message.JobId}, job request {message.RequestId}."); + Trace.Error(ex); + exceptions.Add(ex); + } + + // delay 5 seconds before next retry. + await Task.Delay(TimeSpan.FromSeconds(5)); + } + + // rethrow exceptions from all attempts. + throw new AggregateException(exceptions); + } + private async Task CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null) { jobContext.Debug($"Finishing: {message.JobDisplayName}"); diff --git a/src/Sdk/Common/Common/Authentication/VssCredentialsExtension.cs b/src/Sdk/Common/Common/Authentication/VssCredentialsExtension.cs deleted file mode 100644 index c91fdb70a..000000000 --- a/src/Sdk/Common/Common/Authentication/VssCredentialsExtension.cs +++ /dev/null @@ -1,20 +0,0 @@ -using GitHub.Services.OAuth; - -namespace GitHub.Services.Common -{ - public static class VssCredentialsExtension - { - public static VssOAuthCredential ToOAuthCredentials( - this VssCredentials credentials) - { - if (credentials.Federated.CredentialType == VssCredentialsType.OAuth) - { - return credentials.Federated as VssOAuthCredential; - } - else - { - return null; - } - } - } -} diff --git a/src/Sdk/Common/Common/RawHttpMessageHandler.cs b/src/Sdk/Common/Common/RawHttpMessageHandler.cs index 286fac840..8b3e75e18 100644 --- a/src/Sdk/Common/Common/RawHttpMessageHandler.cs +++ b/src/Sdk/Common/Common/RawHttpMessageHandler.cs @@ -12,20 +12,20 @@ namespace GitHub.Services.Common public class RawHttpMessageHandler: HttpMessageHandler { public RawHttpMessageHandler( - VssOAuthCredential credentials) + FederatedCredential credentials) : this(credentials, new RawClientHttpRequestSettings()) { } public RawHttpMessageHandler( - VssOAuthCredential credentials, + FederatedCredential credentials, RawClientHttpRequestSettings settings) : this(credentials, settings, new HttpClientHandler()) { } public RawHttpMessageHandler( - VssOAuthCredential credentials, + FederatedCredential credentials, RawClientHttpRequestSettings settings, HttpMessageHandler innerHandler) { @@ -56,7 +56,7 @@ namespace GitHub.Services.Common /// /// Gets the credentials associated with this handler. /// - public VssOAuthCredential Credentials + public FederatedCredential Credentials { get; private set; @@ -111,7 +111,7 @@ namespace GitHub.Services.Common // Ensure that we attempt to use the most appropriate authentication mechanism by default. if (m_tokenProvider == null) { - m_tokenProvider = this.Credentials.GetTokenProvider(request.RequestUri); + m_tokenProvider = this.Credentials.CreateTokenProvider(request.RequestUri, null, null); } } @@ -254,7 +254,7 @@ namespace GitHub.Services.Common private CredentialWrapper m_credentialWrapper; private object m_thisLock; private const Int32 m_maxAuthRetries = 3; - private VssOAuthTokenProvider m_tokenProvider; + private IssuedTokenProvider m_tokenProvider; //.Net Core does not attempt NTLM schema on Linux, unless ICredentials is a CredentialCache instance //This workaround may not be needed after this corefx fix is consumed: https://github.com/dotnet/corefx/pull/7923 diff --git a/src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs b/src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs index 74650a12e..3f93e75e5 100644 --- a/src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs +++ b/src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs @@ -42,9 +42,10 @@ namespace GitHub.DistributedTask.Pipelines IList fileTable, TemplateToken jobOutputs, IList defaults, - ActionsEnvironmentReference actionsEnvironment) + ActionsEnvironmentReference actionsEnvironment, + String messageType = JobRequestMessageTypes.PipelineAgentJobRequest) { - this.MessageType = JobRequestMessageTypes.PipelineAgentJobRequest; + this.MessageType = messageType; this.Plan = plan; this.JobId = jobId; this.JobDisplayName = jobDisplayName; diff --git a/src/Sdk/DTWebApi/WebApi/RunServiceHttpClient.cs b/src/Sdk/DTWebApi/WebApi/RunServiceHttpClient.cs index 64c1784e6..7db46d7be 100644 --- a/src/Sdk/DTWebApi/WebApi/RunServiceHttpClient.cs +++ b/src/Sdk/DTWebApi/WebApi/RunServiceHttpClient.cs @@ -62,14 +62,38 @@ namespace GitHub.DistributedTask.WebApi StreamID = messageId }; + requestUri = new Uri(requestUri, "acquirejob"); + var payloadJson = JsonUtility.ToString(payload); var requestContent = new StringContent(payloadJson, System.Text.Encoding.UTF8, "application/json"); return SendAsync( httpMethod, - additionalHeaders: null, requestUri: requestUri, content: requestContent, cancellationToken: cancellationToken); } + + public Task CompleteJobAsync( + Uri requestUri, + Guid planId, + Guid jobId, + CancellationToken cancellationToken = default) + { + HttpMethod httpMethod = new HttpMethod("POST"); + var payload = new { + PlanId = planId, + JobId = jobId + }; + + requestUri = new Uri(requestUri, "completejob"); + + var payloadJson = JsonUtility.ToString(payload); + var requestContent = new StringContent(payloadJson, System.Text.Encoding.UTF8, "application/json"); + return SendAsync( + httpMethod, + requestUri, + content: requestContent, + cancellationToken: cancellationToken); + } } } diff --git a/src/Sdk/WebApi/WebApi/RawHttpClientBase.cs b/src/Sdk/WebApi/WebApi/RawHttpClientBase.cs index ffeecc808..8054a8114 100644 --- a/src/Sdk/WebApi/WebApi/RawHttpClientBase.cs +++ b/src/Sdk/WebApi/WebApi/RawHttpClientBase.cs @@ -101,6 +101,17 @@ namespace Sdk.WebApi.WebApi } } + protected Task SendAsync( + HttpMethod method, + Uri requestUri, + HttpContent content = null, + IEnumerable> queryParameters = null, + Object userState = null, + CancellationToken cancellationToken = default(CancellationToken)) + { + return SendAsync(method, null, requestUri, content, queryParameters, userState, cancellationToken); + } + protected async Task SendAsync( HttpMethod method, IEnumerable> additionalHeaders, diff --git a/src/Test/L0/Worker/JobRunnerL0.cs b/src/Test/L0/Worker/JobRunnerL0.cs index 152c816aa..42616e3d5 100644 --- a/src/Test/L0/Worker/JobRunnerL0.cs +++ b/src/Test/L0/Worker/JobRunnerL0.cs @@ -16,9 +16,10 @@ namespace GitHub.Runner.Common.Tests.Worker private IExecutionContext _jobEc; private JobRunner _jobRunner; private List _initResult = new(); - private Pipelines.AgentJobRequestMessage _message; private CancellationTokenSource _tokenSource; private Mock _jobServer; + + private Mock _runServer; private Mock _jobServerQueue; private Mock _config; private Mock _extensions; @@ -38,6 +39,7 @@ namespace GitHub.Runner.Common.Tests.Worker _extensions = new Mock(); _jobExtension = new Mock(); _jobServer = new Mock(); + _runServer = new Mock(); _jobServerQueue = new Mock(); _stepRunner = new Mock(); _logger = new Mock(); @@ -55,33 +57,6 @@ namespace GitHub.Runner.Common.Tests.Worker _jobRunner = new JobRunner(); _jobRunner.Initialize(hc); - TaskOrchestrationPlanReference plan = new(); - TimelineReference timeline = new Timeline(Guid.NewGuid()); - Guid jobId = Guid.NewGuid(); - _message = new Pipelines.AgentJobRequestMessage(plan, timeline, jobId, testName, testName, null, null, null, new Dictionary(), new List(), new Pipelines.JobResources(), new Pipelines.ContextData.DictionaryContextData(), new Pipelines.WorkspaceOptions(), new List(), null, null, null, null); - _message.Variables[Constants.Variables.System.Culture] = "en-US"; - _message.Resources.Endpoints.Add(new ServiceEndpoint() - { - Name = WellKnownServiceEndpointNames.SystemVssConnection, - Url = new Uri("https://pipelines.actions.githubusercontent.com"), - Authorization = new EndpointAuthorization() - { - Scheme = "Test", - Parameters = { - {"AccessToken", "token"} - } - }, - - }); - - _message.Resources.Repositories.Add(new Pipelines.RepositoryResource() - { - Alias = Pipelines.PipelineConstants.SelfAlias, - Id = "github", - Version = "sha1" - }); - _message.ContextData.Add("github", new Pipelines.ContextData.DictionaryContextData()); - _initResult.Clear(); _jobExtension.Setup(x => x.InitializeJob(It.IsAny(), It.IsAny())). @@ -102,6 +77,7 @@ namespace GitHub.Runner.Common.Tests.Worker hc.SetSingleton(_config.Object); hc.SetSingleton(_jobServer.Object); + hc.SetSingleton(_runServer.Object); hc.SetSingleton(_jobServerQueue.Object); hc.SetSingleton(_stepRunner.Object); hc.SetSingleton(_extensions.Object); @@ -113,6 +89,43 @@ namespace GitHub.Runner.Common.Tests.Worker return hc; } + private Pipelines.AgentJobRequestMessage GetMessage(String messageType = JobRequestMessageTypes.PipelineAgentJobRequest, [CallerMemberName] String testName = "") + { + TaskOrchestrationPlanReference plan = new(); + TimelineReference timeline = new Timeline(Guid.NewGuid()); + Guid jobId = Guid.NewGuid(); + var message = new Pipelines.AgentJobRequestMessage( + plan, + timeline, + jobId, + testName, + testName, null, null, null, new Dictionary(), new List(), new Pipelines.JobResources(), new Pipelines.ContextData.DictionaryContextData(), new Pipelines.WorkspaceOptions(), new List(), null, null, null, null, + messageType: messageType); + message.Variables[Constants.Variables.System.Culture] = "en-US"; + message.Resources.Endpoints.Add(new ServiceEndpoint() + { + Name = WellKnownServiceEndpointNames.SystemVssConnection, + Url = new Uri("https://pipelines.actions.githubusercontent.com"), + Authorization = new EndpointAuthorization() + { + Scheme = "Test", + Parameters = { + {"AccessToken", "token"} + } + }, + + }); + + message.Resources.Repositories.Add(new Pipelines.RepositoryResource() + { + Alias = Pipelines.PipelineConstants.SelfAlias, + Id = "github", + Version = "sha1" + }); + message.ContextData.Add("github", new Pipelines.ContextData.DictionaryContextData()); + return message; + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] @@ -123,7 +136,7 @@ namespace GitHub.Runner.Common.Tests.Worker _jobExtension.Setup(x => x.InitializeJob(It.IsAny(), It.IsAny())) .Throws(new Exception()); - await _jobRunner.RunAsync(_message, _tokenSource.Token); + await _jobRunner.RunAsync(GetMessage(), _tokenSource.Token); Assert.Equal(TaskResult.Failed, _jobEc.Result); _stepRunner.Verify(x => x.RunAsync(It.IsAny()), Times.Never); @@ -141,11 +154,24 @@ namespace GitHub.Runner.Common.Tests.Worker .Throws(new OperationCanceledException()); _tokenSource.Cancel(); - await _jobRunner.RunAsync(_message, _tokenSource.Token); + await _jobRunner.RunAsync(GetMessage(), _tokenSource.Token); Assert.Equal(TaskResult.Canceled, _jobEc.Result); _stepRunner.Verify(x => x.RunAsync(It.IsAny()), Times.Never); } } + + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task WorksWithRunnerJobRequestMessageType() + { + using (TestHostContext hc = CreateTestContext()) + { + var message = GetMessage(JobRequestMessageTypes.RunnerJobRequest); + await _jobRunner.RunAsync(message, _tokenSource.Token); + Assert.Equal(TaskResult.Succeeded, _jobEc.Result); + } + } } }