From 593673ba9e82ad574d6c3744ddb9e3d1521f7517 Mon Sep 17 00:00:00 2001 From: Thomas Brumley Date: Tue, 25 Aug 2020 12:02:29 -0400 Subject: [PATCH] Add in Log line numbers for streaming logs (#663) * Add in Log line Co-authored-by: yaananth (Yash) --- src/Runner.Common/JobServer.cs | 7 +++++ src/Runner.Common/JobServerQueue.cs | 31 ++++++++++++------- src/Runner.Worker/ExecutionContext.cs | 3 +- src/Sdk/DTWebApi/WebApi/TaskHttpClient.cs | 24 +++++++++++++- .../WebApi/TimelineRecordFeedLinesWrapper.cs | 9 ++++++ .../DTWebApi/WebApi/TimelineRecordLogLine.cs | 29 +++++++++++++++++ src/Test/L0/Worker/ExecutionContextL0.cs | 8 ++--- 7 files changed, 94 insertions(+), 17 deletions(-) create mode 100644 src/Sdk/DTWebApi/WebApi/TimelineRecordLogLine.cs diff --git a/src/Runner.Common/JobServer.cs b/src/Runner.Common/JobServer.cs index e3e0f551b..7d069bf97 100644 --- a/src/Runner.Common/JobServer.cs +++ b/src/Runner.Common/JobServer.cs @@ -16,6 +16,7 @@ namespace GitHub.Runner.Common // logging and console Task AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, CancellationToken cancellationToken); + Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long startLine, CancellationToken cancellationToken); Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); @@ -79,6 +80,12 @@ namespace GitHub.Runner.Common return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, cancellationToken: cancellationToken); } + public Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long startLine, CancellationToken cancellationToken) + { + CheckConnection(); + return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, startLine, cancellationToken: cancellationToken); + } + public Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, string type, string name, Stream uploadStream, CancellationToken cancellationToken) { CheckConnection(); diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 787daeffd..5cabca2a9 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -18,7 +18,7 @@ namespace GitHub.Runner.Common event EventHandler JobServerQueueThrottling; Task ShutdownAsync(); void Start(Pipelines.AgentJobRequestMessage jobRequest); - void QueueWebConsoleLine(Guid stepRecordId, string line); + void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord); } @@ -155,10 +155,10 @@ namespace GitHub.Runner.Common Trace.Info("All queue process tasks have been stopped, and all queues are drained."); } - public void QueueWebConsoleLine(Guid stepRecordId, string line) + public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber) { Trace.Verbose("Enqueue web console line queue: {0}", line); - _webConsoleLineQueue.Enqueue(new ConsoleLineInfo(stepRecordId, line)); + _webConsoleLineQueue.Enqueue(new ConsoleLineInfo(stepRecordId, line, lineNumber)); } public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource) @@ -214,7 +214,7 @@ namespace GitHub.Runner.Common } // Group consolelines by timeline record of each step - Dictionary> stepsConsoleLines = new Dictionary>(); + Dictionary> stepsConsoleLines = new Dictionary>(); List stepRecordIds = new List(); // We need to keep lines in order int linesCounter = 0; ConsoleLineInfo lineInfo; @@ -222,7 +222,7 @@ namespace GitHub.Runner.Common { if (!stepsConsoleLines.ContainsKey(lineInfo.StepRecordId)) { - stepsConsoleLines[lineInfo.StepRecordId] = new List(); + stepsConsoleLines[lineInfo.StepRecordId] = new List(); stepRecordIds.Add(lineInfo.StepRecordId); } @@ -232,7 +232,7 @@ namespace GitHub.Runner.Common lineInfo.Line = $"{lineInfo.Line.Substring(0, 1024)}..."; } - stepsConsoleLines[lineInfo.StepRecordId].Add(lineInfo.Line); + stepsConsoleLines[lineInfo.StepRecordId].Add(new TimelineRecordLogLine(lineInfo.Line, lineInfo.LineNumber)); linesCounter++; // process at most about 500 lines of web console line during regular timer dequeue task. @@ -247,13 +247,13 @@ namespace GitHub.Runner.Common { // Split consolelines into batch, each batch will container at most 100 lines. int batchCounter = 0; - List> batchedLines = new List>(); + List> batchedLines = new List>(); foreach (var line in stepsConsoleLines[stepRecordId]) { var currentBatch = batchedLines.ElementAtOrDefault(batchCounter); if (currentBatch == null) { - batchedLines.Add(new List()); + batchedLines.Add(new List()); currentBatch = batchedLines.ElementAt(batchCounter); } @@ -275,7 +275,6 @@ namespace GitHub.Runner.Common { Trace.Info($"Skip {batchedLines.Count - 2} batches web console lines for last run"); batchedLines = batchedLines.TakeLast(2).ToList(); - batchedLines[0].Insert(0, "..."); } int errorCount = 0; @@ -284,7 +283,15 @@ namespace GitHub.Runner.Common try { // we will not requeue failed batch, since the web console lines are time sensitive. - await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch, default(CancellationToken)); + if (batch[0].LineNumber.HasValue) + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken)); + } + else + { + await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken)); + } + if (_firstConsoleOutputs) { HostContext.WritePerfCounter($"WorkerJobServerQueueAppendFirstConsoleOutput_{_planId.ToString()}"); @@ -653,13 +660,15 @@ namespace GitHub.Runner.Common internal class ConsoleLineInfo { - public ConsoleLineInfo(Guid recordId, string line) + public ConsoleLineInfo(Guid recordId, string line, long? lineNumber) { this.StepRecordId = recordId; this.Line = line; + this.LineNumber = lineNumber; } public Guid StepRecordId { get; set; } public string Line { get; set; } + public long? LineNumber { get; set; } } } diff --git a/src/Runner.Worker/ExecutionContext.cs b/src/Runner.Worker/ExecutionContext.cs index 224b33b0e..46f516999 100644 --- a/src/Runner.Worker/ExecutionContext.cs +++ b/src/Runner.Worker/ExecutionContext.cs @@ -717,7 +717,8 @@ namespace GitHub.Runner.Worker } } - _jobServerQueue.QueueWebConsoleLine(_record.Id, msg); + _jobServerQueue.QueueWebConsoleLine(_record.Id, msg, totalLines); + return totalLines; } diff --git a/src/Sdk/DTWebApi/WebApi/TaskHttpClient.cs b/src/Sdk/DTWebApi/WebApi/TaskHttpClient.cs index fcc1ab6ce..6a5515552 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskHttpClient.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskHttpClient.cs @@ -50,7 +50,7 @@ namespace GitHub.DistributedTask.WebApi : base(baseUrl, pipeline, disposeHandler) { } - + public Task AppendTimelineRecordFeedAsync( Guid scopeIdentifier, String planType, @@ -91,6 +91,28 @@ namespace GitHub.DistributedTask.WebApi userState, cancellationToken); } + + public Task AppendTimelineRecordFeedAsync( + Guid scopeIdentifier, + String planType, + Guid planId, + Guid timelineId, + Guid recordId, + Guid stepId, + IList lines, + long startLine, + CancellationToken cancellationToken = default(CancellationToken), + Object userState = null) + { + return AppendTimelineRecordFeedAsync(scopeIdentifier, + planType, + planId, + timelineId, + recordId, + new TimelineRecordFeedLinesWrapper(stepId, lines, startLine), + userState, + cancellationToken); + } public async Task RaisePlanEventAsync( Guid scopeIdentifier, diff --git a/src/Sdk/DTWebApi/WebApi/TimelineRecordFeedLinesWrapper.cs b/src/Sdk/DTWebApi/WebApi/TimelineRecordFeedLinesWrapper.cs index c628852bd..edaaf314a 100644 --- a/src/Sdk/DTWebApi/WebApi/TimelineRecordFeedLinesWrapper.cs +++ b/src/Sdk/DTWebApi/WebApi/TimelineRecordFeedLinesWrapper.cs @@ -20,6 +20,12 @@ namespace GitHub.DistributedTask.WebApi this.Count = lines.Count; } + public TimelineRecordFeedLinesWrapper(Guid stepId, IList lines, Int64 startLine) + : this(stepId, lines) + { + this.StartLine = startLine; + } + [DataMember(Order = 0)] public Int32 Count { get; private set; } @@ -31,5 +37,8 @@ namespace GitHub.DistributedTask.WebApi [DataMember(EmitDefaultValue = false)] public Guid StepId { get; set; } + + [DataMember (EmitDefaultValue = false)] + public Int64? StartLine { get; private set; } } } diff --git a/src/Sdk/DTWebApi/WebApi/TimelineRecordLogLine.cs b/src/Sdk/DTWebApi/WebApi/TimelineRecordLogLine.cs new file mode 100644 index 000000000..2761390c9 --- /dev/null +++ b/src/Sdk/DTWebApi/WebApi/TimelineRecordLogLine.cs @@ -0,0 +1,29 @@ +using System; +using System.Runtime.Serialization; + +namespace GitHub.DistributedTask.WebApi +{ + [DataContract] + public sealed class TimelineRecordLogLine + { + public TimelineRecordLogLine(String line, long? lineNumber) + { + this.Line = line; + this.LineNumber = lineNumber; + } + + [DataMember] + public String Line + { + get; + set; + } + + [DataMember (EmitDefaultValue = false)] + public long? LineNumber + { + get; + set; + } + } +} diff --git a/src/Test/L0/Worker/ExecutionContextL0.cs b/src/Test/L0/Worker/ExecutionContextL0.cs index 0efd4bfe5..39dbb2dda 100644 --- a/src/Test/L0/Worker/ExecutionContextL0.cs +++ b/src/Test/L0/Worker/ExecutionContextL0.cs @@ -116,7 +116,7 @@ namespace GitHub.Runner.Common.Tests.Worker var pagingLogger = new Mock(); var jobServerQueue = new Mock(); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.IsAny())); - jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); + jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny(),It.IsAny())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); }); hc.EnqueueInstance(pagingLogger.Object); hc.SetSingleton(jobServerQueue.Object); @@ -137,7 +137,7 @@ namespace GitHub.Runner.Common.Tests.Worker ec.Complete(); - jobServerQueue.Verify(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny()), Times.Exactly(10)); + jobServerQueue.Verify(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(10)); } } @@ -171,7 +171,7 @@ namespace GitHub.Runner.Common.Tests.Worker var pagingLogger5 = new Mock(); var jobServerQueue = new Mock(); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.IsAny())); - jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); + jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny(), It.IsAny())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); }); var actionRunner1 = new ActionRunner(); actionRunner1.Initialize(hc); @@ -269,7 +269,7 @@ namespace GitHub.Runner.Common.Tests.Worker var pagingLogger5 = new Mock(); var jobServerQueue = new Mock(); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny(), It.IsAny())); - jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); + jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny(), It.IsAny(), It.IsAny())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); }); var actionRunner1 = new ActionRunner(); actionRunner1.Initialize(hc);