Compare commits

...

5 Commits

Author SHA1 Message Date
Patrick Ellis
40f813e0fb enable #nullable in BrokerServer.cs and rename some methods 2024-01-04 21:21:16 +00:00
Patrick Ellis
31436b3c38 WIP add test 2024-01-03 22:48:31 +00:00
Yang Cao
4ba8bcd9ab Merge branch 'main' into continue_results_upload 2023-09-28 15:37:23 -04:00
Yang Cao
9c81a7d682 No need to send telemtry to Actions server in Results only case 2023-09-27 15:08:42 -04:00
Yang Cao
75a11dac1b Do not give us if Results is powering logs 2023-09-27 09:35:28 -04:00
8 changed files with 160 additions and 50 deletions

View File

@@ -1,13 +1,12 @@
using System; #nullable enable
using System.Collections.Generic;
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi; using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient; using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common namespace GitHub.Runner.Common
@@ -15,40 +14,35 @@ namespace GitHub.Runner.Common
[ServiceLocator(Default = typeof(BrokerServer))] [ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService public interface IBrokerServer : IRunnerService
{ {
Task ConnectAsync(Uri serverUrl, VssCredentials credentials); Task<BrokerSession> CreateSessionAsync(Uri serverUrl, VssCredentials credentials, CancellationToken token);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
} }
public sealed class BrokerServer : RunnerService, IBrokerServer public sealed class BrokerServer : RunnerService, IBrokerServer
{ {
private bool _hasConnection; private RawConnection? _connection;
private Uri _brokerUri; private BrokerHttpClient? _brokerHttpClient;
private RawConnection _connection; private BrokerSession? _session;
private BrokerHttpClient _brokerHttpClient;
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials) public async Task<BrokerSession> CreateSessionAsync(Uri serverUri, VssCredentials credentials, CancellationToken cancellationToken)
{ {
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials); _connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(); _brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(cancellationToken);
_hasConnection = true; return await RetryRequest(
} async () => _session = await _brokerHttpClient.CreateSessionAsync(),
cancellationToken
private void CheckConnection() );
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
} }
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{ {
CheckConnection(); if (_connection is null || _session is null || _brokerHttpClient is null)
{
throw new InvalidOperationException($"SetConnection");
}
var jobMessage = RetryRequest<TaskAgentMessage>( var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken); async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken);
return jobMessage; return jobMessage;
} }

View File

@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
TaskCompletionSource<int> JobRecordUpdated { get; } TaskCompletionSource<int> JobRecordUpdated { get; }
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling; event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync(); Task ShutdownAsync();
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false); void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines); void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
@@ -96,14 +96,14 @@ namespace GitHub.Runner.Common
_resultsServer = hostContext.GetService<IResultsServer>(); _resultsServer = hostContext.GetService<IResultsServer>();
} }
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false) public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false)
{ {
Trace.Entering(); Trace.Entering();
_resultsServiceOnly = resultServiceOnly; _resultsServiceOnly = resultsServiceOnly;
var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (!resultServiceOnly) if (!resultsServiceOnly)
{ {
_jobServer.InitializeWebsocketClient(serviceEndPoint); _jobServer.InitializeWebsocketClient(serviceEndPoint);
} }
@@ -119,7 +119,7 @@ namespace GitHub.Runner.Common
{ {
string liveConsoleFeedUrl = null; string liveConsoleFeedUrl = null;
Trace.Info("Initializing results client"); Trace.Info("Initializing results client");
if (resultServiceOnly if (resultsServiceOnly
&& serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl)
&& !string.IsNullOrEmpty(feedStreamUrl)) && !string.IsNullOrEmpty(feedStreamUrl))
{ {
@@ -541,10 +541,12 @@ namespace GitHub.Runner.Common
Trace.Error(ex); Trace.Error(ex);
errorCount++; errorCount++;
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results // If we hit any exceptions uploading to Results, let's skip any additional uploads to Results unless Results is serving logs
_resultsClientInitiated = false; if (!_resultsServiceOnly)
{
SendResultsTelemetry(ex); _resultsClientInitiated = false;
SendResultsTelemetry(ex);
}
} }
} }
@@ -660,9 +662,11 @@ namespace GitHub.Runner.Common
{ {
Trace.Info("Catch exception during update steps, skip update Results."); Trace.Info("Catch exception during update steps, skip update Results.");
Trace.Error(e); Trace.Error(e);
_resultsClientInitiated = false; if (!_resultsServiceOnly)
{
SendResultsTelemetry(e); _resultsClientInitiated = false;
SendResultsTelemetry(e);
}
} }
if (_bufferedRetryRecords.Remove(update.TimelineId)) if (_bufferedRetryRecords.Remove(update.TimelineId))

View File

@@ -1,10 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
@@ -12,8 +7,6 @@ using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener namespace GitHub.Runner.Listener
{ {
@@ -26,6 +19,8 @@ namespace GitHub.Runner.Listener
private CancellationTokenSource _getMessagesTokenSource; private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer; private IBrokerServer _brokerServer;
public string _sessionId;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
@@ -36,7 +31,7 @@ namespace GitHub.Runner.Listener
public async Task<Boolean> CreateSessionAsync(CancellationToken token) public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{ {
await RefreshBrokerConnection(); await RefreshBrokerSession(token);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@@ -137,8 +132,8 @@ namespace GitHub.Runner.Listener
encounteringError = true; encounteringError = true;
} }
// re-create VssConnection before next retry // re-create session before next retry
await RefreshBrokerConnection(); await RefreshBrokerSession(token);
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token); await HostContext.Delay(_getNextMessageRetryInterval, token);
@@ -191,7 +186,7 @@ namespace GitHub.Runner.Listener
} }
} }
private async Task RefreshBrokerConnection() private async Task RefreshBrokerSession(CancellationToken ct)
{ {
var configManager = HostContext.GetService<IConfigurationManager>(); var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings(); _settings = configManager.LoadSettings();
@@ -203,7 +198,8 @@ namespace GitHub.Runner.Listener
var credMgr = HostContext.GetService<ICredentialManager>(); var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials(); VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds); var session = await _brokerServer.CreateSessionAsync(new Uri(_settings.ServerUrlV2), creds, ct);
_sessionId = session.id;
} }
} }
} }

