mirror of
https://github.com/actions/runner.git
synced 2025-12-14 13:43:33 +00:00
Implement Broker Redirects for Session and Messages
This commit is contained in:
@@ -17,7 +17,8 @@ namespace GitHub.Runner.Common
|
||||
{
|
||||
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
|
||||
|
||||
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
|
||||
Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session);
|
||||
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
|
||||
}
|
||||
|
||||
public sealed class BrokerServer : RunnerService, IBrokerServer
|
||||
@@ -44,11 +45,20 @@ namespace GitHub.Runner.Common
|
||||
}
|
||||
}
|
||||
|
||||
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
|
||||
public Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session)
|
||||
{
|
||||
CheckConnection();
|
||||
var jobMessage = RetryRequest<TaskAgentSession>(
|
||||
async () => await _brokerHttpClient.CreateSessionAsync(session, cancellationToken), cancellationToken);
|
||||
|
||||
return jobMessage;
|
||||
}
|
||||
|
||||
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
|
||||
{
|
||||
CheckConnection();
|
||||
var jobMessage = RetryRequest<TaskAgentMessage>(
|
||||
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
|
||||
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
|
||||
|
||||
return jobMessage;
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ namespace GitHub.Runner.Listener
|
||||
try
|
||||
{
|
||||
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token,
|
||||
null,
|
||||
runnerStatus,
|
||||
BuildConstants.RunnerPackage.Version,
|
||||
VarUtil.OS,
|
||||
|
||||
@@ -14,6 +14,7 @@ using GitHub.Runner.Listener.Configuration;
|
||||
using GitHub.Runner.Sdk;
|
||||
using GitHub.Services.Common;
|
||||
using GitHub.Services.OAuth;
|
||||
using GitHub.Services.WebApi;
|
||||
|
||||
namespace GitHub.Runner.Listener
|
||||
{
|
||||
@@ -33,6 +34,7 @@ namespace GitHub.Runner.Listener
|
||||
private RunnerSettings _settings;
|
||||
private ITerminal _term;
|
||||
private IRunnerServer _runnerServer;
|
||||
private IBrokerServer _brokerServer;
|
||||
private TaskAgentSession _session;
|
||||
private TimeSpan _getNextMessageRetryInterval;
|
||||
private bool _accessTokenRevoked = false;
|
||||
@@ -42,6 +44,7 @@ namespace GitHub.Runner.Listener
|
||||
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
|
||||
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
|
||||
private CancellationTokenSource _getMessagesTokenSource;
|
||||
private VssCredentials _creds;
|
||||
|
||||
public override void Initialize(IHostContext hostContext)
|
||||
{
|
||||
@@ -49,6 +52,7 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
_term = HostContext.GetService<ITerminal>();
|
||||
_runnerServer = HostContext.GetService<IRunnerServer>();
|
||||
_brokerServer = hostContext.GetService<IBrokerServer>();
|
||||
}
|
||||
|
||||
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
|
||||
@@ -64,7 +68,7 @@ namespace GitHub.Runner.Listener
|
||||
// Create connection.
|
||||
Trace.Info("Loading Credentials");
|
||||
var credMgr = HostContext.GetService<ICredentialManager>();
|
||||
VssCredentials creds = credMgr.LoadCredentials();
|
||||
_creds = credMgr.LoadCredentials();
|
||||
|
||||
var agent = new TaskAgentReference
|
||||
{
|
||||
@@ -86,7 +90,7 @@ namespace GitHub.Runner.Listener
|
||||
try
|
||||
{
|
||||
Trace.Info("Connecting to the Runner Server...");
|
||||
await _runnerServer.ConnectAsync(new Uri(serverUrl), creds);
|
||||
await _runnerServer.ConnectAsync(new Uri(serverUrl), _creds);
|
||||
Trace.Info("VssConnection created");
|
||||
|
||||
_term.WriteLine();
|
||||
@@ -98,6 +102,14 @@ namespace GitHub.Runner.Listener
|
||||
taskAgentSession,
|
||||
token);
|
||||
|
||||
// if (_session.SessionMigrationURI != null)
|
||||
// {
|
||||
// Trace.Info($"Runner session is in migration mode: Creating Broker session with SessionMigrationURI: {0}", _session.SessionMigrationURI);
|
||||
// var brokerServer = HostContext.GetService<IBrokerServer>();
|
||||
// await brokerServer.ConnectAsync(_session.SessionMigrationURI, _creds);
|
||||
// _session = await brokerServer.CreateSessionAsync(token, taskAgentSession);
|
||||
// }
|
||||
|
||||
Trace.Info($"Session created.");
|
||||
if (encounteringError)
|
||||
{
|
||||
@@ -124,7 +136,7 @@ namespace GitHub.Runner.Listener
|
||||
Trace.Error("Catch exception during create session.");
|
||||
Trace.Error(ex);
|
||||
|
||||
if (ex is VssOAuthTokenRequestException vssOAuthEx && creds.Federated is VssOAuthCredential vssOAuthCred)
|
||||
if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred)
|
||||
{
|
||||
// "invalid_client" means the runner registration has been deleted from the server.
|
||||
if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase))
|
||||
@@ -228,6 +240,24 @@ namespace GitHub.Runner.Listener
|
||||
// Decrypt the message body if the session is using encryption
|
||||
message = DecryptMessage(message);
|
||||
|
||||
|
||||
if (message != null && message.MessageType == BrokerMigrationMessage.MessageType)
|
||||
{
|
||||
Trace.Info("BrokerMigration message received. Polling Broker for messages...");
|
||||
|
||||
var migrationMessage = JsonUtility.FromString<BrokerMigrationMessage>(message.Body);
|
||||
var brokerServer = HostContext.GetService<IBrokerServer>();
|
||||
|
||||
await brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds);
|
||||
message = await brokerServer.GetRunnerMessageAsync(token,
|
||||
_session.SessionId,
|
||||
runnerStatus,
|
||||
BuildConstants.RunnerPackage.Version,
|
||||
VarUtil.OS,
|
||||
VarUtil.OSArchitecture,
|
||||
_settings.DisableUpdate);
|
||||
}
|
||||
|
||||
if (message != null)
|
||||
{
|
||||
_lastMessageId = message.MessageId;
|
||||
|
||||
35
src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs
Normal file
35
src/Sdk/DTWebApi/WebApi/BrokerMigrationMessage.cs
Normal file
@@ -0,0 +1,35 @@
|
||||
using System;
|
||||
using System.Runtime.Serialization;
|
||||
|
||||
namespace GitHub.DistributedTask.WebApi
|
||||
{
|
||||
/// <summary>
|
||||
/// Represents a session for performing message exchanges from an agent.
|
||||
/// </summary>
|
||||
[DataContract]
|
||||
public class BrokerMigrationMessage
|
||||
{
|
||||
|
||||
public static readonly string MessageType = "BrokerMigration";
|
||||
|
||||
public BrokerMigrationMessage()
|
||||
{
|
||||
}
|
||||
|
||||
public BrokerMigrationMessage(
|
||||
Uri brokerUrl)
|
||||
{
|
||||
this.BrokerBaseUrl = brokerUrl;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the unique identifier for this session.
|
||||
/// </summary>
|
||||
[DataMember]
|
||||
public Uri BrokerBaseUrl
|
||||
{
|
||||
get;
|
||||
internal set;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -75,5 +75,12 @@ namespace GitHub.DistributedTask.WebApi
|
||||
get;
|
||||
set;
|
||||
}
|
||||
|
||||
[DataMember(EmitDefaultValue = false, IsRequired = false)]
|
||||
public BrokerMigrationMessage BrokerMigrationMessage
|
||||
{
|
||||
get;
|
||||
set;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
}
|
||||
|
||||
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
|
||||
Guid? sessionId,
|
||||
string runnerVersion,
|
||||
TaskAgentStatus? status,
|
||||
string os = null,
|
||||
@@ -69,6 +70,11 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
|
||||
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
|
||||
|
||||
if (sessionId != null)
|
||||
{
|
||||
queryParams.Add("sessionId", sessionId.Value.ToString());
|
||||
}
|
||||
|
||||
if (status != null)
|
||||
{
|
||||
queryParams.Add("status", status.Value.ToString());
|
||||
@@ -111,5 +117,32 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
|
||||
throw new Exception($"Failed to get job message: {result.Error}");
|
||||
}
|
||||
|
||||
public async Task<TaskAgentSession> CreateSessionAsync(
|
||||
TaskAgentSession session,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
|
||||
var requestUri = new Uri(Client.BaseAddress, "session");
|
||||
var requestContent = new ObjectContent<TaskAgentSession>(session, new VssJsonMediaTypeFormatter(true));
|
||||
|
||||
var result = await SendAsync<TaskAgentSession>(
|
||||
new HttpMethod("POST"),
|
||||
requestUri: requestUri,
|
||||
content: requestContent,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
if (result.IsSuccess)
|
||||
{
|
||||
return result.Value;
|
||||
}
|
||||
|
||||
if (result.StatusCode == HttpStatusCode.Forbidden)
|
||||
{
|
||||
throw new AccessDeniedException(result.Error);
|
||||
}
|
||||
|
||||
throw new Exception($"Failed to create broker session: {result.Error}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user