Compare commits

..

12 Commits

Author SHA1 Message Date
Luke Tomlinson
78330c84a7 Cleanup 2023-03-24 11:57:47 -07:00
Luke Tomlinson
12584aa551 Merge branch 'luketomlinson/runner-runner-admin' into all-the-runner-changes 2023-03-24 10:30:43 -07:00
Luke Tomlinson
567c0f33bd PR feedback 2023-03-24 10:21:13 -07:00
Luke Tomlinson
0f8ed8f7fc Cleanup 2023-03-23 11:10:35 -07:00
Luke Tomlinson
0989ee93d6 . 2023-03-23 08:31:55 -07:00
Luke Tomlinson
c2307b3c92 . 2023-03-23 08:02:21 -07:00
Luke Tomlinson
08479068fe Cleanup 2023-03-23 07:35:50 -07:00
Luke Tomlinson
a49a5bef65 . 2023-03-23 07:00:27 -07:00
Luke Tomlinson
ee19ca253e WIP 2023-03-22 15:20:31 -07:00
Luke Tomlinson
af657acebc WIP 2023-03-22 12:55:15 -07:00
Luke Tomlinson
df885279a1 WIP 2023-03-21 14:27:52 -07:00
Matisse Hack
0e7ca9aedb Fix JIT configurations on Windows (#2497)
* Fix JIT configurations on Windows

* Update src/Runner.Listener/Runner.cs
2023-03-21 15:04:50 -04:00
8 changed files with 430 additions and 43 deletions

View File

@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(BrokerServer))]
public interface IBrokerServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken token, TaskAgentStatus status, string version);
}
public sealed class BrokerServer : RunnerService, IBrokerServer
{
private bool _hasConnection;
private Uri _brokerUri;
private RawConnection _connection;
private BrokerHttpClient _brokerHttpClient;
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_brokerUri = serverUri;
_connection = VssUtil.CreateRawConnection(serverUri, credentials);
_brokerHttpClient = await _connection.GetClientAsync<BrokerHttpClient>();
_hasConnection = true;
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(CancellationToken cancellationToken, TaskAgentStatus status, string version)
{
CheckConnection();
var jobMessage = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(version, status, cancellationToken), cancellationToken);
return jobMessage;
}
}
}

View File

@@ -53,6 +53,9 @@ namespace GitHub.Runner.Common
[DataMember(EmitDefaultValue = false)] [DataMember(EmitDefaultValue = false)]
public bool UseV2Flow { get; set; } public bool UseV2Flow { get; set; }
[DataMember(EmitDefaultValue = false)]
public string ServerUrlV2 { get; set; }
[IgnoreDataMember] [IgnoreDataMember]
public bool IsHostedServer public bool IsHostedServer
{ {

View File

@@ -17,7 +17,7 @@ namespace GitHub.Runner.Common
{ {
Task<List<TaskAgent>> GetRunnersAsync(int runnerGroupId, string githubUrl, string githubToken, string agentName); Task<List<TaskAgent>> GetRunnersAsync(int runnerGroupId, string githubUrl, string githubToken, string agentName);
Task<TaskAgent> AddRunnerAsync(int runnerGroupId, TaskAgent agent, string githubUrl, string githubToken, string publicKey, string hostId); Task<DistributedTask.WebApi.Runner> AddRunnerAsync(int runnerGroupId, TaskAgent agent, string githubUrl, string githubToken, string publicKey);
Task<List<TaskAgentPool>> GetRunnerGroupsAsync(string githubUrl, string githubToken); Task<List<TaskAgentPool>> GetRunnerGroupsAsync(string githubUrl, string githubToken);
string GetGitHubRequestId(HttpResponseHeaders headers); string GetGitHubRequestId(HttpResponseHeaders headers);
@@ -136,7 +136,7 @@ namespace GitHub.Runner.Common
return agentPools?.ToAgentPoolList(); return agentPools?.ToAgentPoolList();
} }
public async Task<TaskAgent> AddRunnerAsync(int runnerGroupId, TaskAgent agent, string githubUrl, string githubToken, string publicKey, string hostId) public async Task<DistributedTask.WebApi.Runner> AddRunnerAsync(int runnerGroupId, TaskAgent agent, string githubUrl, string githubToken, string publicKey)
{ {
var gitHubUrlBuilder = new UriBuilder(githubUrl); var gitHubUrlBuilder = new UriBuilder(githubUrl);
var path = gitHubUrlBuilder.Path.Split('/', '\\', StringSplitOptions.RemoveEmptyEntries); var path = gitHubUrlBuilder.Path.Split('/', '\\', StringSplitOptions.RemoveEmptyEntries);
@@ -159,20 +159,12 @@ namespace GitHub.Runner.Common
{"updates_disabled", agent.DisableUpdate}, {"updates_disabled", agent.DisableUpdate},
{"ephemeral", agent.Ephemeral}, {"ephemeral", agent.Ephemeral},
{"labels", agent.Labels}, {"labels", agent.Labels},
{"public_key", publicKey}, {"public_key", publicKey}
{"host_id", hostId},
}; };
var body = new StringContent(StringUtil.ConvertToJson(bodyObject), null, "application/json"); var body = new StringContent(StringUtil.ConvertToJson(bodyObject), null, "application/json");
var runner = await RetryRequest<DistributedTask.WebApi.Runner>(githubApiUrl, githubToken, RequestType.Post, 3, "Failed to add agent", body); return await RetryRequest<DistributedTask.WebApi.Runner>(githubApiUrl, githubToken, RequestType.Post, 3, "Failed to add agent", body);
agent.Id = runner.Id;
agent.Authorization = new TaskAgentAuthorization()
{
AuthorizationUrl = runner.RunnerAuthorization.AuthorizationUrl,
ClientId = new Guid(runner.RunnerAuthorization.ClientId),
};
return agent;
} }
private async Task<T> RetryRequest<T>(string githubApiUrl, string githubToken, RequestType requestType, int maxRetryAttemptsCount = 5, string errorMessage = null, StringContent body = null) private async Task<T> RetryRequest<T>(string githubApiUrl, string githubToken, RequestType requestType, int maxRetryAttemptsCount = 5, string errorMessage = null, StringContent body = null)

