enable #nullable in BrokerServer.cs and rename some methods

This commit is contained in:
Patrick Ellis
2024-01-04 21:21:16 +00:00
committed by GitHub
parent 31436b3c38
commit 40f813e0fb
4 changed files with 45 additions and 47 deletions

View File

@@ -1,13 +1,12 @@
using System; #nullable enable
using System.Collections.Generic;
using System;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi; using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient; using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common namespace GitHub.Runner.Common
@@ -15,44 +14,33 @@ namespace GitHub.Runner.Common
[ServiceLocator(Default = typeof(BrokerServer))] [ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService public interface IBrokerServer : IRunnerService
{ {
Task<BrokerSession> ConnectAsync(Uri serverUrl, VssCredentials credentials); Task<BrokerSession> CreateSessionAsync(Uri serverUrl, VssCredentials credentials, CancellationToken token);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
} }
public sealed class BrokerServer : RunnerService, IBrokerServer public sealed class BrokerServer : RunnerService, IBrokerServer
{ {
private bool _hasConnection; private RawConnection? _connection;
private Uri _brokerUri; private BrokerHttpClient? _brokerHttpClient;
private RawConnection _connection; private BrokerSession? _session;
private BrokerHttpClient _brokerHttpClient;
private bool _hasSession;
private BrokerSession _session;
public async Task<BrokerSession> ConnectAsync(Uri serverUri, VssCredentials credentials) public async Task<BrokerSession> CreateSessionAsync(Uri serverUri, VssCredentials credentials, CancellationToken cancellationToken)
{ {
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials); _connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(); _brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>(cancellationToken);
_session = await _brokerHttpClient.CreateSessionAsync(); return await RetryRequest(
_hasConnection = true; async () => _session = await _brokerHttpClient.CreateSessionAsync(),
_hasSession = true; cancellationToken
);
return _session;
}
private void CheckConnection()
{
if (!_hasConnection || !_hasSession)
{
throw new InvalidOperationException($"SetConnection");
}
} }
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{ {
CheckConnection(); if (_connection is null || _session is null || _brokerHttpClient is null)
{
throw new InvalidOperationException($"SetConnection");
}
var jobMessage = RetryRequest<TaskAgentMessage>( var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken); async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken);

View File

@@ -1,10 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
@@ -12,8 +7,6 @@ using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener namespace GitHub.Runner.Listener
{ {
@@ -38,7 +31,7 @@ namespace GitHub.Runner.Listener
public async Task<Boolean> CreateSessionAsync(CancellationToken token) public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{ {
await RefreshBrokerConnection(); await RefreshBrokerSession(token);
return await Task.FromResult(true); return await Task.FromResult(true);
} }
@@ -139,8 +132,8 @@ namespace GitHub.Runner.Listener
encounteringError = true; encounteringError = true;
} }
// re-create VssConnection before next retry // re-create session before next retry
await RefreshBrokerConnection(); await RefreshBrokerSession(token);
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token); await HostContext.Delay(_getNextMessageRetryInterval, token);
@@ -193,7 +186,7 @@ namespace GitHub.Runner.Listener
} }
} }
private async Task RefreshBrokerConnection() private async Task RefreshBrokerSession(CancellationToken ct)
{ {
var configManager = HostContext.GetService<IConfigurationManager>(); var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings(); _settings = configManager.LoadSettings();
@@ -205,8 +198,8 @@ namespace GitHub.Runner.Listener
var credMgr = HostContext.GetService<ICredentialManager>(); var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials(); VssCredentials creds = credMgr.LoadCredentials();
var sessionResponse = await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds); var session = await _brokerServer.CreateSessionAsync(new Uri(_settings.ServerUrlV2), creds, ct);
_sessionId = sessionResponse.id; _sessionId = session.id;
} }
} }
} }

View File

@@ -78,6 +78,11 @@ namespace GitHub.Actions.RunService.WebApi
throw new AccessDeniedException(result.Error); throw new AccessDeniedException(result.Error);
} }
if (result.StatusCode == HttpStatusCode.Conflict)
{
throw new TaskAgentSessionConflictException(result.Error);
}
throw new Exception($"Failed to get job message: {result.Error}"); throw new Exception($"Failed to get job message: {result.Error}");
} }
public async Task<TaskAgentMessage> GetRunnerMessageAsync( public async Task<TaskAgentMessage> GetRunnerMessageAsync(

View File

@@ -31,20 +31,32 @@ namespace GitHub.Runner.Common.Tests.Listener
[Fact] [Fact]
[Trait("Level", "L0")] [Trait("Level", "L0")]
[Trait("Category", "Runner")] [Trait("Category", "Runner")]
public async void CreatesSessionAsync() public async void CreatesSession()
{ {
using TestHostContext tc = CreateTestContext(); using TestHostContext tc = CreateTestContext();
using var tokenSource = new CancellationTokenSource(); using var tokenSource = new CancellationTokenSource();
// Arrange. // Arrange
_brokerServer.Setup(x => x.ConnectAsync(new Uri(_settings.ServerUrlV2), It.Is<VssCredentials>(y => y != null))).Returns(Task.FromResult(new BrokerSession { id = "my-phony-session-id" })); _brokerServer
.Setup(
x => x.CreateSessionAsync(
new Uri(_settings.ServerUrlV2),
It.Is<VssCredentials>(y => y != null),
tokenSource.Token
)
)
.Returns(
Task.FromResult(
new BrokerSession { id = "my-phony-session-id" }
)
);
BrokerMessageListener listener = new(); BrokerMessageListener listener = new();
listener.Initialize(tc); listener.Initialize(tc);
// Act. // Act
bool result = await listener.CreateSessionAsync(tokenSource.Token); bool result = await listener.CreateSessionAsync(tokenSource.Token);
// Assert. // Assert
Assert.True(result); Assert.True(result);
Assert.Equal("my-phony-session-id", listener._sessionId); Assert.Equal("my-phony-session-id", listener._sessionId);
} }