Compare commits

...

5 Commits

Author SHA1 Message Date
Patrick Ellis
40f813e0fb enable #nullable in BrokerServer.cs and rename some methods 2024-01-04 21:21:16 +00:00
Patrick Ellis
31436b3c38 WIP add test 2024-01-03 22:48:31 +00:00
Yang Cao
4ba8bcd9ab Merge branch 'main' into continue_results_upload 2023-09-28 15:37:23 -04:00
Yang Cao
9c81a7d682 No need to send telemtry to Actions server in Results only case 2023-09-27 15:08:42 -04:00
Yang Cao
75a11dac1b Do not give us if Results is powering logs 2023-09-27 09:35:28 -04:00
8 changed files with 160 additions and 50 deletions

View File

@@ -21,4 +21,4 @@
},
"postCreateCommand": "dotnet restore src/Test && dotnet restore src/Runner.PluginHost",
"remoteUser": "vscode"
}
}

View File

@@ -1,13 +1,12 @@
using System;
using System.Collections.Generic;
#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
@@ -15,40 +14,35 @@ namespace GitHub.Runner.Common
[ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<BrokerSession> CreateSessionAsync(Uri serverUrl, VssCredentials credentials, CancellationToken token);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
}
public sealed class BrokerServer : RunnerService, IBrokerServer
{
private bool _hasConnection;
private Uri _brokerUri;
private RawConnection _connection;
private BrokerHttpClient _brokerHttpClient;
private RawConnection? _connection;
private BrokerHttpClient? _brokerHttpClient;
private BrokerSession? _session;
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
public async Task<BrokerSession> CreateSessionAsync(Uri serverUri, VssCredentials credentials, CancellationToken cancellationToken)
{
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
_hasConnection = true;
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(cancellationToken);
return await RetryRequest(
async () => _session = await _brokerHttpClient.CreateSessionAsync(),
cancellationToken
);
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{
CheckConnection();
if (_connection is null || _session is null || _brokerHttpClient is null)
{
throw new InvalidOperationException($"SetConnection");
}
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);
async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken);
return jobMessage;
}

View File

@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
TaskCompletionSource<int> JobRecordUpdated { get; }
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync();
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false);
void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
@@ -96,14 +96,14 @@ namespace GitHub.Runner.Common
_resultsServer = hostContext.GetService<IResultsServer>();
}
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultServiceOnly = false)
public void Start(Pipelines.AgentJobRequestMessage jobRequest, bool resultsServiceOnly = false)
{
Trace.Entering();
_resultsServiceOnly = resultServiceOnly;
_resultsServiceOnly = resultsServiceOnly;
var serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (!resultServiceOnly)
if (!resultsServiceOnly)
{
_jobServer.InitializeWebsocketClient(serviceEndPoint);
}
@@ -119,7 +119,7 @@ namespace GitHub.Runner.Common
{
string liveConsoleFeedUrl = null;
Trace.Info("Initializing results client");
if (resultServiceOnly
if (resultsServiceOnly
&& serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl)
&& !string.IsNullOrEmpty(feedStreamUrl))
{
@@ -541,10 +541,12 @@ namespace GitHub.Runner.Common
Trace.Error(ex);
errorCount++;
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results
_resultsClientInitiated = false;
SendResultsTelemetry(ex);
// If we hit any exceptions uploading to Results, let's skip any additional uploads to Results unless Results is serving logs
if (!_resultsServiceOnly)
{
_resultsClientInitiated = false;
SendResultsTelemetry(ex);
}
}
}
@@ -660,9 +662,11 @@ namespace GitHub.Runner.Common
{
Trace.Info("Catch exception during update steps, skip update Results.");
Trace.Error(e);
_resultsClientInitiated = false;
SendResultsTelemetry(e);
if (!_resultsServiceOnly)
{
_resultsClientInitiated = false;
SendResultsTelemetry(e);
}
}
if (_bufferedRetryRecords.Remove(update.TimelineId))

View File

@@ -1,10 +1,5 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
@@ -12,8 +7,6 @@ using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener
{
@@ -26,6 +19,8 @@ namespace GitHub.Runner.Listener
private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer;
public string _sessionId;
public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
@@ -36,7 +31,7 @@ namespace GitHub.Runner.Listener
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{
await RefreshBrokerConnection();
await RefreshBrokerSession(token);
return await Task.FromResult(true);
}
@@ -137,8 +132,8 @@ namespace GitHub.Runner.Listener
encounteringError = true;
}
// re-create VssConnection before next retry
await RefreshBrokerConnection();
// re-create session before next retry
await RefreshBrokerSession(token);
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token);
@@ -191,7 +186,7 @@ namespace GitHub.Runner.Listener
}
}
private async Task RefreshBrokerConnection()
private async Task RefreshBrokerSession(CancellationToken ct)
{
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
@@ -203,7 +198,8 @@ namespace GitHub.Runner.Listener
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds);
var session = await _brokerServer.CreateSessionAsync(new Uri(_settings.ServerUrlV2), creds, ct);
_sessionId = session.id;
}
}
}

