Compare commits

...

6 Commits

Author SHA1 Message Date
Patrick Ellis
a850dac958 Add a simple BrokerMessageListener L0 test (just to get that ball rolling) 2024-01-24 16:33:14 -05:00
Luke Tomlinson
8954d10248 encoding 2024-01-22 06:13:54 -08:00
Luke Tomlinson
bd6be323bf lint + tests 2024-01-19 13:40:30 -08:00
Luke Tomlinson
5022e2d94b reenable session migration 2024-01-19 12:22:22 -08:00
Luke Tomlinson
d22e527655 cleanup 2024-01-19 07:51:11 -08:00
Luke Tomlinson
32282ea36b Implement Broker Redirects for Session and Messages 2024-01-18 15:06:11 -08:00
8 changed files with 202 additions and 6 deletions

View File

@@ -17,7 +17,8 @@ namespace GitHub.Runner.Common
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate);
}
public sealed class BrokerServer : RunnerService, IBrokerServer
@@ -44,11 +45,20 @@ namespace GitHub.Runner.Common
}
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
public Task<TaskAgentSession> CreateSessionAsync(CancellationToken cancellationToken, TaskAgentSession session)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentSession>(
async () => await _brokerHttpClient.CreateSessionAsync(session, cancellationToken), cancellationToken);
return jobMessage;
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken);
return jobMessage;
}

View File

@@ -74,6 +74,7 @@ namespace GitHub.Runner.Listener
try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token,
null,
runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,

View File

@@ -14,6 +14,7 @@ using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
namespace GitHub.Runner.Listener
{
@@ -33,6 +34,7 @@ namespace GitHub.Runner.Listener
private RunnerSettings _settings;
private ITerminal _term;
private IRunnerServer _runnerServer;
private IBrokerServer _brokerServer;
private TaskAgentSession _session;
private TimeSpan _getNextMessageRetryInterval;
private bool _accessTokenRevoked = false;
@@ -42,6 +44,7 @@ namespace GitHub.Runner.Listener
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;
public override void Initialize(IHostContext hostContext)
{
@@ -49,6 +52,7 @@ namespace GitHub.Runner.Listener
_term = HostContext.GetService<ITerminal>();
_runnerServer = HostContext.GetService<IRunnerServer>();
_brokerServer = hostContext.GetService<IBrokerServer>();
}
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
@@ -64,7 +68,7 @@ namespace GitHub.Runner.Listener
// Create connection.
Trace.Info("Loading Credentials");
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
_creds = credMgr.LoadCredentials();
var agent = new TaskAgentReference
{
@@ -86,7 +90,7 @@ namespace GitHub.Runner.Listener
try
{
Trace.Info("Connecting to the Runner Server...");
await _runnerServer.ConnectAsync(new Uri(serverUrl), creds);
await _runnerServer.ConnectAsync(new Uri(serverUrl), _creds);
Trace.Info("VssConnection created");
_term.WriteLine();
@@ -98,6 +102,14 @@ namespace GitHub.Runner.Listener
taskAgentSession,
token);
if (_session.BrokerMigrationMessage != null)
{
Trace.Info($"Runner session is in migration mode: Creating Broker session with BrokerBaseUrl: {0}", _session.BrokerMigrationMessage.BrokerBaseUrl);
var brokerServer = HostContext.GetService<IBrokerServer>();
await brokerServer.ConnectAsync(_session.BrokerMigrationMessage.BrokerBaseUrl, _creds);
_session = await brokerServer.CreateSessionAsync(token, taskAgentSession);
}
Trace.Info($"Session created.");
if (encounteringError)
{
@@ -124,7 +136,7 @@ namespace GitHub.Runner.Listener
Trace.Error("Catch exception during create session.");
Trace.Error(ex);
if (ex is VssOAuthTokenRequestException vssOAuthEx && creds.Federated is VssOAuthCredential vssOAuthCred)
if (ex is VssOAuthTokenRequestException vssOAuthEx && _creds.Federated is VssOAuthCredential vssOAuthCred)
{
// "invalid_client" means the runner registration has been deleted from the server.
if (string.Equals(vssOAuthEx.Error, "invalid_client", StringComparison.OrdinalIgnoreCase))
@@ -228,6 +240,24 @@ namespace GitHub.Runner.Listener
// Decrypt the message body if the session is using encryption
message = DecryptMessage(message);
if (message != null && message.MessageType == BrokerMigrationMessage.MessageType)
{
Trace.Info("BrokerMigration message received. Polling Broker for messages...");
var migrationMessage = JsonUtility.FromString<BrokerMigrationMessage>(message.Body);
var brokerServer = HostContext.GetService<IBrokerServer>();
await brokerServer.ConnectAsync(migrationMessage.BrokerBaseUrl, _creds);
message = await brokerServer.GetRunnerMessageAsync(token,
_session.SessionId,
runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
_settings.DisableUpdate);
}
if (message != null)
{
_lastMessageId = message.MessageId;

View File

@@ -0,0 +1,38 @@
using System;
using System.Runtime.Serialization;
namespace GitHub.DistributedTask.WebApi
{
/// <summary>
/// Message that tells the runner to redirect itself to BrokerListener for messages.
/// (Note that we use a special Message instead of a simple 302. This is because
/// the runner will need to apply the runner's token to the request, and it is
/// a security best practice to *not* blindly add sensitive data to redirects
/// 302s.)
/// </summary>
[DataContract]
public class BrokerMigrationMessage
{
public static readonly string MessageType = "BrokerMigration";
public BrokerMigrationMessage()
{
}
public BrokerMigrationMessage(
Uri brokerUrl)
{
this.BrokerBaseUrl = brokerUrl;
}
/// <summary>
/// The base url for the broker listener
/// </summary>
[DataMember]
public Uri BrokerBaseUrl
{
get;
internal set;
}
}
}

View File

@@ -75,5 +75,12 @@ namespace GitHub.DistributedTask.WebApi
get;
set;
}
[DataMember(EmitDefaultValue = false, IsRequired = false)]
public BrokerMigrationMessage BrokerMigrationMessage
{
get;
set;
}
}
}

View File

@@ -57,6 +57,7 @@ namespace GitHub.Actions.RunService.WebApi
}
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
Guid? sessionId,
string runnerVersion,
TaskAgentStatus? status,
string os = null,
@@ -69,6 +70,11 @@ namespace GitHub.Actions.RunService.WebApi
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
@@ -111,5 +117,33 @@ namespace GitHub.Actions.RunService.WebApi
throw new Exception($"Failed to get job message: {result.Error}");
}
public async Task<TaskAgentSession> CreateSessionAsync(
TaskAgentSession session,
CancellationToken cancellationToken = default)
{
var requestUri = new Uri(Client.BaseAddress, "session");
var requestContent = new ObjectContent<TaskAgentSession>(session, new VssJsonMediaTypeFormatter(true));
var result = await SendAsync<TaskAgentSession>(
new HttpMethod("POST"),
requestUri: requestUri,
content: requestContent,
cancellationToken: cancellationToken);
if (result.IsSuccess)
{
return result.Value;
}
if (result.StatusCode == HttpStatusCode.Forbidden)
{
throw new AccessDeniedException(result.Error);
}
throw new Exception($"Failed to create broker session: {result.Error}");
}
}
}

