mirror of
https://github.com/actions/runner.git
synced 2025-12-12 23:46:12 +00:00
GitHub Actions Runner
This commit is contained in:
292
src/Runner.Worker/JobRunner.cs
Normal file
292
src/Runner.Worker/JobRunner.cs
Normal file
@@ -0,0 +1,292 @@
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using Pipelines = GitHub.DistributedTask.Pipelines;
|
||||
using GitHub.Runner.Common.Util;
|
||||
using GitHub.Services.Common;
|
||||
using GitHub.Services.WebApi;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.IO.Compression;
|
||||
using System.Diagnostics;
|
||||
using Newtonsoft.Json.Linq;
|
||||
using GitHub.DistributedTask.ObjectTemplating.Tokens;
|
||||
using GitHub.Runner.Common;
|
||||
using GitHub.Runner.Sdk;
|
||||
using GitHub.DistributedTask.Pipelines.ContextData;
|
||||
using GitHub.DistributedTask.ObjectTemplating;
|
||||
|
||||
namespace GitHub.Runner.Worker
|
||||
{
|
||||
[ServiceLocator(Default = typeof(JobRunner))]
|
||||
public interface IJobRunner : IRunnerService
|
||||
{
|
||||
Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken);
|
||||
}
|
||||
|
||||
public sealed class JobRunner : RunnerService, IJobRunner
|
||||
{
|
||||
private IJobServerQueue _jobServerQueue;
|
||||
private ITempDirectoryManager _tempDirectoryManager;
|
||||
|
||||
public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken)
|
||||
{
|
||||
// Validate parameters.
|
||||
Trace.Entering();
|
||||
ArgUtil.NotNull(message, nameof(message));
|
||||
ArgUtil.NotNull(message.Resources, nameof(message.Resources));
|
||||
ArgUtil.NotNull(message.Variables, nameof(message.Variables));
|
||||
ArgUtil.NotNull(message.Steps, nameof(message.Steps));
|
||||
Trace.Info("Job ID {0}", message.JobId);
|
||||
|
||||
DateTime jobStartTimeUtc = DateTime.UtcNow;
|
||||
|
||||
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
// Setup the job server and job server queue.
|
||||
var jobServer = HostContext.GetService<IJobServer>();
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
Uri jobServerUrl = systemConnection.Url;
|
||||
|
||||
Trace.Info($"Creating job server with URL: {jobServerUrl}");
|
||||
// jobServerQueue is the throttling reporter.
|
||||
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
||||
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
|
||||
await jobServer.ConnectAsync(jobConnection);
|
||||
|
||||
_jobServerQueue.Start(message);
|
||||
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");
|
||||
|
||||
IExecutionContext jobContext = null;
|
||||
CancellationTokenRegistration? runnerShutdownRegistration = null;
|
||||
try
|
||||
{
|
||||
// Create the job execution context.
|
||||
jobContext = HostContext.CreateService<IExecutionContext>();
|
||||
jobContext.InitializeJob(message, jobRequestCancellationToken);
|
||||
Trace.Info("Starting the job execution context.");
|
||||
jobContext.Start();
|
||||
jobContext.Debug($"Starting: {message.JobDisplayName}");
|
||||
|
||||
runnerShutdownRegistration = HostContext.RunnerShutdownToken.Register(() =>
|
||||
{
|
||||
// log an issue, then runner get shutdown by Ctrl-C or Ctrl-Break.
|
||||
// the server will use Ctrl-Break to tells the runner that operating system is shutting down.
|
||||
string errorMessage;
|
||||
switch (HostContext.RunnerShutdownReason)
|
||||
{
|
||||
case ShutdownReason.UserCancelled:
|
||||
errorMessage = "The runner has received a shutdown signal. This can happen when the runner service is stopped, or a manually started runner is canceled.";
|
||||
break;
|
||||
case ShutdownReason.OperatingSystemShutdown:
|
||||
errorMessage = $"Operating system is shutting down for computer '{Environment.MachineName}'";
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentException(HostContext.RunnerShutdownReason.ToString(), nameof(HostContext.RunnerShutdownReason));
|
||||
}
|
||||
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
|
||||
});
|
||||
|
||||
// Validate directory permissions.
|
||||
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
|
||||
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(workDirectory);
|
||||
IOUtil.ValidateExecutePermission(workDirectory);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error(ex);
|
||||
jobContext.Error(ex);
|
||||
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
|
||||
}
|
||||
|
||||
jobContext.SetRunnerContext("os", VarUtil.OS);
|
||||
|
||||
string toolsDirectory = HostContext.GetDirectory(WellKnownDirectory.Tools);
|
||||
Directory.CreateDirectory(toolsDirectory);
|
||||
jobContext.SetRunnerContext("tool_cache", toolsDirectory);
|
||||
|
||||
// remove variable from env
|
||||
Environment.SetEnvironmentVariable("AGENT_TOOLSDIRECTORY", null);
|
||||
Environment.SetEnvironmentVariable(Constants.Variables.Agent.ToolsDirectory, null);
|
||||
|
||||
// Setup TEMP directories
|
||||
_tempDirectoryManager = HostContext.GetService<ITempDirectoryManager>();
|
||||
_tempDirectoryManager.InitializeTempDirectory(jobContext);
|
||||
|
||||
// // Expand container properties
|
||||
// jobContext.Container?.ExpandProperties(jobContext.Variables);
|
||||
// foreach (var sidecar in jobContext.SidecarContainers)
|
||||
// {
|
||||
// sidecar.ExpandProperties(jobContext.Variables);
|
||||
// }
|
||||
|
||||
// Get the job extension.
|
||||
Trace.Info("Getting job extension.");
|
||||
IJobExtension jobExtension = HostContext.CreateService<IJobExtension>();
|
||||
List<IStep> jobSteps = null;
|
||||
try
|
||||
{
|
||||
Trace.Info("Initialize job. Getting all job steps.");
|
||||
jobSteps = await jobExtension.InitializeJob(jobContext, message);
|
||||
}
|
||||
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// set the job to canceled
|
||||
// don't log error issue to job ExecutionContext, since server owns the job level issue
|
||||
Trace.Error($"Job is canceled during initialize.");
|
||||
Trace.Error($"Caught exception: {ex}");
|
||||
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// set the job to failed.
|
||||
// don't log error issue to job ExecutionContext, since server owns the job level issue
|
||||
Trace.Error($"Job initialize failed.");
|
||||
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
|
||||
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
|
||||
}
|
||||
|
||||
// trace out all steps
|
||||
Trace.Info($"Total job steps: {jobSteps.Count}.");
|
||||
Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'");
|
||||
HostContext.WritePerfCounter($"WorkerJobInitialized_{message.RequestId.ToString()}");
|
||||
|
||||
// Run all job steps
|
||||
Trace.Info("Run all job steps.");
|
||||
var stepsRunner = HostContext.GetService<IStepsRunner>();
|
||||
try
|
||||
{
|
||||
foreach (var step in jobSteps)
|
||||
{
|
||||
jobContext.JobSteps.Enqueue(step);
|
||||
}
|
||||
|
||||
await stepsRunner.RunAsync(jobContext);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// StepRunner should never throw exception out.
|
||||
// End up here mean there is a bug in StepRunner
|
||||
// Log the error and fail the job.
|
||||
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
|
||||
jobContext.Error(ex);
|
||||
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Trace.Info("Finalize job.");
|
||||
await jobExtension.FinalizeJob(jobContext, message, jobStartTimeUtc);
|
||||
}
|
||||
|
||||
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");
|
||||
|
||||
Trace.Info("Completing the job execution context.");
|
||||
return await CompleteJobAsync(jobServer, jobContext, message);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (runnerShutdownRegistration != null)
|
||||
{
|
||||
runnerShutdownRegistration.Value.Dispose();
|
||||
runnerShutdownRegistration = null;
|
||||
}
|
||||
|
||||
await ShutdownQueue(throwOnFailure: false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
|
||||
{
|
||||
jobContext.Debug($"Finishing: {message.JobDisplayName}");
|
||||
TaskResult result = jobContext.Complete(taskResult);
|
||||
|
||||
try
|
||||
{
|
||||
await ShutdownQueue(throwOnFailure: true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
|
||||
Trace.Error("This indicate a failure during publish output variables. Fail the job to prevent unexpected job outputs.");
|
||||
Trace.Error(ex);
|
||||
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
|
||||
}
|
||||
|
||||
// Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir.
|
||||
_tempDirectoryManager?.CleanupTempDirectory();
|
||||
|
||||
if (!jobContext.Features.HasFlag(PlanFeatures.JobCompletedPlanEvent))
|
||||
{
|
||||
Trace.Info($"Skip raise job completed event call from worker because Plan version is {message.Plan.Version}");
|
||||
return result;
|
||||
}
|
||||
|
||||
Trace.Info("Raising job completed event.");
|
||||
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result);
|
||||
|
||||
var completeJobRetryLimit = 5;
|
||||
var exceptions = new List<Exception>();
|
||||
while (completeJobRetryLimit-- > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, default(CancellationToken));
|
||||
return result;
|
||||
}
|
||||
catch (TaskOrchestrationPlanNotFoundException ex)
|
||||
{
|
||||
Trace.Error($"TaskOrchestrationPlanNotFoundException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
|
||||
Trace.Error(ex);
|
||||
return TaskResult.Failed;
|
||||
}
|
||||
catch (TaskOrchestrationPlanSecurityException ex)
|
||||
{
|
||||
Trace.Error($"TaskOrchestrationPlanSecurityException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
|
||||
Trace.Error(ex);
|
||||
return TaskResult.Failed;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error($"Catch exception while attempting to raise JobCompletedEvent for job {message.JobId}, job request {message.RequestId}.");
|
||||
Trace.Error(ex);
|
||||
exceptions.Add(ex);
|
||||
}
|
||||
|
||||
// delay 5 seconds before next retry.
|
||||
await Task.Delay(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
// rethrow exceptions from all attempts.
|
||||
throw new AggregateException(exceptions);
|
||||
}
|
||||
|
||||
private async Task ShutdownQueue(bool throwOnFailure)
|
||||
{
|
||||
if (_jobServerQueue != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
Trace.Info("Shutting down the job server queue.");
|
||||
await _jobServerQueue.ShutdownAsync();
|
||||
}
|
||||
catch (Exception ex) when (!throwOnFailure)
|
||||
{
|
||||
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
|
||||
Trace.Error(ex);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_jobServerQueue = null; // Prevent multiple attempts.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user