mirror of
https://github.com/actions/runner.git
synced 2025-12-28 12:27:48 +08:00
support messages from broker
This commit is contained in:
59
src/Runner.Common/BrokerServer.cs
Normal file
59
src/Runner.Common/BrokerServer.cs
Normal file
@@ -0,0 +1,59 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.Runner.Common.Util;
|
||||
using GitHub.Services.WebApi;
|
||||
using GitHub.Services.Common;
|
||||
using GitHub.Runner.Sdk;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
|
||||
namespace GitHub.Runner.Common
|
||||
{
|
||||
[ServiceLocator(Default = typeof(BrokerServer))]
|
||||
public interface IBrokerServer : IRunnerService
|
||||
{
|
||||
Task ConnectAsync(Uri serverUrl, CancellationToken cancellationToken);
|
||||
Task<string> GetMessageAsync(TaskAgentSession session, RunnerSettings settings, long? lastMessageId, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
public sealed class BrokerServer : RunnerService, IBrokerServer
|
||||
{
|
||||
private HttpClient _httpClient;
|
||||
|
||||
public async Task ConnectAsync(Uri serverUrl, CancellationToken cancellationToken)
|
||||
{
|
||||
_httpClient = new HttpClient();
|
||||
_httpClient.BaseAddress = serverUrl;
|
||||
_httpClient.Timeout = TimeSpan.FromSeconds(100);
|
||||
await _httpClient.GetAsync("health", cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<string> GetMessageAsync(TaskAgentSession session, RunnerSettings settings, long? lastMessageId, CancellationToken cancellationToken)
|
||||
{
|
||||
var response = await _httpClient.GetAsync($"message?tenant=org:github&root_tenant=org:github&group_id={settings.PoolId}&group_name={settings.PoolName}&runner_id={settings.AgentId}&runner_name={settings.AgentName}&labels=self-hosted,linux", cancellationToken);
|
||||
if (!response.IsSuccessStatusCode)
|
||||
{
|
||||
var content = default(string);
|
||||
try
|
||||
{
|
||||
content = await response.Content.ReadAsStringAsync();
|
||||
}
|
||||
catch
|
||||
{
|
||||
}
|
||||
|
||||
var error = $"HTTP {(int)response.StatusCode} {Enum.GetName(typeof(HttpStatusCode), response.StatusCode)}";
|
||||
if (!string.IsNullOrEmpty(content))
|
||||
{
|
||||
error = $"{error}: {content}";
|
||||
}
|
||||
throw new Exception(error);
|
||||
}
|
||||
|
||||
return await response.Content.ReadAsStringAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
120
src/Runner.Common/RunServer.cs
Normal file
120
src/Runner.Common/RunServer.cs
Normal file
@@ -0,0 +1,120 @@
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.Serialization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.Runner.Common.Util;
|
||||
using GitHub.Services.WebApi;
|
||||
using GitHub.Services.Common;
|
||||
using GitHub.Runner.Sdk;
|
||||
using GitHub.DistributedTask.Pipelines;
|
||||
|
||||
namespace GitHub.Runner.Common
|
||||
{
|
||||
[ServiceLocator(Default = typeof(RunServer))]
|
||||
public interface IRunServer : IRunnerService
|
||||
{
|
||||
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
|
||||
|
||||
Task<AgentJobRequestMessage> GetJobMessageAsync(Guid scopeId, Guid hostId, string planType, string planGroup, Guid planId, IList<InstanceRef> instanceRefsJson);
|
||||
}
|
||||
|
||||
public sealed class RunServer : RunnerService, IRunServer
|
||||
{
|
||||
private bool _hasConnection;
|
||||
private VssConnection _connection;
|
||||
private TaskAgentHttpClient _taskAgentClient;
|
||||
|
||||
public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
|
||||
{
|
||||
// System.Console.WriteLine("RunServer.ConnectAsync");
|
||||
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
|
||||
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
|
||||
_hasConnection = true;
|
||||
}
|
||||
|
||||
private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
|
||||
{
|
||||
// System.Console.WriteLine("EstablishVssConnection");
|
||||
Trace.Info($"EstablishVssConnection");
|
||||
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
|
||||
int attemptCount = 5;
|
||||
while (attemptCount-- > 0)
|
||||
{
|
||||
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
|
||||
try
|
||||
{
|
||||
await connection.ConnectAsync();
|
||||
return connection;
|
||||
}
|
||||
catch (Exception ex) when (attemptCount > 0)
|
||||
{
|
||||
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
|
||||
Trace.Error(ex);
|
||||
|
||||
await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
// should never reach here.
|
||||
throw new InvalidOperationException(nameof(EstablishVssConnection));
|
||||
}
|
||||
|
||||
private void CheckConnection()
|
||||
{
|
||||
if (!_hasConnection)
|
||||
{
|
||||
throw new InvalidOperationException($"SetConnection");
|
||||
}
|
||||
}
|
||||
|
||||
public Task<AgentJobRequestMessage> GetJobMessageAsync(
|
||||
Guid scopeId,
|
||||
Guid hostId,
|
||||
string planType,
|
||||
string planGroup,
|
||||
Guid planId,
|
||||
IList<InstanceRef> instanceRefsJson)
|
||||
{
|
||||
// System.Console.WriteLine("RunServer.GetMessageAsync");
|
||||
CheckConnection();
|
||||
return _taskAgentClient.GetJobMessageAsync(scopeId, hostId, planType, planGroup, planId, StringUtil.ConvertToJson(instanceRefsJson, Newtonsoft.Json.Formatting.None));
|
||||
}
|
||||
}
|
||||
|
||||
// todo: move to SDK?
|
||||
[DataContract]
|
||||
public sealed class MessageRef
|
||||
{
|
||||
[DataMember(Name = "url")]
|
||||
public string Url { get; set; }
|
||||
[DataMember(Name = "token")]
|
||||
public string Token { get; set; }
|
||||
[DataMember(Name = "scopeId")]
|
||||
public Guid ScopeId { get; set; }
|
||||
[DataMember(Name = "hostId")]
|
||||
public Guid HostId { get; set; }
|
||||
[DataMember(Name = "planType")]
|
||||
public string PlanType { get; set; }
|
||||
[DataMember(Name = "planGroup")]
|
||||
public string PlanGroup { get; set; }
|
||||
[DataMember(Name = "planId")]
|
||||
public Guid PlanId { get; set; }
|
||||
[DataMember(Name = "instanceRefs")]
|
||||
public InstanceRef[] InstanceRefs { get; set; }
|
||||
[DataMember(Name = "labels")]
|
||||
public string[] Labels { get; set; }
|
||||
}
|
||||
|
||||
[DataContract]
|
||||
public sealed class InstanceRef
|
||||
{
|
||||
[DataMember(Name = "name")]
|
||||
public string Name { get; set; }
|
||||
[DataMember(Name = "instanceType")]
|
||||
public string InstanceType { get; set; }
|
||||
[DataMember(Name = "attempt")]
|
||||
public int Attempt { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ namespace GitHub.Runner.Common
|
||||
// job request
|
||||
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
|
||||
Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, CancellationToken cancellationToken);
|
||||
Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, CancellationToken cancellationToken);
|
||||
Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, Guid targetHostId, CancellationToken cancellationToken);
|
||||
|
||||
// agent package
|
||||
Task<List<PackageMetadata>> GetPackagesAsync(string packageType, string platform, int top, bool includeToken, CancellationToken cancellationToken);
|
||||
@@ -68,11 +68,23 @@ namespace GitHub.Runner.Common
|
||||
|
||||
public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
|
||||
{
|
||||
var createGenericConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
|
||||
// System.Console.WriteLine("RunnerServer.ConnectAsync: Create message connection");
|
||||
var createMessageConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(60));
|
||||
var createRequestConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(60));
|
||||
await Task.WhenAll(createMessageConnection);
|
||||
|
||||
await Task.WhenAll(createGenericConnection, createMessageConnection, createRequestConnection);
|
||||
// System.Console.WriteLine("RunnerServer.ConnectAsync: Create generic connection");
|
||||
var createGenericConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
|
||||
await Task.WhenAll(createGenericConnection);
|
||||
|
||||
// System.Console.WriteLine("RunnerServer.ConnectAsync: Create request connection");
|
||||
var createRequestConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(60));
|
||||
await Task.WhenAll(createRequestConnection);
|
||||
|
||||
// var createGenericConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
|
||||
// var createMessageConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(60));
|
||||
// var createRequestConnection = EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(60));
|
||||
|
||||
// await Task.WhenAll(createGenericConnection, createMessageConnection, createRequestConnection);
|
||||
|
||||
_genericConnection = await createGenericConnection;
|
||||
_messageConnection = await createMessageConnection;
|
||||
@@ -182,6 +194,8 @@ namespace GitHub.Runner.Common
|
||||
|
||||
private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
|
||||
{
|
||||
// System.Console.WriteLine("EstablishVssConnection");
|
||||
Trace.Info($"EstablishVssConnection");
|
||||
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
|
||||
int attemptCount = 5;
|
||||
while (attemptCount-- > 0)
|
||||
@@ -238,41 +252,48 @@ namespace GitHub.Runner.Common
|
||||
|
||||
public Task<List<TaskAgentPool>> GetAgentPoolsAsync(string agentPoolName = null, TaskAgentPoolType poolType = TaskAgentPoolType.Automation)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetAgentPoolsAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.GetAgentPoolsAsync(agentPoolName, poolType: poolType);
|
||||
}
|
||||
|
||||
public Task<TaskAgent> AddAgentAsync(Int32 agentPoolId, TaskAgent agent)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.AddAgentAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.AddAgentAsync(agentPoolId, agent);
|
||||
}
|
||||
|
||||
public Task<List<TaskAgent>> GetAgentsAsync(int agentPoolId, string agentName = null)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetAgentsAsync 1");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.GetAgentsAsync(agentPoolId, agentName, false);
|
||||
}
|
||||
|
||||
public Task<List<TaskAgent>> GetAgentsAsync(string agentName)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetAgentsAsync 2");
|
||||
return GetAgentsAsync(0, agentName); // search in all all agentPools
|
||||
}
|
||||
|
||||
public Task<TaskAgent> ReplaceAgentAsync(int agentPoolId, TaskAgent agent)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.ReplaceAgentAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.ReplaceAgentAsync(agentPoolId, agent);
|
||||
}
|
||||
|
||||
public Task DeleteAgentAsync(int agentPoolId, int agentId)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.DeleteAgentAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.DeleteAgentAsync(agentPoolId, agentId);
|
||||
}
|
||||
|
||||
public Task DeleteAgentAsync(int agentId)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.DeleteAgentAsync");
|
||||
return DeleteAgentAsync(0, agentId); // agentPool is ignored server side
|
||||
}
|
||||
|
||||
@@ -282,24 +303,28 @@ namespace GitHub.Runner.Common
|
||||
|
||||
public Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.CreateAgentSessionAsync");
|
||||
CheckConnection(RunnerConnectionType.MessageQueue);
|
||||
return _messageTaskAgentClient.CreateAgentSessionAsync(poolId, session, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.DeleteAgentMessageAsync");
|
||||
CheckConnection(RunnerConnectionType.MessageQueue);
|
||||
return _messageTaskAgentClient.DeleteMessageAsync(poolId, messageId, sessionId, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.DeleteAgentSessionAsync");
|
||||
CheckConnection(RunnerConnectionType.MessageQueue);
|
||||
return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetAgentMessageAsync");
|
||||
CheckConnection(RunnerConnectionType.MessageQueue);
|
||||
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, cancellationToken: cancellationToken);
|
||||
}
|
||||
@@ -310,18 +335,21 @@ namespace GitHub.Runner.Common
|
||||
|
||||
public Task<TaskAgentJobRequest> RenewAgentRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId = null, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.RenewAgentRequestAsync");
|
||||
CheckConnection(RunnerConnectionType.JobRequest);
|
||||
return _requestTaskAgentClient.RenewAgentRequestAsync(poolId, requestId, lockToken, orchestrationId: orchestrationId, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, CancellationToken cancellationToken = default(CancellationToken))
|
||||
public Task<TaskAgentJobRequest> FinishAgentRequestAsync(int poolId, long requestId, Guid lockToken, DateTime finishTime, TaskResult result, Guid targetHostId, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.FinishAgentRequestAsync");
|
||||
CheckConnection(RunnerConnectionType.JobRequest);
|
||||
return _requestTaskAgentClient.FinishAgentRequestAsync(poolId, requestId, lockToken, finishTime, result, cancellationToken: cancellationToken);
|
||||
return _requestTaskAgentClient.FinishAgentRequestAsync(poolId, requestId, lockToken, finishTime, result, targetHostId, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetAgentRequestAsync");
|
||||
CheckConnection(RunnerConnectionType.JobRequest);
|
||||
return _requestTaskAgentClient.GetAgentRequestAsync(poolId, requestId, cancellationToken: cancellationToken);
|
||||
}
|
||||
@@ -331,18 +359,21 @@ namespace GitHub.Runner.Common
|
||||
//-----------------------------------------------------------------
|
||||
public Task<List<PackageMetadata>> GetPackagesAsync(string packageType, string platform, int top, bool includeToken, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetPackagesAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.GetPackagesAsync(packageType, platform, top, includeToken, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task<PackageMetadata> GetPackageAsync(string packageType, string platform, string version, bool includeToken, CancellationToken cancellationToken)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.GetPackageAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.GetPackageAsync(packageType, platform, version, includeToken, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public Task<TaskAgent> UpdateAgentUpdateStateAsync(int agentPoolId, int agentId, string currentState, string trace)
|
||||
{
|
||||
// System.Console.WriteLine("RunnerServer.UpdateAgentUpdateStateAsync");
|
||||
CheckConnection(RunnerConnectionType.Generic);
|
||||
return _genericTaskAgentClient.UpdateAgentUpdateStateAsync(agentPoolId, agentId, currentState, trace);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user