mirror of
https://github.com/actions/runner.git
synced 2025-12-10 04:06:57 +00:00
Compare commits
3 Commits
b39c237989
...
users/eric
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4c63ec012 | ||
|
|
d13cf5d9a8 | ||
|
|
d124b0104e |
@@ -22,6 +22,8 @@ namespace GitHub.Runner.Common
|
||||
|
||||
Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token);
|
||||
|
||||
Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken);
|
||||
|
||||
Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials);
|
||||
|
||||
Task ForceRefreshConnection(VssCredentials credentials);
|
||||
@@ -75,6 +77,12 @@ namespace GitHub.Runner.Common
|
||||
await _brokerHttpClient.DeleteSessionAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public async Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken)
|
||||
{
|
||||
CheckConnection();
|
||||
await _brokerHttpClient.DeleteRunnerMessageAsync(sessionId, jobMessageKey, cancellationToken);
|
||||
}
|
||||
|
||||
public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials)
|
||||
{
|
||||
if (_brokerUri != serverUri || !_hasConnection)
|
||||
|
||||
@@ -62,7 +62,9 @@ namespace GitHub.Runner.Common
|
||||
CheckConnection();
|
||||
return RetryRequest<AgentJobRequestMessage>(
|
||||
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, cancellationToken), cancellationToken,
|
||||
shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException);
|
||||
shouldRetry: ex => ex is not TaskOrchestrationJobNotFoundException &&
|
||||
ex is not TaskOrchestrationJobAlreadyAcquiredException &&
|
||||
ex is not TaskOrchestrationJobUnprocessableException);
|
||||
}
|
||||
|
||||
public Task CompleteJobAsync(
|
||||
|
||||
@@ -314,7 +314,30 @@ namespace GitHub.Runner.Listener
|
||||
|
||||
public async Task DeleteMessageAsync(TaskAgentMessage message)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
Trace.Entering();
|
||||
ArgUtil.NotNull(_session, nameof(_session));
|
||||
|
||||
if (message == null || _session.SessionId == Guid.Empty)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var jobMessageKey = "";
|
||||
if (MessageUtil.IsRunServiceJob(message.MessageType))
|
||||
{
|
||||
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
|
||||
jobMessageKey = messageRef.RunnerRequestId;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Broker currently doesn't support delete for other message types
|
||||
return;
|
||||
}
|
||||
|
||||
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
|
||||
{
|
||||
await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, jobMessageKey, cs.Token);
|
||||
}
|
||||
}
|
||||
|
||||
private bool IsGetNextMessageExceptionRetriable(Exception ex)
|
||||
|
||||
@@ -397,11 +397,19 @@ namespace GitHub.Runner.Listener
|
||||
if (message != null && _session.SessionId != Guid.Empty)
|
||||
{
|
||||
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
|
||||
{
|
||||
if (MessageUtil.IsRunServiceJob(message.MessageType))
|
||||
{
|
||||
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
|
||||
await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, messageRef.RunnerRequestId, cs.Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
await _runnerServer.DeleteAgentMessageAsync(_settings.PoolId, message.MessageId, _session.SessionId, cs.Token);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task RefreshListenerTokenAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
|
||||
@@ -544,6 +544,8 @@ namespace GitHub.Runner.Listener
|
||||
}
|
||||
else
|
||||
{
|
||||
skipMessageDeletion = true;
|
||||
|
||||
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
|
||||
Pipelines.AgentJobRequestMessage jobRequestMessage = null;
|
||||
|
||||
@@ -563,13 +565,15 @@ namespace GitHub.Runner.Listener
|
||||
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
|
||||
try
|
||||
{
|
||||
jobRequestMessage =
|
||||
await runServer.GetJobMessageAsync(messageRef.RunnerRequestId,
|
||||
messageQueueLoopTokenSource.Token);
|
||||
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
|
||||
}
|
||||
catch (TaskOrchestrationJobAlreadyAcquiredException)
|
||||
catch (Exception ex) when (
|
||||
ex is TaskOrchestrationJobNotFoundException ||
|
||||
ex is TaskOrchestrationJobAlreadyAcquiredException ||
|
||||
ex is TaskOrchestrationJobUnprocessableException)
|
||||
{
|
||||
Trace.Info("Job is already acquired, skip this message.");
|
||||
Trace.Error($"Skipping job: {ex.Message}");
|
||||
skipMessageDeletion = false;
|
||||
continue;
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
||||
@@ -1539,6 +1539,26 @@ namespace GitHub.DistributedTask.WebApi
|
||||
}
|
||||
}
|
||||
|
||||
[Serializable]
|
||||
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationJobUnprocessableException", "GitHub.DistributedTask.WebApi.TaskOrchestrationJobUnprocessableException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
|
||||
public sealed class TaskOrchestrationJobUnprocessableException : DistributedTaskException
|
||||
{
|
||||
public TaskOrchestrationJobUnprocessableException(String message)
|
||||
: base(message)
|
||||
{
|
||||
}
|
||||
|
||||
public TaskOrchestrationJobUnprocessableException(String message, Exception innerException)
|
||||
: base(message, innerException)
|
||||
{
|
||||
}
|
||||
|
||||
private TaskOrchestrationJobUnprocessableException(SerializationInfo info, StreamingContext context)
|
||||
: base(info, context)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
[Serializable]
|
||||
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationPlanSecurityException", "GitHub.DistributedTask.WebApi.TaskOrchestrationPlanSecurityException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
|
||||
public sealed class TaskOrchestrationPlanSecurityException : DistributedTaskException
|
||||
|
||||
17
src/Sdk/RSWebApi/Contracts/RunServiceError.cs
Normal file
17
src/Sdk/RSWebApi/Contracts/RunServiceError.cs
Normal file
@@ -0,0 +1,17 @@
|
||||
using System.Runtime.Serialization;
|
||||
|
||||
namespace GitHub.Actions.RunService.WebApi
|
||||
{
|
||||
[DataContract]
|
||||
public class RunServiceError
|
||||
{
|
||||
[DataMember(Name = "source", EmitDefaultValue = false)]
|
||||
public string Source { get; set; }
|
||||
|
||||
[DataMember(Name = "statusCode", EmitDefaultValue = false)]
|
||||
public int StatusCode { get; set; }
|
||||
|
||||
[DataMember(Name = "errorMessage", EmitDefaultValue = false)]
|
||||
public string ErrorMessage { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -86,6 +86,7 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
httpMethod,
|
||||
requestUri: requestUri,
|
||||
content: requestContent,
|
||||
readErrorContent: true,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
if (result.IsSuccess)
|
||||
@@ -93,13 +94,34 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
return result.Value;
|
||||
}
|
||||
|
||||
if (TryParseErrorContent(result.ErrorContent, out RunServiceError error))
|
||||
{
|
||||
switch ((HttpStatusCode)error.StatusCode)
|
||||
{
|
||||
case HttpStatusCode.NotFound:
|
||||
throw new TaskOrchestrationJobNotFoundException($"Job message not found '{messageId}'. {error.ErrorMessage}");
|
||||
case HttpStatusCode.Conflict:
|
||||
throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired '{messageId}'. {error.ErrorMessage}");
|
||||
case HttpStatusCode.UnprocessableEntity:
|
||||
throw new TaskOrchestrationJobUnprocessableException($"Unprocessable job '{messageId}'. {error.ErrorMessage}");
|
||||
}
|
||||
}
|
||||
|
||||
// Temporary back compat
|
||||
switch (result.StatusCode)
|
||||
{
|
||||
case HttpStatusCode.NotFound:
|
||||
throw new TaskOrchestrationJobNotFoundException($"Job message not found: {messageId}");
|
||||
case HttpStatusCode.Conflict:
|
||||
throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired: {messageId}");
|
||||
default:
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(result.ErrorContent))
|
||||
{
|
||||
throw new Exception($"Failed to get job message: {result.Error}. {result.ErrorContent}");
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new Exception($"Failed to get job message: {result.Error}");
|
||||
}
|
||||
}
|
||||
@@ -190,5 +212,26 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
|
||||
return JsonConvert.DeserializeObject<T>(json, s_serializerSettings);
|
||||
}
|
||||
|
||||
private static bool TryParseErrorContent(string errorContent, out RunServiceError error)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(errorContent))
|
||||
{
|
||||
try
|
||||
{
|
||||
error = JsonUtility.FromString<RunServiceError>(errorContent);
|
||||
if (error?.Source == "actions-run-service")
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
error = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,8 +63,7 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
string os = null,
|
||||
string architecture = null,
|
||||
bool? disableUpdate = null,
|
||||
CancellationToken cancellationToken = default
|
||||
)
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var requestUri = new Uri(Client.BaseAddress, "message");
|
||||
|
||||
@@ -123,8 +122,42 @@ namespace GitHub.Actions.RunService.WebApi
|
||||
throw new Exception($"Failed to get job message: {result.Error}");
|
||||
}
|
||||
|
||||
public async Task<TaskAgentSession> CreateSessionAsync(
|
||||
public async Task DeleteRunnerMessageAsync(
|
||||
Guid? sessionId,
|
||||
string jobMessageKey,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var requestUri = new Uri(Client.BaseAddress, "message");
|
||||
|
||||
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
|
||||
|
||||
if (sessionId != null)
|
||||
{
|
||||
queryParams.Add("sessionId", sessionId.Value.ToString());
|
||||
}
|
||||
|
||||
if (!string.IsNullOrEmpty(jobMessageKey))
|
||||
{
|
||||
queryParams.Add("jobMessageKey", jobMessageKey);
|
||||
}
|
||||
|
||||
queryParams.Add("status", TaskAgentStatus.Online.ToString());
|
||||
|
||||
var result = await SendAsync<object>(
|
||||
new HttpMethod("DELETE"),
|
||||
requestUri: requestUri,
|
||||
queryParameters: queryParams,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
if (result.IsSuccess)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
throw new Exception($"Failed to get job message: StatusCode={result.StatusCode} Error={result.Error}");
|
||||
}
|
||||
|
||||
public async Task<TaskAgentSession> CreateSessionAsync(
|
||||
TaskAgentSession session,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
|
||||
@@ -106,10 +106,11 @@ namespace Sdk.WebApi.WebApi
|
||||
Uri requestUri,
|
||||
HttpContent content = null,
|
||||
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
|
||||
Boolean readErrorContent = false,
|
||||
Object userState = null,
|
||||
CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
return SendAsync<T>(method, null, requestUri, content, queryParameters, userState, cancellationToken);
|
||||
return SendAsync<T>(method, null, requestUri, content, queryParameters, readErrorContent, userState, cancellationToken);
|
||||
}
|
||||
|
||||
protected async Task<RawHttpClientResult<T>> SendAsync<T>(
|
||||
@@ -118,18 +119,20 @@ namespace Sdk.WebApi.WebApi
|
||||
Uri requestUri,
|
||||
HttpContent content = null,
|
||||
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
|
||||
Boolean readErrorContent = false,
|
||||
Object userState = null,
|
||||
CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
using (VssTraceActivity.GetOrCreate().EnterCorrelationScope())
|
||||
using (HttpRequestMessage requestMessage = CreateRequestMessage(method, additionalHeaders, requestUri, content, queryParameters))
|
||||
{
|
||||
return await SendAsync<T>(requestMessage, userState, cancellationToken).ConfigureAwait(false);
|
||||
return await SendAsync<T>(requestMessage, readErrorContent, userState, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
protected async Task<RawHttpClientResult<T>> SendAsync<T>(
|
||||
HttpRequestMessage message,
|
||||
Boolean readErrorContent = false,
|
||||
Object userState = null,
|
||||
CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
@@ -145,8 +148,14 @@ namespace Sdk.WebApi.WebApi
|
||||
}
|
||||
else
|
||||
{
|
||||
var errorContent = default(string);
|
||||
if (readErrorContent)
|
||||
{
|
||||
errorContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
string errorMessage = $"Error: {response.ReasonPhrase}";
|
||||
return RawHttpClientResult<T>.Fail(errorMessage, response.StatusCode);
|
||||
return RawHttpClientResult<T>.Fail(errorMessage, response.StatusCode, errorContent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,15 +5,27 @@ namespace Sdk.WebApi.WebApi
|
||||
public class RawHttpClientResult
|
||||
{
|
||||
public bool IsSuccess { get; protected set; }
|
||||
|
||||
/// <summary>
|
||||
/// A description of the HTTP status code, like "Error: Unprocessable Entity"
|
||||
/// </summary>
|
||||
public string Error { get; protected set; }
|
||||
|
||||
/// <summary>
|
||||
/// The raw of the HTTP response, for unsuccessful HTTP status codes
|
||||
/// </summary>
|
||||
public string ErrorContent { get; protected set; }
|
||||
|
||||
public HttpStatusCode StatusCode { get; protected set; }
|
||||
|
||||
public bool IsFailure => !IsSuccess;
|
||||
|
||||
protected RawHttpClientResult(bool isSuccess, string error, HttpStatusCode statusCode)
|
||||
protected RawHttpClientResult(bool isSuccess, string error, HttpStatusCode statusCode, string errorContent = null)
|
||||
{
|
||||
IsSuccess = isSuccess;
|
||||
Error = error;
|
||||
StatusCode = statusCode;
|
||||
ErrorContent = errorContent;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,13 +33,13 @@ namespace Sdk.WebApi.WebApi
|
||||
{
|
||||
public T Value { get; private set; }
|
||||
|
||||
protected internal RawHttpClientResult(T value, bool isSuccess, string error, HttpStatusCode statusCode)
|
||||
: base(isSuccess, error, statusCode)
|
||||
protected internal RawHttpClientResult(T value, bool isSuccess, string error, HttpStatusCode statusCode, string errorContent)
|
||||
: base(isSuccess, error, statusCode, errorContent)
|
||||
{
|
||||
Value = value;
|
||||
}
|
||||
|
||||
public static RawHttpClientResult<T> Fail(string message, HttpStatusCode statusCode) => new RawHttpClientResult<T>(default(T), false, message, statusCode);
|
||||
public static RawHttpClientResult<T> Ok(T value) => new RawHttpClientResult<T>(value, true, string.Empty, HttpStatusCode.OK);
|
||||
public static RawHttpClientResult<T> Fail(string message, HttpStatusCode statusCode, string errorContent) => new RawHttpClientResult<T>(default(T), false, message, statusCode, errorContent);
|
||||
public static RawHttpClientResult<T> Ok(T value) => new RawHttpClientResult<T>(value, true, string.Empty, HttpStatusCode.OK, null);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user