start calling run service for job completion (#2412)

* start calling run service for job completion

* cleanup

* nit: lines

Co-authored-by: Tingluo Huang <tingluohuang@github.com>

* clean up

* give sanity back to thboop

Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>

* add clean up back

* clean up

* clean up more

* oops

* copied from existing, but :thumb:

Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>

---------

Co-authored-by: Tingluo Huang <tingluohuang@github.com>
Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>
This commit is contained in:
Yashwanth Anantharaju
2023-02-01 16:18:31 -05:00
committed by GitHub
parent 24a27efd4f
commit e6e5f36dd0
11 changed files with 220 additions and 78 deletions

View File

@@ -6,7 +6,6 @@ using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines.ContextData;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common;
using GitHub.Runner.Common.Util;
@@ -40,21 +39,34 @@ namespace GitHub.Runner.Worker
Trace.Info("Job ID {0}", message.JobId);
DateTime jobStartTimeUtc = DateTime.UtcNow;
IRunnerService server = null;
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase))
{
var runServer = HostContext.GetService<IRunServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
server = runServer;
}
else
{
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
await jobServer.ConnectAsync(jobConnection);
Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(jobServerUrl, jobServerCredential, new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) });
await jobServer.ConnectAsync(jobConnection);
_jobServerQueue.Start(message);
server = jobServer;
}
_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");
IExecutionContext jobContext = null;
@@ -99,7 +111,7 @@ namespace GitHub.Runner.Worker
{
Trace.Error(ex);
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}
if (jobContext.Global.WriteDebug)
@@ -136,7 +148,7 @@ namespace GitHub.Runner.Worker
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job is cancelled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Canceled);
}
catch (Exception ex)
{
@@ -144,7 +156,7 @@ namespace GitHub.Runner.Worker
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job initialize failed.");
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}
// trace out all steps
@@ -181,7 +193,7 @@ namespace GitHub.Runner.Worker
// Log the error and fail the job.
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
return await CompleteJobAsync(server, jobContext, message, TaskResult.Failed);
}
finally
{
@@ -192,7 +204,7 @@ namespace GitHub.Runner.Worker
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");
Trace.Info("Completing the job execution context.");
return await CompleteJobAsync(jobServer, jobContext, message);
return await CompleteJobAsync(server, jobContext, message);
}
finally
{
@@ -206,6 +218,66 @@ namespace GitHub.Runner.Worker
}
}
private async Task<TaskResult> CompleteJobAsync(IRunnerService server, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
if (server is IRunServer runServer)
{
return await CompleteJobAsync(runServer, jobContext, message, taskResult);
}
else if (server is IJobServer jobServer)
{
return await CompleteJobAsync(jobServer, jobContext, message, taskResult);
}
else
{
throw new NotSupportedException();
}
}
private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
jobContext.Debug($"Finishing: {message.JobDisplayName}");
TaskResult result = jobContext.Complete(taskResult);
if (jobContext.Global.Variables.TryGetValue("Node12ActionsWarnings", out var node12Warnings))
{
var actions = string.Join(", ", StringUtil.ConvertFromJson<HashSet<string>>(node12Warnings));
jobContext.Warning(string.Format(Constants.Runner.Node12DetectedAfterEndOfLife, actions));
}
// Make sure to clean temp after file upload since they may be pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();
// Load any upgrade telemetry
LoadFromTelemetryFile(jobContext.Global.JobTelemetry);
// Make sure we don't submit secrets as telemetry
MaskTelemetrySecrets(jobContext.Global.JobTelemetry);
Trace.Info($"Raising job completed against run service");
var completeJobRetryLimit = 5;
var exceptions = new List<Exception>();
while (completeJobRetryLimit-- > 0)
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, default);
return result;
}
catch (Exception ex)
{
Trace.Error($"Catch exception while attempting to complete job {message.JobId}, job request {message.RequestId}.");
Trace.Error(ex);
exceptions.Add(ex);
}
// delay 5 seconds before next retry.
await Task.Delay(TimeSpan.FromSeconds(5));
}
// rethrow exceptions from all attempts.
throw new AggregateException(exceptions);
}
private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
jobContext.Debug($"Finishing: {message.JobDisplayName}");