Support auth migration using authUrlV2 in Runner/MessageListener. (#3787)

This commit is contained in:
Tingluo Huang
2025-04-10 12:58:33 -04:00
committed by GitHub
parent f1b5b5bd5c
commit d5ccbd10d1
6 changed files with 1204 additions and 12 deletions

View File

@@ -1,4 +1,5 @@
using System;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -47,7 +48,7 @@ namespace GitHub.Runner.Common.Tests.Listener
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
_credMgr.Setup(x => x.LoadCredentials(It.IsAny<bool>())).Returns(new VssCredentials());
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
// Act.
BrokerMessageListener listener = new();
@@ -65,6 +66,303 @@ namespace GitHub.Runner.Common.Tests.Listener
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task HandleAuthMigrationChanged()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
// Act.
BrokerMessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
// Assert.
Assert.Equal(CreateSessionResult.Success, result);
_brokerServer
.Verify(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token), Times.Once());
tc.EnableAuthMigration("L0Test");
var traceFile = Path.GetTempFileName();
File.Copy(tc.TraceFileName, traceFile, true);
Assert.Contains("Auth migration changed", File.ReadAllText(traceFile));
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task CreatesSession_DeferAuthMigration()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
var throwException = true;
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(async (TaskAgentSession session, CancellationToken token) =>
{
await Task.Yield();
if (throwException)
{
throwException = false;
throw new NotSupportedException("Error during create session");
}
return expectedSession;
});
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
// Act.
BrokerMessageListener listener = new();
listener.Initialize(tc);
tc.EnableAuthMigration("L0Test");
Assert.True(tc.AllowAuthMigration);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
// Assert.
Assert.Equal(CreateSessionResult.Success, result);
_brokerServer
.Verify(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token), Times.Exactly(2));
_credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(2));
Assert.False(tc.AllowAuthMigration);
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task GetNextMessage()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
var expectedMessage = new TaskAgentMessage();
_brokerServer
.Setup(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(expectedMessage));
// Act.
BrokerMessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
Assert.Equal(CreateSessionResult.Success, result);
TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token);
trace.Info("message: {0}", message);
// Assert.
Assert.Equal(expectedMessage, message);
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()), Times.Once());
_brokerServer.Verify(x => x.ConnectAsync(It.IsAny<Uri>(), It.IsAny<VssCredentials>()), Times.Once());
_credMgr.Verify(x => x.LoadCredentials(true), Times.Once());
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task GetNextMessage_EnableAuthMigration()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
var expectedMessage = new TaskAgentMessage();
_brokerServer
.Setup(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(expectedMessage));
// Act.
BrokerMessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
Assert.Equal(CreateSessionResult.Success, result);
tc.EnableAuthMigration("L0Test");
TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token);
trace.Info("message: {0}", message);
// Assert.
Assert.Equal(expectedMessage, message);
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()), Times.Once());
_brokerServer.Verify(x => x.ConnectAsync(It.IsAny<Uri>(), It.IsAny<VssCredentials>()), Times.Exactly(2));
_credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(2));
Assert.True(tc.AllowAuthMigration);
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task GetNextMessage_AuthMigrationFallback()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
tc.EnableAuthMigration("L0Test");
// Arrange.
_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
var expectedMessage = new TaskAgentMessage();
_brokerServer
.Setup(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()))
.Returns(async (Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token) =>
{
await Task.Yield();
if (tc.AllowAuthMigration)
{
throw new NotSupportedException("Error during get message");
}
return expectedMessage;
});
// Act.
BrokerMessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
Assert.Equal(CreateSessionResult.Success, result);
Assert.True(tc.AllowAuthMigration);
TaskAgentMessage message = await listener.GetNextMessageAsync(tokenSource.Token);
trace.Info("message: {0}", message);
// Assert.
Assert.Equal(expectedMessage, message);
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
It.IsAny<Guid?>(),
It.IsAny<TaskAgentStatus>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<CancellationToken>()), Times.Exactly(2));
_brokerServer.Verify(x => x.ConnectAsync(It.IsAny<Uri>(), It.IsAny<VssCredentials>()), Times.Exactly(3));
_credMgr.Verify(x => x.LoadCredentials(true), Times.Exactly(3));
Assert.False(tc.AllowAuthMigration);
}
}
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
{
TestHostContext tc = new(this, testName);

View File

@@ -323,6 +323,15 @@ namespace GitHub.Runner.Common.Tests.Listener
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length));
_credMgr
.Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length));
_brokerServer
.Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny<VssCredentials>()), Times.Exactly(brokerMessages.Length));
_brokerServer
.Verify(x => x.ForceRefreshConnection(It.IsAny<VssCredentials>()), Times.Never);
}
}
@@ -432,5 +441,301 @@ namespace GitHub.Runner.Common.Tests.Listener
_settings.PoolId, expectedSession.SessionId, It.IsAny<CancellationToken>()), Times.Never);
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task HandleAuthMigrationChanged()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
var expectedSession = new TaskAgentSession();
_runnerServer
.Setup(x => x.CreateAgentSessionAsync(
_settings.PoolId,
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
_credMgr.Setup(x => x.LoadCredentials(It.IsAny<bool>())).Returns(new VssCredentials());
// Act.
MessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);
// Assert.
Assert.Equal(CreateSessionResult.Success, result);
_runnerServer
.Verify(x => x.CreateAgentSessionAsync(
_settings.PoolId,
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token), Times.Once());
_brokerServer
.Verify(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token), Times.Never());
tc.EnableAuthMigration("L0Test");
var traceFile = Path.GetTempFileName();
File.Copy(tc.TraceFileName, traceFile, true);
Assert.Contains("Auth migration changed", File.ReadAllText(traceFile));
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task GetNextMessageWithBrokerMigration_AuthMigrationFallback()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
var expectedSession = new TaskAgentSession();
PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
Assert.NotNull(sessionIdProperty);
sessionIdProperty.SetValue(expectedSession, Guid.NewGuid());
_runnerServer
.Setup(x => x.CreateAgentSessionAsync(
_settings.PoolId,
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
_credMgr.Setup(x => x.LoadCredentials(It.IsAny<bool>())).Returns(new VssCredentials());
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
// Act.
MessageListener listener = new();
listener.Initialize(tc);
tc.EnableAuthMigration("L0Test");
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
Assert.Equal(CreateSessionResult.Success, result);
var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com"));
var arMessages = new TaskAgentMessage[]
{
new TaskAgentMessage
{
Body = JsonUtility.ToString(brokerMigrationMesage),
MessageType = BrokerMigrationMessage.MessageType
},
};
var brokerMessages = new TaskAgentMessage[]
{
new TaskAgentMessage
{
Body = "somebody1",
MessageId = 4234,
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
},
new TaskAgentMessage
{
Body = "somebody2",
MessageId = 4235,
MessageType = JobCancelMessage.MessageType
},
null, //should be skipped by GetNextMessageAsync implementation
null,
new TaskAgentMessage
{
Body = "somebody3",
MessageId = 4236,
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
}
};
var brokerMessageQueue = new Queue<TaskAgentMessage>(brokerMessages);
_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
{
await Task.Yield();
return arMessages[0]; // always send migration message
});
var counter = 0;
_brokerServer
.Setup(x => x.GetRunnerMessageAsync(
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
{
counter++;
await Task.Yield();
if (counter == 2)
{
throw new NotSupportedException("Something wrong.");
}
return brokerMessageQueue.Dequeue();
});
TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token);
TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token);
TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token);
Assert.Equal(brokerMessages[0], message1);
Assert.Equal(brokerMessages[1], message2);
Assert.Equal(brokerMessages[4], message3);
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length + 1));
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length + 1));
_credMgr
.Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length + 1));
_brokerServer
.Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny<VssCredentials>()), Times.Exactly(brokerMessages.Length + 1));
_brokerServer
.Verify(x => x.ForceRefreshConnection(It.IsAny<VssCredentials>()), Times.Once());
Assert.False(tc.AllowAuthMigration);
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task GetNextMessageWithBrokerMigration_EnableAuthMigration()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();
// Arrange.
var expectedSession = new TaskAgentSession();
PropertyInfo sessionIdProperty = expectedSession.GetType().GetProperty("SessionId", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
Assert.NotNull(sessionIdProperty);
sessionIdProperty.SetValue(expectedSession, Guid.NewGuid());
_runnerServer
.Setup(x => x.CreateAgentSessionAsync(
_settings.PoolId,
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));
_credMgr.Setup(x => x.LoadCredentials(It.IsAny<bool>())).Returns(new VssCredentials());
_store.Setup(x => x.GetCredentials()).Returns(new CredentialData() { Scheme = Constants.Configuration.OAuthAccessToken });
_store.Setup(x => x.GetMigratedCredentials()).Returns(default(CredentialData));
// Act.
MessageListener listener = new();
listener.Initialize(tc);
CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
Assert.Equal(CreateSessionResult.Success, result);
var brokerMigrationMesage = new BrokerMigrationMessage(new Uri("https://actions.broker.com"));
var arMessages = new TaskAgentMessage[]
{
new TaskAgentMessage
{
Body = JsonUtility.ToString(brokerMigrationMesage),
MessageType = BrokerMigrationMessage.MessageType
},
};
var brokerMessages = new TaskAgentMessage[]
{
new TaskAgentMessage
{
Body = "somebody1",
MessageId = 4234,
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
},
new TaskAgentMessage
{
Body = "somebody2",
MessageId = 4235,
MessageType = JobCancelMessage.MessageType
},
null, //should be skipped by GetNextMessageAsync implementation
null,
new TaskAgentMessage
{
Body = "somebody3",
MessageId = 4236,
MessageType = JobRequestMessageTypes.PipelineAgentJobRequest
}
};
var brokerMessageQueue = new Queue<TaskAgentMessage>(brokerMessages);
_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
{
await Task.Yield();
return arMessages[0]; // always send migration message
});
_brokerServer
.Setup(x => x.GetRunnerMessageAsync(
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(async (Guid sessionId, TaskAgentStatus status, string runnerVersion, string os, string architecture, bool disableUpdate, CancellationToken cancellationToken) =>
{
await Task.Yield();
if (!tc.AllowAuthMigration)
{
tc.EnableAuthMigration("L0Test");
}
return brokerMessageQueue.Dequeue();
});
TaskAgentMessage message1 = await listener.GetNextMessageAsync(tokenSource.Token);
TaskAgentMessage message2 = await listener.GetNextMessageAsync(tokenSource.Token);
TaskAgentMessage message3 = await listener.GetNextMessageAsync(tokenSource.Token);
Assert.Equal(brokerMessages[0], message1);
Assert.Equal(brokerMessages[1], message2);
Assert.Equal(brokerMessages[4], message3);
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length));
_brokerServer
.Verify(x => x.GetRunnerMessageAsync(
expectedSession.SessionId, TaskAgentStatus.Online, It.IsAny<string>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Exactly(brokerMessages.Length));
_credMgr
.Verify(x => x.LoadCredentials(true), Times.Exactly(brokerMessages.Length));
_brokerServer
.Verify(x => x.UpdateConnectionIfNeeded(brokerMigrationMesage.BrokerBaseUrl, It.IsAny<VssCredentials>()), Times.Exactly(brokerMessages.Length));
_brokerServer
.Verify(x => x.ForceRefreshConnection(It.IsAny<VssCredentials>()), Times.Once());
Assert.True(tc.AllowAuthMigration);
}
}
}
}

