diff --git a/src/Runner.Common/RunServer.cs b/src/Runner.Common/RunServer.cs new file mode 100644 index 000000000..afb861e69 --- /dev/null +++ b/src/Runner.Common/RunServer.cs @@ -0,0 +1,83 @@ +using GitHub.DistributedTask.WebApi; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GitHub.Runner.Common.Util; +using GitHub.Services.WebApi; +using GitHub.Services.Common; +using GitHub.Runner.Sdk; +using GitHub.DistributedTask.Pipelines; + +namespace GitHub.Runner.Common +{ + [ServiceLocator(Default = typeof(RunnerServer))] + public interface IRunServer : IRunnerService + { + Task ConnectAsync(Uri serverUrl, VssCredentials credentials); + + Task GetJobMessageAsync(Guid scopeId, string planType, string planGroup, Guid planId, string instanceRefsJson); + } + + public sealed class RunServer : RunnerService, IRunServer + { + private bool _hasConnection; + private VssConnection _connection; + private TaskAgentHttpClient _taskAgentClient; + + public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials) + { + System.Console.WriteLine("RunServer.ConnectAsync"); + _connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100)); + _taskAgentClient = _connection.GetClient(); + _hasConnection = true; + } + + private async Task EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout) + { + System.Console.WriteLine("EstablishVssConnection"); + Trace.Info($"EstablishVssConnection"); + Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout."); + int attemptCount = 5; + while (attemptCount-- > 0) + { + var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout); + try + { + await connection.ConnectAsync(); + return connection; + } + catch (Exception ex) when (attemptCount > 0) + { + Trace.Info($"Catch exception during connect. {attemptCount} attempt left."); + Trace.Error(ex); + + await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None); + } + } + + // should never reach here. + throw new InvalidOperationException(nameof(EstablishVssConnection)); + } + + private void CheckConnection() + { + if (!_hasConnection) + { + throw new InvalidOperationException($"SetConnection"); + } + } + + public Task GetJobMessageAsync( + Guid scopeId, + string planType, + string planGroup, + Guid planId, + string instanceRefsJson) + { + System.Console.WriteLine("RunServer.GetMessageAsync"); + CheckConnection(); + return _taskAgentClient.GetJobMessageAsync(scopeId, planType, planGroup, planId, instanceRefsJson); + } + } +} diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 5ef0a3d64..bc27afd06 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -13,6 +13,10 @@ using GitHub.Runner.Sdk; using System.Linq; using GitHub.Runner.Listener.Check; using System.Collections.Generic; +using System.Runtime.Serialization; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; namespace GitHub.Runner.Listener { @@ -457,6 +461,52 @@ namespace GitHub.Runner.Listener } } } + else if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase)) + { + + // var messageRef = StringUtil.ConvertFromJson(message.Body); + // var client = new HttpClient(); + // client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", messageRef.Token); + // var response = await client.GetAsync(messageRef.Url, token); + // if (!response.IsSuccessStatusCode) + // { + // var content = default(string); + // try + // { + // content = await response.Content.ReadAsStringAsync(); + // } + // catch + // { + // } + + // var error = $"HTTP {(int)response.StatusCode} {Enum.GetName(typeof(HttpStatusCode), response.StatusCode)}"; + // if (!string.IsNullOrEmpty(content)) + // { + // error = $"{error}: {content}"; + // } + // throw new Exception(error); + // } + + // var fullMessage = await response.Content.ReadAsStringAsync(); + // return StringUtil.ConvertFromJson(fullMessage); + + if (autoUpdateInProgress || runOnceJobReceived) + { + skipMessageDeletion = true; + Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); + } + else + { + Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); + var jobMessage = StringUtil.ConvertFromJson(message.Body); + jobDispatcher.Run(jobMessage, runOnce); + if (runOnce) + { + Trace.Info("One time used runner received job message."); + runOnceJobReceived = true; + } + } + } else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { var cancelJobMessage = JsonUtility.FromString(message.Body); @@ -585,5 +635,35 @@ Examples: _term.WriteLine($@" .{separator}config.{ext} --url --token --runasservice"); #endif } + + [DataContract] + public sealed class MessageRef + { + [DataMember(Name = "url")] + public string Url { get; set; } + [DataMember(Name = "token")] + public string Token { get; set; } + [DataMember(Name = "scopeId")] + public string ScopeId { get; set; } + [DataMember(Name = "planType")] + public string PlanType { get; set; } + [DataMember(Name = "planGroup")] + public string PlanGroup { get; set; } + [DataMember(Name = "instanceRefs")] + public InstanceRef[] InstanceRefs { get; set; } + [DataMember(Name = "labels")] + public string[] Labels { get; set; } + } + + [DataContract] + public sealed class InstanceRef + { + [DataMember(Name = "name")] + public string Name { get; set; } + [DataMember(Name = "instanceType")] + public string InstanceType { get; set; } + [DataMember(Name = "attempt")] + public int Attempt { get; set; } + } } } diff --git a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs index 9f33d6a50..69820ea08 100644 --- a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs +++ b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs @@ -27,6 +27,7 @@ using System.Net.Http.Headers; using System.Net.Http.Formatting; using System.Threading; using System.Threading.Tasks; +using GitHub.DistributedTask.Pipelines; using GitHub.Services.Common; using GitHub.Services.WebApi; @@ -703,6 +704,45 @@ namespace GitHub.DistributedTask.WebApi cancellationToken: cancellationToken); } + /// + /// [Preview API] + /// + /// + /// + /// + /// + /// + /// + /// The cancellation token to cancel operation. + [EditorBrowsable(EditorBrowsableState.Never)] + public virtual Task GetJobMessageAsync( + Guid scopeId, + string planType, + string planGroup, + Guid planId, + string instanceRefsJson, + object userState = null, + CancellationToken cancellationToken = default) + { + HttpMethod httpMethod = new HttpMethod("GET"); + Guid locationId = new Guid("25adab70-1379-4186-be8e-b643061ebe3a"); + + List> queryParams = new List>(); + queryParams.Add("scopeId", scopeId.ToString()); + queryParams.Add("planType", planType); + queryParams.Add("planGroup", planGroup); + queryParams.Add("planId", planId.ToString()); + queryParams.Add("instanceRefsJson", instanceRefsJson); + + return SendAsync( + httpMethod, + locationId, + version: new ApiResourceVersion(6.0, 1), + queryParameters: queryParams, + userState: userState, + cancellationToken: cancellationToken); + } + /// /// [Preview API] /// diff --git a/src/Sdk/DTWebApi/WebApi/JobRequestMessageTypes.cs b/src/Sdk/DTWebApi/WebApi/JobRequestMessageTypes.cs index 326473750..c2f3a87ee 100644 --- a/src/Sdk/DTWebApi/WebApi/JobRequestMessageTypes.cs +++ b/src/Sdk/DTWebApi/WebApi/JobRequestMessageTypes.cs @@ -5,5 +5,6 @@ namespace GitHub.DistributedTask.WebApi public static class JobRequestMessageTypes { public const String PipelineAgentJobRequest = "PipelineAgentJobRequest"; + public const String RunnerJobRequest = "RunnerJobRequest"; } } diff --git a/src/Sdk/WebApi/WebApi/VssHttpClientBase.cs b/src/Sdk/WebApi/WebApi/VssHttpClientBase.cs index 6a4be0676..10d50fa8a 100644 --- a/src/Sdk/WebApi/WebApi/VssHttpClientBase.cs +++ b/src/Sdk/WebApi/WebApi/VssHttpClientBase.cs @@ -765,6 +765,7 @@ namespace GitHub.Services.WebApi protected async Task ReadContentAsAsync(HttpResponseMessage response, CancellationToken cancellationToken = default(CancellationToken)) { + System.Console.WriteLine($"VssHttpClientBase.ReadContentAsAsync {response.Headers}"); CheckForDisposed(); Boolean isJson = IsJsonResponse(response); bool mismatchContentType = false; @@ -776,17 +777,20 @@ namespace GitHub.Services.WebApi !typeof(Byte[]).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()) && !typeof(JObject).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo())) { + System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: isJson 1"); // expect it to come back wrapped, if it isn't it is a bug! var wrapper = await ReadJsonContentAsync>(response, cancellationToken).ConfigureAwait(false); return wrapper.Value; } else if (isJson) { + System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: isJson 2"); return await ReadJsonContentAsync(response, cancellationToken).ConfigureAwait(false); } } catch (JsonReaderException) { + System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: mismatchContentType"); // We thought the content was JSON but failed to parse. // In this case, do nothing and utilize the HandleUnknownContentType call below mismatchContentType = true;