Allow runner to check service connection in background. (#3542)

* Allow runner to check service connection in background.

* .

* .
This commit is contained in:
Tingluo Huang
2024-11-12 13:30:30 -05:00
committed by GitHub
parent d37a7ae14d
commit d16fb2c593
5 changed files with 208 additions and 11 deletions

View File

@@ -60,5 +60,15 @@ namespace GitHub.Runner.Sdk
}
return string.Empty;
}
public static string GetVssRequestId(HttpResponseHeaders headers)
{
if (headers != null &&
headers.TryGetValues("x-vss-e2eid", out var headerValues))
{
return headerValues.FirstOrDefault();
}
return string.Empty;
}
}
}

View File

@@ -17,6 +17,7 @@ using GitHub.Runner.Common;
using GitHub.Runner.Common.Util;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using Newtonsoft.Json;
using Pipelines = GitHub.DistributedTask.Pipelines;
namespace GitHub.Runner.Worker
@@ -42,11 +43,13 @@ namespace GitHub.Runner.Worker
public sealed class JobExtension : RunnerService, IJobExtension
{
private readonly HashSet<string> _existingProcesses = new(StringComparer.OrdinalIgnoreCase);
private readonly List<Task<string>> _connectivityCheckTasks = new();
private readonly List<Task<CheckResult>> _connectivityCheckTasks = new();
private bool _processCleanup;
private string _processLookupId = $"github_{Guid.NewGuid()}";
private CancellationTokenSource _diskSpaceCheckToken = new();
private Task _diskSpaceCheckTask = null;
private CancellationTokenSource _serviceConnectivityCheckToken = new();
private Task _serviceConnectivityCheckTask = null;
// Download all required actions.
// Make sure all condition inputs are valid.
@@ -454,11 +457,14 @@ namespace GitHub.Runner.Worker
{
foreach (var checkUrl in checkUrls)
{
_connectivityCheckTasks.Add(CheckConnectivity(checkUrl));
_connectivityCheckTasks.Add(CheckConnectivity(checkUrl, accessToken: string.Empty, timeoutInSeconds: 5, token: CancellationToken.None));
}
}
}
Trace.Info($"Start checking service connectivity in background.");
_serviceConnectivityCheckTask = CheckServiceConnectivityAsync(context, _serviceConnectivityCheckToken.Token);
return steps;
}
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
@@ -692,7 +698,7 @@ namespace GitHub.Runner.Worker
{
var result = await check;
Trace.Info($"Connectivity check result: {result}");
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = result });
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"{result.EndpointUrl}: {result.StatusCode}" });
}
}
catch (Exception ex)
@@ -702,6 +708,22 @@ namespace GitHub.Runner.Worker
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check server connectivity. {ex.Message}" });
}
}
// Collect service connectivity check result
if (_serviceConnectivityCheckTask != null)
{
_serviceConnectivityCheckToken.Cancel();
try
{
await _serviceConnectivityCheckTask;
}
catch (Exception ex)
{
Trace.Error($"Fail to check service connectivity.");
Trace.Error(ex);
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check service connectivity. {ex.Message}" });
}
}
}
catch (Exception ex)
{
@@ -717,11 +739,13 @@ namespace GitHub.Runner.Worker
}
}
private async Task<string> CheckConnectivity(string endpointUrl)
private async Task<CheckResult> CheckConnectivity(string endpointUrl, string accessToken, int timeoutInSeconds, CancellationToken token)
{
Trace.Info($"Check server connectivity for {endpointUrl}.");
string result = string.Empty;
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
CheckResult result = new CheckResult() { EndpointUrl = endpointUrl };
var stopwatch = Stopwatch.StartNew();
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutInSeconds)))
using (var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token))
{
try
{
@@ -729,21 +753,44 @@ namespace GitHub.Runner.Worker
using (var httpClient = new HttpClient(httpClientHandler))
{
httpClient.DefaultRequestHeaders.UserAgent.AddRange(HostContext.UserAgents);
var response = await httpClient.GetAsync(endpointUrl, timeoutTokenSource.Token);
result = $"{endpointUrl}: {response.StatusCode}";
if (!string.IsNullOrEmpty(accessToken))
{
httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {accessToken}");
}
var response = await httpClient.GetAsync(endpointUrl, linkedTokenSource.Token);
result.StatusCode = $"{response.StatusCode}";
var githubRequestId = UrlUtil.GetGitHubRequestId(response.Headers);
var vssRequestId = UrlUtil.GetVssRequestId(response.Headers);
if (!string.IsNullOrEmpty(githubRequestId))
{
result.RequestId = githubRequestId;
}
else if (!string.IsNullOrEmpty(vssRequestId))
{
result.RequestId = vssRequestId;
}
}
}
catch (Exception ex) when (ex is OperationCanceledException && token.IsCancellationRequested)
{
Trace.Error($"Request canceled during connectivity check: {ex}");
result.StatusCode = "canceled";
}
catch (Exception ex) when (ex is OperationCanceledException && timeoutTokenSource.IsCancellationRequested)
{
Trace.Error($"Request timeout during connectivity check: {ex}");
result = $"{endpointUrl}: timeout";
result.StatusCode = "timeout";
}
catch (Exception ex)
{
Trace.Error($"Catch exception during connectivity check: {ex}");
result = $"{endpointUrl}: {ex.Message}";
result.StatusCode = $"{ex.Message}";
}
}
stopwatch.Stop();
result.DurationInMs = (int)stopwatch.ElapsedMilliseconds;
return result;
}
@@ -781,6 +828,84 @@ namespace GitHub.Runner.Worker
}
}
private async Task CheckServiceConnectivityAsync(IExecutionContext context, CancellationToken token)
{
var connectionTest = context.Global.Variables.Get(WellKnownDistributedTaskVariables.RunnerServiceConnectivityTest);
if (string.IsNullOrEmpty(connectionTest))
{
return;
}
ServiceConnectivityCheckInput checkConnectivityInfo;
try
{
checkConnectivityInfo = StringUtil.ConvertFromJson<ServiceConnectivityCheckInput>(connectionTest);
}
catch (Exception ex)
{
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"Fail to parse JSON. {ex.Message}" });
return;
}
if (checkConnectivityInfo == null)
{
return;
}
// make sure interval is at least 10 seconds
checkConnectivityInfo.IntervalInSecond = Math.Max(10, checkConnectivityInfo.IntervalInSecond);
var systemConnection = context.Global.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
var accessToken = systemConnection.Authorization.Parameters[EndpointAuthorizationParameters.AccessToken];
var testResult = new ServiceConnectivityCheckResult();
while (!token.IsCancellationRequested)
{
foreach (var endpoint in checkConnectivityInfo.Endpoints)
{
if (string.IsNullOrEmpty(endpoint.Key) || string.IsNullOrEmpty(endpoint.Value))
{
continue;
}
if (!testResult.EndpointsResult.ContainsKey(endpoint.Key))
{
testResult.EndpointsResult[endpoint.Key] = new List<string>();
}
try
{
var result = await CheckConnectivity(endpoint.Value, accessToken: accessToken, timeoutInSeconds: checkConnectivityInfo.RequestTimeoutInSecond, token);
testResult.EndpointsResult[endpoint.Key].Add($"{result.StartTime:s}: {result.StatusCode} - {result.RequestId} - {result.DurationInMs}ms");
if (!testResult.HasFailure &&
result.StatusCode != "OK" &&
result.StatusCode != "canceled")
{
// track if any endpoint is not reachable
testResult.HasFailure = true;
}
}
catch (Exception ex)
{
testResult.EndpointsResult[endpoint.Key].Add($"{DateTime.UtcNow:s}: {ex.Message}");
}
}
try
{
await Task.Delay(TimeSpan.FromSeconds(checkConnectivityInfo.IntervalInSecond), token);
}
catch (TaskCanceledException)
{
// ignore
}
}
var telemetryData = StringUtil.ConvertToJson(testResult, Formatting.None);
Trace.Verbose($"Connectivity check result: {telemetryData}");
context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = telemetryData });
}
private Dictionary<int, Process> SnapshotProcesses()
{
Dictionary<int, Process> snapshot = new();
@@ -812,5 +937,23 @@ namespace GitHub.Runner.Worker
throw new ArgumentException("Jobs without a job container are forbidden on this runner, please add a 'container:' to your job or contact your self-hosted runner administrator.");
}
}
private class CheckResult
{
public CheckResult()
{
StartTime = DateTime.UtcNow;
}
public string EndpointUrl { get; set; }
public DateTime StartTime { get; set; }
public string StatusCode { get; set; }
public string RequestId { get; set; }
public int DurationInMs { get; set; }
}
}
}

