Include current runner status while getting messages (#2026)

* get messages with runner status

* fixed l0 tests

* PR feedback
This commit is contained in:
Lokesh Gopu
2022-07-28 16:42:02 -04:00
committed by GitHub
parent 72e2107b5e
commit 813af29886
9 changed files with 79 additions and 10 deletions

View File

@@ -0,0 +1,14 @@
using System;
using GitHub.DistributedTask.WebApi;
namespace GitHub.Runner.Common
{
public class JobStatusEventArgs : EventArgs
{
public JobStatusEventArgs(TaskAgentStatus status)
{
this.Status = status;
}
public TaskAgentStatus Status { get; private set; }
}
}

View File

@@ -39,7 +39,7 @@ namespace GitHub.Runner.Common
Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken); Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken);
Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken); Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken);
Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken); Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken); Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken);
// job request // job request
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken); Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
@@ -298,10 +298,10 @@ namespace GitHub.Runner.Common
return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken); return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken);
} }
public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken)
{ {
CheckConnection(RunnerConnectionType.MessageQueue); CheckConnection(RunnerConnectionType.MessageQueue);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, cancellationToken: cancellationToken); return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, cancellationToken: cancellationToken);
} }
//----------------------------------------------------------------- //-----------------------------------------------------------------

View File

@@ -27,6 +27,7 @@ namespace GitHub.Runner.Listener
bool Cancel(JobCancelMessage message); bool Cancel(JobCancelMessage message);
Task WaitAsync(CancellationToken token); Task WaitAsync(CancellationToken token);
Task ShutdownAsync(); Task ShutdownAsync();
event EventHandler<JobStatusEventArgs> JobStatus;
} }
// This implementation of IJobDispatcher is not thread safe. // This implementation of IJobDispatcher is not thread safe.
@@ -55,6 +56,8 @@ namespace GitHub.Runner.Listener
private TaskCompletionSource<bool> _runOnceJobCompleted = new TaskCompletionSource<bool>(); private TaskCompletionSource<bool> _runOnceJobCompleted = new TaskCompletionSource<bool>();
public event EventHandler<JobStatusEventArgs> JobStatus;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
@@ -335,6 +338,11 @@ namespace GitHub.Runner.Listener
Busy = true; Busy = true;
try try
{ {
if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Busy));
}
if (previousJobDispatch != null) if (previousJobDispatch != null)
{ {
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker."); Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
@@ -650,6 +658,11 @@ namespace GitHub.Runner.Listener
finally finally
{ {
Busy = false; Busy = false;
if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Online));
}
} }
} }

View File

