Files
runner/src/Runner.Worker/JobRunner.cs

285 lines
13 KiB
C#

using GitHub.DistributedTask.WebApi;
using Pipelines = GitHub.DistributedTask.Pipelines;
using GitHub.Runner.Common.Util;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using GitHub.Runner.Common;
using GitHub.Runner.Sdk;
namespace GitHub.Runner.Worker
{
[ServiceLocator(Default = typeof(JobRunner))]
public interface IJobRunner : IRunnerService
{
Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken);
}
public sealed class JobRunner : RunnerService, IJobRunner
{
private IJobServerQueue _jobServerQueue;
private ITempDirectoryManager _tempDirectoryManager;
public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken)
{
// Validate parameters.
Trace.Entering();
ArgUtil.NotNull(message, nameof(message));
ArgUtil.NotNull(message.Resources, nameof(message.Resources));
ArgUtil.NotNull(message.Variables, nameof(message.Variables));
ArgUtil.NotNull(message.Steps, nameof(message.Steps));
Trace.Info("Job ID {0}", message.JobId);
DateTime jobStartTimeUtc = DateTime.UtcNow;
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
await jobServer.ConnectAsync(jobConnection);
_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");
IExecutionContext jobContext = null;
CancellationTokenRegistration? runnerShutdownRegistration = null;
try
{
// Create the job execution context.
jobContext = HostContext.CreateService<IExecutionContext>();
jobContext.InitializeJob(message, jobRequestCancellationToken);
Trace.Info("Starting the job execution context.");
jobContext.Start();
jobContext.Debug($"Starting: {message.JobDisplayName}");
runnerShutdownRegistration = HostContext.RunnerShutdownToken.Register(() =>
{
// log an issue, then runner get shutdown by Ctrl-C or Ctrl-Break.
// the server will use Ctrl-Break to tells the runner that operating system is shutting down.
string errorMessage;
switch (HostContext.RunnerShutdownReason)
{
case ShutdownReason.UserCancelled:
errorMessage = "The runner has received a shutdown signal. This can happen when the runner service is stopped, or a manually started runner is canceled.";
break;
case ShutdownReason.OperatingSystemShutdown:
errorMessage = $"Operating system is shutting down for computer '{Environment.MachineName}'";
break;
default:
throw new ArgumentException(HostContext.RunnerShutdownReason.ToString(), nameof(HostContext.RunnerShutdownReason));
}
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
});
// Validate directory permissions.
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
try
{
Directory.CreateDirectory(workDirectory);
IOUtil.ValidateExecutePermission(workDirectory);
}
catch (Exception ex)
{
Trace.Error(ex);
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
if (jobContext.Global.WriteDebug)
{
jobContext.SetRunnerContext("debug", "1");
}
jobContext.SetRunnerContext("os", VarUtil.OS);
string toolsDirectory = HostContext.GetDirectory(WellKnownDirectory.Tools);
Directory.CreateDirectory(toolsDirectory);
jobContext.SetRunnerContext("tool_cache", toolsDirectory);
// Setup TEMP directories
_tempDirectoryManager = HostContext.GetService<ITempDirectoryManager>();
_tempDirectoryManager.InitializeTempDirectory(jobContext);
// Get the job extension.
Trace.Info("Getting job extension.");
IJobExtension jobExtension = HostContext.CreateService<IJobExtension>();
List<IStep> jobSteps = null;
try
{
Trace.Info("Initialize job. Getting all job steps.");
jobSteps = await jobExtension.InitializeJob(jobContext, message);
}
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
{
// set the job to canceled
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job is canceled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
}
catch (Exception ex)
{
// set the job to failed.
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job initialize failed.");
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
// trace out all steps
Trace.Info($"Total job steps: {jobSteps.Count}.");
Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'");
HostContext.WritePerfCounter($"WorkerJobInitialized_{message.RequestId.ToString()}");
// Run all job steps
Trace.Info("Run all job steps.");
var stepsRunner = HostContext.GetService<IStepsRunner>();
try
{
foreach (var step in jobSteps)
{
jobContext.JobSteps.Add(step);
}
await stepsRunner.RunAsync(jobContext);
}
catch (Exception ex)
{
// StepRunner should never throw exception out.
// End up here mean there is a bug in StepRunner
// Log the error and fail the job.
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
finally
{
Trace.Info("Finalize job.");
jobExtension.FinalizeJob(jobContext, message, jobStartTimeUtc);
}
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");
Trace.Info("Completing the job execution context.");
return await CompleteJobAsync(jobServer, jobContext, message);
}
finally
{
if (runnerShutdownRegistration != null)
{
runnerShutdownRegistration.Value.Dispose();
runnerShutdownRegistration = null;
}
await ShutdownQueue(throwOnFailure: false);
}
}
private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
jobContext.Debug($"Finishing: {message.JobDisplayName}");
TaskResult result = jobContext.Complete(taskResult);
try
{
await ShutdownQueue(throwOnFailure: true);
}
catch (Exception ex)
{
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
Trace.Error("This indicate a failure during publish output variables. Fail the job to prevent unexpected job outputs.");
Trace.Error(ex);
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
}
// Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();
if (!jobContext.Global.Features.HasFlag(PlanFeatures.JobCompletedPlanEvent))
{
Trace.Info($"Skip raise job completed event call from worker because Plan version is {message.Plan.Version}");
return result;
}
Trace.Info("Raising job completed event.");
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result, jobContext.JobOutputs);
var completeJobRetryLimit = 5;
var exceptions = new List<Exception>();
while (completeJobRetryLimit-- > 0)
{
try
{
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, default(CancellationToken));
return result;
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
Trace.Error(ex);
return TaskResult.Failed;
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
Trace.Error(ex);
return TaskResult.Failed;
}
catch (TaskOrchestrationPlanTerminatedException ex)
{
Trace.Error($"TaskOrchestrationPlanTerminatedException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
Trace.Error(ex);
return TaskResult.Failed;
}
catch (Exception ex)
{
Trace.Error($"Catch exception while attempting to raise JobCompletedEvent for job {message.JobId}, job request {message.RequestId}.");
Trace.Error(ex);
exceptions.Add(ex);
}
// delay 5 seconds before next retry.
await Task.Delay(TimeSpan.FromSeconds(5));
}
// rethrow exceptions from all attempts.
throw new AggregateException(exceptions);
}
private async Task ShutdownQueue(bool throwOnFailure)
{
if (_jobServerQueue != null)
{
try
{
Trace.Info("Shutting down the job server queue.");
await _jobServerQueue.ShutdownAsync();
}
catch (Exception ex) when (!throwOnFailure)
{
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
Trace.Error(ex);
}
finally
{
_jobServerQueue = null; // Prevent multiple attempts.
}
}
}
}
}