mirror of
https://github.com/actions/runner.git
synced 2025-12-11 04:46:58 +00:00
Switch to use token service instead of SPS for exchanging oauth token. (#325)
* Gracefully switch the runner to use Token Service instead of SPS. * PR feedback. * feedback2 * report error.
This commit is contained in:
@@ -18,6 +18,7 @@ namespace GitHub.Runner.Listener
|
||||
[ServiceLocator(Default = typeof(JobDispatcher))]
|
||||
public interface IJobDispatcher : IRunnerService
|
||||
{
|
||||
bool Busy { get; }
|
||||
TaskCompletionSource<bool> RunOnceJobCompleted { get; }
|
||||
void Run(Pipelines.AgentJobRequestMessage message, bool runOnce = false);
|
||||
bool Cancel(JobCancelMessage message);
|
||||
@@ -69,6 +70,8 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
public TaskCompletionSource<bool> RunOnceJobCompleted => _runOnceJobCompleted;
|
||||
|
||||
public bool Busy { get; private set; }
|
||||
|
||||
public void Run(Pipelines.AgentJobRequestMessage jobRequestMessage, bool runOnce = false)
|
||||
{
|
||||
Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received.");
|
||||
@@ -247,7 +250,7 @@ namespace GitHub.Runner.Listener
|
||||
Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45)));
|
||||
if (completedTask != jobDispatch.WorkerDispatch)
|
||||
{
|
||||
// at this point, the job exectuion might encounter some dead lock and even not able to be canclled.
|
||||
// at this point, the job execution might encounter some dead lock and even not able to be cancelled.
|
||||
// no need to localize the exception string should never happen.
|
||||
throw new InvalidOperationException($"Job dispatch process for {jobDispatch.JobId} has encountered unexpected error, the dispatch task is not able to be canceled within 45 seconds.");
|
||||
}
|
||||
@@ -296,190 +299,290 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
private async Task RunAsync(Pipelines.AgentJobRequestMessage message, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
|
||||
{
|
||||
if (previousJobDispatch != null)
|
||||
Busy = true;
|
||||
try
|
||||
{
|
||||
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
|
||||
await EnsureDispatchFinished(previousJobDispatch);
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace.Verbose($"This is the first job request.");
|
||||
}
|
||||
|
||||
var term = HostContext.GetService<ITerminal>();
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Running job: {message.JobDisplayName}");
|
||||
|
||||
// first job request renew succeed.
|
||||
TaskCompletionSource<int> firstJobRequestRenewed = new TaskCompletionSource<int>();
|
||||
var notification = HostContext.GetService<IJobNotification>();
|
||||
|
||||
// lock renew cancellation token.
|
||||
using (var lockRenewalTokenSource = new CancellationTokenSource())
|
||||
using (var workerProcessCancelTokenSource = new CancellationTokenSource())
|
||||
{
|
||||
long requestId = message.RequestId;
|
||||
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
|
||||
|
||||
// start renew job request
|
||||
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
|
||||
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token);
|
||||
|
||||
// wait till first renew succeed or job request is canceled
|
||||
// not even start worker if the first renew fail
|
||||
await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken));
|
||||
|
||||
if (renewJobRequest.IsCompleted)
|
||||
if (previousJobDispatch != null)
|
||||
{
|
||||
// renew job request task complete means we run out of retry for the first job request renew.
|
||||
Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker.");
|
||||
return;
|
||||
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
|
||||
await EnsureDispatchFinished(previousJobDispatch);
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace.Verbose($"This is the first job request.");
|
||||
}
|
||||
|
||||
if (jobRequestCancellationToken.IsCancellationRequested)
|
||||
var term = HostContext.GetService<ITerminal>();
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Running job: {message.JobDisplayName}");
|
||||
|
||||
// first job request renew succeed.
|
||||
TaskCompletionSource<int> firstJobRequestRenewed = new TaskCompletionSource<int>();
|
||||
var notification = HostContext.GetService<IJobNotification>();
|
||||
|
||||
// lock renew cancellation token.
|
||||
using (var lockRenewalTokenSource = new CancellationTokenSource())
|
||||
using (var workerProcessCancelTokenSource = new CancellationTokenSource())
|
||||
{
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
lockRenewalTokenSource.Cancel();
|
||||
// renew job request should never blows up.
|
||||
await renewJobRequest;
|
||||
long requestId = message.RequestId;
|
||||
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
|
||||
|
||||
// complete job request with result Cancelled
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled);
|
||||
return;
|
||||
}
|
||||
// start renew job request
|
||||
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
|
||||
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, firstJobRequestRenewed, lockRenewalTokenSource.Token);
|
||||
|
||||
HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}");
|
||||
// wait till first renew succeed or job request is canceled
|
||||
// not even start worker if the first renew fail
|
||||
await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken));
|
||||
|
||||
Task<int> workerProcessTask = null;
|
||||
object _outputLock = new object();
|
||||
List<string> workerOutput = new List<string>();
|
||||
using (var processChannel = HostContext.CreateService<IProcessChannel>())
|
||||
using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
|
||||
{
|
||||
// Start the process channel.
|
||||
// It's OK if StartServer bubbles an execption after the worker process has already started.
|
||||
// The worker will shutdown after 30 seconds if it hasn't received the job message.
|
||||
processChannel.StartServer(
|
||||
// Delegate to start the child process.
|
||||
startProcess: (string pipeHandleOut, string pipeHandleIn) =>
|
||||
{
|
||||
// Validate args.
|
||||
ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
|
||||
ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));
|
||||
|
||||
// Save STDOUT from worker, worker will use STDOUT report unhandle exception.
|
||||
processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(stdout.Data))
|
||||
{
|
||||
lock (_outputLock)
|
||||
{
|
||||
workerOutput.Add(stdout.Data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Save STDERR from worker, worker will use STDERR on crash.
|
||||
processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(stderr.Data))
|
||||
{
|
||||
lock (_outputLock)
|
||||
{
|
||||
workerOutput.Add(stderr.Data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Start the child process.
|
||||
HostContext.WritePerfCounter("StartingWorkerProcess");
|
||||
var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin);
|
||||
string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
|
||||
workerProcessTask = processInvoker.ExecuteAsync(
|
||||
workingDirectory: assemblyDirectory,
|
||||
fileName: workerFileName,
|
||||
arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
|
||||
environment: null,
|
||||
requireExitCodeZero: false,
|
||||
outputEncoding: null,
|
||||
killProcessOnCancel: true,
|
||||
redirectStandardIn: null,
|
||||
inheritConsoleHandler: false,
|
||||
keepStandardInOpen: false,
|
||||
highPriorityProcess: true,
|
||||
cancellationToken: workerProcessCancelTokenSource.Token);
|
||||
});
|
||||
|
||||
// Send the job request message.
|
||||
// Kill the worker process if sending the job message times out. The worker
|
||||
// process may have successfully received the job message.
|
||||
try
|
||||
if (renewJobRequest.IsCompleted)
|
||||
{
|
||||
Trace.Info($"Send job request message to worker for job {message.JobId}.");
|
||||
HostContext.WritePerfCounter($"RunnerSendingJobToWorker_{message.JobId}");
|
||||
using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout))
|
||||
{
|
||||
await processChannel.SendAsync(
|
||||
messageType: MessageType.NewJobRequest,
|
||||
body: JsonUtility.ToString(message),
|
||||
cancellationToken: csSendJobRequest.Token);
|
||||
}
|
||||
// renew job request task complete means we run out of retry for the first job request renew.
|
||||
Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker.");
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// message send been cancelled.
|
||||
// timeout 30 sec. kill worker.
|
||||
Trace.Info($"Job request message sending for job {message.JobId} been cancelled, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
|
||||
if (jobRequestCancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
lockRenewalTokenSource.Cancel();
|
||||
// renew job request should never blows up.
|
||||
await renewJobRequest;
|
||||
|
||||
// not finish the job request since the job haven't run on worker at all, we will not going to set a result to server.
|
||||
// complete job request with result Cancelled
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled);
|
||||
return;
|
||||
}
|
||||
|
||||
// we get first jobrequest renew succeed and start the worker process with the job message.
|
||||
// send notification to machine provisioner.
|
||||
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
|
||||
notification.JobStarted(message.JobId, accessToken, systemConnection.Url);
|
||||
HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}");
|
||||
|
||||
HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}");
|
||||
|
||||
try
|
||||
Task<int> workerProcessTask = null;
|
||||
object _outputLock = new object();
|
||||
List<string> workerOutput = new List<string>();
|
||||
using (var processChannel = HostContext.CreateService<IProcessChannel>())
|
||||
using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
|
||||
{
|
||||
TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded;
|
||||
// wait for renewlock, worker process or cancellation token been fired.
|
||||
var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken));
|
||||
if (completedTask == workerProcessTask)
|
||||
{
|
||||
// worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
|
||||
int returnCode = await workerProcessTask;
|
||||
Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);
|
||||
|
||||
string detailInfo = null;
|
||||
if (!TaskResultUtil.IsValidReturnCode(returnCode))
|
||||
// Start the process channel.
|
||||
// It's OK if StartServer bubbles an execption after the worker process has already started.
|
||||
// The worker will shutdown after 30 seconds if it hasn't received the job message.
|
||||
processChannel.StartServer(
|
||||
// Delegate to start the child process.
|
||||
startProcess: (string pipeHandleOut, string pipeHandleIn) =>
|
||||
{
|
||||
detailInfo = string.Join(Environment.NewLine, workerOutput);
|
||||
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
|
||||
await LogWorkerProcessUnhandledException(message, detailInfo);
|
||||
// Validate args.
|
||||
ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
|
||||
ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));
|
||||
|
||||
// Save STDOUT from worker, worker will use STDOUT report unhandle exception.
|
||||
processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(stdout.Data))
|
||||
{
|
||||
lock (_outputLock)
|
||||
{
|
||||
workerOutput.Add(stdout.Data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Save STDERR from worker, worker will use STDERR on crash.
|
||||
processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(stderr.Data))
|
||||
{
|
||||
lock (_outputLock)
|
||||
{
|
||||
workerOutput.Add(stderr.Data);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Start the child process.
|
||||
HostContext.WritePerfCounter("StartingWorkerProcess");
|
||||
var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin);
|
||||
string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
|
||||
workerProcessTask = processInvoker.ExecuteAsync(
|
||||
workingDirectory: assemblyDirectory,
|
||||
fileName: workerFileName,
|
||||
arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
|
||||
environment: null,
|
||||
requireExitCodeZero: false,
|
||||
outputEncoding: null,
|
||||
killProcessOnCancel: true,
|
||||
redirectStandardIn: null,
|
||||
inheritConsoleHandler: false,
|
||||
keepStandardInOpen: false,
|
||||
highPriorityProcess: true,
|
||||
cancellationToken: workerProcessCancelTokenSource.Token);
|
||||
});
|
||||
|
||||
// Send the job request message.
|
||||
// Kill the worker process if sending the job message times out. The worker
|
||||
// process may have successfully received the job message.
|
||||
try
|
||||
{
|
||||
Trace.Info($"Send job request message to worker for job {message.JobId}.");
|
||||
HostContext.WritePerfCounter($"RunnerSendingJobToWorker_{message.JobId}");
|
||||
using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout))
|
||||
{
|
||||
await processChannel.SendAsync(
|
||||
messageType: MessageType.NewJobRequest,
|
||||
body: JsonUtility.ToString(message),
|
||||
cancellationToken: csSendJobRequest.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// message send been cancelled.
|
||||
// timeout 30 sec. kill worker.
|
||||
Trace.Info($"Job request message sending for job {message.JobId} been cancelled, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
|
||||
TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
|
||||
Trace.Info($"finish job request for job {message.JobId} with result: {result}");
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {result}");
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
lockRenewalTokenSource.Cancel();
|
||||
// renew job request should never blows up.
|
||||
await renewJobRequest;
|
||||
|
||||
// not finish the job request since the job haven't run on worker at all, we will not going to set a result to server.
|
||||
return;
|
||||
}
|
||||
|
||||
// we get first jobrequest renew succeed and start the worker process with the job message.
|
||||
// send notification to machine provisioner.
|
||||
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
|
||||
notification.JobStarted(message.JobId, accessToken, systemConnection.Url);
|
||||
|
||||
HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}");
|
||||
|
||||
try
|
||||
{
|
||||
TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded;
|
||||
// wait for renewlock, worker process or cancellation token been fired.
|
||||
var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken));
|
||||
if (completedTask == workerProcessTask)
|
||||
{
|
||||
// worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
|
||||
int returnCode = await workerProcessTask;
|
||||
Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);
|
||||
|
||||
string detailInfo = null;
|
||||
if (!TaskResultUtil.IsValidReturnCode(returnCode))
|
||||
{
|
||||
detailInfo = string.Join(Environment.NewLine, workerOutput);
|
||||
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
|
||||
await LogWorkerProcessUnhandledException(message, detailInfo);
|
||||
}
|
||||
|
||||
TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
|
||||
Trace.Info($"finish job request for job {message.JobId} with result: {result}");
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {result}");
|
||||
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
lockRenewalTokenSource.Cancel();
|
||||
// renew job request should never blows up.
|
||||
await renewJobRequest;
|
||||
|
||||
// complete job request
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
|
||||
|
||||
// print out unhandled exception happened in worker after we complete job request.
|
||||
// when we run out of disk space, report back to server has higher priority.
|
||||
if (!string.IsNullOrEmpty(detailInfo))
|
||||
{
|
||||
Trace.Error("Unhandled exception happened in worker:");
|
||||
Trace.Error(detailInfo);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
else if (completedTask == renewJobRequest)
|
||||
{
|
||||
resultOnAbandonOrCancel = TaskResult.Abandoned;
|
||||
}
|
||||
else
|
||||
{
|
||||
resultOnAbandonOrCancel = TaskResult.Canceled;
|
||||
}
|
||||
|
||||
// renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage)
|
||||
// cancel worker gracefully first, then kill it after worker cancel timeout
|
||||
try
|
||||
{
|
||||
Trace.Info($"Send job cancellation message to worker for job {message.JobId}.");
|
||||
using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
|
||||
{
|
||||
var messageType = MessageType.CancelRequest;
|
||||
if (HostContext.RunnerShutdownToken.IsCancellationRequested)
|
||||
{
|
||||
switch (HostContext.RunnerShutdownReason)
|
||||
{
|
||||
case ShutdownReason.UserCancelled:
|
||||
messageType = MessageType.RunnerShutdown;
|
||||
break;
|
||||
case ShutdownReason.OperatingSystemShutdown:
|
||||
messageType = MessageType.OperatingSystemShutdown;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await processChannel.SendAsync(
|
||||
messageType: messageType,
|
||||
body: string.Empty,
|
||||
cancellationToken: csSendCancel.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// message send been cancelled.
|
||||
Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
}
|
||||
|
||||
// wait worker to exit
|
||||
// if worker doesn't exit within timeout, then kill worker.
|
||||
completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
|
||||
|
||||
// worker haven't exit within cancellation timeout.
|
||||
if (completedTask != workerProcessTask)
|
||||
{
|
||||
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
|
||||
// When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs.
|
||||
// The runner will try to upload these logs at this time.
|
||||
await TryUploadUnfinishedLogs(message);
|
||||
}
|
||||
|
||||
Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {resultOnAbandonOrCancel}");
|
||||
// complete job request with cancel result, stop renew lock, job has finished.
|
||||
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
@@ -488,112 +591,20 @@ namespace GitHub.Runner.Listener
|
||||
await renewJobRequest;
|
||||
|
||||
// complete job request
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
|
||||
|
||||
// print out unhandled exception happened in worker after we complete job request.
|
||||
// when we run out of disk space, report back to server has higher priority.
|
||||
if (!string.IsNullOrEmpty(detailInfo))
|
||||
{
|
||||
Trace.Error("Unhandled exception happened in worker:");
|
||||
Trace.Error(detailInfo);
|
||||
}
|
||||
|
||||
return;
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel);
|
||||
}
|
||||
else if (completedTask == renewJobRequest)
|
||||
finally
|
||||
{
|
||||
resultOnAbandonOrCancel = TaskResult.Abandoned;
|
||||
// This should be the last thing to run so we don't notify external parties until actually finished
|
||||
await notification.JobCompleted(message.JobId);
|
||||
}
|
||||
else
|
||||
{
|
||||
resultOnAbandonOrCancel = TaskResult.Canceled;
|
||||
}
|
||||
|
||||
// renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage)
|
||||
// cancel worker gracefully first, then kill it after worker cancel timeout
|
||||
try
|
||||
{
|
||||
Trace.Info($"Send job cancellation message to worker for job {message.JobId}.");
|
||||
using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
|
||||
{
|
||||
var messageType = MessageType.CancelRequest;
|
||||
if (HostContext.RunnerShutdownToken.IsCancellationRequested)
|
||||
{
|
||||
switch (HostContext.RunnerShutdownReason)
|
||||
{
|
||||
case ShutdownReason.UserCancelled:
|
||||
messageType = MessageType.RunnerShutdown;
|
||||
break;
|
||||
case ShutdownReason.OperatingSystemShutdown:
|
||||
messageType = MessageType.OperatingSystemShutdown;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await processChannel.SendAsync(
|
||||
messageType: messageType,
|
||||
body: string.Empty,
|
||||
cancellationToken: csSendCancel.Token);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// message send been cancelled.
|
||||
Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
}
|
||||
|
||||
// wait worker to exit
|
||||
// if worker doesn't exit within timeout, then kill worker.
|
||||
completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
|
||||
|
||||
// worker haven't exit within cancellation timeout.
|
||||
if (completedTask != workerProcessTask)
|
||||
{
|
||||
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
|
||||
workerProcessCancelTokenSource.Cancel();
|
||||
try
|
||||
{
|
||||
await workerProcessTask;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Trace.Info("worker process has been killed.");
|
||||
}
|
||||
|
||||
// When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs.
|
||||
// The runner will try to upload these logs at this time.
|
||||
await TryUploadUnfinishedLogs(message);
|
||||
}
|
||||
|
||||
Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
|
||||
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {resultOnAbandonOrCancel}");
|
||||
// complete job request with cancel result, stop renew lock, job has finished.
|
||||
|
||||
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
||||
// stop renew lock
|
||||
lockRenewalTokenSource.Cancel();
|
||||
// renew job request should never blows up.
|
||||
await renewJobRequest;
|
||||
|
||||
// complete job request
|
||||
await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// This should be the last thing to run so we don't notify external parties until actually finished
|
||||
await notification.JobCompleted(message.JobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
Busy = false;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
||||
|
||||
Reference in New Issue
Block a user