mirror of
https://github.com/actions/runner.git
synced 2025-12-20 06:29:53 +00:00
Implement Broker Redirects for Session and Messages (#3103)
This commit is contained in:
81
src/Test/L0/Listener/BrokerMessageListenerL0.cs
Normal file
81
src/Test/L0/Listener/BrokerMessageListenerL0.cs
Normal file
@@ -0,0 +1,81 @@
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using GitHub.Runner.Listener;
|
||||
using GitHub.Runner.Listener.Configuration;
|
||||
using GitHub.Services.Common;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
|
||||
namespace GitHub.Runner.Common.Tests.Listener
|
||||
{
|
||||
public sealed class BrokerMessageListenerL0
|
||||
{
|
||||
private readonly RunnerSettings _settings;
|
||||
private readonly Mock<IConfigurationManager> _config;
|
||||
private readonly Mock<IBrokerServer> _brokerServer;
|
||||
private readonly Mock<ICredentialManager> _credMgr;
|
||||
private Mock<IConfigurationStore> _store;
|
||||
|
||||
|
||||
public BrokerMessageListenerL0()
|
||||
{
|
||||
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrl = "http://myserver", WorkFolder = "_work", ServerUrlV2 = "http://myserverv2" };
|
||||
_config = new Mock<IConfigurationManager>();
|
||||
_config.Setup(x => x.LoadSettings()).Returns(_settings);
|
||||
_credMgr = new Mock<ICredentialManager>();
|
||||
_store = new Mock<IConfigurationStore>();
|
||||
_brokerServer = new Mock<IBrokerServer>();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
public async void CreatesSession()
|
||||
{
|
||||
using (TestHostContext tc = CreateTestContext())
|
||||
using (var tokenSource = new CancellationTokenSource())
|
||||
{
|
||||
Tracing trace = tc.GetTrace();
|
||||
|
||||
// Arrange.
|
||||
var expectedSession = new TaskAgentSession();
|
||||
_brokerServer
|
||||
.Setup(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedSession));
|
||||
|
||||
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
|
||||
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
|
||||
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
|
||||
|
||||
// Act.
|
||||
BrokerMessageListener listener = new();
|
||||
listener.Initialize(tc);
|
||||
|
||||
bool result = await listener.CreateSessionAsync(tokenSource.Token);
|
||||
trace.Info("result: {0}", result);
|
||||
|
||||
// Assert.
|
||||
Assert.True(result);
|
||||
_brokerServer
|
||||
.Verify(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
}
|
||||
}
|
||||
|
||||
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
|
||||
{
|
||||
TestHostContext tc = new(this, testName);
|
||||
tc.SetSingleton<IConfigurationManager>(_config.Object);
|
||||
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
|
||||
tc.SetSingleton<IConfigurationStore>(_store.Object);
|
||||
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
|
||||
return tc;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,8 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
private Mock<ICredentialManager> _credMgr;
|
||||
private Mock<IConfigurationStore> _store;
|
||||
|
||||
private Mock<IBrokerServer> _brokerServer;
|
||||
|
||||
public MessageListenerL0()
|
||||
{
|
||||
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrl = "http://myserver", WorkFolder = "_work" };
|
||||
@@ -32,6 +34,7 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
_runnerServer = new Mock<IRunnerServer>();
|
||||
_credMgr = new Mock<ICredentialManager>();
|
||||
_store = new Mock<IConfigurationStore>();
|
||||
_brokerServer = new Mock<IBrokerServer>();
|
||||
}
|
||||
|
||||
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
|
||||
@@ -41,6 +44,7 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
tc.SetSingleton<IRunnerServer>(_runnerServer.Object);
|
||||
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
|
||||
tc.SetSingleton<IConfigurationStore>(_store.Object);
|
||||
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
|
||||
return tc;
|
||||
}
|
||||
|
||||
@@ -81,6 +85,72 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
_brokerServer
|
||||
.Verify(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Never());
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
public async void CreatesSessionWithBrokerMigration()
|
||||
{
|
||||
using (TestHostContext tc = CreateTestContext())
|
||||
using (var tokenSource = new CancellationTokenSource())
|
||||
{
|
||||
Tracing trace = tc.GetTrace();
|
||||
|
||||
// Arrange.
|
||||
var expectedSession = new TaskAgentSession()
|
||||
{
|
||||
OwnerName = "legacy",
|
||||
BrokerMigrationMessage = new BrokerMigrationMessage(new Uri("https://broker.actions.github.com"))
|
||||
};
|
||||
|
||||
var expectedBrokerSession = new TaskAgentSession()
|
||||
{
|
||||
OwnerName = "broker"
|
||||
};
|
||||
|
||||
_runnerServer
|
||||
.Setup(x => x.CreateAgentSessionAsync(
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedSession));
|
||||
|
||||
_brokerServer
|
||||
.Setup(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedBrokerSession));
|
||||
|
||||
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
|
||||
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
|
||||
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
|
||||
|
||||
// Act.
|
||||
MessageListener listener = new();
|
||||
listener.Initialize(tc);
|
||||
|
||||
bool result = await listener.CreateSessionAsync(tokenSource.Token);
|
||||
trace.Info("result: {0}", result);
|
||||
|
||||
// Assert.
|
||||
Assert.True(result);
|
||||
|
||||
_runnerServer
|
||||
.Verify(x => x.CreateAgentSessionAsync(
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
|
||||
_brokerServer
|
||||
.Verify(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,6 +201,83 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
public async void DeleteSessionWithBrokerMigration()
|
||||
{
|
||||
using (TestHostContext tc = CreateTestContext())
|
||||
using (var tokenSource = new CancellationTokenSource())
|
||||
{
|
||||
Tracing trace = tc.GetTrace();
|
||||
|
||||
// Arrange.
|
||||
var expectedSession = new TaskAgentSession()
|
||||
{
|
||||
OwnerName = "legacy",
|
||||
BrokerMigrationMessage = new BrokerMigrationMessage(new Uri("https://broker.actions.github.com"))
|
||||
};
|
||||
|
||||
var expectedBrokerSession = new TaskAgentSession()
|
||||
{
|
||||
SessionId = Guid.NewGuid(),
|
||||
OwnerName = "broker"
|
||||
};
|
||||
|
||||
_runnerServer
|
||||
.Setup(x => x.CreateAgentSessionAsync(
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedSession));
|
||||
|
||||
_brokerServer
|
||||
.Setup(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedBrokerSession));
|
||||
|
||||
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
|
||||
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
|
||||
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
|
||||
|
||||
// Act.
|
||||
MessageListener listener = new();
|
||||
listener.Initialize(tc);
|
||||
|
||||
bool result = await listener.CreateSessionAsync(tokenSource.Token);
|
||||
trace.Info("result: {0}", result);
|
||||
|
||||
Assert.True(result);
|
||||
|
||||
_runnerServer
|
||||
.Verify(x => x.CreateAgentSessionAsync(
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
|
||||
_brokerServer
|
||||
.Verify(x => x.CreateSessionAsync(
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token), Times.Once());
|
||||
|
||||
_brokerServer
|
||||
.Setup(x => x.DeleteSessionAsync(It.IsAny<CancellationToken>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
|
||||
// Act.
|
||||
await listener.DeleteSessionAsync();
|
||||
|
||||
|
||||
//Assert
|
||||
_runnerServer
|
||||
.Verify(x => x.DeleteAgentSessionAsync(
|
||||
_settings.PoolId, expectedSession.SessionId, It.IsAny<CancellationToken>()), Times.Never());
|
||||
_brokerServer
|
||||
.Verify(x => x.DeleteSessionAsync(It.IsAny<CancellationToken>()), Times.Once());
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
@@ -212,6 +359,112 @@ namespace GitHub.Runner.Common.Tests.Listener
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
public async void GetNextMessageWithBrokerMigration()
|
||||
{
|
||||
using (TestHostContext tc = CreateTestContext())
|
||||
using (var tokenSource = new CancellationTokenSource())
|
||||
{
|
||||
Tracing trace = tc.GetTrace();
|
||||
|
||||
// Arrange.
|
||||
var expectedSession = new TaskAgentSession();
|
||||
PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
|
||||
Assert.NotNull(sessionIdProperty);
|
||||
sessionIdProperty.SetValue(expectedSession, Guid.NewGuid());
|
||||
|
||||
_runnerServer
|
||||
.Setup(x => x.CreateAgentSessionAsync(
|
||||
_settings.PoolId,
|
||||
It.Is<TaskAgentSession>(y => y != null),
|
||||
tokenSource.Token))
|
||||
.Returns(Task.FromResult(expectedSession));
|
||||
|
||||
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
|
||||
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
|
||||
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
|
||||
|
||||
// Act.
|
||||
MessageListener listener = new();
|
||||
listener.Initialize(tc);
|
||||
|
||||
bool result = await listener.CreateSessionAsync(tokenSource.Token);
|
||||
Assert.True(result);
|
||||
|
||||
var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com"));
|
||||
|
||||
var arMessages = new TaskAgentMessage[]
|
||||
{
|
||||
new TaskAgentMessage
|
||||
{
|
||||
Body = JsonUtility.ToString(brokerMigrationMesage),
|
||||
MessageType = BrokerMigrationMessage.MessageType
|
||||
},
|
||||
};
|
||||
|
||||
var brokerMessages = new TaskAgentMessage[]
|
||||
{
|
||||
new TaskAgentMessage
|
||||
{
|
||||
Body = "somebody1",
|
||||
MessageId = 4234,
|
||||
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
|
||||
},
|
||||
new TaskAgentMessage
|
||||
{
|
||||
Body = "somebody2",
|
||||
MessageId = 4235,
|
||||
MessageType = JobCancelMessage.MessageType
|
||||
},
|
||||
null, //should be skipped by GetNextMessageAsync implementation
|
||||
null,
|
||||
new TaskAgentMessage
|
||||
{
|
||||
Body = "somebody3",
|
||||
MessageId = 4236,
|
||||
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
|
||||
}
|
||||
};
|
||||
var brokerMessageQueue = new Queue<TaskAgentMessage>(brokerMessages);
|
||||
|
||||
_runnerServer
|
||||
.Setup(x => x.GetAgentMessageAsync(
|
||||
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
|
||||
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
|
||||
{
|
||||
await Task.Yield();
|
||||
return arMessages[0]; // always send migration message
|
||||
});
|
||||
|
||||
_brokerServer
|
||||
.Setup(x => x.GetRunnerMessageAsync(
|
||||
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
|
||||
.Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
|
||||
{
|
||||
await Task.Yield();
|
||||
return brokerMessageQueue.Dequeue();
|
||||
});
|
||||
|
||||
TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token);
|
||||
TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token);
|
||||
TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token);
|
||||
Assert.Equal(brokerMessages[0], message1);
|
||||
Assert.Equal(brokerMessages[1], message2);
|
||||
Assert.Equal(brokerMessages[4], message3);
|
||||
|
||||
//Assert
|
||||
_runnerServer
|
||||
.Verify(x => x.GetAgentMessageAsync(
|
||||
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length));
|
||||
|
||||
_brokerServer
|
||||
.Verify(x => x.GetRunnerMessageAsync(
|
||||
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length));
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Runner")]
|
||||
|
||||
Reference in New Issue
Block a user