mirror of
https://github.com/actions/runner.git
synced 2025-12-10 12:36:23 +00:00
Compare commits
5 Commits
b39c237989
...
pje/broker
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40f813e0fb | ||
|
|
31436b3c38 | ||
|
|
4ba8bcd9ab | ||
|
|
9c81a7d682 | ||
|
|
75a11dac1b |
@@ -1,13 +1,12 @@
|
|||||||
using System;
|
#nullable enable
|
||||||
using System.Collections.Generic;
|
|
||||||
|
using System;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using GitHub.Actions.RunService.WebApi;
|
using GitHub.Actions.RunService.WebApi;
|
||||||
using GitHub.DistributedTask.Pipelines;
|
|
||||||
using GitHub.DistributedTask.WebApi;
|
using GitHub.DistributedTask.WebApi;
|
||||||
using GitHub.Runner.Sdk;
|
using GitHub.Runner.Sdk;
|
||||||
using GitHub.Services.Common;
|
using GitHub.Services.Common;
|
||||||
using Sdk.RSWebApi.Contracts;
|
|
||||||
using Sdk.WebApi.WebApi.RawClient;
|
using Sdk.WebApi.WebApi.RawClient;
|
||||||
|
|
||||||
namespace GitHub.Runner.Common
|
namespace GitHub.Runner.Common
|
||||||
@@ -15,40 +14,35 @@ namespace GitHub.Runner.Common
|
|||||||
[ServiceLocator(Default = typeof(BrokerServer))]
|
[ServiceLocator(Default = typeof(BrokerServer))]
|
||||||
public interface IBrokerServer : IRunnerService
|
public interface IBrokerServer : IRunnerService
|
||||||
{
|
{
|
||||||
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
|
Task<BrokerSession> CreateSessionAsync(Uri serverUrl, VssCredentials credentials, CancellationToken token);
|
||||||
|
|
||||||
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
|
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class BrokerServer : RunnerService, IBrokerServer
|
public sealed class BrokerServer : RunnerService, IBrokerServer
|
||||||
{
|
{
|
||||||
private bool _hasConnection;
|
private RawConnection? _connection;
|
||||||
private Uri _brokerUri;
|
private BrokerHttpClient? _brokerHttpClient;
|
||||||
private RawConnection _connection;
|
private BrokerSession? _session;
|
||||||
private BrokerHttpClient _brokerHttpClient;
|
|
||||||
|
|
||||||
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
|
public async Task<BrokerSession> CreateSessionAsync(Uri serverUri, VssCredentials credentials, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
_brokerUri = serverUri;
|
|
||||||
|
|
||||||
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
|
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
|
||||||
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
|
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(cancellationToken);
|
||||||
_hasConnection = true;
|
return await RetryRequest(
|
||||||
}
|
async () => _session = await _brokerHttpClient.CreateSessionAsync(),
|
||||||
|
cancellationToken
|
||||||
private void CheckConnection()
|
);
|
||||||
{
|
|
||||||
if (!_hasConnection)
|
|
||||||
{
|
|
||||||
throw new InvalidOperationException($"SetConnection");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
|
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
|
||||||
{
|
{
|
||||||
CheckConnection();
|
if (_connection is null || _session is null || _brokerHttpClient is null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException($"SetConnection");
|
||||||
|
}
|
||||||
var jobMessage = RetryRequest<TaskAgentMessage>(
|
var jobMessage = RetryRequest<TaskAgentMessage>(
|
||||||
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);
|
async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken);
|
||||||
|
|
||||||
return jobMessage;
|
return jobMessage;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
|
|||||||
TaskCompletionSource<int> JobRecordUpdated { get; }
|
TaskCompletionSource<int> JobRecordUpdated { get; }
|
||||||
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
|
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
|
||||||
Task ShutdownAsync();
|
Task ShutdownAsync();
|
||||||
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false);
|
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false);
|
||||||
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
|
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
|
||||||
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
|
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
|
||||||
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
|
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
|
||||||
@@ -96,14 +96,14 @@ namespace GitHub.Runner.Common
|
|||||||
_resultsServer = hostContext.GetService<IResultsServer>();
|
_resultsServer = hostContext.GetService<IResultsServer>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false)
|
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false)
|
||||||
{
|
{
|
||||||
Trace.Entering();
|
Trace.Entering();
|
||||||
_resultsServiceOnly = resultServiceOnly;
|
_resultsServiceOnly = resultsServiceOnly;
|
||||||
|
|
||||||
var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
|
||||||
|
|
||||||
if (!resultServiceOnly)
|
if (!resultsServiceOnly)
|
||||||
{
|
{
|
||||||
_jobServer.InitializeWebsocketClient(serviceEndPoint);
|
_jobServer.InitializeWebsocketClient(serviceEndPoint);
|
||||||
}
|
}
|
||||||
@@ -119,7 +119,7 @@ namespace GitHub.Runner.Common
|
|||||||
{
|
{
|
||||||
string liveConsoleFeedUrl = null;
|
string liveConsoleFeedUrl = null;
|
||||||
Trace.Info("Initializing results client");
|
Trace.Info("Initializing results client");
|
||||||
if (resultServiceOnly
|
if (resultsServiceOnly
|
||||||
&& serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl)
|
&& serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl)
|
||||||
&& !string.IsNullOrEmpty(feedStreamUrl))
|
&& !string.IsNullOrEmpty(feedStreamUrl))
|
||||||
{
|
{
|
||||||
@@ -541,10 +541,12 @@ namespace GitHub.Runner.Common
|
|||||||
Trace.Error(ex);
|
Trace.Error(ex);
|
||||||
errorCount++;
|
errorCount++;
|
||||||
|
|
||||||
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results
|
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results unless Results is serving logs
|
||||||
_resultsClientInitiated = false;
|
if (!_resultsServiceOnly)
|
||||||
|
{
|
||||||
SendResultsTelemetry(ex);
|
_resultsClientInitiated = false;
|
||||||
|
SendResultsTelemetry(ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -660,9 +662,11 @@ namespace GitHub.Runner.Common
|
|||||||
{
|
{
|
||||||
Trace.Info("Catch exception during update steps, skip update Results.");
|
Trace.Info("Catch exception during update steps, skip update Results.");
|
||||||
Trace.Error(e);
|
Trace.Error(e);
|
||||||
_resultsClientInitiated = false;
|
if (!_resultsServiceOnly)
|
||||||
|
{
|
||||||
SendResultsTelemetry(e);
|
_resultsClientInitiated = false;
|
||||||
|
SendResultsTelemetry(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_bufferedRetryRecords.Remove(update.TimelineId))
|
if (_bufferedRetryRecords.Remove(update.TimelineId))
|
||||||
|
|||||||
@@ -1,10 +1,5 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.IO;
|
|
||||||
using System.Runtime.InteropServices;
|
|
||||||
using System.Security.Cryptography;
|
|
||||||
using System.Text;
|
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using GitHub.DistributedTask.WebApi;
|
using GitHub.DistributedTask.WebApi;
|
||||||
@@ -12,8 +7,6 @@ using GitHub.Runner.Common;
|
|||||||
using GitHub.Runner.Listener.Configuration;
|
using GitHub.Runner.Listener.Configuration;
|
||||||
using GitHub.Runner.Sdk;
|
using GitHub.Runner.Sdk;
|
||||||
using GitHub.Services.Common;
|
using GitHub.Services.Common;
|
||||||
using GitHub.Runner.Common.Util;
|
|
||||||
using GitHub.Services.OAuth;
|
|
||||||
|
|
||||||
namespace GitHub.Runner.Listener
|
namespace GitHub.Runner.Listener
|
||||||
{
|
{
|
||||||
@@ -26,6 +19,8 @@ namespace GitHub.Runner.Listener
|
|||||||
private CancellationTokenSource _getMessagesTokenSource;
|
private CancellationTokenSource _getMessagesTokenSource;
|
||||||
private IBrokerServer _brokerServer;
|
private IBrokerServer _brokerServer;
|
||||||
|
|
||||||
|
public string _sessionId;
|
||||||
|
|
||||||
public override void Initialize(IHostContext hostContext)
|
public override void Initialize(IHostContext hostContext)
|
||||||
{
|
{
|
||||||
base.Initialize(hostContext);
|
base.Initialize(hostContext);
|
||||||
@@ -36,7 +31,7 @@ namespace GitHub.Runner.Listener
|
|||||||
|
|
||||||
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
|
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
|
||||||
{
|
{
|
||||||
await RefreshBrokerConnection();
|
await RefreshBrokerSession(token);
|
||||||
return await Task.FromResult(true);
|
return await Task.FromResult(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,8 +132,8 @@ namespace GitHub.Runner.Listener
|
|||||||
encounteringError = true;
|
encounteringError = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// re-create VssConnection before next retry
|
// re-create session before next retry
|
||||||
await RefreshBrokerConnection();
|
await RefreshBrokerSession(token);
|
||||||
|
|
||||||
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
|
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
|
||||||
await HostContext.Delay(_getNextMessageRetryInterval, token);
|
await HostContext.Delay(_getNextMessageRetryInterval, token);
|
||||||
@@ -191,7 +186,7 @@ namespace GitHub.Runner.Listener
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task RefreshBrokerConnection()
|
private async Task RefreshBrokerSession(CancellationToken ct)
|
||||||
{
|
{
|
||||||
var configManager = HostContext.GetService<IConfigurationManager>();
|
var configManager = HostContext.GetService<IConfigurationManager>();
|
||||||
_settings = configManager.LoadSettings();
|
_settings = configManager.LoadSettings();
|
||||||
@@ -203,7 +198,8 @@ namespace GitHub.Runner.Listener
|
|||||||
|
|
||||||
var credMgr = HostContext.GetService<ICredentialManager>();
|
var credMgr = HostContext.GetService<ICredentialManager>();
|
||||||
VssCredentials creds = credMgr.LoadCredentials();
|
VssCredentials creds = credMgr.LoadCredentials();
|
||||||
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds);
|
var session = await _brokerServer.CreateSessionAsync(new Uri(_settings.ServerUrlV2), creds, ct);
|
||||||
|
_sessionId = session.id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ namespace GitHub.Runner.Worker
|
|||||||
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
|
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
|
||||||
}
|
}
|
||||||
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
|
||||||
_jobServerQueue.Start(message, resultServiceOnly: true);
|
_jobServerQueue.Start(message, resultsServiceOnly: true);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -56,7 +56,37 @@ namespace GitHub.Actions.RunService.WebApi
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async Task<BrokerSession> CreateSessionAsync(
|
||||||
|
CancellationToken cancellationToken = default
|
||||||
|
)
|
||||||
|
{
|
||||||
|
var requestUri = new Uri(Client.BaseAddress, "session");
|
||||||
|
|
||||||
|
var result = await SendAsync<BrokerSession>(
|
||||||
|
new HttpMethod("POST"),
|
||||||
|
requestUri: requestUri,
|
||||||
|
cancellationToken: cancellationToken
|
||||||
|
);
|
||||||
|
|
||||||
|
if (result.IsSuccess)
|
||||||
|
{
|
||||||
|
return result.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.StatusCode == HttpStatusCode.Forbidden)
|
||||||
|
{
|
||||||
|
throw new AccessDeniedException(result.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.StatusCode == HttpStatusCode.Conflict)
|
||||||
|
{
|
||||||
|
throw new TaskAgentSessionConflictException(result.Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Exception($"Failed to get job message: {result.Error}");
|
||||||
|
}
|
||||||
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
|
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
|
||||||
|
string sessionID,
|
||||||
string runnerVersion,
|
string runnerVersion,
|
||||||
TaskAgentStatus? status,
|
TaskAgentStatus? status,
|
||||||
CancellationToken cancellationToken = default
|
CancellationToken cancellationToken = default
|
||||||
@@ -66,6 +96,10 @@ namespace GitHub.Actions.RunService.WebApi
|
|||||||
|
|
||||||
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
|
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
|
||||||
|
|
||||||
|
if (sessionID != null)
|
||||||
|
{
|
||||||
|
queryParams.Add("sessionID", runnerVersion);
|
||||||
|
}
|
||||||
if (status != null)
|
if (status != null)
|
||||||
{
|
{
|
||||||
queryParams.Add("status", status.Value.ToString());
|
queryParams.Add("status", status.Value.ToString());
|
||||||
|
|||||||
9
src/Sdk/WebApi/WebApi/BrokerSession.cs
Normal file
9
src/Sdk/WebApi/WebApi/BrokerSession.cs
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace GitHub.Actions.RunService.WebApi
|
||||||
|
{
|
||||||
|
public sealed class BrokerSession
|
||||||
|
{
|
||||||
|
public string id;
|
||||||
|
}
|
||||||
|
}
|
||||||
73
src/Test/L0/Listener/BrokerMessageListenerL0.cs
Normal file
73
src/Test/L0/Listener/BrokerMessageListenerL0.cs
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
using System;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using GitHub.Actions.RunService.WebApi;
|
||||||
|
using GitHub.Runner.Listener;
|
||||||
|
using GitHub.Runner.Listener.Configuration;
|
||||||
|
using GitHub.Services.Common;
|
||||||
|
using Moq;
|
||||||
|
using Xunit;
|
||||||
|
|
||||||
|
namespace GitHub.Runner.Common.Tests.Listener
|
||||||
|
{
|
||||||
|
public sealed class BrokerMessageListenerL0
|
||||||
|
{
|
||||||
|
private readonly RunnerSettings _settings;
|
||||||
|
private readonly Mock<IConfigurationManager> _config;
|
||||||
|
private readonly Mock<IBrokerServer> _brokerServer;
|
||||||
|
private readonly Mock<ICredentialManager> _credMgr;
|
||||||
|
|
||||||
|
public BrokerMessageListenerL0()
|
||||||
|
{
|
||||||
|
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrlV2 = "http://myserver", WorkFolder = "_work" };
|
||||||
|
_config = new Mock<IConfigurationManager>();
|
||||||
|
_config.Setup(x => x.LoadSettings()).Returns(_settings);
|
||||||
|
_brokerServer = new Mock<IBrokerServer>();
|
||||||
|
_credMgr = new Mock<ICredentialManager>();
|
||||||
|
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
[Trait("Level", "L0")]
|
||||||
|
[Trait("Category", "Runner")]
|
||||||
|
public async void CreatesSession()
|
||||||
|
{
|
||||||
|
using TestHostContext tc = CreateTestContext();
|
||||||
|
using var tokenSource = new CancellationTokenSource();
|
||||||
|
|
||||||
|
// Arrange
|
||||||
|
_brokerServer
|
||||||
|
.Setup(
|
||||||
|
x => x.CreateSessionAsync(
|
||||||
|
new Uri(_settings.ServerUrlV2),
|
||||||
|
It.Is<VssCredentials>(y => y != null),
|
||||||
|
tokenSource.Token
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.Returns(
|
||||||
|
Task.FromResult(
|
||||||
|
new BrokerSession { id = "my-phony-session-id" }
|
||||||
|
)
|
||||||
|
);
|
||||||
|
BrokerMessageListener listener = new();
|
||||||
|
listener.Initialize(tc);
|
||||||
|
|
||||||
|
// Act
|
||||||
|
bool result = await listener.CreateSessionAsync(tokenSource.Token);
|
||||||
|
|
||||||
|
// Assert
|
||||||
|
Assert.True(result);
|
||||||
|
Assert.Equal("my-phony-session-id", listener._sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
|
||||||
|
{
|
||||||
|
TestHostContext tc = new(this, testName);
|
||||||
|
tc.SetSingleton<IConfigurationManager>(_config.Object);
|
||||||
|
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
|
||||||
|
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
|
||||||
|
return tc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user