View File

@@ -0,0 +1,209 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common;
using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Runner.Common.Util;
using GitHub.Services.OAuth;
namespace GitHub.Runner.Listener
{
public sealed class BrokerMessageListener : RunnerService, IMessageListener
{
private RunnerSettings _settings;
private ITerminal _term;
private TimeSpan _getNextMessageRetryInterval;
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private IBrokerServer _brokerServer;
public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_term = HostContext.GetService<ITerminal>();
_brokerServer = HostContext.GetService<IBrokerServer>();
}
public async Task<Boolean> CreateSessionAsync(CancellationToken token)
{
await RefreshBrokerConnection();
return await Task.FromResult(true);
}
public async Task DeleteSessionAsync()
{
await Task.CompletedTask;
}
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
bool encounteringError = false;
int continuousError = 0;
Stopwatch heartbeat = new();
heartbeat.Restart();
var maxRetryCount = 10;
while (true)
{
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _brokerServer.GetRunnerMessageAsync(_getMessagesTokenSource.Token, runnerStatus, BuildConstants.RunnerPackage.Version);
if (message == null)
{
continue;
}
return message;
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
throw;
}
catch (TaskAgentAccessTokenExpiredException)
{
Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
throw;
}
catch (AccessDeniedException e) when (e.InnerException is InvalidTaskAgentVersionException)
{
throw;
}
catch (Exception ex)
{
Trace.Error("Catch exception during get next message.");
Trace.Error(ex);
if (!IsGetNextMessageExceptionRetriable(ex))
{
throw;
}
else
{
continuousError++;
//retry after a random backoff to avoid service throttling
//in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
if (continuousError <= 5)
{
// random backoff [15, 30]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
}
else if (continuousError >= maxRetryCount)
{
throw;
}
else
{
// more aggressive backoff [30, 60]
_getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
}
if (!encounteringError)
{
//print error only on the first consecutive error
_term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
encounteringError = true;
}
// re-create VssConnection before next retry
await RefreshBrokerConnection();
Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}
if (message == null)
{
if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
{
Trace.Info($"No message retrieved within last 30 minutes.");
heartbeat.Restart();
}
else
{
Trace.Verbose($"No message retrieved.");
}
continue;
}
Trace.Info($"Message '{message.MessageId}' received.");
}
}
public async Task DeleteMessageAsync(TaskAgentMessage message)
{
await Task.CompletedTask;
}
private bool IsGetNextMessageExceptionRetriable(Exception ex)
{
if (ex is TaskAgentNotFoundException ||
ex is TaskAgentPoolNotFoundException ||
ex is TaskAgentSessionExpiredException ||
ex is AccessDeniedException ||
ex is VssUnauthorizedException)
{
Trace.Info($"Non-retriable exception: {ex.Message}");
return false;
}
else
{
Trace.Info($"Retriable exception: {ex.Message}");
return true;
}
}
private async Task RefreshBrokerConnection()
{
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
if (_settings.ServerUrlV2 == null)
{
throw new InvalidOperationException("ServerUrlV2 is not set");
}
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials creds = credMgr.LoadCredentials();
await _brokerServer.ConnectAsync(new Uri(_settings.ServerUrlV2), creds);
}
}
}

