Add in Log line numbers for streaming logs (#663)

* Add in Log line

Co-authored-by: yaananth (Yash) <yaananth@github.com>
This commit is contained in:
Thomas Brumley
2020-08-25 12:02:29 -04:00
committed by TingluoHuang
parent 2b0a2aeba2
commit 593673ba9e
7 changed files with 94 additions and 17 deletions

View File

@@ -16,6 +16,7 @@ namespace GitHub.Runner.Common
// logging and console // logging and console
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
@@ -79,6 +80,12 @@ namespace GitHub.Runner.Common
return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, cancellationToken: cancellationToken); return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, cancellationToken: cancellationToken);
} }
public Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long startLine, CancellationToken cancellationToken)
{
CheckConnection();
return _taskClient.AppendTimelineRecordFeedAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, stepId, lines, startLine, cancellationToken: cancellationToken);
}
public Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, string type, string name, Stream uploadStream, CancellationToken cancellationToken) public Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, string type, string name, Stream uploadStream, CancellationToken cancellationToken)
{ {
CheckConnection(); CheckConnection();

View File

@@ -18,7 +18,7 @@ namespace GitHub.Runner.Common
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling; event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync(); Task ShutdownAsync();
void Start(Pipelines.AgentJobRequestMessage jobRequest); void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord); void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
} }
@@ -155,10 +155,10 @@ namespace GitHub.Runner.Common
Trace.Info("All queue process tasks have been stopped, and all queues are drained."); Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
} }
public void QueueWebConsoleLine(Guid stepRecordId, string line) public void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber)
{ {
Trace.Verbose("Enqueue web console line queue: {0}", line); Trace.Verbose("Enqueue web console line queue: {0}", line);
_webConsoleLineQueue.Enqueue(new ConsoleLineInfo(stepRecordId, line)); _webConsoleLineQueue.Enqueue(new ConsoleLineInfo(stepRecordId, line, lineNumber));
} }
public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource) public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource)
@@ -214,7 +214,7 @@ namespace GitHub.Runner.Common
} }
// Group consolelines by timeline record of each step // Group consolelines by timeline record of each step
Dictionary<Guid, List<string>> stepsConsoleLines = new Dictionary<Guid, List<string>>(); Dictionary<Guid, List<TimelineRecordLogLine>> stepsConsoleLines = new Dictionary<Guid, List<TimelineRecordLogLine>>();
List<Guid> stepRecordIds = new List<Guid>(); // We need to keep lines in order List<Guid> stepRecordIds = new List<Guid>(); // We need to keep lines in order
int linesCounter = 0; int linesCounter = 0;
ConsoleLineInfo lineInfo; ConsoleLineInfo lineInfo;
@@ -222,7 +222,7 @@ namespace GitHub.Runner.Common
{ {
if (!stepsConsoleLines.ContainsKey(lineInfo.StepRecordId)) if (!stepsConsoleLines.ContainsKey(lineInfo.StepRecordId))
{ {
stepsConsoleLines[lineInfo.StepRecordId] = new List<string>(); stepsConsoleLines[lineInfo.StepRecordId] = new List<TimelineRecordLogLine>();
stepRecordIds.Add(lineInfo.StepRecordId); stepRecordIds.Add(lineInfo.StepRecordId);
} }
@@ -232,7 +232,7 @@ namespace GitHub.Runner.Common
lineInfo.Line = $"{lineInfo.Line.Substring(0, 1024)}..."; lineInfo.Line = $"{lineInfo.Line.Substring(0, 1024)}...";
} }
stepsConsoleLines[lineInfo.StepRecordId].Add(lineInfo.Line); stepsConsoleLines[lineInfo.StepRecordId].Add(new TimelineRecordLogLine(lineInfo.Line, lineInfo.LineNumber));
linesCounter++; linesCounter++;
// process at most about 500 lines of web console line during regular timer dequeue task. // process at most about 500 lines of web console line during regular timer dequeue task.
@@ -247,13 +247,13 @@ namespace GitHub.Runner.Common
{ {
// Split consolelines into batch, each batch will container at most 100 lines. // Split consolelines into batch, each batch will container at most 100 lines.
int batchCounter = 0; int batchCounter = 0;
List<List<string>> batchedLines = new List<List<string>>(); List<List<TimelineRecordLogLine>> batchedLines = new List<List<TimelineRecordLogLine>>();
foreach (var line in stepsConsoleLines[stepRecordId]) foreach (var line in stepsConsoleLines[stepRecordId])
{ {
var currentBatch = batchedLines.ElementAtOrDefault(batchCounter); var currentBatch = batchedLines.ElementAtOrDefault(batchCounter);
if (currentBatch == null) if (currentBatch == null)
{ {
batchedLines.Add(new List<string>()); batchedLines.Add(new List<TimelineRecordLogLine>());
currentBatch = batchedLines.ElementAt(batchCounter); currentBatch = batchedLines.ElementAt(batchCounter);
} }
@@ -275,7 +275,6 @@ namespace GitHub.Runner.Common
{ {
Trace.Info($"Skip {batchedLines.Count - 2} batches web console lines for last run"); Trace.Info($"Skip {batchedLines.Count - 2} batches web console lines for last run");
batchedLines = batchedLines.TakeLast(2).ToList(); batchedLines = batchedLines.TakeLast(2).ToList();
batchedLines[0].Insert(0, "...");
} }
int errorCount = 0; int errorCount = 0;
@@ -284,7 +283,15 @@ namespace GitHub.Runner.Common
try try
{ {
// we will not requeue failed batch, since the web console lines are time sensitive. // we will not requeue failed batch, since the web console lines are time sensitive.
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch, default(CancellationToken)); if (batch[0].LineNumber.HasValue)
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
}
else
{
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
}
if (_firstConsoleOutputs) if (_firstConsoleOutputs)
{ {
HostContext.WritePerfCounter($"WorkerJobServerQueueAppendFirstConsoleOutput_{_planId.ToString()}"); HostContext.WritePerfCounter($"WorkerJobServerQueueAppendFirstConsoleOutput_{_planId.ToString()}");
@@ -653,13 +660,15 @@ namespace GitHub.Runner.Common
internal class ConsoleLineInfo internal class ConsoleLineInfo
{ {
public ConsoleLineInfo(Guid recordId, string line) public ConsoleLineInfo(Guid recordId, string line, long? lineNumber)
{ {
this.StepRecordId = recordId; this.StepRecordId = recordId;
this.Line = line; this.Line = line;
this.LineNumber = lineNumber;
} }
public Guid StepRecordId { get; set; } public Guid StepRecordId { get; set; }
public string Line { get; set; } public string Line { get; set; }
public long? LineNumber { get; set; }
} }
} }

View File

@@ -717,7 +717,8 @@ namespace GitHub.Runner.Worker
} }
} }
_jobServerQueue.QueueWebConsoleLine(_record.Id, msg); _jobServerQueue.QueueWebConsoleLine(_record.Id, msg, totalLines);
return totalLines; return totalLines;
} }