View File

@@ -0,0 +1,72 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Listener;
using GitHub.Runner.Listener.Configuration;
using GitHub.Services.Common;
using Moq;
using Xunit;
namespace GitHub.Runner.Common.Tests.Listener
{
public sealed class BrokerMessageListenerL0
{
private readonly RunnerSettings _settings;
private readonly Mock<IConfigurationManager> _config;
private readonly Mock<IBrokerServer> _brokerServer;
private readonly Mock<ICredentialManager> _credMgr;
public BrokerMessageListenerL0()
{
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrlV2 = "http://myserver", WorkFolder = "_work" };
_config = new Mock<IConfigurationManager>();
_config.Setup(x => x.LoadSettings()).Returns(_settings);
_brokerServer = new Mock<IBrokerServer>();
_credMgr = new Mock<ICredentialManager>();
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void CreatesSession()
{
using TestHostContext tc = CreateTestContext();
using var tokenSource = new CancellationTokenSource();
var guid = new Guid();
// Arrange
_brokerServer
.Setup(
x => x.ConnectAsync(
new Uri(_settings.ServerUrlV2),
It.Is<VssCredentials>(y => y != null)
)
)
.Returns(
Task.FromResult(
new TaskAgentSession { SessionId = guid }
)
);
BrokerMessageListener listener = new();
listener.Initialize(tc);
// Act
var result = await listener.CreateSessionAsync(tokenSource.Token);
// Assert
Assert.True(result);
}
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
{
TestHostContext tc = new(this, testName);
tc.SetSingleton<IConfigurationManager>(_config.Object);
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
return tc;
}
}
}

View File

@@ -24,6 +24,8 @@ namespace GitHub.Runner.Common.Tests.Listener
private Mock<ICredentialManager> _credMgr;
private Mock<IConfigurationStore> _store;
private Mock<IBrokerServer> _brokerServer;
public MessageListenerL0()
{
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrl = "http://myserver", WorkFolder = "_work" };
@@ -32,6 +34,7 @@ namespace GitHub.Runner.Common.Tests.Listener
_runnerServer = new Mock<IRunnerServer>();
_credMgr = new Mock<ICredentialManager>();
_store = new Mock<IConfigurationStore>();
_brokerServer = new Mock<IBrokerServer>();
}
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
@@ -41,6 +44,7 @@ namespace GitHub.Runner.Common.Tests.Listener
tc.SetSingleton<IRunnerServer>(_runnerServer.Object);
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
tc.SetSingleton<IConfigurationStore>(_store.Object);
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
return tc;
}