View File

@@ -1,10 +1,12 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Listener;
using GitHub.Runner.Listener.Configuration;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Moq;
using Xunit;
@@ -24,6 +26,9 @@ namespace GitHub.Runner.Common.Tests.Listener
private Mock<IConfigurationStore> _configStore;
private Mock<ISelfUpdater> _updater;
private Mock<IErrorThrottler> _acquireJobThrottler;
private Mock<ICredentialManager> _credentialManager;
private Mock<IActionsRunServer> _actionsRunServer;
private Mock<IRunServer> _runServer;
public RunnerL0()
{
@@ -37,6 +42,9 @@ namespace GitHub.Runner.Common.Tests.Listener
_configStore = new Mock<IConfigurationStore>();
_updater = new Mock<ISelfUpdater>();
_acquireJobThrottler = new Mock<IErrorThrottler>();
_credentialManager = new Mock<ICredentialManager>();
_actionsRunServer = new Mock<IActionsRunServer>();
_runServer = new Mock<IRunServer>();
}
private Pipelines.AgentJobRequestMessage CreateJobRequestMessage(string jobName)
@@ -552,5 +560,428 @@ namespace GitHub.Runner.Common.Tests.Listener
_configurationManager.Verify(x => x.DeleteLocalRunnerConfig(), Times.Once());
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task TestReportAuthMigrationTelemetry()
{
using (var hc = new TestHostContext(this))
{
//Arrange
var runner = new Runner.Listener.Runner();
hc.SetSingleton<IConfigurationManager>(_configurationManager.Object);
hc.SetSingleton<IJobNotification>(_jobNotification.Object);
hc.SetSingleton<IMessageListener>(_messageListener.Object);
hc.SetSingleton<IPromptManager>(_promptManager.Object);
hc.SetSingleton<IRunnerServer>(_runnerServer.Object);
hc.SetSingleton<IConfigurationStore>(_configStore.Object);
hc.SetSingleton<ICredentialManager>(_credentialManager.Object);
hc.EnqueueInstance<IErrorThrottler>(_acquireJobThrottler.Object);
hc.EnqueueInstance<IJobDispatcher>(_jobDispatcher.Object);
runner.Initialize(hc);
var settings = new RunnerSettings
{
PoolId = 43242,
AgentId = 5678,
Ephemeral = true
};
var message1 = new TaskAgentMessage()
{
MessageId = 4234,
MessageType = "unknown"
};
var messages = new Queue<TaskAgentMessage>();
messages.Enqueue(message1);
_updater.Setup(x => x.SelfUpdate(It.IsAny<AgentRefreshMessage>(), It.IsAny<IJobDispatcher>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
_configurationManager.Setup(x => x.LoadSettings())
.Returns(settings);
_configurationManager.Setup(x => x.IsConfigured())
.Returns(true);
_messageListener.Setup(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<CreateSessionResult>(CreateSessionResult.Success));
_messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()))
.Returns(async (CancellationToken token) =>
{
hc.GetTrace().Info("Waiting for message");
Assert.False(hc.AllowAuthMigration);
await Task.Delay(100, token);
var traceFile = Path.GetTempFileName();
File.Copy(hc.TraceFileName, traceFile, true);
Assert.DoesNotContain("Checking for auth migration telemetry to report", File.ReadAllText(traceFile));
hc.EnableAuthMigration("L0Test");
hc.DeferAuthMigration(TimeSpan.FromSeconds(1), "L0Test");
hc.EnableAuthMigration("L0Test");
hc.DeferAuthMigration(TimeSpan.FromSeconds(1), "L0Test");
await Task.Delay(1000, token);
hc.ShutdownRunner(ShutdownReason.UserCancelled);
File.Copy(hc.TraceFileName, traceFile, true);
Assert.Contains("Checking for auth migration telemetry to report", File.ReadAllText(traceFile));
return messages.Dequeue();
});
_messageListener.Setup(x => x.DeleteSessionAsync())
.Returns(Task.CompletedTask);
_messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()))
.Returns(Task.CompletedTask);
_jobNotification.Setup(x => x.StartClient(It.IsAny<String>()))
.Callback(() =>
{
});
_configStore.Setup(x => x.IsServiceConfigured()).Returns(false);
_runnerServer.Setup(x => x.UpdateAgentUpdateStateAsync(It.IsAny<int>(), It.IsAny<ulong>(), It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(new TaskAgent()));
//Act
var command = new CommandSettings(hc, new string[] { "run" });
var returnCode = await runner.ExecuteCommand(command);
//Assert
Assert.Equal(Constants.Runner.ReturnCode.Success, returnCode);
_messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()), Times.AtLeastOnce());
_messageListener.Verify(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()), Times.Once());
_messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once());
_messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()), Times.Once());
_runnerServer.Verify(x => x.UpdateAgentUpdateStateAsync(It.IsAny<int>(), It.IsAny<ulong>(), It.IsAny<string>(), It.Is<string>(s => s.Contains("L0Test")), It.IsAny<CancellationToken>()), Times.Exactly(4));
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task TestRunnerJobRequestMessageFromPipeline()
{
using (var hc = new TestHostContext(this))
{
//Arrange
var runner = new Runner.Listener.Runner();
hc.SetSingleton<IConfigurationManager>(_configurationManager.Object);
hc.SetSingleton<IJobNotification>(_jobNotification.Object);
hc.SetSingleton<IMessageListener>(_messageListener.Object);
hc.SetSingleton<IPromptManager>(_promptManager.Object);
hc.SetSingleton<IRunnerServer>(_runnerServer.Object);
hc.SetSingleton<IConfigurationStore>(_configStore.Object);
hc.SetSingleton<ISelfUpdater>(_updater.Object);
hc.SetSingleton<ICredentialManager>(_credentialManager.Object);
hc.EnqueueInstance<IErrorThrottler>(_acquireJobThrottler.Object);
hc.EnqueueInstance<IActionsRunServer>(_actionsRunServer.Object);
hc.EnqueueInstance<IJobDispatcher>(_jobDispatcher.Object);
runner.Initialize(hc);
var settings = new RunnerSettings
{
PoolId = 43242,
AgentId = 5678,
Ephemeral = true,
ServerUrl = "https://github.com",
};
var message1 = new TaskAgentMessage()
{
Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999" }),
MessageId = 4234,
MessageType = JobRequestMessageTypes.RunnerJobRequest
};
var messages = new Queue<TaskAgentMessage>();
messages.Enqueue(message1);
_updater.Setup(x => x.SelfUpdate(It.IsAny<AgentRefreshMessage>(), It.IsAny<IJobDispatcher>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
_configurationManager.Setup(x => x.LoadSettings())
.Returns(settings);
_configurationManager.Setup(x => x.IsConfigured())
.Returns(true);
_messageListener.Setup(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<CreateSessionResult>(CreateSessionResult.Success));
_messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()))
.Returns(async (CancellationToken token) =>
{
if (0 == messages.Count)
{
await Task.Delay(2000, token);
}
return messages.Dequeue();
});
_messageListener.Setup(x => x.DeleteSessionAsync())
.Returns(Task.CompletedTask);
_messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()))
.Returns(Task.CompletedTask);
_jobNotification.Setup(x => x.StartClient(It.IsAny<String>()))
.Callback(() =>
{
});
_actionsRunServer.Setup(x => x.GetJobMessageAsync("999", It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(CreateJobRequestMessage("test")));
_credentialManager.Setup(x => x.LoadCredentials(false)).Returns(new VssCredentials());
_configStore.Setup(x => x.IsServiceConfigured()).Returns(false);
var completedTask = new TaskCompletionSource<bool>();
completedTask.SetResult(true);
_jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask);
//Act
var command = new CommandSettings(hc, new string[] { "run" });
Task<int> runnerTask = runner.ExecuteCommand(command);
//Assert
//wait for the runner to exit with right return code
await Task.WhenAny(runnerTask, Task.Delay(30000));
Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out.");
Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString());
if (runnerTask.IsCompleted)
{
Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask);
}
_jobDispatcher.Verify(x => x.Run(It.IsAny<Pipelines.AgentJobRequestMessage>(), true), Times.Once());
_messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()), Times.AtLeastOnce());
_messageListener.Verify(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()), Times.Once());
_messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once());
_messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()), Times.Once());
_credentialManager.Verify(x => x.LoadCredentials(false), Times.Once());
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task TestRunnerJobRequestMessageFromRunService()
{
using (var hc = new TestHostContext(this))
{
//Arrange
var runner = new Runner.Listener.Runner();
hc.SetSingleton<IConfigurationManager>(_configurationManager.Object);
hc.SetSingleton<IJobNotification>(_jobNotification.Object);
hc.SetSingleton<IMessageListener>(_messageListener.Object);
hc.SetSingleton<IPromptManager>(_promptManager.Object);
hc.SetSingleton<IRunnerServer>(_runnerServer.Object);
hc.SetSingleton<IConfigurationStore>(_configStore.Object);
hc.SetSingleton<ISelfUpdater>(_updater.Object);
hc.SetSingleton<ICredentialManager>(_credentialManager.Object);
hc.EnqueueInstance<IErrorThrottler>(_acquireJobThrottler.Object);
hc.EnqueueInstance<IRunServer>(_runServer.Object);
hc.EnqueueInstance<IJobDispatcher>(_jobDispatcher.Object);
runner.Initialize(hc);
var settings = new RunnerSettings
{
PoolId = 43242,
AgentId = 5678,
Ephemeral = true,
ServerUrl = "https://github.com",
};
var message1 = new TaskAgentMessage()
{
Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999", RunServiceUrl = "https://run-service.com" }),
MessageId = 4234,
MessageType = JobRequestMessageTypes.RunnerJobRequest
};
var messages = new Queue<TaskAgentMessage>();
messages.Enqueue(message1);
_updater.Setup(x => x.SelfUpdate(It.IsAny<AgentRefreshMessage>(), It.IsAny<IJobDispatcher>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
_configurationManager.Setup(x => x.LoadSettings())
.Returns(settings);
_configurationManager.Setup(x => x.IsConfigured())
.Returns(true);
_messageListener.Setup(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<CreateSessionResult>(CreateSessionResult.Success));
_messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()))
.Returns(async (CancellationToken token) =>
{
if (0 == messages.Count)
{
await Task.Delay(2000, token);
}
return messages.Dequeue();
});
_messageListener.Setup(x => x.DeleteSessionAsync())
.Returns(Task.CompletedTask);
_messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()))
.Returns(Task.CompletedTask);
_jobNotification.Setup(x => x.StartClient(It.IsAny<String>()))
.Callback(() =>
{
});
_runServer.Setup(x => x.GetJobMessageAsync("999", "github", It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(CreateJobRequestMessage("test")));
_credentialManager.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
_configStore.Setup(x => x.IsServiceConfigured()).Returns(false);
var completedTask = new TaskCompletionSource<bool>();
completedTask.SetResult(true);
_jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask);
//Act
var command = new CommandSettings(hc, new string[] { "run" });
Task<int> runnerTask = runner.ExecuteCommand(command);
//Assert
//wait for the runner to exit with right return code
await Task.WhenAny(runnerTask, Task.Delay(30000));
Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out.");
Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString());
if (runnerTask.IsCompleted)
{
Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask);
}
_jobDispatcher.Verify(x => x.Run(It.IsAny<Pipelines.AgentJobRequestMessage>(), true), Times.Once());
_messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()), Times.AtLeastOnce());
_messageListener.Verify(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()), Times.Once());
_messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once());
_messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()), Times.Once());
_credentialManager.Verify(x => x.LoadCredentials(true), Times.Once());
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task TestRunnerJobRequestMessageFromRunService_AuthMigrationFallback()
{
using (var hc = new TestHostContext(this))
{
//Arrange
var runner = new Runner.Listener.Runner();
hc.SetSingleton<IConfigurationManager>(_configurationManager.Object);
hc.SetSingleton<IJobNotification>(_jobNotification.Object);
hc.SetSingleton<IMessageListener>(_messageListener.Object);
hc.SetSingleton<IPromptManager>(_promptManager.Object);
hc.SetSingleton<IRunnerServer>(_runnerServer.Object);
hc.SetSingleton<IConfigurationStore>(_configStore.Object);
hc.SetSingleton<ISelfUpdater>(_updater.Object);
hc.SetSingleton<ICredentialManager>(_credentialManager.Object);
hc.EnqueueInstance<IErrorThrottler>(_acquireJobThrottler.Object);
hc.EnqueueInstance<IJobDispatcher>(_jobDispatcher.Object);
hc.EnqueueInstance<IRunServer>(_runServer.Object);
hc.EnqueueInstance<IRunServer>(_runServer.Object);
runner.Initialize(hc);
var settings = new RunnerSettings
{
PoolId = 43242,
AgentId = 5678,
Ephemeral = true,
ServerUrl = "https://github.com",
};
var message1 = new TaskAgentMessage()
{
Body = JsonUtility.ToString(new RunnerJobRequestRef() { BillingOwnerId = "github", RunnerRequestId = "999", RunServiceUrl = "https://run-service.com" }),
MessageId = 4234,
MessageType = JobRequestMessageTypes.RunnerJobRequest
};
var messages = new Queue<TaskAgentMessage>();
messages.Enqueue(message1);
messages.Enqueue(message1);
_updater.Setup(x => x.SelfUpdate(It.IsAny<AgentRefreshMessage>(), It.IsAny<IJobDispatcher>(), It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
_configurationManager.Setup(x => x.LoadSettings())
.Returns(settings);
_configurationManager.Setup(x => x.IsConfigured())
.Returns(true);
_messageListener.Setup(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<CreateSessionResult>(CreateSessionResult.Success));
_messageListener.Setup(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()))
.Returns(async (CancellationToken token) =>
{
if (2 == messages.Count)
{
hc.EnableAuthMigration("L0Test");
}
if (0 == messages.Count)
{
await Task.Delay(2000, token);
}
return messages.Dequeue();
});
_messageListener.Setup(x => x.DeleteSessionAsync())
.Returns(Task.CompletedTask);
_messageListener.Setup(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()))
.Returns(Task.CompletedTask);
_jobNotification.Setup(x => x.StartClient(It.IsAny<String>()))
.Callback(() =>
{
});
var throwError = true;
_runServer.Setup(x => x.GetJobMessageAsync("999", "github", It.IsAny<CancellationToken>()))
.Returns(() =>
{
if (throwError)
{
Assert.True(hc.AllowAuthMigration);
throwError = false;
throw new NotSupportedException("some error");
}
return Task.FromResult(CreateJobRequestMessage("test"));
});
_credentialManager.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());
_configStore.Setup(x => x.IsServiceConfigured()).Returns(false);
var completedTask = new TaskCompletionSource<bool>();
completedTask.SetResult(true);
_jobDispatcher.Setup(x => x.RunOnceJobCompleted).Returns(completedTask);
//Act
var command = new CommandSettings(hc, new string[] { "run" });
Task<int> runnerTask = runner.ExecuteCommand(command);
//Assert
//wait for the runner to exit with right return code
await Task.WhenAny(runnerTask, Task.Delay(30000));
Assert.True(runnerTask.IsCompleted, $"{nameof(runner.ExecuteCommand)} timed out.");
Assert.True(!runnerTask.IsFaulted, runnerTask.Exception?.ToString());
if (runnerTask.IsCompleted)
{
Assert.Equal(Constants.Runner.ReturnCode.Success, await runnerTask);
}
_jobDispatcher.Verify(x => x.Run(It.IsAny<Pipelines.AgentJobRequestMessage>(), true), Times.Once());
_messageListener.Verify(x => x.CreateSessionAsync(It.IsAny<CancellationToken>()), Times.Once());
_messageListener.Verify(x => x.GetNextMessageAsync(It.IsAny<CancellationToken>()), Times.AtLeast(2));
_messageListener.Verify(x => x.DeleteMessageAsync(It.IsAny<TaskAgentMessage>()), Times.AtLeast(2));
_messageListener.Verify(x => x.DeleteSessionAsync(), Times.Once());
_credentialManager.Verify(x => x.LoadCredentials(true), Times.Exactly(2));
Assert.False(hc.AllowAuthMigration);
}
}
}
}