From d16fb2c593dc41b8b80680e0c87670b87b7537d9 Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Tue, 12 Nov 2024 13:30:30 -0500 Subject: [PATCH] Allow runner to check service connection in background. (#3542) * Allow runner to check service connection in background. * . * . --- src/Runner.Sdk/Util/UrlUtil.cs | 10 ++ src/Runner.Worker/JobExtension.cs | 163 ++++++++++++++++-- src/Runner.Worker/JobRunner.cs | 3 +- .../WebApi/ServiceConnectivityCheck.cs | 42 +++++ .../WellKnownDistributedTaskVariables.cs | 1 + 5 files changed, 208 insertions(+), 11 deletions(-) create mode 100644 src/Sdk/DTWebApi/WebApi/ServiceConnectivityCheck.cs diff --git a/src/Runner.Sdk/Util/UrlUtil.cs b/src/Runner.Sdk/Util/UrlUtil.cs index 01658da05..52ce3a0cb 100644 --- a/src/Runner.Sdk/Util/UrlUtil.cs +++ b/src/Runner.Sdk/Util/UrlUtil.cs @@ -60,5 +60,15 @@ namespace GitHub.Runner.Sdk } return string.Empty; } + + public static string GetVssRequestId(HttpResponseHeaders headers) + { + if (headers != null && + headers.TryGetValues("x-vss-e2eid", out var headerValues)) + { + return headerValues.FirstOrDefault(); + } + return string.Empty; + } } } diff --git a/src/Runner.Worker/JobExtension.cs b/src/Runner.Worker/JobExtension.cs index 1aa79c925..58e8929b4 100644 --- a/src/Runner.Worker/JobExtension.cs +++ b/src/Runner.Worker/JobExtension.cs @@ -17,6 +17,7 @@ using GitHub.Runner.Common; using GitHub.Runner.Common.Util; using GitHub.Runner.Sdk; using GitHub.Services.Common; +using Newtonsoft.Json; using Pipelines = GitHub.DistributedTask.Pipelines; namespace GitHub.Runner.Worker @@ -42,11 +43,13 @@ namespace GitHub.Runner.Worker public sealed class JobExtension : RunnerService, IJobExtension { private readonly HashSet _existingProcesses = new(StringComparer.OrdinalIgnoreCase); - private readonly List> _connectivityCheckTasks = new(); + private readonly List> _connectivityCheckTasks = new(); private bool _processCleanup; private string _processLookupId = $"github_{Guid.NewGuid()}"; private CancellationTokenSource _diskSpaceCheckToken = new(); private Task _diskSpaceCheckTask = null; + private CancellationTokenSource _serviceConnectivityCheckToken = new(); + private Task _serviceConnectivityCheckTask = null; // Download all required actions. // Make sure all condition inputs are valid. @@ -454,11 +457,14 @@ namespace GitHub.Runner.Worker { foreach (var checkUrl in checkUrls) { - _connectivityCheckTasks.Add(CheckConnectivity(checkUrl)); + _connectivityCheckTasks.Add(CheckConnectivity(checkUrl, accessToken: string.Empty, timeoutInSeconds: 5, token: CancellationToken.None)); } } } + Trace.Info($"Start checking service connectivity in background."); + _serviceConnectivityCheckTask = CheckServiceConnectivityAsync(context, _serviceConnectivityCheckToken.Token); + return steps; } catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested) @@ -692,7 +698,7 @@ namespace GitHub.Runner.Worker { var result = await check; Trace.Info($"Connectivity check result: {result}"); - context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = result }); + context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"{result.EndpointUrl}: {result.StatusCode}" }); } } catch (Exception ex) @@ -702,6 +708,22 @@ namespace GitHub.Runner.Worker context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check server connectivity. {ex.Message}" }); } } + + // Collect service connectivity check result + if (_serviceConnectivityCheckTask != null) + { + _serviceConnectivityCheckToken.Cancel(); + try + { + await _serviceConnectivityCheckTask; + } + catch (Exception ex) + { + Trace.Error($"Fail to check service connectivity."); + Trace.Error(ex); + context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = $"Fail to check service connectivity. {ex.Message}" }); + } + } } catch (Exception ex) { @@ -717,11 +739,13 @@ namespace GitHub.Runner.Worker } } - private async Task CheckConnectivity(string endpointUrl) + private async Task CheckConnectivity(string endpointUrl, string accessToken, int timeoutInSeconds, CancellationToken token) { Trace.Info($"Check server connectivity for {endpointUrl}."); - string result = string.Empty; - using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5))) + CheckResult result = new CheckResult() { EndpointUrl = endpointUrl }; + var stopwatch = Stopwatch.StartNew(); + using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(timeoutInSeconds))) + using (var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token, timeoutTokenSource.Token)) { try { @@ -729,21 +753,44 @@ namespace GitHub.Runner.Worker using (var httpClient = new HttpClient(httpClientHandler)) { httpClient.DefaultRequestHeaders.UserAgent.AddRange(HostContext.UserAgents); - var response = await httpClient.GetAsync(endpointUrl, timeoutTokenSource.Token); - result = $"{endpointUrl}: {response.StatusCode}"; + if (!string.IsNullOrEmpty(accessToken)) + { + httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {accessToken}"); + } + + var response = await httpClient.GetAsync(endpointUrl, linkedTokenSource.Token); + result.StatusCode = $"{response.StatusCode}"; + + var githubRequestId = UrlUtil.GetGitHubRequestId(response.Headers); + var vssRequestId = UrlUtil.GetVssRequestId(response.Headers); + if (!string.IsNullOrEmpty(githubRequestId)) + { + result.RequestId = githubRequestId; + } + else if (!string.IsNullOrEmpty(vssRequestId)) + { + result.RequestId = vssRequestId; + } } } + catch (Exception ex) when (ex is OperationCanceledException && token.IsCancellationRequested) + { + Trace.Error($"Request canceled during connectivity check: {ex}"); + result.StatusCode = "canceled"; + } catch (Exception ex) when (ex is OperationCanceledException && timeoutTokenSource.IsCancellationRequested) { Trace.Error($"Request timeout during connectivity check: {ex}"); - result = $"{endpointUrl}: timeout"; + result.StatusCode = "timeout"; } catch (Exception ex) { Trace.Error($"Catch exception during connectivity check: {ex}"); - result = $"{endpointUrl}: {ex.Message}"; + result.StatusCode = $"{ex.Message}"; } } + stopwatch.Stop(); + result.DurationInMs = (int)stopwatch.ElapsedMilliseconds; return result; } @@ -781,6 +828,84 @@ namespace GitHub.Runner.Worker } } + private async Task CheckServiceConnectivityAsync(IExecutionContext context, CancellationToken token) + { + var connectionTest = context.Global.Variables.Get(WellKnownDistributedTaskVariables.RunnerServiceConnectivityTest); + if (string.IsNullOrEmpty(connectionTest)) + { + return; + } + + ServiceConnectivityCheckInput checkConnectivityInfo; + try + { + checkConnectivityInfo = StringUtil.ConvertFromJson(connectionTest); + } + catch (Exception ex) + { + context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.General, Message = $"Fail to parse JSON. {ex.Message}" }); + return; + } + + if (checkConnectivityInfo == null) + { + return; + } + + // make sure interval is at least 10 seconds + checkConnectivityInfo.IntervalInSecond = Math.Max(10, checkConnectivityInfo.IntervalInSecond); + + var systemConnection = context.Global.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); + var accessToken = systemConnection.Authorization.Parameters[EndpointAuthorizationParameters.AccessToken]; + + var testResult = new ServiceConnectivityCheckResult(); + while (!token.IsCancellationRequested) + { + foreach (var endpoint in checkConnectivityInfo.Endpoints) + { + if (string.IsNullOrEmpty(endpoint.Key) || string.IsNullOrEmpty(endpoint.Value)) + { + continue; + } + + if (!testResult.EndpointsResult.ContainsKey(endpoint.Key)) + { + testResult.EndpointsResult[endpoint.Key] = new List(); + } + + try + { + var result = await CheckConnectivity(endpoint.Value, accessToken: accessToken, timeoutInSeconds: checkConnectivityInfo.RequestTimeoutInSecond, token); + testResult.EndpointsResult[endpoint.Key].Add($"{result.StartTime:s}: {result.StatusCode} - {result.RequestId} - {result.DurationInMs}ms"); + if (!testResult.HasFailure && + result.StatusCode != "OK" && + result.StatusCode != "canceled") + { + // track if any endpoint is not reachable + testResult.HasFailure = true; + } + } + catch (Exception ex) + { + testResult.EndpointsResult[endpoint.Key].Add($"{DateTime.UtcNow:s}: {ex.Message}"); + } + } + + try + { + await Task.Delay(TimeSpan.FromSeconds(checkConnectivityInfo.IntervalInSecond), token); + } + catch (TaskCanceledException) + { + // ignore + } + } + + var telemetryData = StringUtil.ConvertToJson(testResult, Formatting.None); + Trace.Verbose($"Connectivity check result: {telemetryData}"); + context.Global.JobTelemetry.Add(new JobTelemetry() { Type = JobTelemetryType.ConnectivityCheck, Message = telemetryData }); + } + private Dictionary SnapshotProcesses() { Dictionary snapshot = new(); @@ -812,5 +937,23 @@ namespace GitHub.Runner.Worker throw new ArgumentException("Jobs without a job container are forbidden on this runner, please add a 'container:' to your job or contact your self-hosted runner administrator."); } } + + private class CheckResult + { + public CheckResult() + { + StartTime = DateTime.UtcNow; + } + + public string EndpointUrl { get; set; } + + public DateTime StartTime { get; set; } + + public string StatusCode { get; set; } + + public string RequestId { get; set; } + + public int DurationInMs { get; set; } + } } } diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index f1f9a72f1..7ab506a53 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -50,7 +50,8 @@ namespace GitHub.Runner.Worker if (message.Variables.TryGetValue(Constants.Variables.System.OrchestrationId, out VariableValue orchestrationId) && !string.IsNullOrEmpty(orchestrationId.Value)) { - HostContext.UserAgents.Add(new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value)); + // make the orchestration id the first item in the user-agent header to avoid get truncated in server log. + HostContext.UserAgents.Insert(0, new ProductInfoHeaderValue("OrchestrationId", orchestrationId.Value)); // make sure orchestration id is in the user-agent header. VssUtil.InitializeVssClientSettings(HostContext.UserAgents, HostContext.WebProxy); diff --git a/src/Sdk/DTWebApi/WebApi/ServiceConnectivityCheck.cs b/src/Sdk/DTWebApi/WebApi/ServiceConnectivityCheck.cs new file mode 100644 index 000000000..ff4845fec --- /dev/null +++ b/src/Sdk/DTWebApi/WebApi/ServiceConnectivityCheck.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Newtonsoft.Json; + +namespace GitHub.DistributedTask.WebApi +{ + [DataContract] + public class ServiceConnectivityCheckInput + { + [JsonConstructor] + public ServiceConnectivityCheckInput() + { + Endpoints = new Dictionary(StringComparer.OrdinalIgnoreCase); + } + + [DataMember(EmitDefaultValue = false)] + public Dictionary Endpoints { get; set; } + + [DataMember(EmitDefaultValue = false)] + public int IntervalInSecond { get; set; } + + [DataMember(EmitDefaultValue = false)] + public int RequestTimeoutInSecond { get; set; } + } + + [DataContract] + public class ServiceConnectivityCheckResult + { + [JsonConstructor] + public ServiceConnectivityCheckResult() + { + EndpointsResult = new Dictionary>(StringComparer.OrdinalIgnoreCase); + } + + [DataMember(Order = 1, EmitDefaultValue = true)] + public bool HasFailure { get; set; } + + [DataMember(Order = 2, EmitDefaultValue = false)] + public Dictionary> EndpointsResult { get; set; } + } +} diff --git a/src/Sdk/DTWebApi/WebApi/WellKnownDistributedTaskVariables.cs b/src/Sdk/DTWebApi/WebApi/WellKnownDistributedTaskVariables.cs index c6ca7b0b4..a4ea950de 100644 --- a/src/Sdk/DTWebApi/WebApi/WellKnownDistributedTaskVariables.cs +++ b/src/Sdk/DTWebApi/WebApi/WellKnownDistributedTaskVariables.cs @@ -7,5 +7,6 @@ namespace GitHub.DistributedTask.WebApi public static readonly String JobId = "system.jobId"; public static readonly String RunnerLowDiskspaceThreshold = "system.runner.lowdiskspacethreshold"; public static readonly String RunnerEnvironment = "system.runnerEnvironment"; + public static readonly String RunnerServiceConnectivityTest = "system.runner.serviceconnectivitycheckinput"; } }