From 40f813e0fb2a3b500858c907723f3efabc9d979b Mon Sep 17 00:00:00 2001 From: Patrick Ellis <319655+pje@users.noreply.github.com> Date: Thu, 4 Jan 2024 21:21:16 +0000 Subject: [PATCH] enable #nullable in BrokerServer.cs and rename some methods --- src/Runner.Common/BrokerServer.cs | 46 +++++++------------ src/Runner.Listener/BrokerMessageListener.cs | 19 +++----- src/Sdk/WebApi/WebApi/BrokerHttpClient.cs | 5 ++ .../L0/Listener/BrokerMessageListenerL0.cs | 22 +++++++-- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index d6fefdc17..cfcee1d16 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -1,13 +1,12 @@ -using System; -using System.Collections.Generic; +#nullable enable + +using System; using System.Threading; using System.Threading.Tasks; using GitHub.Actions.RunService.WebApi; -using GitHub.DistributedTask.Pipelines; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Sdk; using GitHub.Services.Common; -using Sdk.RSWebApi.Contracts; using Sdk.WebApi.WebApi.RawClient; namespace GitHub.Runner.Common @@ -15,44 +14,33 @@ namespace GitHub.Runner.Common [ServiceLocator(Default = typeof(BrokerServer))] public interface IBrokerServer : IRunnerService { - Task ConnectAsync(Uri serverUrl, VssCredentials credentials); + Task CreateSessionAsync(Uri serverUrl, VssCredentials credentials, CancellationToken token); Task GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version); } public sealed class BrokerServer : RunnerService, IBrokerServer { - private bool _hasConnection; - private Uri _brokerUri; - private RawConnection _connection; - private BrokerHttpClient _brokerHttpClient; - private bool _hasSession; - private BrokerSession _session; + private RawConnection? _connection; + private BrokerHttpClient? _brokerHttpClient; + private BrokerSession? _session; - public async Task ConnectAsync(Uri serverUri, VssCredentials credentials) + public async Task CreateSessionAsync(Uri serverUri, VssCredentials credentials, CancellationToken cancellationToken) { - _brokerUri = serverUri; - _connection = VssUtil.CreateRawConnection(serverUri, credentials); - _brokerHttpClient = await _connection.GetClientAsync(); - _session = await _brokerHttpClient.CreateSessionAsync(); - _hasConnection = true; - _hasSession = true; - - return _session; - } - - private void CheckConnection() - { - if (!_hasConnection || !_hasSession) - { - throw new InvalidOperationException($"SetConnection"); - } + _brokerHttpClient = await _connection.GetClientAsync(cancellationToken); + return await RetryRequest( + async () => _session = await _brokerHttpClient.CreateSessionAsync(), + cancellationToken + ); } public Task GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version) { - CheckConnection(); + if (_connection is null || _session is null || _brokerHttpClient is null) + { + throw new InvalidOperationException($"SetConnection"); + } var jobMessage = RetryRequest( async () => await _brokerHttpClient.GetRunnerMessageAsync(_session.id, version, status, cancellationToken), cancellationToken); diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index d31f1ea10..d1be31f88 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -1,10 +1,5 @@ using System; -using System.Collections.Generic; using System.Diagnostics; -using System.IO; -using System.Runtime.InteropServices; -using System.Security.Cryptography; -using System.Text; using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; @@ -12,8 +7,6 @@ using GitHub.Runner.Common; using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Sdk; using GitHub.Services.Common; -using GitHub.Runner.Common.Util; -using GitHub.Services.OAuth; namespace GitHub.Runner.Listener { @@ -38,7 +31,7 @@ namespace GitHub.Runner.Listener public async Task CreateSessionAsync(CancellationToken token) { - await RefreshBrokerConnection(); + await RefreshBrokerSession(token); return await Task.FromResult(true); } @@ -139,8 +132,8 @@ namespace GitHub.Runner.Listener encounteringError = true; } - // re-create VssConnection before next retry - await RefreshBrokerConnection(); + // re-create session before next retry + await RefreshBrokerSession(token); Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds); await HostContext.Delay(_getNextMessageRetryInterval, token); @@ -193,7 +186,7 @@ namespace GitHub.Runner.Listener } } - private async Task RefreshBrokerConnection() + private async Task RefreshBrokerSession(CancellationToken ct) { var configManager = HostContext.GetService(); _settings = configManager.LoadSettings(); @@ -205,8 +198,8 @@ namespace GitHub.Runner.Listener var credMgr = HostContext.GetService(); VssCredentials creds = credMgr.LoadCredentials(); - var sessionResponse = await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds); - _sessionId = sessionResponse.id; + var session = await _brokerServer.CreateSessionAsync(new Uri(_settings.ServerUrlV2), creds, ct); + _sessionId = session.id; } } } diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index 8d079dd95..8cb74b066 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -78,6 +78,11 @@ namespace GitHub.Actions.RunService.WebApi throw new AccessDeniedException(result.Error); } + if (result.StatusCode == HttpStatusCode.Conflict) + { + throw new TaskAgentSessionConflictException(result.Error); + } + throw new Exception($"Failed to get job message: {result.Error}"); } public async Task GetRunnerMessageAsync( diff --git a/src/Test/L0/Listener/BrokerMessageListenerL0.cs b/src/Test/L0/Listener/BrokerMessageListenerL0.cs index 12e92d2c4..bebbe1338 100644 --- a/src/Test/L0/Listener/BrokerMessageListenerL0.cs +++ b/src/Test/L0/Listener/BrokerMessageListenerL0.cs @@ -31,20 +31,32 @@ namespace GitHub.Runner.Common.Tests.Listener [Fact] [Trait("Level", "L0")] [Trait("Category", "Runner")] - public async void CreatesSessionAsync() + public async void CreatesSession() { using TestHostContext tc = CreateTestContext(); using var tokenSource = new CancellationTokenSource(); - // Arrange. - _brokerServer.Setup(x => x.ConnectAsync(new Uri(_settings.ServerUrlV2), It.Is(y => y != null))).Returns(Task.FromResult(new BrokerSession { id = "my-phony-session-id" })); + // Arrange + _brokerServer + .Setup( + x => x.CreateSessionAsync( + new Uri(_settings.ServerUrlV2), + It.Is(y => y != null), + tokenSource.Token + ) + ) + .Returns( + Task.FromResult( + new BrokerSession { id = "my-phony-session-id" } + ) + ); BrokerMessageListener listener = new(); listener.Initialize(tc); - // Act. + // Act bool result = await listener.CreateSessionAsync(tokenSource.Token); - // Assert. + // Assert Assert.True(result); Assert.Equal("my-phony-session-id", listener._sessionId); }