View File

@@ -50,7 +50,7 @@ namespace GitHub.DistributedTask.WebApi
: base(baseUrl, pipeline, disposeHandler) : base(baseUrl, pipeline, disposeHandler)
{ {
} }
public Task AppendTimelineRecordFeedAsync( public Task AppendTimelineRecordFeedAsync(
Guid scopeIdentifier, Guid scopeIdentifier,
String planType, String planType,
@@ -91,6 +91,28 @@ namespace GitHub.DistributedTask.WebApi
userState, userState,
cancellationToken); cancellationToken);
} }
public Task AppendTimelineRecordFeedAsync(
Guid scopeIdentifier,
String planType,
Guid planId,
Guid timelineId,
Guid recordId,
Guid stepId,
IList<String> lines,
long startLine,
CancellationToken cancellationToken = default(CancellationToken),
Object userState = null)
{
return AppendTimelineRecordFeedAsync(scopeIdentifier,
planType,
planId,
timelineId,
recordId,
new TimelineRecordFeedLinesWrapper(stepId, lines, startLine),
userState,
cancellationToken);
}
public async Task RaisePlanEventAsync<T>( public async Task RaisePlanEventAsync<T>(
Guid scopeIdentifier, Guid scopeIdentifier,

View File

@@ -20,6 +20,12 @@ namespace GitHub.DistributedTask.WebApi
this.Count = lines.Count; this.Count = lines.Count;
} }
public TimelineRecordFeedLinesWrapper(Guid stepId, IList<string> lines, Int64 startLine)
: this(stepId, lines)
{
this.StartLine = startLine;
}
[DataMember(Order = 0)] [DataMember(Order = 0)]
public Int32 Count { get; private set; } public Int32 Count { get; private set; }
@@ -31,5 +37,8 @@ namespace GitHub.DistributedTask.WebApi
[DataMember(EmitDefaultValue = false)] [DataMember(EmitDefaultValue = false)]
public Guid StepId { get; set; } public Guid StepId { get; set; }
[DataMember (EmitDefaultValue = false)]
public Int64? StartLine { get; private set; }
} }
} }

