mirror of
https://github.com/actions/runner.git
synced 2025-12-11 04:46:58 +00:00
call run service renewjob (#2461)
* call run service renewjob * format * formatting * make it private and expose internals * lint * fix exception class * lint * fix test as well
This commit is contained in:
committed by
GitHub
parent
0befa62f64
commit
e8975514fd
@@ -7,6 +7,7 @@ using System.Text;
|
||||
using System.Text.RegularExpressions;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.DistributedTask.Pipelines;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using GitHub.Runner.Common;
|
||||
using GitHub.Runner.Common.Util;
|
||||
@@ -58,6 +59,8 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
public event EventHandler<JobStatusEventArgs> JobStatus;
|
||||
|
||||
private bool _isRunServiceJob;
|
||||
|
||||
public override void Initialize(IHostContext hostContext)
|
||||
{
|
||||
base.Initialize(hostContext);
|
||||
@@ -86,6 +89,8 @@ namespace GitHub.Runner.Listener
|
||||
{
|
||||
Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received.");
|
||||
|
||||
_isRunServiceJob = MessageUtil.IsRunServiceJob(jobRequestMessage.MessageType);
|
||||
|
||||
WorkerDispatcher currentDispatch = null;
|
||||
if (_jobDispatchedQueue.Count > 0)
|
||||
{
|
||||
@@ -239,6 +244,13 @@ namespace GitHub.Runner.Listener
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._isRunServiceJob)
|
||||
{
|
||||
Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
|
||||
jobDispatch.WorkerCancellationTokenSource.Cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
// based on the current design, server will only send one job for a given runner at a time.
|
||||
// if the runner received a new job request while a previous job request is still running, this typically indicates two situations
|
||||
// 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest
|
||||
@@ -367,9 +379,11 @@ namespace GitHub.Runner.Listener
|
||||
long requestId = message.RequestId;
|
||||
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
|
||||
|
||||
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
// start renew job request
|
||||
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
|
||||
Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);
|
||||
Task renewJobRequest = RenewJobRequestAsync(message, systemConnection, _poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);
|
||||
|
||||
// wait till first renew succeed or job request is cancelled
|
||||
// not even start worker if the first renew fail
|
||||
@@ -426,7 +440,7 @@ namespace GitHub.Runner.Listener
|
||||
{
|
||||
workerOutput.Add(stdout.Data);
|
||||
}
|
||||
|
||||
|
||||
if (printToStdout)
|
||||
{
|
||||
term.WriteLine(stdout.Data, skipTracing: true);
|
||||
@@ -508,7 +522,6 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
// we get first jobrequest renew succeed and start the worker process with the job message.
|
||||
// send notification to machine provisioner.
|
||||
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
|
||||
notification.JobStarted(message.JobId, accessToken, systemConnection.Url);
|
||||
|
||||
@@ -531,11 +544,8 @@ namespace GitHub.Runner.Listener
|
||||
detailInfo = string.Join(Environment.NewLine, workerOutput);
|
||||
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
|
||||
|
||||
var jobServer = HostContext.GetService<IJobServer>();
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
|
||||
await jobServer.ConnectAsync(jobConnection);
|
||||
|
||||
var jobServer = await InitializeJobServerAsync(systemConnection);
|
||||
await LogWorkerProcessUnhandledException(jobServer, message, detailInfo);
|
||||
|
||||
// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
|
||||
@@ -675,9 +685,128 @@ namespace GitHub.Runner.Listener
|
||||
}
|
||||
}
|
||||
|
||||
public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
||||
internal async Task RenewJobRequestAsync(Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
||||
{
|
||||
if (this._isRunServiceJob)
|
||||
{
|
||||
var runServer = await GetRunServerAsync(systemConnection);
|
||||
await RenewJobRequestAsync(runServer, message.Plan.PlanId, message.JobId, firstJobRequestRenewed, token);
|
||||
}
|
||||
else
|
||||
{
|
||||
var runnerServer = HostContext.GetService<IRunnerServer>();
|
||||
await RenewJobRequestAsync(runnerServer, poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, token);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RenewJobRequestAsync(IRunServer runServer, Guid planId, Guid jobId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
||||
{
|
||||
TaskAgentJobRequest request = null;
|
||||
int firstRenewRetryLimit = 5;
|
||||
int encounteringError = 0;
|
||||
|
||||
// renew lock during job running.
|
||||
// stop renew only if cancellation token for lock renew task been signal or exception still happen after retry.
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
var renewResponse = await runServer.RenewJobAsync(planId, jobId, token);
|
||||
Trace.Info($"Successfully renew job {jobId}, job is valid till {renewResponse.LockedUntil}");
|
||||
|
||||
if (!firstJobRequestRenewed.Task.IsCompleted)
|
||||
{
|
||||
// fire first renew succeed event.
|
||||
firstJobRequestRenewed.TrySetResult(0);
|
||||
}
|
||||
|
||||
if (encounteringError > 0)
|
||||
{
|
||||
encounteringError = 0;
|
||||
HostContext.WritePerfCounter("JobRenewRecovered");
|
||||
}
|
||||
|
||||
// renew again after 60 sec delay
|
||||
await HostContext.Delay(TimeSpan.FromSeconds(60), token);
|
||||
}
|
||||
catch (TaskOrchestrationJobNotFoundException)
|
||||
{
|
||||
// no need for retry. the job is not valid anymore.
|
||||
Trace.Info($"TaskAgentJobNotFoundException received when renew job {jobId}, job is no longer valid, stop renew job request.");
|
||||
return;
|
||||
}
|
||||
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
||||
{
|
||||
// OperationCanceledException may caused by http timeout or _lockRenewalTokenSource.Cance();
|
||||
// Stop renew only on cancellation token fired.
|
||||
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
|
||||
return;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error($"Catch exception during renew runner job {jobId}.");
|
||||
Trace.Error(ex);
|
||||
encounteringError++;
|
||||
|
||||
// retry
|
||||
TimeSpan remainingTime = TimeSpan.Zero;
|
||||
if (!firstJobRequestRenewed.Task.IsCompleted)
|
||||
{
|
||||
// retry 5 times every 10 sec for the first renew
|
||||
if (firstRenewRetryLimit-- > 0)
|
||||
{
|
||||
remainingTime = TimeSpan.FromSeconds(10);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// retry till reach lockeduntil + 5 mins extra buffer.
|
||||
remainingTime = request.LockedUntil.Value + TimeSpan.FromMinutes(5) - DateTime.UtcNow;
|
||||
}
|
||||
|
||||
if (remainingTime > TimeSpan.Zero)
|
||||
{
|
||||
TimeSpan delayTime;
|
||||
if (!firstJobRequestRenewed.Task.IsCompleted)
|
||||
{
|
||||
Trace.Info($"Retrying lock renewal for job {jobId}. The first job renew request has failed.");
|
||||
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace.Info($"Retrying lock renewal for job {jobId}. Job is valid until {request.LockedUntil.Value}.");
|
||||
if (encounteringError > 5)
|
||||
{
|
||||
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30));
|
||||
}
|
||||
else
|
||||
{
|
||||
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// back-off before next retry.
|
||||
await HostContext.Delay(delayTime, token);
|
||||
}
|
||||
catch (OperationCanceledException) when (token.IsCancellationRequested)
|
||||
{
|
||||
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
Trace.Info($"Lock renewal has run out of retry, stop renew lock for job {jobId}.");
|
||||
HostContext.WritePerfCounter("JobRenewReachLimit");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task RenewJobRequestAsync(IRunnerServer runnerServer, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
|
||||
{
|
||||
var runnerServer = HostContext.GetService<IRunnerServer>();
|
||||
TaskAgentJobRequest request = null;
|
||||
int firstRenewRetryLimit = 5;
|
||||
int encounteringError = 0;
|
||||
@@ -840,90 +969,93 @@ namespace GitHub.Runner.Listener
|
||||
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
|
||||
ArgUtil.NotNull(systemConnection, nameof(systemConnection));
|
||||
|
||||
var jobServer = HostContext.GetService<IJobServer>();
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
|
||||
var server = await InitializeJobServerAsync(systemConnection);
|
||||
|
||||
await jobServer.ConnectAsync(jobConnection);
|
||||
|
||||
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
||||
|
||||
var updatedRecords = new List<TimelineRecord>();
|
||||
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
|
||||
var logRecords = new Dictionary<Guid, TimelineRecord>();
|
||||
foreach (var log in logs)
|
||||
if (server is IJobServer jobServer)
|
||||
{
|
||||
var logName = Path.GetFileNameWithoutExtension(log);
|
||||
var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (logNameParts.Length != 3)
|
||||
{
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
var logPageSeperator = logName.IndexOf('_');
|
||||
var logRecordId = Guid.Empty;
|
||||
var pageNumber = 0;
|
||||
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
||||
|
||||
if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id)
|
||||
var updatedRecords = new List<TimelineRecord>();
|
||||
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
|
||||
var logRecords = new Dictionary<Guid, TimelineRecord>();
|
||||
foreach (var log in logs)
|
||||
{
|
||||
Trace.Warning($"log file '{log}' is not belongs to current job");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!Guid.TryParse(logNameParts[1], out logRecordId))
|
||||
{
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!int.TryParse(logNameParts[2], out pageNumber))
|
||||
{
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
|
||||
var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
|
||||
if (record != null)
|
||||
{
|
||||
if (!logPages.ContainsKey(record.Id))
|
||||
var logName = Path.GetFileNameWithoutExtension(log);
|
||||
var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (logNameParts.Length != 3)
|
||||
{
|
||||
logPages[record.Id] = new Dictionary<int, string>();
|
||||
logRecords[record.Id] = record;
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
var logPageSeperator = logName.IndexOf('_');
|
||||
var logRecordId = Guid.Empty;
|
||||
var pageNumber = 0;
|
||||
|
||||
if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id)
|
||||
{
|
||||
Trace.Warning($"log file '{log}' is not belongs to current job");
|
||||
continue;
|
||||
}
|
||||
|
||||
logPages[record.Id][pageNumber] = log;
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var pages in logPages)
|
||||
{
|
||||
var record = logRecords[pages.Key];
|
||||
if (record.Log == null)
|
||||
{
|
||||
// Create the log
|
||||
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));
|
||||
|
||||
// Need to post timeline record updates to reflect the log creation
|
||||
updatedRecords.Add(record.Clone());
|
||||
}
|
||||
|
||||
for (var i = 1; i <= pages.Value.Count; i++)
|
||||
{
|
||||
var logFile = pages.Value[i];
|
||||
// Upload the contents
|
||||
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
if (!Guid.TryParse(logNameParts[1], out logRecordId))
|
||||
{
|
||||
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
|
||||
Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
|
||||
IOUtil.DeleteFile(logFile);
|
||||
if (!int.TryParse(logNameParts[2], out pageNumber))
|
||||
{
|
||||
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
|
||||
continue;
|
||||
}
|
||||
|
||||
var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
|
||||
if (record != null)
|
||||
{
|
||||
if (!logPages.ContainsKey(record.Id))
|
||||
{
|
||||
logPages[record.Id] = new Dictionary<int, string>();
|
||||
logRecords[record.Id] = record;
|
||||
}
|
||||
|
||||
logPages[record.Id][pageNumber] = log;
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var pages in logPages)
|
||||
{
|
||||
var record = logRecords[pages.Key];
|
||||
if (record.Log == null)
|
||||
{
|
||||
// Create the log
|
||||
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));
|
||||
|
||||
// Need to post timeline record updates to reflect the log creation
|
||||
updatedRecords.Add(record.Clone());
|
||||
}
|
||||
|
||||
for (var i = 1; i <= pages.Value.Count; i++)
|
||||
{
|
||||
var logFile = pages.Value[i];
|
||||
// Upload the contents
|
||||
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
|
||||
{
|
||||
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
|
||||
}
|
||||
|
||||
Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
|
||||
IOUtil.DeleteFile(logFile);
|
||||
}
|
||||
}
|
||||
|
||||
if (updatedRecords.Count > 0)
|
||||
{
|
||||
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
if (updatedRecords.Count > 0)
|
||||
else
|
||||
{
|
||||
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
|
||||
Trace.Info("Job server does not support log upload yet.");
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -943,6 +1075,12 @@ namespace GitHub.Runner.Listener
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._isRunServiceJob)
|
||||
{
|
||||
Trace.Verbose($"Skip FinishAgentRequest call from Listener because MessageType is {message.MessageType}");
|
||||
return;
|
||||
}
|
||||
|
||||
var runnerServer = HostContext.GetService<IRunnerServer>();
|
||||
int completeJobRequestRetryLimit = 5;
|
||||
List<Exception> exceptions = new();
|
||||
@@ -979,66 +1117,117 @@ namespace GitHub.Runner.Listener
|
||||
}
|
||||
|
||||
// log an error issue to job level timeline record
|
||||
private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, string errorMessage)
|
||||
private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string errorMessage)
|
||||
{
|
||||
try
|
||||
if (server is IJobServer jobServer)
|
||||
{
|
||||
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
||||
ArgUtil.NotNull(timeline, nameof(timeline));
|
||||
|
||||
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
|
||||
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
|
||||
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrEmpty(errorMessage) &&
|
||||
message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) &&
|
||||
StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value))
|
||||
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
|
||||
ArgUtil.NotNull(timeline, nameof(timeline));
|
||||
|
||||
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
|
||||
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
|
||||
|
||||
try
|
||||
{
|
||||
// the trace should be best effort and not affect any job result
|
||||
var match = _invalidJsonRegex.Match(errorMessage);
|
||||
if (match.Success &&
|
||||
match.Groups.Count == 2)
|
||||
if (!string.IsNullOrEmpty(errorMessage) &&
|
||||
message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) &&
|
||||
StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value))
|
||||
{
|
||||
var jsonPosition = int.Parse(match.Groups[1].Value);
|
||||
var serializedJobMessage = JsonUtility.ToString(message);
|
||||
var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20);
|
||||
errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}";
|
||||
// the trace should be best effort and not affect any job result
|
||||
var match = _invalidJsonRegex.Match(errorMessage);
|
||||
if (match.Success &&
|
||||
match.Groups.Count == 2)
|
||||
{
|
||||
var jsonPosition = int.Parse(match.Groups[1].Value);
|
||||
var serializedJobMessage = JsonUtility.ToString(message);
|
||||
var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20);
|
||||
errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}";
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error(ex);
|
||||
errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}";
|
||||
}
|
||||
|
||||
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage };
|
||||
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
|
||||
jobRecord.ErrorCount++;
|
||||
jobRecord.Issues.Add(unhandledExceptionIssue);
|
||||
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
|
||||
Trace.Error(ex);
|
||||
errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}";
|
||||
}
|
||||
|
||||
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage };
|
||||
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
|
||||
jobRecord.ErrorCount++;
|
||||
jobRecord.Issues.Add(unhandledExceptionIssue);
|
||||
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
else
|
||||
{
|
||||
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
|
||||
Trace.Error(ex);
|
||||
Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", errorMessage);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// raise job completed event to fail the job.
|
||||
private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message)
|
||||
private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message)
|
||||
{
|
||||
try
|
||||
if (server is IJobServer jobServer)
|
||||
{
|
||||
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
|
||||
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
|
||||
try
|
||||
{
|
||||
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
|
||||
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error("Fail to raise JobCompletedEvent back to service.");
|
||||
Trace.Error(ex);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
else if (server is IRunServer runServer)
|
||||
{
|
||||
Trace.Error("Fail to raise JobCompletedEvent back to service.");
|
||||
Trace.Error(ex);
|
||||
try
|
||||
{
|
||||
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, CancellationToken.None);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Error("Fail to raise job completion back to service.");
|
||||
Trace.Error(ex);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new NotSupportedException($"Server type {server.GetType().FullName} is not supported.");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IRunnerService> InitializeJobServerAsync(ServiceEndpoint systemConnection)
|
||||
{
|
||||
if (this._isRunServiceJob)
|
||||
{
|
||||
return await GetRunServerAsync(systemConnection);
|
||||
}
|
||||
else
|
||||
{
|
||||
var jobServer = HostContext.GetService<IJobServer>();
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
|
||||
await jobServer.ConnectAsync(jobConnection);
|
||||
return jobServer;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IRunServer> GetRunServerAsync(ServiceEndpoint systemConnection)
|
||||
{
|
||||
var runServer = HostContext.GetService<IRunServer>();
|
||||
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
|
||||
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
|
||||
return runServer;
|
||||
}
|
||||
|
||||
private class WorkerDispatcher : IDisposable
|
||||
|
||||
Reference in New Issue
Block a user