View File

@@ -117,7 +117,6 @@ namespace GitHub.Runner.Listener.Configuration
VssCredentials creds = null; VssCredentials creds = null;
_term.WriteSection("Authentication"); _term.WriteSection("Authentication");
string registerToken = string.Empty; string registerToken = string.Empty;
string hostId = string.Empty;
while (true) while (true)
{ {
// When testing against a dev deployment of Actions Service, set this environment variable // When testing against a dev deployment of Actions Service, set this environment variable
@@ -142,7 +141,6 @@ namespace GitHub.Runner.Listener.Configuration
_term.WriteLine($"Using V2 flow: {runnerSettings.UseV2Flow}"); _term.WriteLine($"Using V2 flow: {runnerSettings.UseV2Flow}");
creds = authResult.ToVssCredentials(); creds = authResult.ToVssCredentials();
Trace.Info("cred retrieved via GitHub auth"); Trace.Info("cred retrieved via GitHub auth");
hostId = GetHostId(authResult.Token);
} }
try try
@@ -302,7 +300,9 @@ namespace GitHub.Runner.Listener.Configuration
{ {
if (runnerSettings.UseV2Flow) if (runnerSettings.UseV2Flow)
{ {
agent = await _dotcomServer.AddRunnerAsync(runnerSettings.PoolId, agent, runnerSettings.GitHubUrl, registerToken, publicKeyXML, hostId); var runner = await _dotcomServer.AddRunnerAsync(runnerSettings.PoolId, agent, runnerSettings.GitHubUrl, registerToken, publicKeyXML);
runner.ApplyToTaskAgent(agent);
runnerSettings.ServerUrlV2 = runner.RunnerAuthorization.ServerUrl;
} }
else else
{ {
@@ -359,24 +359,28 @@ namespace GitHub.Runner.Listener.Configuration
} }
// Testing agent connection, detect any potential connection issue, like local clock skew that cause OAuth token expired. // Testing agent connection, detect any potential connection issue, like local clock skew that cause OAuth token expired.
var credMgr = HostContext.GetService<ICredentialManager>();
VssCredentials credential = credMgr.LoadCredentials(); if (!runnerSettings.UseV2Flow)
try
{ {
await _runnerServer.ConnectAsync(new Uri(runnerSettings.ServerUrl), credential); var credMgr = HostContext.GetService<ICredentialManager>();
// ConnectAsync() hits _apis/connectionData which is an anonymous endpoint VssCredentials credential = credMgr.LoadCredentials();
// Need to hit an authenticate endpoint to trigger OAuth token exchange. try
await _runnerServer.GetAgentPoolsAsync(); {
_term.WriteSuccessMessage("Runner connection is good"); await _runnerServer.ConnectAsync(new Uri(runnerSettings.ServerUrl), credential);
} // ConnectAsync() hits _apis/connectionData which is an anonymous endpoint
catch (VssOAuthTokenRequestException ex) when (ex.Message.Contains("Current server time is")) // Need to hit an authenticate endpoint to trigger OAuth token exchange.
{ await _runnerServer.GetAgentPoolsAsync();
// there are two exception messages server send that indicate clock skew. _term.WriteSuccessMessage("Runner connection is good");
// 1. The bearer token expired on {jwt.ValidTo}. Current server time is {DateTime.UtcNow}. }
// 2. The bearer token is not valid until {jwt.ValidFrom}. Current server time is {DateTime.UtcNow}. catch (VssOAuthTokenRequestException ex) when (ex.Message.Contains("Current server time is"))
Trace.Error("Catch exception during test agent connection."); {
Trace.Error(ex); // there are two exception messages server send that indicate clock skew.
throw new Exception("The local machine's clock may be out of sync with the server time by more than five minutes. Please sync your clock with your domain or internet time and try again."); // 1. The bearer token expired on {jwt.ValidTo}. Current server time is {DateTime.UtcNow}.
// 2. The bearer token is not valid until {jwt.ValidFrom}. Current server time is {DateTime.UtcNow}.
Trace.Error("Catch exception during test agent connection.");
Trace.Error(ex);
throw new Exception("The local machine's clock may be out of sync with the server time by more than five minutes. Please sync your clock with your domain or internet time and try again.");
}
} }
_term.WriteSection("Runner settings"); _term.WriteSection("Runner settings");
@@ -778,12 +782,5 @@ namespace GitHub.Runner.Listener.Configuration
} }
return null; return null;
} }
// Temporary hack for sending legacy host id using v2 flow
private string GetHostId(string accessToken)
{
var claims = JsonWebToken.Create(accessToken).ExtractClaims();
return claims.FirstOrDefault(x => x.Type == "aud").Value.Split(':').LastOrDefault();
}
} }
} }

