mirror of
https://github.com/actions/runner.git
synced 2025-12-10 20:36:49 +00:00
1308 lines
65 KiB
C#
1308 lines
65 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Text.RegularExpressions;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using GitHub.DistributedTask.Pipelines;
|
|
using GitHub.DistributedTask.WebApi;
|
|
using GitHub.Runner.Common;
|
|
using GitHub.Runner.Common.Util;
|
|
using GitHub.Runner.Sdk;
|
|
using GitHub.Services.Common;
|
|
using GitHub.Services.WebApi;
|
|
using GitHub.Services.WebApi.Jwt;
|
|
using Sdk.RSWebApi.Contracts;
|
|
using Pipelines = GitHub.DistributedTask.Pipelines;
|
|
|
|
namespace GitHub.Runner.Listener
|
|
{
|
|
[ServiceLocator(Default = typeof(JobDispatcher))]
|
|
public interface IJobDispatcher : IRunnerService
|
|
{
|
|
bool Busy { get; }
|
|
TaskCompletionSource<bool> RunOnceJobCompleted { get; }
|
|
void Run(Pipelines.AgentJobRequestMessage message, bool runOnce = false);
|
|
bool Cancel(JobCancelMessage message);
|
|
Task WaitAsync(CancellationToken token);
|
|
Task ShutdownAsync();
|
|
event EventHandler<JobStatusEventArgs> JobStatus;
|
|
}
|
|
|
|
// This implementation of IJobDispatcher is not thread safe.
|
|
// It is based on the fact that the current design of the runner is a dequeue
|
|
// and processes one message from the message queue at a time.
|
|
// In addition, it only executes one job every time,
|
|
// and the server will not send another job while this one is still running.
|
|
public sealed class JobDispatcher : RunnerService, IJobDispatcher
|
|
{
|
|
private static Regex _invalidJsonRegex = new(@"invalid\ Json\ at\ position\ '(\d+)':", RegexOptions.Compiled | RegexOptions.IgnoreCase);
|
|
private readonly Lazy<Dictionary<long, TaskResult>> _localRunJobResult = new();
|
|
private int _poolId;
|
|
|
|
IConfigurationStore _configurationStore;
|
|
|
|
RunnerSettings _runnerSettings;
|
|
private static readonly string _workerProcessName = $"Runner.Worker{IOUtil.ExeExtension}";
|
|
|
|
// this is not thread-safe
|
|
private readonly Queue<Guid> _jobDispatchedQueue = new();
|
|
private readonly ConcurrentDictionary<Guid, WorkerDispatcher> _jobInfos = new();
|
|
|
|
// allow up to 30sec for any data to be transmitted over the process channel
|
|
// timeout limit can be overwritten by environment GITHUB_ACTIONS_RUNNER_CHANNEL_TIMEOUT
|
|
private TimeSpan _channelTimeout;
|
|
|
|
private TaskCompletionSource<bool> _runOnceJobCompleted = new();
|
|
|
|
public event EventHandler<JobStatusEventArgs> JobStatus;
|
|
|
|
private bool _isRunServiceJob;
|
|
|
|
public override void Initialize(IHostContext hostContext)
|
|
{
|
|
base.Initialize(hostContext);
|
|
|
|
// get pool id from config
|
|
_configurationStore = hostContext.GetService<IConfigurationStore>();
|
|
_runnerSettings = _configurationStore.GetSettings();
|
|
_poolId = _runnerSettings.PoolId;
|
|
|
|
int channelTimeoutSeconds;
|
|
if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_CHANNEL_TIMEOUT") ?? string.Empty, out channelTimeoutSeconds))
|
|
{
|
|
channelTimeoutSeconds = 30;
|
|
}
|
|
|
|
// _channelTimeout should be in range [30, 300] seconds
|
|
_channelTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(channelTimeoutSeconds, 30), 300));
|
|
Trace.Info($"Set runner/worker IPC timeout to {_channelTimeout.TotalSeconds} seconds.");
|
|
}
|
|
|
|
public TaskCompletionSource<bool> RunOnceJobCompleted => _runOnceJobCompleted;
|
|
|
|
public bool Busy { get; private set; }
|
|
|
|
public void Run(Pipelines.AgentJobRequestMessage jobRequestMessage, bool runOnce = false)
|
|
{
|
|
Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received.");
|
|
|
|
_isRunServiceJob = MessageUtil.IsRunServiceJob(jobRequestMessage.MessageType);
|
|
|
|
WorkerDispatcher currentDispatch = null;
|
|
if (_jobDispatchedQueue.Count > 0)
|
|
{
|
|
Guid dispatchedJobId = _jobDispatchedQueue.Dequeue();
|
|
if (_jobInfos.TryGetValue(dispatchedJobId, out currentDispatch))
|
|
{
|
|
Trace.Verbose($"Retrive previous WorkerDispatcher for job {currentDispatch.JobId}.");
|
|
}
|
|
}
|
|
|
|
var orchestrationId = string.Empty;
|
|
var systemConnection = jobRequestMessage.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
|
if (systemConnection?.Authorization != null &&
|
|
systemConnection.Authorization.Parameters.TryGetValue("AccessToken", out var accessToken) &&
|
|
!string.IsNullOrEmpty(accessToken))
|
|
{
|
|
var jwt = JsonWebToken.Create(accessToken);
|
|
var claims = jwt.ExtractClaims();
|
|
orchestrationId = claims.FirstOrDefault(x => string.Equals(x.Type, "orchid", StringComparison.OrdinalIgnoreCase))?.Value;
|
|
if (!string.IsNullOrEmpty(orchestrationId))
|
|
{
|
|
Trace.Info($"Pull OrchestrationId {orchestrationId} from JWT claims");
|
|
}
|
|
}
|
|
|
|
WorkerDispatcher newDispatch = new(jobRequestMessage.JobId, jobRequestMessage.RequestId);
|
|
if (runOnce)
|
|
{
|
|
Trace.Info("Start dispatcher for one time used runner.");
|
|
newDispatch.WorkerDispatch = RunOnceAsync(jobRequestMessage, orchestrationId, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
|
|
}
|
|
else
|
|
{
|
|
newDispatch.WorkerDispatch = RunAsync(jobRequestMessage, orchestrationId, currentDispatch, newDispatch.WorkerCancellationTokenSource.Token, newDispatch.WorkerCancelTimeoutKillTokenSource.Token);
|
|
}
|
|
|
|
_jobInfos.TryAdd(newDispatch.JobId, newDispatch);
|
|
_jobDispatchedQueue.Enqueue(newDispatch.JobId);
|
|
}
|
|
|
|
public bool Cancel(JobCancelMessage jobCancelMessage)
|
|
{
|
|
Trace.Info($"Job cancellation request {jobCancelMessage.JobId} received, cancellation timeout {jobCancelMessage.Timeout.TotalMinutes} minutes.");
|
|
|
|
WorkerDispatcher workerDispatcher;
|
|
if (!_jobInfos.TryGetValue(jobCancelMessage.JobId, out workerDispatcher))
|
|
{
|
|
Trace.Verbose($"Job request {jobCancelMessage.JobId} is not a current running job, ignore cancllation request.");
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
if (workerDispatcher.Cancel(jobCancelMessage.Timeout))
|
|
{
|
|
Trace.Verbose($"Fired cancellation token for job request {workerDispatcher.JobId}.");
|
|
}
|
|
|
|
return true;
|
|
}
|
|
}
|
|
|
|
public async Task WaitAsync(CancellationToken token)
|
|
{
|
|
WorkerDispatcher currentDispatch = null;
|
|
Guid dispatchedJobId;
|
|
if (_jobDispatchedQueue.Count > 0)
|
|
{
|
|
dispatchedJobId = _jobDispatchedQueue.Dequeue();
|
|
if (_jobInfos.TryGetValue(dispatchedJobId, out currentDispatch))
|
|
{
|
|
Trace.Verbose($"Retrive previous WorkerDispatcher for job {currentDispatch.JobId}.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Trace.Verbose($"There is no running WorkerDispatcher needs to await.");
|
|
}
|
|
|
|
if (currentDispatch != null)
|
|
{
|
|
using (var registration = token.Register(() => { if (currentDispatch.Cancel(TimeSpan.FromSeconds(60))) { Trace.Verbose($"Fired cancellation token for job request {currentDispatch.JobId}."); } }))
|
|
{
|
|
try
|
|
{
|
|
Trace.Info($"Waiting WorkerDispatcher for job {currentDispatch.JobId} run to finish.");
|
|
await currentDispatch.WorkerDispatch;
|
|
Trace.Info($"Job request {currentDispatch.JobId} processed succeed.");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Worker Dispatch failed with an exception for job request {currentDispatch.JobId}.");
|
|
Trace.Error(ex);
|
|
}
|
|
finally
|
|
{
|
|
WorkerDispatcher workerDispatcher;
|
|
if (_jobInfos.TryRemove(currentDispatch.JobId, out workerDispatcher))
|
|
{
|
|
Trace.Verbose($"Remove WorkerDispatcher from {nameof(_jobInfos)} dictionary for job {currentDispatch.JobId}.");
|
|
workerDispatcher.Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task ShutdownAsync()
|
|
{
|
|
Trace.Info($"Shutting down JobDispatcher. Make sure all WorkerDispatcher has finished.");
|
|
WorkerDispatcher currentDispatch = null;
|
|
if (_jobDispatchedQueue.Count > 0)
|
|
{
|
|
Guid dispatchedJobId = _jobDispatchedQueue.Dequeue();
|
|
if (_jobInfos.TryGetValue(dispatchedJobId, out currentDispatch))
|
|
{
|
|
try
|
|
{
|
|
Trace.Info($"Ensure WorkerDispatcher for job {currentDispatch.JobId} run to finish, cancel any running job.");
|
|
await EnsureDispatchFinished(currentDispatch, cancelRunningJob: true);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Catching worker dispatch exception for job request {currentDispatch.JobId} durning job dispatcher shut down.");
|
|
Trace.Error(ex);
|
|
}
|
|
finally
|
|
{
|
|
WorkerDispatcher workerDispatcher;
|
|
if (_jobInfos.TryRemove(currentDispatch.JobId, out workerDispatcher))
|
|
{
|
|
Trace.Verbose($"Remove WorkerDispatcher from {nameof(_jobInfos)} dictionary for job {currentDispatch.JobId}.");
|
|
workerDispatcher.Dispose();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool cancelRunningJob = false)
|
|
{
|
|
if (!jobDispatch.WorkerDispatch.IsCompleted)
|
|
{
|
|
if (cancelRunningJob)
|
|
{
|
|
// cancel running job when shutting down the runner.
|
|
// this will happen when runner get Ctrl+C or message queue loop crashed.
|
|
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
|
// wait for worker process exit then return.
|
|
await jobDispatch.WorkerDispatch;
|
|
|
|
return;
|
|
}
|
|
|
|
if (this._isRunServiceJob)
|
|
{
|
|
Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
|
|
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
|
return;
|
|
}
|
|
|
|
// based on the current design, server will only send one job for a given runner at a time.
|
|
// if the runner received a new job request while a previous job request is still running, this typically indicates two situations
|
|
// 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest
|
|
// properly but thinks it still owns the job reqest, however the server has already abandoned the jobrequest.
|
|
// 2. a server bug or design change that allowed the server to send more than one job request to an given runner that hasn't finished
|
|
//. a previous job request.
|
|
var runnerServer = HostContext.GetService<IRunnerServer>();
|
|
TaskAgentJobRequest request = null;
|
|
try
|
|
{
|
|
request = await runnerServer.GetAgentRequestAsync(_poolId, jobDispatch.RequestId, CancellationToken.None);
|
|
}
|
|
catch (TaskAgentJobNotFoundException ex)
|
|
{
|
|
Trace.Error($"Catch job-not-found exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
|
|
Trace.Error(ex);
|
|
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
|
// make sure worker process exits before we return, otherwise we might leave an orphan worker process behind.
|
|
await jobDispatch.WorkerDispatch;
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// we can't even query for the jobrequest from server, something totally busted, stop runner/worker.
|
|
Trace.Error($"Catch exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
|
|
Trace.Error(ex);
|
|
|
|
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
|
// make sure the worker process exits before we rethrow, otherwise we might leave orphan worker process behind.
|
|
await jobDispatch.WorkerDispatch;
|
|
|
|
// rethrow original exception
|
|
throw;
|
|
}
|
|
|
|
if (request.Result != null)
|
|
{
|
|
// job request has been finished, the server already has the result.
|
|
// this means the runner is busted since it is still running that request.
|
|
// cancel the zombie worker, run next job request.
|
|
Trace.Error($"Received job request while previous job {jobDispatch.JobId} still running on worker. Cancel the previous job since the job request have been finished on server side with result: {request.Result.Value}.");
|
|
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
|
|
|
// wait 45 sec for worker to finish.
|
|
Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45)));
|
|
if (completedTask != jobDispatch.WorkerDispatch)
|
|
{
|
|
// at this point, the job execution might encounter some dead lock and even not able to be cancelled.
|
|
// no need to localize the exception string should never happen.
|
|
throw new InvalidOperationException($"Job dispatch process for {jobDispatch.JobId} has encountered unexpected error, the dispatch task is not able to be cancelled within 45 seconds.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// something seriously wrong on server side. stop runner from continue running.
|
|
// no need to localize the exception string should never happen.
|
|
throw new InvalidOperationException($"Server send a new job request while the previous job request {jobDispatch.JobId} haven't finished.");
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
await jobDispatch.WorkerDispatch;
|
|
Trace.Info($"Job request {jobDispatch.JobId} processed succeed.");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Worker Dispatch failed with an exception for job request {jobDispatch.JobId}.");
|
|
Trace.Error(ex);
|
|
}
|
|
finally
|
|
{
|
|
WorkerDispatcher workerDispatcher;
|
|
if (_jobInfos.TryRemove(jobDispatch.JobId, out workerDispatcher))
|
|
{
|
|
Trace.Verbose($"Remove WorkerDispatcher from {nameof(_jobInfos)} dictionary for job {jobDispatch.JobId}.");
|
|
workerDispatcher.Dispose();
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task RunOnceAsync(Pipelines.AgentJobRequestMessage message, string orchestrationId, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
|
|
{
|
|
try
|
|
{
|
|
await RunAsync(message, orchestrationId, previousJobDispatch, jobRequestCancellationToken, workerCancelTimeoutKillToken);
|
|
}
|
|
finally
|
|
{
|
|
Trace.Info("Fire signal for one time used runner.");
|
|
_runOnceJobCompleted.TrySetResult(true);
|
|
}
|
|
}
|
|
|
|
private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orchestrationId, WorkerDispatcher previousJobDispatch, CancellationToken jobRequestCancellationToken, CancellationToken workerCancelTimeoutKillToken)
|
|
{
|
|
Busy = true;
|
|
try
|
|
{
|
|
if (JobStatus != null)
|
|
{
|
|
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Busy));
|
|
}
|
|
|
|
if (previousJobDispatch != null)
|
|
{
|
|
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
|
|
await EnsureDispatchFinished(previousJobDispatch);
|
|
}
|
|
else
|
|
{
|
|
Trace.Verbose($"This is the first job request.");
|
|
}
|
|
|
|
var term = HostContext.GetService<ITerminal>();
|
|
term.WriteLine($"{DateTime.UtcNow:u}: Running job: {message.JobDisplayName}");
|
|
|
|
// first job request renew succeed.
|
|
TaskCompletionSource<int> firstJobRequestRenewed = new();
|
|
var notification = HostContext.GetService<IJobNotification>();
|
|
|
|
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
|
|
|
// lock renew cancellation token.
|
|
using (var lockRenewalTokenSource = new CancellationTokenSource())
|
|
using (var workerProcessCancelTokenSource = new CancellationTokenSource())
|
|
{
|
|
long requestId = message.RequestId;
|
|
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
|
|
|
|
// start renew job request
|
|
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
|
|
Task renewJobRequest = RenewJobRequestAsync(message, systemConnection, _poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);
|
|
|
|
// wait till first renew succeed or job request is cancelled
|
|
// not even start worker if the first renew fail
|
|
await Task.WhenAny(firstJobRequestRenewed.Task, renewJobRequest, Task.Delay(-1, jobRequestCancellationToken));
|
|
|
|
if (renewJobRequest.IsCompleted)
|
|
{
|
|
// renew job request task complete means we run out of retry for the first job request renew.
|
|
Trace.Info($"Unable to renew job request for job {message.JobId} for the first time, stop dispatching job to worker.");
|
|
return;
|
|
}
|
|
|
|
if (jobRequestCancellationToken.IsCancellationRequested)
|
|
{
|
|
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
|
// stop renew lock
|
|
lockRenewalTokenSource.Cancel();
|
|
// renew job request should never blows up.
|
|
await renewJobRequest;
|
|
|
|
// complete job request with result Cancelled
|
|
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, TaskResult.Canceled);
|
|
return;
|
|
}
|
|
|
|
HostContext.WritePerfCounter($"JobRequestRenewed_{requestId.ToString()}");
|
|
|
|
Task<int> workerProcessTask = null;
|
|
object _outputLock = new();
|
|
List<string> workerOutput = new();
|
|
bool printToStdout = StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable(Constants.Variables.Agent.PrintLogToStdout));
|
|
using (var processChannel = HostContext.CreateService<IProcessChannel>())
|
|
using (var processInvoker = HostContext.CreateService<IProcessInvoker>())
|
|
{
|
|
// Start the process channel.
|
|
// It's OK if StartServer bubbles an execption after the worker process has already started.
|
|
// The worker will shutdown after 30 seconds if it hasn't received the job message.
|
|
processChannel.StartServer(
|
|
// Delegate to start the child process.
|
|
startProcess: (string pipeHandleOut, string pipeHandleIn) =>
|
|
{
|
|
// Validate args.
|
|
ArgUtil.NotNullOrEmpty(pipeHandleOut, nameof(pipeHandleOut));
|
|
ArgUtil.NotNullOrEmpty(pipeHandleIn, nameof(pipeHandleIn));
|
|
|
|
// Save STDOUT from worker, worker will use STDOUT report unhandle exception.
|
|
processInvoker.OutputDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stdout)
|
|
{
|
|
if (!string.IsNullOrEmpty(stdout.Data))
|
|
{
|
|
lock (_outputLock)
|
|
{
|
|
if (!stdout.Data.StartsWith("[WORKER"))
|
|
{
|
|
workerOutput.Add(stdout.Data);
|
|
}
|
|
|
|
if (printToStdout)
|
|
{
|
|
term.WriteLine(stdout.Data, skipTracing: true);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
// Save STDERR from worker, worker will use STDERR on crash.
|
|
processInvoker.ErrorDataReceived += delegate (object sender, ProcessDataReceivedEventArgs stderr)
|
|
{
|
|
if (!string.IsNullOrEmpty(stderr.Data))
|
|
{
|
|
lock (_outputLock)
|
|
{
|
|
workerOutput.Add(stderr.Data);
|
|
}
|
|
}
|
|
};
|
|
|
|
// Start the child process.
|
|
HostContext.WritePerfCounter("StartingWorkerProcess");
|
|
var assemblyDirectory = HostContext.GetDirectory(WellKnownDirectory.Bin);
|
|
string workerFileName = Path.Combine(assemblyDirectory, _workerProcessName);
|
|
workerProcessTask = processInvoker.ExecuteAsync(
|
|
workingDirectory: assemblyDirectory,
|
|
fileName: workerFileName,
|
|
arguments: "spawnclient " + pipeHandleOut + " " + pipeHandleIn,
|
|
environment: null,
|
|
requireExitCodeZero: false,
|
|
outputEncoding: null,
|
|
killProcessOnCancel: true,
|
|
redirectStandardIn: null,
|
|
inheritConsoleHandler: false,
|
|
keepStandardInOpen: false,
|
|
highPriorityProcess: true,
|
|
cancellationToken: workerProcessCancelTokenSource.Token);
|
|
});
|
|
|
|
// Send the job request message.
|
|
// Kill the worker process if sending the job message times out. The worker
|
|
// process may have successfully received the job message.
|
|
try
|
|
{
|
|
Trace.Info($"Send job request message to worker for job {message.JobId}.");
|
|
HostContext.WritePerfCounter($"RunnerSendingJobToWorker_{message.JobId}");
|
|
using (var csSendJobRequest = new CancellationTokenSource(_channelTimeout))
|
|
{
|
|
await processChannel.SendAsync(
|
|
messageType: MessageType.NewJobRequest,
|
|
body: JsonUtility.ToString(message),
|
|
cancellationToken: csSendJobRequest.Token);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// message send been cancelled.
|
|
// timeout 30 sec. kill worker.
|
|
Trace.Info($"Job request message sending for job {message.JobId} been cancelled, kill running worker.");
|
|
workerProcessCancelTokenSource.Cancel();
|
|
try
|
|
{
|
|
await workerProcessTask;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Trace.Info("worker process has been killed.");
|
|
}
|
|
|
|
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
|
// stop renew lock
|
|
lockRenewalTokenSource.Cancel();
|
|
// renew job request should never blows up.
|
|
await renewJobRequest;
|
|
|
|
// not finish the job request since the job haven't run on worker at all, we will not going to set a result to server.
|
|
return;
|
|
}
|
|
|
|
// we get first jobrequest renew succeed and start the worker process with the job message.
|
|
// send notification to machine provisioner.
|
|
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
|
|
notification.JobStarted(message.JobId, accessToken, systemConnection.Url);
|
|
|
|
HostContext.WritePerfCounter($"SentJobToWorker_{requestId.ToString()}");
|
|
|
|
try
|
|
{
|
|
TaskResult resultOnAbandonOrCancel = TaskResult.Succeeded;
|
|
// wait for renewlock, worker process or cancellation token been fired.
|
|
var completedTask = await Task.WhenAny(renewJobRequest, workerProcessTask, Task.Delay(-1, jobRequestCancellationToken));
|
|
if (completedTask == workerProcessTask)
|
|
{
|
|
// worker finished successfully, complete job request with result, attach unhandled exception reported by worker, stop renew lock, job has finished.
|
|
int returnCode = await workerProcessTask;
|
|
Trace.Info($"Worker finished for job {message.JobId}. Code: " + returnCode);
|
|
|
|
string detailInfo = null;
|
|
if (!TaskResultUtil.IsValidReturnCode(returnCode))
|
|
{
|
|
detailInfo = string.Join(Environment.NewLine, workerOutput);
|
|
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
|
|
|
|
var jobServer = await InitializeJobServerAsync(systemConnection);
|
|
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
|
|
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
|
|
switch (jobServer)
|
|
{
|
|
case IJobServer js:
|
|
{
|
|
await LogWorkerProcessUnhandledException(js, message, unhandledExceptionIssue);
|
|
// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
|
|
if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
Trace.Info($"Finish job with result 'Failed' due to IOException.");
|
|
await ForceFailJob(js, message);
|
|
}
|
|
|
|
break;
|
|
}
|
|
case IRunServer rs:
|
|
await ForceFailJob(rs, message, unhandledExceptionIssue);
|
|
break;
|
|
default:
|
|
throw new NotSupportedException($"JobServer type '{jobServer.GetType().Name}' is not supported.");
|
|
}
|
|
}
|
|
|
|
TaskResult result = TaskResultUtil.TranslateFromReturnCode(returnCode);
|
|
Trace.Info($"finish job request for job {message.JobId} with result: {result}");
|
|
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {result}");
|
|
|
|
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
|
// stop renew lock
|
|
lockRenewalTokenSource.Cancel();
|
|
// renew job request should never blows up.
|
|
await renewJobRequest;
|
|
|
|
// complete job request
|
|
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, result, detailInfo);
|
|
|
|
// print out unhandled exception happened in worker after we complete job request.
|
|
// when we run out of disk space, report back to server has higher priority.
|
|
if (!string.IsNullOrEmpty(detailInfo))
|
|
{
|
|
Trace.Error("Unhandled exception happened in worker:");
|
|
Trace.Error(detailInfo);
|
|
}
|
|
|
|
return;
|
|
}
|
|
else if (completedTask == renewJobRequest)
|
|
{
|
|
resultOnAbandonOrCancel = TaskResult.Abandoned;
|
|
}
|
|
else
|
|
{
|
|
resultOnAbandonOrCancel = TaskResult.Canceled;
|
|
}
|
|
|
|
// renew job request completed or job request cancellation token been fired for RunAsync(jobrequestmessage)
|
|
// cancel worker gracefully first, then kill it after worker cancel timeout
|
|
try
|
|
{
|
|
Trace.Info($"Send job cancellation message to worker for job {message.JobId}.");
|
|
using (var csSendCancel = new CancellationTokenSource(_channelTimeout))
|
|
{
|
|
var messageType = MessageType.CancelRequest;
|
|
if (HostContext.RunnerShutdownToken.IsCancellationRequested)
|
|
{
|
|
switch (HostContext.RunnerShutdownReason)
|
|
{
|
|
case ShutdownReason.UserCancelled:
|
|
messageType = MessageType.RunnerShutdown;
|
|
break;
|
|
case ShutdownReason.OperatingSystemShutdown:
|
|
messageType = MessageType.OperatingSystemShutdown;
|
|
break;
|
|
}
|
|
}
|
|
|
|
await processChannel.SendAsync(
|
|
messageType: messageType,
|
|
body: string.Empty,
|
|
cancellationToken: csSendCancel.Token);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// message send been cancelled.
|
|
Trace.Info($"Job cancel message sending for job {message.JobId} been cancelled, kill running worker.");
|
|
workerProcessCancelTokenSource.Cancel();
|
|
try
|
|
{
|
|
await workerProcessTask;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Trace.Info("worker process has been killed.");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// message send failed, this might indicate worker process is already exited or stuck.
|
|
Trace.Info($"Job cancel message sending for job {message.JobId} failed, kill running worker. {ex}");
|
|
workerProcessCancelTokenSource.Cancel();
|
|
try
|
|
{
|
|
await workerProcessTask;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Trace.Info("worker process has been killed.");
|
|
}
|
|
}
|
|
|
|
// wait worker to exit
|
|
// if worker doesn't exit within timeout, then kill worker.
|
|
completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken));
|
|
|
|
// worker haven't exit within cancellation timeout.
|
|
if (completedTask != workerProcessTask)
|
|
{
|
|
Trace.Info($"worker process for job {message.JobId} haven't exit within cancellation timout, kill running worker.");
|
|
workerProcessCancelTokenSource.Cancel();
|
|
try
|
|
{
|
|
await workerProcessTask;
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Trace.Info("worker process has been killed.");
|
|
}
|
|
|
|
// When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs.
|
|
// The runner will try to upload these logs at this time.
|
|
await TryUploadUnfinishedLogs(message);
|
|
}
|
|
|
|
Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
|
|
term.WriteLine($"{DateTime.UtcNow:u}: Job {message.JobDisplayName} completed with result: {resultOnAbandonOrCancel}");
|
|
// complete job request with cancel result, stop renew lock, job has finished.
|
|
|
|
Trace.Info($"Stop renew job request for job {message.JobId}.");
|
|
// stop renew lock
|
|
lockRenewalTokenSource.Cancel();
|
|
// renew job request should never blows up.
|
|
await renewJobRequest;
|
|
|
|
// complete job request
|
|
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, resultOnAbandonOrCancel);
|
|
}
|
|
finally
|
|
{
|
|
// This should be the last thing to run so we don't notify external parties until actually finished
|
|
await notification.JobCompleted(message.JobId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
Busy = false;
|
|
|
|
if (JobStatus != null)
|
|
{
|
|
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Online));
|
|
}
|
|
}
|
|
}
|
|
|
|
internal async Task RenewJobRequestAsync(Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
|
{
|
|
if (this._isRunServiceJob)
|
|
{
|
|
var runServer = await GetRunServerAsync(systemConnection);
|
|
await RenewJobRequestAsync(runServer, message.Plan.PlanId, message.JobId, firstJobRequestRenewed, token);
|
|
}
|
|
else
|
|
{
|
|
var runnerServer = HostContext.GetService<IRunnerServer>();
|
|
await RenewJobRequestAsync(runnerServer, poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, token);
|
|
}
|
|
}
|
|
|
|
private async Task RenewJobRequestAsync(IRunServer runServer, Guid planId, Guid jobId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
|
{
|
|
TaskAgentJobRequest request = null;
|
|
int firstRenewRetryLimit = 5;
|
|
int encounteringError = 0;
|
|
|
|
// renew lock during job running.
|
|
// stop renew only if cancellation token for lock renew task been signal or exception still happen after retry.
|
|
while (!token.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
var renewResponse = await runServer.RenewJobAsync(planId, jobId, token);
|
|
Trace.Info($"Successfully renew job {jobId}, job is valid till {renewResponse.LockedUntil}");
|
|
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
// fire first renew succeed event.
|
|
firstJobRequestRenewed.TrySetResult(0);
|
|
}
|
|
|
|
if (encounteringError > 0)
|
|
{
|
|
encounteringError = 0;
|
|
HostContext.WritePerfCounter("JobRenewRecovered");
|
|
}
|
|
|
|
// renew again after 60 sec delay
|
|
await HostContext.Delay(TimeSpan.FromSeconds(60), token);
|
|
}
|
|
catch (TaskOrchestrationJobNotFoundException)
|
|
{
|
|
// no need for retry. the job is not valid anymore.
|
|
Trace.Info($"TaskAgentJobNotFoundException received when renew job {jobId}, job is no longer valid, stop renew job request.");
|
|
return;
|
|
}
|
|
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
|
{
|
|
// OperationCanceledException may caused by http timeout or _lockRenewalTokenSource.Cance();
|
|
// Stop renew only on cancellation token fired.
|
|
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Catch exception during renew runner job {jobId}.");
|
|
Trace.Error(ex);
|
|
encounteringError++;
|
|
|
|
// retry
|
|
TimeSpan remainingTime = TimeSpan.Zero;
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
// retry 5 times every 10 sec for the first renew
|
|
if (firstRenewRetryLimit-- > 0)
|
|
{
|
|
remainingTime = TimeSpan.FromSeconds(10);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// retry till reach lockeduntil + 5 mins extra buffer.
|
|
remainingTime = request.LockedUntil.Value + TimeSpan.FromMinutes(5) - DateTime.UtcNow;
|
|
}
|
|
|
|
if (remainingTime > TimeSpan.Zero)
|
|
{
|
|
TimeSpan delayTime;
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
Trace.Info($"Retrying lock renewal for job {jobId}. The first job renew request has failed.");
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
|
|
}
|
|
else
|
|
{
|
|
Trace.Info($"Retrying lock renewal for job {jobId}. Job is valid until {request.LockedUntil.Value}.");
|
|
if (encounteringError > 5)
|
|
{
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30));
|
|
}
|
|
else
|
|
{
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
// back-off before next retry.
|
|
await HostContext.Delay(delayTime, token);
|
|
}
|
|
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
|
{
|
|
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Trace.Info($"Lock renewal has run out of retry, stop renew lock for job {jobId}.");
|
|
HostContext.WritePerfCounter("JobRenewReachLimit");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task RenewJobRequestAsync(IRunnerServer runnerServer, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
|
{
|
|
TaskAgentJobRequest request = null;
|
|
int firstRenewRetryLimit = 5;
|
|
int encounteringError = 0;
|
|
|
|
// renew lock during job running.
|
|
// stop renew only if cancellation token for lock renew task been signal or exception still happen after retry.
|
|
while (!token.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
request = await runnerServer.RenewAgentRequestAsync(poolId, requestId, lockToken, orchestrationId, token);
|
|
Trace.Info($"Successfully renew job request {requestId}, job is valid till {request.LockedUntil.Value}");
|
|
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
// fire first renew succeed event.
|
|
firstJobRequestRenewed.TrySetResult(0);
|
|
|
|
// Update settings if the runner name has been changed server-side
|
|
UpdateAgentNameIfNeeded(request.ReservedAgent?.Name);
|
|
}
|
|
|
|
if (encounteringError > 0)
|
|
{
|
|
encounteringError = 0;
|
|
runnerServer.SetConnectionTimeout(RunnerConnectionType.JobRequest, TimeSpan.FromSeconds(60));
|
|
HostContext.WritePerfCounter("JobRenewRecovered");
|
|
}
|
|
|
|
// renew again after 60 sec delay
|
|
await HostContext.Delay(TimeSpan.FromSeconds(60), token);
|
|
}
|
|
catch (TaskAgentJobNotFoundException)
|
|
{
|
|
// no need for retry. the job is not valid anymore.
|
|
Trace.Info($"TaskAgentJobNotFoundException received when renew job request {requestId}, job is no longer valid, stop renew job request.");
|
|
return;
|
|
}
|
|
catch (TaskAgentJobTokenExpiredException)
|
|
{
|
|
// no need for retry. the job is not valid anymore.
|
|
Trace.Info($"TaskAgentJobTokenExpiredException received renew job request {requestId}, job is no longer valid, stop renew job request.");
|
|
return;
|
|
}
|
|
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
|
{
|
|
// OperationCanceledException may caused by http timeout or _lockRenewalTokenSource.Cance();
|
|
// Stop renew only on cancellation token fired.
|
|
Trace.Info($"job renew has been cancelled, stop renew job request {requestId}.");
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Catch exception during renew runner jobrequest {requestId}.");
|
|
Trace.Error(ex);
|
|
encounteringError++;
|
|
|
|
// retry
|
|
TimeSpan remainingTime = TimeSpan.Zero;
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
// retry 5 times every 10 sec for the first renew
|
|
if (firstRenewRetryLimit-- > 0)
|
|
{
|
|
remainingTime = TimeSpan.FromSeconds(10);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// retry till reach lockeduntil + 5 mins extra buffer.
|
|
remainingTime = request.LockedUntil.Value + TimeSpan.FromMinutes(5) - DateTime.UtcNow;
|
|
}
|
|
|
|
if (remainingTime > TimeSpan.Zero)
|
|
{
|
|
TimeSpan delayTime;
|
|
if (!firstJobRequestRenewed.Task.IsCompleted)
|
|
{
|
|
Trace.Info($"Retrying lock renewal for jobrequest {requestId}. The first job renew request has failed.");
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
|
|
}
|
|
else
|
|
{
|
|
Trace.Info($"Retrying lock renewal for jobrequest {requestId}. Job is valid until {request.LockedUntil.Value}.");
|
|
if (encounteringError > 5)
|
|
{
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30));
|
|
}
|
|
else
|
|
{
|
|
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
|
|
}
|
|
}
|
|
|
|
// Re-establish connection to server in order to avoid affinity with server.
|
|
// Reduce connection timeout to 30 seconds (from 60s)
|
|
HostContext.WritePerfCounter("ResetJobRenewConnection");
|
|
await runnerServer.RefreshConnectionAsync(RunnerConnectionType.JobRequest, TimeSpan.FromSeconds(30));
|
|
|
|
try
|
|
{
|
|
// back-off before next retry.
|
|
await HostContext.Delay(delayTime, token);
|
|
}
|
|
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
|
{
|
|
Trace.Info($"job renew has been cancelled, stop renew job request {requestId}.");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Trace.Info($"Lock renewal has run out of retry, stop renew lock for jobrequest {requestId}.");
|
|
HostContext.WritePerfCounter("JobRenewReachLimit");
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void UpdateAgentNameIfNeeded(string agentName)
|
|
{
|
|
var isNewAgentName = !string.Equals(_runnerSettings.AgentName, agentName, StringComparison.Ordinal);
|
|
if (!isNewAgentName || string.IsNullOrEmpty(agentName))
|
|
{
|
|
return;
|
|
}
|
|
|
|
_runnerSettings.AgentName = agentName;
|
|
try
|
|
{
|
|
_configurationStore.SaveSettings(_runnerSettings);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error("Cannot update the settings file:");
|
|
Trace.Error(ex);
|
|
}
|
|
|
|
}
|
|
|
|
// Best effort upload any logs for this job.
|
|
private async Task TryUploadUnfinishedLogs(Pipelines.AgentJobRequestMessage message)
|
|
{
|
|
Trace.Entering();
|
|
|
|
var logFolder = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Diag), PagingLogger.PagingFolder);
|
|
if (!Directory.Exists(logFolder))
|
|
{
|
|
return;
|
|
}
|
|
|
|
var logs = Directory.GetFiles(logFolder);
|
|
if (logs.Length == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
|
|
ArgUtil.NotNull(systemConnection, nameof(systemConnection));
|
|
|
|
var server = await InitializeJobServerAsync(systemConnection);
|
|
|
|
if (server is IJobServer jobServer)
|
|
{
|
|
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
|
|
|
var updatedRecords = new List<TimelineRecord>();
|
|
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
|
|
var logRecords = new Dictionary<Guid, TimelineRecord>();
|
|
foreach (var log in logs)
|
|
{
|
|
var logName = Path.GetFileNameWithoutExtension(log);
|
|
var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries);
|
|
if (logNameParts.Length != 3)
|
|
{
|
|
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
|
continue;
|
|
}
|
|
var logPageSeperator = logName.IndexOf('_');
|
|
var logRecordId = Guid.Empty;
|
|
var pageNumber = 0;
|
|
|
|
if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id)
|
|
{
|
|
Trace.Warning($"log file '{log}' is not belongs to current job");
|
|
continue;
|
|
}
|
|
|
|
if (!Guid.TryParse(logNameParts[1], out logRecordId))
|
|
{
|
|
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
|
continue;
|
|
}
|
|
|
|
if (!int.TryParse(logNameParts[2], out pageNumber))
|
|
{
|
|
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
|
continue;
|
|
}
|
|
|
|
var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
|
|
if (record != null)
|
|
{
|
|
if (!logPages.ContainsKey(record.Id))
|
|
{
|
|
logPages[record.Id] = new Dictionary<int, string>();
|
|
logRecords[record.Id] = record;
|
|
}
|
|
|
|
logPages[record.Id][pageNumber] = log;
|
|
}
|
|
}
|
|
|
|
foreach (var pages in logPages)
|
|
{
|
|
var record = logRecords[pages.Key];
|
|
if (record.Log == null)
|
|
{
|
|
// Create the log
|
|
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));
|
|
|
|
// Need to post timeline record updates to reflect the log creation
|
|
updatedRecords.Add(record.Clone());
|
|
}
|
|
|
|
for (var i = 1; i <= pages.Value.Count; i++)
|
|
{
|
|
var logFile = pages.Value[i];
|
|
// Upload the contents
|
|
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
|
{
|
|
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
|
|
}
|
|
|
|
Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
|
|
IOUtil.DeleteFile(logFile);
|
|
}
|
|
}
|
|
|
|
if (updatedRecords.Count > 0)
|
|
{
|
|
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Trace.Info("Job server does not support log upload yet.");
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Ignore any error during log upload since it's best effort
|
|
Trace.Error(ex);
|
|
}
|
|
}
|
|
|
|
private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, Guid lockToken, TaskResult result, string detailInfo = null)
|
|
{
|
|
Trace.Entering();
|
|
|
|
if (PlanUtil.GetFeatures(message.Plan).HasFlag(PlanFeatures.JobCompletedPlanEvent))
|
|
{
|
|
Trace.Verbose($"Skip FinishAgentRequest call from Listener because Plan version is {message.Plan.Version}");
|
|
return;
|
|
}
|
|
|
|
if (this._isRunServiceJob)
|
|
{
|
|
Trace.Verbose($"Skip CompleteJobRequestAsync call from Listener because it's RunService job");
|
|
return;
|
|
}
|
|
|
|
var runnerServer = HostContext.GetService<IRunnerServer>();
|
|
int completeJobRequestRetryLimit = 5;
|
|
List<Exception> exceptions = new();
|
|
while (completeJobRequestRetryLimit-- > 0)
|
|
{
|
|
try
|
|
{
|
|
await runnerServer.FinishAgentRequestAsync(poolId, message.RequestId, lockToken, DateTime.UtcNow, result, CancellationToken.None);
|
|
return;
|
|
}
|
|
catch (TaskAgentJobNotFoundException)
|
|
{
|
|
Trace.Info($"TaskAgentJobNotFoundException received, job {message.JobId} is no longer valid.");
|
|
return;
|
|
}
|
|
catch (TaskAgentJobTokenExpiredException)
|
|
{
|
|
Trace.Info($"TaskAgentJobTokenExpiredException received, job {message.JobId} is no longer valid.");
|
|
return;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error($"Catch exception during complete runner jobrequest {message.RequestId}.");
|
|
Trace.Error(ex);
|
|
exceptions.Add(ex);
|
|
}
|
|
|
|
// delay 5 seconds before next retry.
|
|
await Task.Delay(TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
// rethrow all catched exceptions during retry.
|
|
throw new AggregateException(exceptions);
|
|
}
|
|
|
|
// log an error issue to job level timeline record
|
|
private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, Issue issue)
|
|
{
|
|
try
|
|
{
|
|
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
|
ArgUtil.NotNull(timeline, nameof(timeline));
|
|
|
|
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
|
|
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
|
|
|
|
jobRecord.ErrorCount++;
|
|
jobRecord.Issues.Add(issue);
|
|
|
|
Trace.Info("Mark the job as failed since the worker crashed");
|
|
jobRecord.Result = TaskResult.Failed;
|
|
// mark the job as completed so service will pickup the result
|
|
jobRecord.State = TimelineRecordState.Completed;
|
|
|
|
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
|
|
Trace.Error(ex);
|
|
}
|
|
}
|
|
|
|
// raise job completed event to fail the job.
|
|
private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message)
|
|
{
|
|
try
|
|
{
|
|
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
|
|
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error("Fail to raise JobCompletedEvent back to service.");
|
|
Trace.Error(ex);
|
|
}
|
|
}
|
|
|
|
private async Task ForceFailJob(IRunServer runServer, Pipelines.AgentJobRequestMessage message, Issue issue)
|
|
{
|
|
try
|
|
{
|
|
var annotation = issue.ToAnnotation();
|
|
var jobAnnotations = new List<Annotation>();
|
|
if (annotation.HasValue)
|
|
{
|
|
jobAnnotations.Add(annotation.Value);
|
|
}
|
|
|
|
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Trace.Error("Fail to raise job completion back to service.");
|
|
Trace.Error(ex);
|
|
}
|
|
}
|
|
|
|
private async Task<IRunnerService> InitializeJobServerAsync(ServiceEndpoint systemConnection)
|
|
{
|
|
if (this._isRunServiceJob)
|
|
{
|
|
return await GetRunServerAsync(systemConnection);
|
|
}
|
|
else
|
|
{
|
|
var jobServer = HostContext.GetService<IJobServer>();
|
|
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
|
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
|
|
await jobServer.ConnectAsync(jobConnection);
|
|
return jobServer;
|
|
}
|
|
}
|
|
|
|
private async Task<IRunServer> GetRunServerAsync(ServiceEndpoint systemConnection)
|
|
{
|
|
var runServer = HostContext.GetService<IRunServer>();
|
|
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
|
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
|
|
return runServer;
|
|
}
|
|
|
|
private class WorkerDispatcher : IDisposable
|
|
{
|
|
public long RequestId { get; }
|
|
public Guid JobId { get; }
|
|
public Task WorkerDispatch { get; set; }
|
|
public CancellationTokenSource WorkerCancellationTokenSource { get; private set; }
|
|
public CancellationTokenSource WorkerCancelTimeoutKillTokenSource { get; private set; }
|
|
private readonly object _lock = new();
|
|
|
|
public WorkerDispatcher(Guid jobId, long requestId)
|
|
{
|
|
JobId = jobId;
|
|
RequestId = requestId;
|
|
WorkerCancelTimeoutKillTokenSource = new CancellationTokenSource();
|
|
WorkerCancellationTokenSource = new CancellationTokenSource();
|
|
}
|
|
|
|
public bool Cancel(TimeSpan timeout)
|
|
{
|
|
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
|
|
{
|
|
lock (_lock)
|
|
{
|
|
if (WorkerCancellationTokenSource != null && WorkerCancelTimeoutKillTokenSource != null)
|
|
{
|
|
WorkerCancellationTokenSource.Cancel();
|
|
|
|
// make sure we have at least 60 seconds for cancellation.
|
|
if (timeout.TotalSeconds < 60)
|
|
{
|
|
timeout = TimeSpan.FromSeconds(60);
|
|
}
|
|
|
|
WorkerCancelTimeoutKillTokenSource.CancelAfter(timeout.Subtract(TimeSpan.FromSeconds(15)));
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
Dispose(true);
|
|
GC.SuppressFinalize(this);
|
|
}
|
|
|
|
private void Dispose(bool disposing)
|
|
{
|
|
if (disposing)
|
|
{
|
|
if (WorkerCancellationTokenSource != null || WorkerCancelTimeoutKillTokenSource != null)
|
|
{
|
|
lock (_lock)
|
|
{
|
|
if (WorkerCancellationTokenSource != null)
|
|
{
|
|
WorkerCancellationTokenSource.Dispose();
|
|
WorkerCancellationTokenSource = null;
|
|
}
|
|
|
|
if (WorkerCancelTimeoutKillTokenSource != null)
|
|
{
|
|
WorkerCancelTimeoutKillTokenSource.Dispose();
|
|
WorkerCancelTimeoutKillTokenSource = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|