View File

@@ -0,0 +1,29 @@
using System;
using System.Runtime.Serialization;
namespace GitHub.DistributedTask.WebApi
{
[DataContract]
public sealed class TimelineRecordLogLine
{
public TimelineRecordLogLine(String line, long? lineNumber)
{
this.Line = line;
this.LineNumber = lineNumber;
}
[DataMember]
public String Line
{
get;
set;
}
[DataMember (EmitDefaultValue = false)]
public long? LineNumber
{
get;
set;
}
}
}

View File

@@ -116,7 +116,7 @@ namespace GitHub.Runner.Common.Tests.Worker
var pagingLogger = new Mock<IPagingLogger>(); var pagingLogger = new Mock<IPagingLogger>();
var jobServerQueue = new Mock<IJobServerQueue>(); var jobServerQueue = new Mock<IJobServerQueue>();
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>())); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>(),It.IsAny<long>())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); });
hc.EnqueueInstance(pagingLogger.Object); hc.EnqueueInstance(pagingLogger.Object);
hc.SetSingleton(jobServerQueue.Object); hc.SetSingleton(jobServerQueue.Object);
@@ -137,7 +137,7 @@ namespace GitHub.Runner.Common.Tests.Worker
ec.Complete(); ec.Complete();
jobServerQueue.Verify(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>()), Times.Exactly(10)); jobServerQueue.Verify(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<long?>()), Times.Exactly(10));
} }
} }
@@ -171,7 +171,7 @@ namespace GitHub.Runner.Common.Tests.Worker
var pagingLogger5 = new Mock<IPagingLogger>(); var pagingLogger5 = new Mock<IPagingLogger>();
var jobServerQueue = new Mock<IJobServerQueue>(); var jobServerQueue = new Mock<IJobServerQueue>();
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>())); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<long?>())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); });
var actionRunner1 = new ActionRunner(); var actionRunner1 = new ActionRunner();
actionRunner1.Initialize(hc); actionRunner1.Initialize(hc);
@@ -269,7 +269,7 @@ namespace GitHub.Runner.Common.Tests.Worker
var pagingLogger5 = new Mock<IPagingLogger>(); var pagingLogger5 = new Mock<IPagingLogger>();
var jobServerQueue = new Mock<IJobServerQueue>(); var jobServerQueue = new Mock<IJobServerQueue>();
jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>())); jobServerQueue.Setup(x => x.QueueTimelineRecordUpdate(It.IsAny<Guid>(), It.IsAny<TimelineRecord>()));
jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>())).Callback((Guid id, string msg) => { hc.GetTrace().Info(msg); }); jobServerQueue.Setup(x => x.QueueWebConsoleLine(It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<long?>())).Callback((Guid id, string msg, long? lineNumber) => { hc.GetTrace().Info(msg); });
var actionRunner1 = new ActionRunner(); var actionRunner1 = new ActionRunner();
actionRunner1.Initialize(hc); actionRunner1.Initialize(hc);