View File

@@ -4,6 +4,7 @@ using System.IO;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -210,10 +211,16 @@ namespace GitHub.Runner.Listener
foreach (var config in jitConfig) foreach (var config in jitConfig)
{ {
var configFile = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), config.Key); var configFile = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Root), config.Key);
var configContent = Encoding.UTF8.GetString(Convert.FromBase64String(config.Value)); var configContent = Convert.FromBase64String(config.Value);
File.WriteAllText(configFile, configContent, Encoding.UTF8); #if OS_WINDOWS
if (configFile == HostContext.GetConfigFile(WellKnownConfigFile.RSACredentials))
{
configContent = ProtectedData.Protect(configContent, null, DataProtectionScope.LocalMachine);
}
#endif
File.WriteAllBytes(configFile, configContent);
File.SetAttributes(configFile, File.GetAttributes(configFile) | FileAttributes.Hidden); File.SetAttributes(configFile, File.GetAttributes(configFile) | FileAttributes.Hidden);
Trace.Info($"Save {configContent.Length} chars to '{configFile}'."); Trace.Info($"Saved {configContent.Length} bytes to '{configFile}'.");
} }
} }
catch (Exception ex) catch (Exception ex)
@@ -332,13 +339,25 @@ namespace GitHub.Runner.Listener
} }
} }
private IMessageListener GetMesageListener(RunnerSettings settings)
{
if (settings.UseV2Flow)
{
var brokerListener = new BrokerMessageListener();
brokerListener.Initialize(HostContext);
return brokerListener;
}
return HostContext.GetService<IMessageListener>();
}
//create worker manager, create message listener and start listening to the queue //create worker manager, create message listener and start listening to the queue
private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false) private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{ {
try try
{ {
Trace.Info(nameof(RunAsync)); Trace.Info(nameof(RunAsync));
_listener = HostContext.GetService<IMessageListener>(); _listener = GetMesageListener(settings);
if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken)) if (!await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken))
{ {
return Constants.Runner.ReturnCode.TerminatedError; return Constants.Runner.ReturnCode.TerminatedError;

View File

@@ -8,6 +8,9 @@ namespace GitHub.DistributedTask.WebApi
public class Authorization public class Authorization
{ {
/// <summary>
/// The url to refresh tokens
/// </summary>
[JsonProperty("authorization_url")] [JsonProperty("authorization_url")]
public Uri AuthorizationUrl public Uri AuthorizationUrl
{ {
@@ -15,6 +18,19 @@ namespace GitHub.DistributedTask.WebApi
internal set; internal set;
} }
/// <summary>
/// The url to connect to to poll for messages
/// </summary>
[JsonProperty("server_url")]
public string ServerUrl
{
get;
internal set;
}
/// <summary>
/// The client id to use when connecting to the authorization_url
/// </summary>
[JsonProperty("client_id")] [JsonProperty("client_id")]
public string ClientId public string ClientId
{ {
@@ -43,5 +59,16 @@ namespace GitHub.DistributedTask.WebApi
get; get;
internal set; internal set;
} }
public TaskAgent ApplyToTaskAgent(TaskAgent agent)
{
agent.Id = this.Id;
agent.Authorization = new TaskAgentAuthorization()
{
AuthorizationUrl = this.RunnerAuthorization.AuthorizationUrl,
ClientId = new Guid(this.RunnerAuthorization.ClientId)
};
return agent;
}
} }
} }

View File

@@ -0,0 +1,84 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi;
namespace GitHub.Actions.RunService.WebApi
{
public class BrokerHttpClient : RawHttpClientBase
{
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials)
: base(baseUrl, credentials)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: base(baseUrl, credentials, settings)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, settings, handlers)
{
}
public BrokerHttpClient(
Uri baseUrl,
HttpMessageHandler pipeline,
Boolean disposeHandler)
: base(baseUrl, pipeline, disposeHandler)
{
}
public Task<TaskAgentMessage> GetRunnerMessageAsync(
string runnerVersion,
TaskAgentStatus? status,
CancellationToken cancellationToken = default
)
{
var requestUri = new Uri(Client.BaseAddress, "message");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
return SendAsync<TaskAgentMessage>(
new HttpMethod("GET"),
requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken);
}
}
}