View File

@@ -50,7 +50,8 @@ namespace GitHub.Runner.Worker
if (message.Variables.TryGetValue(Constants.Variables.System.OrchestrationId, out VariableValue orchestrationId) &&
!string.IsNullOrEmpty(orchestrationId.Value))
{
HostContext.UserAgents.Add(new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));
// make the orchestration id the first item in the user-agent header to avoid get truncated in server log.
HostContext.UserAgents.Insert(0, new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value));
// make sure orchestration id is in the user-agent header.
VssUtil.InitializeVssClientSettings(HostContext.UserAgents, HostContext.WebProxy);

View File

@@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using Newtonsoft.Json;
namespace GitHub.DistributedTask.WebApi
{
[DataContract]
public class ServiceConnectivityCheckInput
{
[JsonConstructor]
public ServiceConnectivityCheckInput()
{
Endpoints = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
}
[DataMember(EmitDefaultValue = false)]
public Dictionary<string, string> Endpoints { get; set; }
[DataMember(EmitDefaultValue = false)]
public int IntervalInSecond { get; set; }
[DataMember(EmitDefaultValue = false)]
public int RequestTimeoutInSecond { get; set; }
}
[DataContract]
public class ServiceConnectivityCheckResult
{
[JsonConstructor]
public ServiceConnectivityCheckResult()
{
EndpointsResult = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
}
[DataMember(Order = 1, EmitDefaultValue = true)]
public bool HasFailure { get; set; }
[DataMember(Order = 2, EmitDefaultValue = false)]
public Dictionary<string, List<string>> EndpointsResult { get; set; }
}
}

View File

@@ -7,5 +7,6 @@ namespace GitHub.DistributedTask.WebApi
public static readonly String JobId = "system.jobId";
public static readonly String RunnerLowDiskspaceThreshold = "system.runner.lowdiskspacethreshold";
public static readonly String RunnerEnvironment = "system.runnerEnvironment";
public static readonly String RunnerServiceConnectivityTest = "system.runner.serviceconnectivitycheckinput";
}
}