Support runner upgrade messages (#2231)

Co-authored-by: Thomas Boop <52323235+thboop@users.noreply.github.com>
This commit is contained in:
Tatyana Kostromskaya
2022-11-21 16:17:47 +01:00
committed by GitHub
parent b465102e7f
commit 1632e4a343
6 changed files with 78 additions and 11 deletions

View File

@@ -38,7 +38,7 @@ namespace GitHub.Runner.Common
Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken); Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken);
Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken); Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken);
Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken); Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken); Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, CancellationToken cancellationToken);
// job request // job request
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken); Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
@@ -272,10 +272,10 @@ namespace GitHub.Runner.Common
return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken); return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken);
} }
public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, CancellationToken cancellationToken)
{ {
CheckConnection(RunnerConnectionType.MessageQueue); CheckConnection(RunnerConnectionType.MessageQueue);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, cancellationToken: cancellationToken); return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, runnerVersion, cancellationToken: cancellationToken);
} }
//----------------------------------------------------------------- //-----------------------------------------------------------------

View File

@@ -211,6 +211,7 @@ namespace GitHub.Runner.Listener
_session.SessionId, _session.SessionId,
_lastMessageId, _lastMessageId,
runnerStatus, runnerStatus,
BuildConstants.RunnerPackage.Version,
_getMessagesTokenSource.Token); _getMessagesTokenSource.Token);
// Decrypt the message body if the session is using encryption // Decrypt the message body if the session is using encryption

View File

@@ -430,12 +430,22 @@ namespace GitHub.Runner.Listener
message = await getNextMessage; //get next message message = await getNextMessage; //get next message
HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}");
if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase) ||
string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{ {
if (autoUpdateInProgress == false) if (autoUpdateInProgress == false)
{ {
autoUpdateInProgress = true; autoUpdateInProgress = true;
var runnerUpdateMessage = JsonUtility.FromString<AgentRefreshMessage>(message.Body); AgentRefreshMessage runnerUpdateMessage = null;
if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
runnerUpdateMessage = JsonUtility.FromString<AgentRefreshMessage>(message.Body);
}
else
{
var brokerRunnerUpdateMessage = JsonUtility.FromString<RunnerRefreshMessage>(message.Body);
runnerUpdateMessage = new AgentRefreshMessage(brokerRunnerUpdateMessage.RunnerId, brokerRunnerUpdateMessage.TargetVersion, TimeSpan.FromSeconds(brokerRunnerUpdateMessage.TimeoutInSeconds));
}
#if DEBUG #if DEBUG
// Can mock the update for testing // Can mock the update for testing
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE")))

View File

@@ -450,6 +450,8 @@ namespace GitHub.DistributedTask.WebApi
/// <param name="poolId"></param> /// <param name="poolId"></param>
/// <param name="sessionId"></param> /// <param name="sessionId"></param>
/// <param name="lastMessageId"></param> /// <param name="lastMessageId"></param>
/// <param name="status"></param>
/// <param name="runnerVersion"></param>
/// <param name="userState"></param> /// <param name="userState"></param>
/// <param name="cancellationToken">The cancellation token to cancel operation.</param> /// <param name="cancellationToken">The cancellation token to cancel operation.</param>
[EditorBrowsable(EditorBrowsableState.Never)] [EditorBrowsable(EditorBrowsableState.Never)]
@@ -458,6 +460,7 @@ namespace GitHub.DistributedTask.WebApi
Guid sessionId, Guid sessionId,
long? lastMessageId = null, long? lastMessageId = null,
TaskAgentStatus? status = null, TaskAgentStatus? status = null,
string runnerVersion = null,
object userState = null, object userState = null,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
@@ -475,12 +478,16 @@ namespace GitHub.DistributedTask.WebApi
{ {
queryParams.Add("status", status.Value.ToString()); queryParams.Add("status", status.Value.ToString());
} }
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
return SendAsync<TaskAgentMessage>( return SendAsync<TaskAgentMessage>(
httpMethod, httpMethod,
locationId, locationId,
routeValues: routeValues, routeValues: routeValues,
version: new ApiResourceVersion(5.1, 1), version: new ApiResourceVersion(6.0, 1),
queryParameters: queryParams, queryParameters: queryParams,
userState: userState, userState: userState,
cancellationToken: cancellationToken); cancellationToken: cancellationToken);

View File

@@ -0,0 +1,49 @@
using Newtonsoft.Json;
using System;
using System.Runtime.Serialization;
namespace GitHub.DistributedTask.WebApi
{
[DataContract]
public sealed class RunnerRefreshMessage
{
public static readonly String MessageType = "RunnerRefresh";
[JsonConstructor]
internal RunnerRefreshMessage()
{
}
public RunnerRefreshMessage(
Int32 runnerId,
String targetVersion,
int? timeoutInSeconds = null)
{
this.RunnerId = runnerId;
this.TimeoutInSeconds = timeoutInSeconds ?? TimeSpan.FromMinutes(60).Seconds;
this.TargetVersion = targetVersion;
}
[DataMember]
public Int32 RunnerId
{
get;
private set;
}
[DataMember]
public int TimeoutInSeconds
{
get;
private set;
}
[DataMember]
public String TargetVersion
{
get;
private set;
}
}
}

View File

@@ -192,8 +192,8 @@ namespace GitHub.Runner.Common.Tests.Listener
_runnerServer _runnerServer
.Setup(x => x.GetAgentMessageAsync( .Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>())) _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) => .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, CancellationToken cancellationToken) =>
{ {
await Task.Yield(); await Task.Yield();
return messages.Dequeue(); return messages.Dequeue();
@@ -208,7 +208,7 @@ namespace GitHub.Runner.Common.Tests.Listener
//Assert //Assert
_runnerServer _runnerServer
.Verify(x => x.GetAgentMessageAsync( .Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Exactly(arMessages.Length)); _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Exactly(arMessages.Length));
} }
} }
@@ -293,7 +293,7 @@ namespace GitHub.Runner.Common.Tests.Listener
_runnerServer _runnerServer
.Setup(x => x.GetAgentMessageAsync( .Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>())) _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Throws(new TaskAgentAccessTokenExpiredException("test")); .Throws(new TaskAgentAccessTokenExpiredException("test"));
try try
{ {
@@ -311,7 +311,7 @@ namespace GitHub.Runner.Common.Tests.Listener
//Assert //Assert
_runnerServer _runnerServer
.Verify(x => x.GetAgentMessageAsync( .Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Once); _settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Once);
_runnerServer _runnerServer
.Verify(x => x.DeleteAgentSessionAsync( .Verify(x => x.DeleteAgentSessionAsync(