View File

@@ -72,7 +72,7 @@ namespace GitHub.Runner.Worker
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken); launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
} }
_jobServerQueue = HostContext.GetService<IJobServerQueue>(); _jobServerQueue = HostContext.GetService<IJobServerQueue>();
_jobServerQueue.Start(message, resultServiceOnly: true); _jobServerQueue.Start(message, resultsServiceOnly: true);
} }
else else
{ {

View File

@@ -56,7 +56,37 @@ namespace GitHub.Actions.RunService.WebApi
{ {
} }
public async Task<BrokerSession> CreateSessionAsync(
CancellationToken cancellationToken = default
)
{
var requestUri = new Uri(Client.BaseAddress, "session");
var result = await SendAsync<BrokerSession>(
new HttpMethod("POST"),
requestUri: requestUri,
cancellationToken: cancellationToken
);
if (result.IsSuccess)
{
return result.Value;
}
if (result.StatusCode == HttpStatusCode.Forbidden)
{
throw new AccessDeniedException(result.Error);
}
if (result.StatusCode == HttpStatusCode.Conflict)
{
throw new TaskAgentSessionConflictException(result.Error);
}
throw new Exception($"Failed to get job message: {result.Error}");
}
public async Task<TaskAgentMessage> GetRunnerMessageAsync( public async Task<TaskAgentMessage> GetRunnerMessageAsync(
string sessionID,
string runnerVersion, string runnerVersion,
TaskAgentStatus? status, TaskAgentStatus? status,
CancellationToken cancellationToken = default CancellationToken cancellationToken = default
@@ -66,6 +96,10 @@ namespace GitHub.Actions.RunService.WebApi
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>(); List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionID != null)
{
queryParams.Add("sessionID", runnerVersion);
}
if (status != null) if (status != null)
{ {
queryParams.Add("status", status.Value.ToString()); queryParams.Add("status", status.Value.ToString());

View File

@@ -0,0 +1,9 @@
using System;
namespace GitHub.Actions.RunService.WebApi
{
public sealed class BrokerSession
{
public string id;
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.Runner.Listener;
using GitHub.Runner.Listener.Configuration;
using GitHub.Services.Common;
using Moq;
using Xunit;
namespace GitHub.Runner.Common.Tests.Listener
{
public sealed class BrokerMessageListenerL0
{
private readonly RunnerSettings _settings;
private readonly Mock<IConfigurationManager> _config;
private readonly Mock<IBrokerServer> _brokerServer;
private readonly Mock<ICredentialManager> _credMgr;
public BrokerMessageListenerL0()
{
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrlV2 = "http://myserver", WorkFolder = "_work" };
_config = new Mock<IConfigurationManager>();
_config.Setup(x => x.LoadSettings()).Returns(_settings);
_brokerServer = new Mock<IBrokerServer>();
_credMgr = new Mock<ICredentialManager>();
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void CreatesSession()
{
using TestHostContext tc = CreateTestContext();
using var tokenSource = new CancellationTokenSource();
// Arrange
_brokerServer
.Setup(
x => x.CreateSessionAsync(
new Uri(_settings.ServerUrlV2),
It.Is<VssCredentials>(y => y != null),
tokenSource.Token
)
)
.Returns(
Task.FromResult(
new BrokerSession { id = "my-phony-session-id" }
)
);
BrokerMessageListener listener = new();
listener.Initialize(tc);
// Act
bool result = await listener.CreateSessionAsync(tokenSource.Token);
// Assert
Assert.True(result);
Assert.Equal("my-phony-session-id", listener._sessionId);
}
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
{
TestHostContext tc = new(this, testName);
tc.SetSingleton<IConfigurationManager>(_config.Object);
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
return tc;
}
}
}