This commit is contained in:
eric sciple
2022-05-05 19:24:55 +00:00
committed by GitHub
parent 05c3d892f5
commit 9b82c84d51
5 changed files with 208 additions and 0 deletions

View File

@@ -0,0 +1,83 @@
using GitHub.DistributedTask.WebApi;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Common.Util;
using GitHub.Services.WebApi;
using GitHub.Services.Common;
using GitHub.Runner.Sdk;
using GitHub.DistributedTask.Pipelines;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(RunnerServer))]
public interface IRunServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<AgentJobRequestMessage> GetJobMessageAsync(Guid scopeId, string planType, string planGroup, Guid planId, string instanceRefsJson);
}
public sealed class RunServer : RunnerService, IRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;
public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
{
System.Console.WriteLine("RunServer.ConnectAsync");
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}
private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
System.Console.WriteLine("EstablishVssConnection");
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);
await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}
public Task<AgentJobRequestMessage> GetJobMessageAsync(
Guid scopeId,
string planType,
string planGroup,
Guid planId,
string instanceRefsJson)
{
System.Console.WriteLine("RunServer.GetMessageAsync");
CheckConnection();
return _taskAgentClient.GetJobMessageAsync(scopeId, planType, planGroup, planId, instanceRefsJson);
}
}
}

View File

@@ -13,6 +13,10 @@ using GitHub.Runner.Sdk;
using System.Linq;
using GitHub.Runner.Listener.Check;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
namespace GitHub.Runner.Listener
{
@@ -457,6 +461,52 @@ namespace GitHub.Runner.Listener
}
}
}
else if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase))
{
// var messageRef = StringUtil.ConvertFromJson<MessageRef>(message.Body);
// var client = new HttpClient();
// client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", messageRef.Token);
// var response = await client.GetAsync(messageRef.Url, token);
// if (!response.IsSuccessStatusCode)
// {
// var content = default(string);
// try
// {
// content = await response.Content.ReadAsStringAsync();
// }
// catch
// {
// }
// var error = $"HTTP {(int)response.StatusCode} {Enum.GetName(typeof(HttpStatusCode), response.StatusCode)}";
// if (!string.IsNullOrEmpty(content))
// {
// error = $"{error}: {content}";
// }
// throw new Exception(error);
// }
// var fullMessage = await response.Content.ReadAsStringAsync();
// return StringUtil.ConvertFromJson<TaskAgentMessage>(fullMessage);
if (autoUpdateInProgress || runOnceJobReceived)
{
skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
}
else
{
Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'");
var jobMessage = StringUtil.ConvertFromJson<Pipelines.AgentJobRequestMessage>(message.Body);
jobDispatcher.Run(jobMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");
runOnceJobReceived = true;
}
}
}
else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
@@ -585,5 +635,35 @@ Examples:
_term.WriteLine($@" .{separator}config.{ext} --url <url> --token <token> --runasservice");
#endif
}
[DataContract]
public sealed class MessageRef
{
[DataMember(Name = "url")]
public string Url { get; set; }
[DataMember(Name = "token")]
public string Token { get; set; }
[DataMember(Name = "scopeId")]
public string ScopeId { get; set; }
[DataMember(Name = "planType")]
public string PlanType { get; set; }
[DataMember(Name = "planGroup")]
public string PlanGroup { get; set; }
[DataMember(Name = "instanceRefs")]
public InstanceRef[] InstanceRefs { get; set; }
[DataMember(Name = "labels")]
public string[] Labels { get; set; }
}
[DataContract]
public sealed class InstanceRef
{
[DataMember(Name = "name")]
public string Name { get; set; }
[DataMember(Name = "instanceType")]
public string InstanceType { get; set; }
[DataMember(Name = "attempt")]
public int Attempt { get; set; }
}
}
}

View File

@@ -27,6 +27,7 @@ using System.Net.Http.Headers;
using System.Net.Http.Formatting;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
@@ -703,6 +704,45 @@ namespace GitHub.DistributedTask.WebApi
cancellationToken: cancellationToken);
}
/// <summary>
/// [Preview API]
/// </summary>
/// <param name="scopeId"></param>
/// <param name="planType"></param>
/// <param name="planGroup"></param>
/// <param name="planId"></param>
/// <param name="instanceRefsJson"></param>
/// <param name="userState"></param>
/// <param name="cancellationToken">The cancellation token to cancel operation.</param>
[EditorBrowsable(EditorBrowsableState.Never)]
public virtual Task<AgentJobRequestMessage> GetJobMessageAsync(
Guid scopeId,
string planType,
string planGroup,
Guid planId,
string instanceRefsJson,
object userState = null,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("GET");
Guid locationId = new Guid("25adab70-1379-4186-be8e-b643061ebe3a");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
queryParams.Add("scopeId", scopeId.ToString());
queryParams.Add("planType", planType);
queryParams.Add("planGroup", planGroup);
queryParams.Add("planId", planId.ToString());
queryParams.Add("instanceRefsJson", instanceRefsJson);
return SendAsync<AgentJobRequestMessage>(
httpMethod,
locationId,
version: new ApiResourceVersion(6.0, 1),
queryParameters: queryParams,
userState: userState,
cancellationToken: cancellationToken);
}
/// <summary>
/// [Preview API]
/// </summary>

View File

@@ -5,5 +5,6 @@ namespace GitHub.DistributedTask.WebApi
public static class JobRequestMessageTypes
{
public const String PipelineAgentJobRequest = "PipelineAgentJobRequest";
public const String RunnerJobRequest = "RunnerJobRequest";
}
}

View File

@@ -765,6 +765,7 @@ namespace GitHub.Services.WebApi
protected async Task<T> ReadContentAsAsync<T>(HttpResponseMessage response, CancellationToken cancellationToken = default(CancellationToken))
{
System.Console.WriteLine($"VssHttpClientBase.ReadContentAsAsync {response.Headers}");
CheckForDisposed();
Boolean isJson = IsJsonResponse(response);
bool mismatchContentType = false;
@@ -776,17 +777,20 @@ namespace GitHub.Services.WebApi
!typeof(Byte[]).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()) &&
!typeof(JObject).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()))
{
System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: isJson 1");
// expect it to come back wrapped, if it isn't it is a bug!
var wrapper = await ReadJsonContentAsync<VssJsonCollectionWrapper<T>>(response, cancellationToken).ConfigureAwait(false);
return wrapper.Value;
}
else if (isJson)
{
System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: isJson 2");
return await ReadJsonContentAsync<T>(response, cancellationToken).ConfigureAwait(false);
}
}
catch (JsonReaderException)
{
System.Console.WriteLine("VssHttpClientBase.ReadContentAsAsync: mismatchContentType");
// We thought the content was JSON but failed to parse.
// In this case, do nothing and utilize the HandleUnknownContentType call below
mismatchContentType = true;