diff --git a/src/Runner.Common/RunServer.cs b/src/Runner.Common/RunServer.cs index c1e25d8b8..32c9f063f 100644 --- a/src/Runner.Common/RunServer.cs +++ b/src/Runner.Common/RunServer.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -7,6 +7,7 @@ using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; using GitHub.Services.Common; +using Sdk.RSWebApi.Contracts; using Sdk.WebApi.WebApi.RawClient; namespace GitHub.Runner.Common @@ -19,6 +20,8 @@ namespace GitHub.Runner.Common Task GetJobMessageAsync(string id, CancellationToken token); Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary outputs, IList stepResults, CancellationToken token); + + Task RenewJobAsync(Guid planId, Guid jobId, CancellationToken token); } public sealed class RunServer : RunnerService, IRunServer @@ -64,5 +67,18 @@ namespace GitHub.Runner.Common return RetryRequest( async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, cancellationToken), cancellationToken); } + + public Task RenewJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken) + { + CheckConnection(); + var renewJobResponse = RetryRequest( + async () => await _runServiceHttpClient.RenewJobAsync(requestUri, planId, jobId, cancellationToken), cancellationToken); + if (renewJobResponse == null) + { + throw new TaskOrchestrationJobNotFoundException(jobId.ToString()); + } + + return renewJobResponse; + } } } diff --git a/src/Runner.Common/Util/MessageUtil.cs b/src/Runner.Common/Util/MessageUtil.cs new file mode 100644 index 000000000..4489b447c --- /dev/null +++ b/src/Runner.Common/Util/MessageUtil.cs @@ -0,0 +1,14 @@ +namespace GitHub.Runner.Common.Util +{ + using System; + using GitHub.DistributedTask.WebApi; + + public static class MessageUtil + { + public static bool IsRunServiceJob(string messageType) + { + return string.Equals(messageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase); + } + } +} + diff --git a/src/Runner.Listener/InternalsVisibleTo.cs b/src/Runner.Listener/InternalsVisibleTo.cs new file mode 100644 index 000000000..0012a2949 --- /dev/null +++ b/src/Runner.Listener/InternalsVisibleTo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Test")] diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index aaa83b80c..53380b9b2 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -7,6 +7,7 @@ using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Common; using GitHub.Runner.Common.Util; @@ -58,6 +59,8 @@ namespace GitHub.Runner.Listener public event EventHandler JobStatus; + private bool _isRunServiceJob; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -86,6 +89,8 @@ namespace GitHub.Runner.Listener { Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received."); + _isRunServiceJob = MessageUtil.IsRunServiceJob(jobRequestMessage.MessageType); + WorkerDispatcher currentDispatch = null; if (_jobDispatchedQueue.Count > 0) { @@ -239,6 +244,13 @@ namespace GitHub.Runner.Listener return; } + if (this._isRunServiceJob) + { + Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); + jobDispatch.WorkerCancellationTokenSource.Cancel(); + return; + } + // based on the current design, server will only send one job for a given runner at a time. // if the runner received a new job request while a previous job request is still running, this typically indicates two situations // 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest @@ -367,9 +379,11 @@ namespace GitHub.Runner.Listener long requestId = message.RequestId; Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat + var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + // start renew job request Trace.Info($"Start renew job request {requestId} for job {message.JobId}."); - Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token); + Task renewJobRequest = RenewJobRequestAsync(message, systemConnection, _poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token); // wait till first renew succeed or job request is cancelled // not even start worker if the first renew fail @@ -426,7 +440,7 @@ namespace GitHub.Runner.Listener { workerOutput.Add(stdout.Data); } - + if (printToStdout) { term.WriteLine(stdout.Data, skipTracing: true); @@ -508,7 +522,6 @@ namespace GitHub.Runner.Listener // we get first jobrequest renew succeed and start the worker process with the job message. // send notification to machine provisioner. - var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"]; notification.JobStarted(message.JobId, accessToken, systemConnection.Url); @@ -531,11 +544,8 @@ namespace GitHub.Runner.Listener detailInfo = string.Join(Environment.NewLine, workerOutput); Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); - var jobServer = HostContext.GetService(); - VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); - VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential); - await jobServer.ConnectAsync(jobConnection); + var jobServer = await InitializeJobServerAsync(systemConnection); await LogWorkerProcessUnhandledException(jobServer, message, detailInfo); // Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space. @@ -675,9 +685,128 @@ namespace GitHub.Runner.Listener } } - public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource firstJobRequestRenewed, CancellationToken token) + internal async Task RenewJobRequestAsync(Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource firstJobRequestRenewed, CancellationToken token) + { + if (this._isRunServiceJob) + { + var runServer = await GetRunServerAsync(systemConnection); + await RenewJobRequestAsync(runServer, message.Plan.PlanId, message.JobId, firstJobRequestRenewed, token); + } + else + { + var runnerServer = HostContext.GetService(); + await RenewJobRequestAsync(runnerServer, poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, token); + } + } + + private async Task RenewJobRequestAsync(IRunServer runServer, Guid planId, Guid jobId, TaskCompletionSource firstJobRequestRenewed, CancellationToken token) + { + TaskAgentJobRequest request = null; + int firstRenewRetryLimit = 5; + int encounteringError = 0; + + // renew lock during job running. + // stop renew only if cancellation token for lock renew task been signal or exception still happen after retry. + while (!token.IsCancellationRequested) + { + try + { + var renewResponse = await runServer.RenewJobAsync(planId, jobId, token); + Trace.Info($"Successfully renew job {jobId}, job is valid till {renewResponse.LockedUntil}"); + + if (!firstJobRequestRenewed.Task.IsCompleted) + { + // fire first renew succeed event. + firstJobRequestRenewed.TrySetResult(0); + } + + if (encounteringError > 0) + { + encounteringError = 0; + HostContext.WritePerfCounter("JobRenewRecovered"); + } + + // renew again after 60 sec delay + await HostContext.Delay(TimeSpan.FromSeconds(60), token); + } + catch (TaskOrchestrationJobNotFoundException) + { + // no need for retry. the job is not valid anymore. + Trace.Info($"TaskAgentJobNotFoundException received when renew job {jobId}, job is no longer valid, stop renew job request."); + return; + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + // OperationCanceledException may caused by http timeout or _lockRenewalTokenSource.Cance(); + // Stop renew only on cancellation token fired. + Trace.Info($"job renew has been cancelled, stop renew job {jobId}."); + return; + } + catch (Exception ex) + { + Trace.Error($"Catch exception during renew runner job {jobId}."); + Trace.Error(ex); + encounteringError++; + + // retry + TimeSpan remainingTime = TimeSpan.Zero; + if (!firstJobRequestRenewed.Task.IsCompleted) + { + // retry 5 times every 10 sec for the first renew + if (firstRenewRetryLimit-- > 0) + { + remainingTime = TimeSpan.FromSeconds(10); + } + } + else + { + // retry till reach lockeduntil + 5 mins extra buffer. + remainingTime = request.LockedUntil.Value + TimeSpan.FromMinutes(5) - DateTime.UtcNow; + } + + if (remainingTime > TimeSpan.Zero) + { + TimeSpan delayTime; + if (!firstJobRequestRenewed.Task.IsCompleted) + { + Trace.Info($"Retrying lock renewal for job {jobId}. The first job renew request has failed."); + delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10)); + } + else + { + Trace.Info($"Retrying lock renewal for job {jobId}. Job is valid until {request.LockedUntil.Value}."); + if (encounteringError > 5) + { + delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30)); + } + else + { + delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15)); + } + } + + try + { + // back-off before next retry. + await HostContext.Delay(delayTime, token); + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + Trace.Info($"job renew has been cancelled, stop renew job {jobId}."); + } + } + else + { + Trace.Info($"Lock renewal has run out of retry, stop renew lock for job {jobId}."); + HostContext.WritePerfCounter("JobRenewReachLimit"); + return; + } + } + } + } + + private async Task RenewJobRequestAsync(IRunnerServer runnerServer, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource firstJobRequestRenewed, CancellationToken token) { - var runnerServer = HostContext.GetService(); TaskAgentJobRequest request = null; int firstRenewRetryLimit = 5; int encounteringError = 0; @@ -840,90 +969,93 @@ namespace GitHub.Runner.Listener var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); ArgUtil.NotNull(systemConnection, nameof(systemConnection)); - var jobServer = HostContext.GetService(); - VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); - VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential); + var server = await InitializeJobServerAsync(systemConnection); - await jobServer.ConnectAsync(jobConnection); - - var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); - - var updatedRecords = new List(); - var logPages = new Dictionary>(); - var logRecords = new Dictionary(); - foreach (var log in logs) + if (server is IJobServer jobServer) { - var logName = Path.GetFileNameWithoutExtension(log); - var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries); - if (logNameParts.Length != 3) - { - Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); - continue; - } - var logPageSeperator = logName.IndexOf('_'); - var logRecordId = Guid.Empty; - var pageNumber = 0; + var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); - if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id) + var updatedRecords = new List(); + var logPages = new Dictionary>(); + var logRecords = new Dictionary(); + foreach (var log in logs) { - Trace.Warning($"log file '{log}' is not belongs to current job"); - continue; - } - - if (!Guid.TryParse(logNameParts[1], out logRecordId)) - { - Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); - continue; - } - - if (!int.TryParse(logNameParts[2], out pageNumber)) - { - Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); - continue; - } - - var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId); - if (record != null) - { - if (!logPages.ContainsKey(record.Id)) + var logName = Path.GetFileNameWithoutExtension(log); + var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries); + if (logNameParts.Length != 3) { - logPages[record.Id] = new Dictionary(); - logRecords[record.Id] = record; + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); + continue; + } + var logPageSeperator = logName.IndexOf('_'); + var logRecordId = Guid.Empty; + var pageNumber = 0; + + if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id) + { + Trace.Warning($"log file '{log}' is not belongs to current job"); + continue; } - logPages[record.Id][pageNumber] = log; - } - } - - foreach (var pages in logPages) - { - var record = logRecords[pages.Key]; - if (record.Log == null) - { - // Create the log - record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken)); - - // Need to post timeline record updates to reflect the log creation - updatedRecords.Add(record.Clone()); - } - - for (var i = 1; i <= pages.Value.Count; i++) - { - var logFile = pages.Value[i]; - // Upload the contents - using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + if (!Guid.TryParse(logNameParts[1], out logRecordId)) { - var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken)); + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); + continue; } - Trace.Info($"Uploaded unfinished log '{logFile}' for current job."); - IOUtil.DeleteFile(logFile); + if (!int.TryParse(logNameParts[2], out pageNumber)) + { + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); + continue; + } + + var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId); + if (record != null) + { + if (!logPages.ContainsKey(record.Id)) + { + logPages[record.Id] = new Dictionary(); + logRecords[record.Id] = record; + } + + logPages[record.Id][pageNumber] = log; + } + } + + foreach (var pages in logPages) + { + var record = logRecords[pages.Key]; + if (record.Log == null) + { + // Create the log + record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken)); + + // Need to post timeline record updates to reflect the log creation + updatedRecords.Add(record.Clone()); + } + + for (var i = 1; i <= pages.Value.Count; i++) + { + var logFile = pages.Value[i]; + // Upload the contents + using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken)); + } + + Trace.Info($"Uploaded unfinished log '{logFile}' for current job."); + IOUtil.DeleteFile(logFile); + } + } + + if (updatedRecords.Count > 0) + { + await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None); } } - - if (updatedRecords.Count > 0) + else { - await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None); + Trace.Info("Job server does not support log upload yet."); } } catch (Exception ex) @@ -943,6 +1075,12 @@ namespace GitHub.Runner.Listener return; } + if (this._isRunServiceJob) + { + Trace.Verbose($"Skip FinishAgentRequest call from Listener because MessageType is {message.MessageType}"); + return; + } + var runnerServer = HostContext.GetService(); int completeJobRequestRetryLimit = 5; List exceptions = new(); @@ -979,66 +1117,117 @@ namespace GitHub.Runner.Listener } // log an error issue to job level timeline record - private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, string errorMessage) + private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string errorMessage) { - try + if (server is IJobServer jobServer) { - var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); - ArgUtil.NotNull(timeline, nameof(timeline)); - - TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); - ArgUtil.NotNull(jobRecord, nameof(jobRecord)); - try { - if (!string.IsNullOrEmpty(errorMessage) && - message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) && - StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value)) + var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); + ArgUtil.NotNull(timeline, nameof(timeline)); + + TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); + ArgUtil.NotNull(jobRecord, nameof(jobRecord)); + + try { - // the trace should be best effort and not affect any job result - var match = _invalidJsonRegex.Match(errorMessage); - if (match.Success && - match.Groups.Count == 2) + if (!string.IsNullOrEmpty(errorMessage) && + message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) && + StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value)) { - var jsonPosition = int.Parse(match.Groups[1].Value); - var serializedJobMessage = JsonUtility.ToString(message); - var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20); - errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}"; + // the trace should be best effort and not affect any job result + var match = _invalidJsonRegex.Match(errorMessage); + if (match.Success && + match.Groups.Count == 2) + { + var jsonPosition = int.Parse(match.Groups[1].Value); + var serializedJobMessage = JsonUtility.ToString(message); + var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20); + errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}"; + } } } + catch (Exception ex) + { + Trace.Error(ex); + errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}"; + } + + var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage }; + unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash; + jobRecord.ErrorCount++; + jobRecord.Issues.Add(unhandledExceptionIssue); + await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); } catch (Exception ex) { + Trace.Error("Fail to report unhandled exception from Runner.Worker process"); Trace.Error(ex); - errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}"; } - - var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage }; - unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash; - jobRecord.ErrorCount++; - jobRecord.Issues.Add(unhandledExceptionIssue); - await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); } - catch (Exception ex) + else { - Trace.Error("Fail to report unhandled exception from Runner.Worker process"); - Trace.Error(ex); + Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", errorMessage); + return; } } // raise job completed event to fail the job. - private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message) + private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message) { - try + if (server is IJobServer jobServer) { - var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed); - await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); + try + { + var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed); + await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); + } + catch (Exception ex) + { + Trace.Error("Fail to raise JobCompletedEvent back to service."); + Trace.Error(ex); + } } - catch (Exception ex) + else if (server is IRunServer runServer) { - Trace.Error("Fail to raise JobCompletedEvent back to service."); - Trace.Error(ex); + try + { + await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, CancellationToken.None); + } + catch (Exception ex) + { + Trace.Error("Fail to raise job completion back to service."); + Trace.Error(ex); + } } + else + { + throw new NotSupportedException($"Server type {server.GetType().FullName} is not supported."); + } + } + + private async Task InitializeJobServerAsync(ServiceEndpoint systemConnection) + { + if (this._isRunServiceJob) + { + return await GetRunServerAsync(systemConnection); + } + else + { + var jobServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential); + await jobServer.ConnectAsync(jobConnection); + return jobServer; + } + } + + private async Task GetRunServerAsync(ServiceEndpoint systemConnection) + { + var runServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + await runServer.ConnectAsync(systemConnection.Url, jobServerCredential); + return runServer; } private class WorkerDispatcher : IDisposable diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index c28e5a9f7..30d60b2fc 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Common; +using GitHub.Runner.Common.Util; using GitHub.Runner.Listener.Check; using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; @@ -136,7 +137,7 @@ namespace GitHub.Runner.Listener if (command.Remove) { // only remove local config files and exit - if(command.RemoveLocalConfig) + if (command.RemoveLocalConfig) { configManager.DeleteLocalRunnerConfig(); return Constants.Runner.ReturnCode.Success; @@ -502,7 +503,7 @@ namespace GitHub.Runner.Listener } } // Broker flow - else if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase)) + else if (MessageUtil.IsRunServiceJob(message.MessageType)) { if (autoUpdateInProgress || runOnceJobReceived) { diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index 41fc062d3..f63fbf717 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -6,6 +6,7 @@ using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; +using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Common; using GitHub.Runner.Common.Util; @@ -19,7 +20,7 @@ namespace GitHub.Runner.Worker [ServiceLocator(Default = typeof(JobRunner))] public interface IJobRunner : IRunnerService { - Task RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken); + Task RunAsync(AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken); } public sealed class JobRunner : RunnerService, IJobRunner @@ -28,7 +29,7 @@ namespace GitHub.Runner.Worker private RunnerSettings _runnerSettings; private ITempDirectoryManager _tempDirectoryManager; - public async Task RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken) + public async Task RunAsync(AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken) { // Validate parameters. Trace.Entering(); @@ -42,14 +43,14 @@ namespace GitHub.Runner.Worker IRunnerService server = null; ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); - if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase)) + if (MessageUtil.IsRunServiceJob(message.MessageType)) { var runServer = HostContext.GetService(); VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); await runServer.ConnectAsync(systemConnection.Url, jobServerCredential); server = runServer; } - else + else { // Setup the job server and job server queue. var jobServer = HostContext.GetService(); @@ -65,7 +66,7 @@ namespace GitHub.Runner.Worker _jobServerQueue.Start(message); server = jobServer; } - + HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}"); diff --git a/src/Sdk/RSWebApi/Contracts/RenewJobRequest.cs b/src/Sdk/RSWebApi/Contracts/RenewJobRequest.cs new file mode 100644 index 000000000..7afb0e3b7 --- /dev/null +++ b/src/Sdk/RSWebApi/Contracts/RenewJobRequest.cs @@ -0,0 +1,15 @@ +using System; +using System.Runtime.Serialization; + +namespace GitHub.Actions.RunService.WebApi +{ + [DataContract] + public class RenewJobRequest + { + [DataMember(Name = "planId", EmitDefaultValue = false)] + public Guid PlanID { get; set; } + + [DataMember(Name = "jobId", EmitDefaultValue = false)] + public Guid JobID { get; set; } + } +} diff --git a/src/Sdk/RSWebApi/Contracts/RenewJobResponse.cs b/src/Sdk/RSWebApi/Contracts/RenewJobResponse.cs new file mode 100644 index 000000000..387a3ac01 --- /dev/null +++ b/src/Sdk/RSWebApi/Contracts/RenewJobResponse.cs @@ -0,0 +1,16 @@ +using System; +using System.Runtime.Serialization; + +namespace Sdk.RSWebApi.Contracts +{ + [DataContract] + public class RenewJobResponse + { + [DataMember] + public DateTime LockedUntil + { + get; + internal set; + } + } +} diff --git a/src/Sdk/RSWebApi/RunServiceHttpClient.cs b/src/Sdk/RSWebApi/RunServiceHttpClient.cs index 0bf46146e..2730e6665 100644 --- a/src/Sdk/RSWebApi/RunServiceHttpClient.cs +++ b/src/Sdk/RSWebApi/RunServiceHttpClient.cs @@ -8,6 +8,7 @@ using GitHub.DistributedTask.WebApi; using GitHub.Services.Common; using GitHub.Services.OAuth; using GitHub.Services.WebApi; +using Sdk.RSWebApi.Contracts; using Sdk.WebApi.WebApi; namespace GitHub.Actions.RunService.WebApi @@ -98,6 +99,29 @@ namespace GitHub.Actions.RunService.WebApi var requestContent = new ObjectContent(payload, new VssJsonMediaTypeFormatter(true)); return SendAsync( + httpMethod, + requestUri, + content: requestContent, + cancellationToken: cancellationToken); + } + + public Task RenewJobAsync( + Uri requestUri, + Guid planId, + Guid jobId, + CancellationToken cancellationToken = default) + { + HttpMethod httpMethod = new HttpMethod("POST"); + var payload = new RenewJobRequest() + { + PlanID = planId, + JobID = jobId + }; + + requestUri = new Uri(requestUri, "renewjob"); + + var requestContent = new ObjectContent(payload, new VssJsonMediaTypeFormatter(true)); + return SendAsync( httpMethod, requestUri, content: requestContent, diff --git a/src/Test/L0/Listener/JobDispatcherL0.cs b/src/Test/L0/Listener/JobDispatcherL0.cs index f1a74d5fe..7788df3d5 100644 --- a/src/Test/L0/Listener/JobDispatcherL0.cs +++ b/src/Test/L0/Listener/JobDispatcherL0.cs @@ -1,12 +1,17 @@ -using System; +using System; using System.Collections.Generic; using System.Reflection; using System.Threading; using System.Threading.Tasks; +using GitHub.DistributedTask.ObjectTemplating.Tokens; +using GitHub.DistributedTask.Pipelines; +using GitHub.DistributedTask.Pipelines.ContextData; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Listener; +using GitHub.Runner.Listener.Configuration; using GitHub.Services.WebApi; using Moq; +using Sdk.RSWebApi.Contracts; using Xunit; using Pipelines = GitHub.DistributedTask.Pipelines; @@ -18,6 +23,8 @@ namespace GitHub.Runner.Common.Tests.Listener private Mock _processChannel; private Mock _processInvoker; private Mock _runnerServer; + + private Mock _runServer; private Mock _configurationStore; public JobDispatcherL0() @@ -25,6 +32,7 @@ namespace GitHub.Runner.Common.Tests.Listener _processChannel = new Mock(); _processInvoker = new Mock(); _runnerServer = new Mock(); + _runServer = new Mock(); _configurationStore = new Mock(); } @@ -139,7 +147,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully); _runnerServer.Verify(x => x.RenewAgentRequestAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(5)); @@ -197,7 +205,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.False(cancellationTokenSource.IsCancellationRequested); @@ -205,6 +213,75 @@ namespace GitHub.Runner.Common.Tests.Listener } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void DispatcherRenewJobOnRunServiceStopOnJobNotFoundExceptions() + { + //Arrange + using (var hc = new TestHostContext(this)) + { + int poolId = 1; + Int64 requestId = 1000; + int count = 0; + + var trace = hc.GetTrace(nameof(DispatcherRenewJobOnRunServiceStopOnJobNotFoundExceptions)); + TaskCompletionSource firstJobRequestRenewed = new(); + CancellationTokenSource cancellationTokenSource = new(); + + TaskAgentJobRequest request = new(); + PropertyInfo lockUntilProperty = request.GetType().GetProperty("LockedUntil", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public); + Assert.NotNull(lockUntilProperty); + lockUntilProperty.SetValue(request, DateTime.UtcNow.AddMinutes(5)); + + hc.SetSingleton(_runServer.Object); + hc.SetSingleton(_configurationStore.Object); + _configurationStore.Setup(x => x.GetSettings()).Returns(new RunnerSettings() { PoolId = 1 }); + _ = _runServer.Setup(x => x.RenewJobAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(() => + { + count++; + if (!firstJobRequestRenewed.Task.IsCompletedSuccessfully) + { + trace.Info("First renew happens."); + } + + if (count < 5) + { + var response = new RenewJobResponse() + { + LockedUntil = request.LockedUntil.Value + }; + return Task.FromResult(response); + } + else if (count == 5) + { + cancellationTokenSource.CancelAfter(10000); + throw new TaskOrchestrationJobNotFoundException(""); + } + else + { + throw new InvalidOperationException("Should not reach here."); + } + }); + + + var jobDispatcher = new JobDispatcher(); + jobDispatcher.Initialize(hc); + EnableRunServiceJobForJobDispatcher(jobDispatcher); + + // Set the value of the _isRunServiceJob field to true + var isRunServiceJobField = typeof(JobDispatcher).GetField("_isRunServiceJob", BindingFlags.NonPublic | BindingFlags.Instance); + isRunServiceJobField.SetValue(jobDispatcher, true); + + await jobDispatcher.RenewJobRequestAsync(GetAgentJobRequestMessage(), GetServiceEndpoint(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + + Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); + Assert.False(cancellationTokenSource.IsCancellationRequested); + _runServer.Verify(x => x.RenewJobAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(5)); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Runner")] @@ -256,7 +333,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.False(cancellationTokenSource.IsCancellationRequested); @@ -312,8 +389,9 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); + // Act - await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); // Assert _configurationStore.Verify(x => x.SaveSettings(It.Is(settings => settings.AgentName == newName)), Times.Once); @@ -368,7 +446,7 @@ namespace GitHub.Runner.Common.Tests.Listener jobDispatcher.Initialize(hc); // Act - await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); // Assert _configurationStore.Verify(x => x.SaveSettings(It.IsAny()), Times.Never); @@ -421,7 +499,7 @@ namespace GitHub.Runner.Common.Tests.Listener jobDispatcher.Initialize(hc); // Act - await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); // Assert _configurationStore.Verify(x => x.SaveSettings(It.IsAny()), Times.Never); @@ -479,7 +557,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.True(cancellationTokenSource.IsCancellationRequested); @@ -536,7 +614,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.False(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should failed."); Assert.False(cancellationTokenSource.IsCancellationRequested); @@ -600,7 +678,7 @@ namespace GitHub.Runner.Common.Tests.Listener var jobDispatcher = new JobDispatcher(); jobDispatcher.Initialize(hc); - await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); + await jobDispatcher.RenewJobRequestAsync(It.IsAny(), It.IsAny(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.False(cancellationTokenSource.IsCancellationRequested); @@ -659,5 +737,78 @@ namespace GitHub.Runner.Common.Tests.Listener Assert.True(jobDispatcher.RunOnceJobCompleted.Task.Result, "JobDispatcher should set task complete token to 'TRUE' for one time agent."); } } + + private static void EnableRunServiceJobForJobDispatcher(JobDispatcher jobDispatcher) + { + // Set the value of the _isRunServiceJob field to true + var isRunServiceJobField = typeof(JobDispatcher).GetField("_isRunServiceJob", BindingFlags.NonPublic | BindingFlags.Instance); + isRunServiceJobField.SetValue(jobDispatcher, true); + } + + private static ServiceEndpoint GetServiceEndpoint() + { + var serviceEndpoint = new ServiceEndpoint + { + Authorization = new EndpointAuthorization + { + Scheme = EndpointAuthorizationSchemes.OAuth + } + }; + serviceEndpoint.Authorization.Parameters.Add("AccessToken", "token"); + return serviceEndpoint; + } + + private static AgentJobRequestMessage GetAgentJobRequestMessage() + { + var message = new AgentJobRequestMessage( + new TaskOrchestrationPlanReference() + { + PlanType = "Build", + PlanId = Guid.NewGuid(), + Version = 1 + }, + new TimelineReference() + { + Id = Guid.NewGuid() + }, + Guid.NewGuid(), + "jobDisplayName", + "jobName", + null, + null, + new List(), + new Dictionary() + { + { + "variables", + new VariableValue() + { + IsSecret = false, + Value = "variables" + } + } + }, + new List() + { + new MaskHint() + { + Type = MaskType.Variable, + Value = "maskHints" + } + }, + new JobResources(), + new DictionaryContextData(), + new WorkspaceOptions(), + new List(), + new List() + { + "fileTable" + }, + null, + new List(), + new ActionsEnvironmentReference("env") + ); + return message; + } } }