using GitHub.DistributedTask.WebApi; using Pipelines = GitHub.DistributedTask.Pipelines; using GitHub.Runner.Common.Util; using GitHub.Services.Common; using GitHub.Services.WebApi; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Net.Http; using GitHub.Runner.Common; using GitHub.Runner.Sdk; namespace GitHub.Runner.Worker { [ServiceLocator(Default = typeof(JobRunner))] public interface IJobRunner : IRunnerService { Task RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken); } public sealed class JobRunner : RunnerService, IJobRunner { private IJobServerQueue _jobServerQueue; private ITempDirectoryManager _tempDirectoryManager; public async Task RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken) { // Validate parameters. Trace.Entering(); ArgUtil.NotNull(message, nameof(message)); ArgUtil.NotNull(message.Resources, nameof(message.Resources)); ArgUtil.NotNull(message.Variables, nameof(message.Variables)); ArgUtil.NotNull(message.Steps, nameof(message.Steps)); Trace.Info("Job ID {0}", message.JobId); DateTime jobStartTimeUtc = DateTime.UtcNow; ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); // Setup the job server and job server queue. var jobServer = HostContext.GetService(); VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); Uri jobServerUrl = systemConnection.Url; Trace.Info($"Creating job server with URL: {jobServerUrl}"); // jobServerQueue is the throttling reporter. _jobServerQueue = HostContext.GetService(); VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) }); await jobServer.ConnectAsync(jobConnection); _jobServerQueue.Start(message); HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}"); IExecutionContext jobContext = null; CancellationTokenRegistration? runnerShutdownRegistration = null; try { // Create the job execution context. jobContext = HostContext.CreateService(); jobContext.InitializeJob(message, jobRequestCancellationToken); Trace.Info("Starting the job execution context."); jobContext.Start(); jobContext.Debug($"Starting: {message.JobDisplayName}"); runnerShutdownRegistration = HostContext.RunnerShutdownToken.Register(() => { // log an issue, then runner get shutdown by Ctrl-C or Ctrl-Break. // the server will use Ctrl-Break to tells the runner that operating system is shutting down. string errorMessage; switch (HostContext.RunnerShutdownReason) { case ShutdownReason.UserCancelled: errorMessage = "The runner has received a shutdown signal. This can happen when the runner service is stopped, or a manually started runner is canceled."; break; case ShutdownReason.OperatingSystemShutdown: errorMessage = $"Operating system is shutting down for computer '{Environment.MachineName}'"; break; default: throw new ArgumentException(HostContext.RunnerShutdownReason.ToString(), nameof(HostContext.RunnerShutdownReason)); } jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage }); }); // Validate directory permissions. string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work); Trace.Info($"Validating directory permissions for: '{workDirectory}'"); try { Directory.CreateDirectory(workDirectory); IOUtil.ValidateExecutePermission(workDirectory); } catch (Exception ex) { Trace.Error(ex); jobContext.Error(ex); return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); } if (jobContext.Global.WriteDebug) { jobContext.SetRunnerContext("debug", "1"); } jobContext.SetRunnerContext("os", VarUtil.OS); string toolsDirectory = HostContext.GetDirectory(WellKnownDirectory.Tools); Directory.CreateDirectory(toolsDirectory); jobContext.SetRunnerContext("tool_cache", toolsDirectory); // Setup TEMP directories _tempDirectoryManager = HostContext.GetService(); _tempDirectoryManager.InitializeTempDirectory(jobContext); // Get the job extension. Trace.Info("Getting job extension."); IJobExtension jobExtension = HostContext.CreateService(); List jobSteps = null; try { Trace.Info("Initialize job. Getting all job steps."); jobSteps = await jobExtension.InitializeJob(jobContext, message); } catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested) { // set the job to canceled // don't log error issue to job ExecutionContext, since server owns the job level issue Trace.Error($"Job is canceled during initialize."); Trace.Error($"Caught exception: {ex}"); return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled); } catch (Exception ex) { // set the job to failed. // don't log error issue to job ExecutionContext, since server owns the job level issue Trace.Error($"Job initialize failed."); Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}"); return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); } // trace out all steps Trace.Info($"Total job steps: {jobSteps.Count}."); Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'"); HostContext.WritePerfCounter($"WorkerJobInitialized_{message.RequestId.ToString()}"); // Run all job steps Trace.Info("Run all job steps."); var stepsRunner = HostContext.GetService(); try { foreach (var step in jobSteps) { jobContext.JobSteps.Enqueue(step); } await stepsRunner.RunAsync(jobContext); } catch (Exception ex) { // StepRunner should never throw exception out. // End up here mean there is a bug in StepRunner // Log the error and fail the job. Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}"); jobContext.Error(ex); return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed); } finally { Trace.Info("Finalize job."); jobExtension.FinalizeJob(jobContext, message, jobStartTimeUtc); } Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}"); Trace.Info("Completing the job execution context."); return await CompleteJobAsync(jobServer, jobContext, message); } finally { if (runnerShutdownRegistration != null) { runnerShutdownRegistration.Value.Dispose(); runnerShutdownRegistration = null; } await ShutdownQueue(throwOnFailure: false); } } private async Task CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null) { jobContext.Debug($"Finishing: {message.JobDisplayName}"); TaskResult result = jobContext.Complete(taskResult); try { await ShutdownQueue(throwOnFailure: true); } catch (Exception ex) { Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}"); Trace.Error("This indicate a failure during publish output variables. Fail the job to prevent unexpected job outputs."); Trace.Error(ex); result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed); } // Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir. _tempDirectoryManager?.CleanupTempDirectory(); if (!jobContext.Global.Features.HasFlag(PlanFeatures.JobCompletedPlanEvent)) { Trace.Info($"Skip raise job completed event call from worker because Plan version is {message.Plan.Version}"); return result; } Trace.Info("Raising job completed event."); var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, jobContext.JobOutputs); var completeJobRetryLimit = 5; var exceptions = new List(); while (completeJobRetryLimit-- > 0) { try { await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, default(CancellationToken)); return result; } catch (TaskOrchestrationPlanNotFoundException ex) { Trace.Error($"TaskOrchestrationPlanNotFoundException received, while attempting to raise JobCompletedEvent for job {message.JobId}."); Trace.Error(ex); return TaskResult.Failed; } catch (TaskOrchestrationPlanSecurityException ex) { Trace.Error($"TaskOrchestrationPlanSecurityException received, while attempting to raise JobCompletedEvent for job {message.JobId}."); Trace.Error(ex); return TaskResult.Failed; } catch (TaskOrchestrationPlanTerminatedException ex) { Trace.Error($"TaskOrchestrationPlanTerminatedException received, while attempting to raise JobCompletedEvent for job {message.JobId}."); Trace.Error(ex); return TaskResult.Failed; } catch (Exception ex) { Trace.Error($"Catch exception while attempting to raise JobCompletedEvent for job {message.JobId}, job request {message.RequestId}."); Trace.Error(ex); exceptions.Add(ex); } // delay 5 seconds before next retry. await Task.Delay(TimeSpan.FromSeconds(5)); } // rethrow exceptions from all attempts. throw new AggregateException(exceptions); } private async Task ShutdownQueue(bool throwOnFailure) { if (_jobServerQueue != null) { try { Trace.Info("Shutting down the job server queue."); await _jobServerQueue.ShutdownAsync(); } catch (Exception ex) when (!throwOnFailure) { Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}"); Trace.Error(ex); } finally { _jobServerQueue = null; // Prevent multiple attempts. } } } } }