@@ -23,6 +23,7 @@ namespace GitHub.Runner.Listener
Task DeleteSessionAsync(); Task DeleteSessionAsync();
Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token); Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token);
Task DeleteMessageAsync(TaskAgentMessage message); Task DeleteMessageAsync(TaskAgentMessage message);
void OnJobStatus(object sender, JobStatusEventArgs e);
} }
public sealed class MessageListener : RunnerService, IMessageListener public sealed class MessageListener : RunnerService, IMessageListener
@@ -38,6 +39,8 @@ namespace GitHub.Runner.Listener
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new Dictionary<string, int>(); private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new Dictionary<string, int>();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
@@ -170,6 +173,23 @@ namespace GitHub.Runner.Listener
} }
} }
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
}
public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token) public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{ {
Trace.Entering(); Trace.Entering();
@@ -184,12 +204,14 @@ namespace GitHub.Runner.Listener
{ {
token.ThrowIfCancellationRequested(); token.ThrowIfCancellationRequested();
TaskAgentMessage message = null; TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try try
{ {
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId, _session.SessionId,
_lastMessageId, _lastMessageId,
token); runnerStatus,
_getMessagesTokenSource.Token);
// Decrypt the message body if the session is using encryption // Decrypt the message body if the session is using encryption
message = DecryptMessage(message); message = DecryptMessage(message);
@@ -206,6 +228,11 @@ namespace GitHub.Runner.Listener
continuousError = 0; continuousError = 0;
} }
} }
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested) catch (OperationCanceledException) when (token.IsCancellationRequested)
{ {
Trace.Info("Get next message has been cancelled."); Trace.Info("Get next message has been cancelled.");
@@ -261,6 +288,10 @@ namespace GitHub.Runner.Listener
await HostContext.Delay(_getNextMessageRetryInterval, token); await HostContext.Delay(_getNextMessageRetryInterval, token);
} }
} }
finally
{
_getMessagesTokenSource.Dispose();
}
if (message == null) if (message == null)
{ {

View File

@@ -360,6 +360,8 @@ namespace GitHub.Runner.Listener
bool runOnceJobReceived = false; bool runOnceJobReceived = false;
jobDispatcher = HostContext.CreateService<IJobDispatcher>(); jobDispatcher = HostContext.CreateService<IJobDispatcher>();
jobDispatcher.JobStatus += _listener.OnJobStatus;
while (!HostContext.RunnerShutdownToken.IsCancellationRequested) while (!HostContext.RunnerShutdownToken.IsCancellationRequested)
{ {
TaskAgentMessage message = null; TaskAgentMessage message = null;
@@ -561,6 +563,7 @@ namespace GitHub.Runner.Listener
{ {
if (jobDispatcher != null) if (jobDispatcher != null)
{ {
jobDispatcher.JobStatus -= _listener.OnJobStatus;
await jobDispatcher.ShutdownAsync(); await jobDispatcher.ShutdownAsync();
} }

View File

@@ -57,7 +57,7 @@ namespace GitHub.Runner.Sdk
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200)); settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
} }
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_ALLOW_REDIRECT"))) if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{ {
settings.AllowAutoRedirect = true; settings.AllowAutoRedirect = true;
} }

View File

@@ -457,6 +457,7 @@ namespace GitHub.DistributedTask.WebApi
int poolId, int poolId,
Guid sessionId, Guid sessionId,
long? lastMessageId = null, long? lastMessageId = null,
TaskAgentStatus? status = null,
object userState = null, object userState = null,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
@@ -470,6 +471,10 @@ namespace GitHub.DistributedTask.WebApi
{ {
queryParams.Add("lastMessageId", lastMessageId.Value.ToString(CultureInfo.InvariantCulture)); queryParams.Add("lastMessageId", lastMessageId.Value.ToString(CultureInfo.InvariantCulture));
} }
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
return SendAsync<TaskAgentMessage>( return SendAsync<TaskAgentMessage>(
httpMethod, httpMethod,

View File

@@ -10,5 +10,8 @@ namespace GitHub.DistributedTask.WebApi
[EnumMember] [EnumMember]
Online = 2, Online = 2,
[EnumMember]
Busy = 3,
} }
} }

View File

@@ -192,8 +192,8 @@ namespace GitHub.Runner.Common.Tests.Listener
_runnerServer _runnerServer
.Setup(x => x.GetAgentMessageAsync( .Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token)) _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) =>
{ {
await Task.Yield(); await Task.Yield();
return messages.Dequeue(); return messages.Dequeue();
@@ -208,7 +208,7 @@ namespace GitHub.Runner.Common.Tests.Listener
//Assert //Assert
_runnerServer _runnerServer
.Verify(x => x.GetAgentMessageAsync( .Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Exactly(arMessages.Length)); _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Exactly(arMessages.Length));
} }
} }
@@ -293,7 +293,7 @@ namespace GitHub.Runner.Common.Tests.Listener
_runnerServer _runnerServer
.Setup(x => x.GetAgentMessageAsync( .Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token)) _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Throws(new TaskAgentAccessTokenExpiredException("test")); .Throws(new TaskAgentAccessTokenExpiredException("test"));
try try
{ {
@@ -311,7 +311,7 @@ namespace GitHub.Runner.Common.Tests.Listener
//Assert //Assert
_runnerServer _runnerServer
.Verify(x => x.GetAgentMessageAsync( .Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Once); _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Once);
_runnerServer _runnerServer
.Verify(x => x.DeleteAgentSessionAsync( .Verify(x => x.DeleteAgentSessionAsync(