From 72fa2a8a0d019aa48a5f0cf97128707953c0c724 Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Thu, 9 Sep 2021 21:55:15 -0400 Subject: [PATCH] Wait for job record updated before running steps. (#1320) * Wait for job record updated before running steps. * only oidc --- src/Runner.Common/JobServerQueue.cs | 16 ++++++++++++++-- src/Runner.Worker/JobRunner.cs | 10 ++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 785637e57..9ea0bf41f 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -15,6 +15,7 @@ namespace GitHub.Runner.Common [ServiceLocator(Default = typeof(JobServerQueue))] public interface IJobServerQueue : IRunnerService, IThrottlingReporter { + TaskCompletionSource JobRecordUpdated { get; } event EventHandler JobServerQueueThrottling; Task ShutdownAsync(); void Start(Pipelines.AgentJobRequestMessage jobRequest); @@ -62,8 +63,11 @@ namespace GitHub.Runner.Common private IJobServer _jobServer; private Task[] _allDequeueTasks; private readonly TaskCompletionSource _jobCompletionSource = new TaskCompletionSource(); + private readonly TaskCompletionSource _jobRecordUpdated = new TaskCompletionSource(); private bool _queueInProcess = false; + public TaskCompletionSource JobRecordUpdated => _jobRecordUpdated; + public event EventHandler JobServerQueueThrottling; // Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds). @@ -287,11 +291,11 @@ namespace GitHub.Runner.Common { await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); } - else + else { await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); } - + if (_firstConsoleOutputs) { HostContext.WritePerfCounter($"WorkerJobServerQueueAppendFirstConsoleOutput_{_planId.ToString()}"); @@ -455,6 +459,14 @@ namespace GitHub.Runner.Common { Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId); } + + if (!_jobRecordUpdated.Task.IsCompleted && + update.PendingRecords.Any(x => x.Id == _jobTimelineRecordId && x.State != null)) + { + // We have changed the state of the job + Trace.Info("Job timeline record has been updated for the first time."); + _jobRecordUpdated.TrySetResult(0); + } } catch (Exception ex) { diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index 4ab077277..3257d1c8d 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -145,6 +145,16 @@ namespace GitHub.Runner.Worker Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'"); HostContext.WritePerfCounter($"WorkerJobInitialized_{message.RequestId.ToString()}"); + if (systemConnection.Data.TryGetValue("GenerateIdTokenUrl", out var generateIdTokenUrl) && + !string.IsNullOrEmpty(generateIdTokenUrl)) + { + // Server won't issue ID_TOKEN for non-inprogress job. + // If the job is trying to use OIDC feature, we want the job to be marked as in-progress before running any customer's steps as much as we can. + // Timeline record update background process runs every 500ms, so delay 1000ms is enough for most of the cases + Trace.Info($"Waiting for job to be marked as started."); + await Task.WhenAny(_jobServerQueue.JobRecordUpdated.Task, Task.Delay(1000)); + } + // Run all job steps Trace.Info("Run all job steps."); var stepsRunner = HostContext.GetService();