View File

@@ -72,7 +72,7 @@ namespace GitHub.Runner.Worker
launchServer.InitializeLaunchClient(new Uri(launchReceiverEndpoint), accessToken);
}
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
_jobServerQueue.Start(message, resultServiceOnly: true);
_jobServerQueue.Start(message, resultsServiceOnly: true);
}
else
{

View File

@@ -56,7 +56,37 @@ namespace GitHub.Actions.RunService.WebApi
{
}
public async Task<BrokerSession> CreateSessionAsync(
CancellationToken cancellationToken = default
)
{
var requestUri = new Uri(Client.BaseAddress, "session");
var result = await SendAsync<BrokerSession>(
new HttpMethod("POST"),
requestUri: requestUri,
cancellationToken: cancellationToken
);
if (result.IsSuccess)
{
return result.Value;
}
if (result.StatusCode == HttpStatusCode.Forbidden)
{
throw new AccessDeniedException(result.Error);
}
if (result.StatusCode == HttpStatusCode.Conflict)
{
throw new TaskAgentSessionConflictException(result.Error);
}
throw new Exception($"Failed to get job message: {result.Error}");
}
public async Task<TaskAgentMessage> GetRunnerMessageAsync(
string sessionID,
string runnerVersion,
TaskAgentStatus? status,
CancellationToken cancellationToken = default
@@ -66,6 +96,10 @@ namespace GitHub.Actions.RunService.WebApi
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionID != null)
{
queryParams.Add("sessionID", runnerVersion);
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());

View File

@@ -0,0 +1,9 @@
using System;
namespace GitHub.Actions.RunService.WebApi
{
public sealed class BrokerSession
{
public string id;
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.Runner.Listener;
using GitHub.Runner.Listener.Configuration;
using GitHub.Services.Common;
using Moq;
using Xunit;
namespace GitHub.Runner.Common.Tests.Listener
{
public sealed class BrokerMessageListenerL0
{
private readonly RunnerSettings _settings;
private readonly Mock<IConfigurationManager> _config;
private readonly Mock<IBrokerServer> _brokerServer;
private readonly Mock<ICredentialManager> _credMgr;
public BrokerMessageListenerL0()
{
_settings = new RunnerSettings { AgentId = 1, AgentName = "myagent", PoolId = 123, PoolName = "default", ServerUrlV2 = "http://myserver", WorkFolder = "_work" };
_config = new Mock<IConfigurationManager>();
_config.Setup(x => x.LoadSettings()).Returns(_settings);
_brokerServer = new Mock<IBrokerServer>();
_credMgr = new Mock<ICredentialManager>();
_credMgr.Setup(x => x.LoadCredentials()).Returns(new VssCredentials());
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void CreatesSession()
{
using TestHostContext tc = CreateTestContext();
using var tokenSource = new CancellationTokenSource();
// Arrange
_brokerServer
.Setup(
x => x.CreateSessionAsync(
new Uri(_settings.ServerUrlV2),
It.Is<VssCredentials>(y => y != null),
tokenSource.Token
)
)
.Returns(
Task.FromResult(
new BrokerSession { id = "my-phony-session-id" }
)
);
BrokerMessageListener listener = new();
listener.Initialize(tc);
// Act
bool result = await listener.CreateSessionAsync(tokenSource.Token);
// Assert
Assert.True(result);
Assert.Equal("my-phony-session-id", listener._sessionId);
}
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
{
TestHostContext tc = new(this, testName);
tc.SetSingleton<IConfigurationManager>(_config.Object);
tc.SetSingleton<IBrokerServer>(_brokerServer.Object);
tc.SetSingleton<ICredentialManager>(_credMgr.Object);
return tc;
}
}
}