Compare commits

...

3 Commits

Author SHA1 Message Date
eric sciple
a4c63ec012 . 2024-06-11 12:48:51 -07:00
eric sciple
d13cf5d9a8 . 2024-06-10 21:38:17 -07:00
eric sciple
d124b0104e . 2024-06-07 13:54:33 -07:00
11 changed files with 200 additions and 21 deletions

View File

@@ -22,6 +22,8 @@ namespace GitHub.Runner.Common
Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token); Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token);
Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken);
Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials); Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials);
Task ForceRefreshConnection(VssCredentials credentials); Task ForceRefreshConnection(VssCredentials credentials);
@@ -75,6 +77,12 @@ namespace GitHub.Runner.Common
await _brokerHttpClient.DeleteSessionAsync(cancellationToken); await _brokerHttpClient.DeleteSessionAsync(cancellationToken);
} }
public async Task DeleteRunnerMessageAsync(Guid sessionId, string jobMessageKey, CancellationToken cancellationToken)
{
CheckConnection();
await _brokerHttpClient.DeleteRunnerMessageAsync(sessionId, jobMessageKey, cancellationToken);
}
public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials) public Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials)
{ {
if (_brokerUri != serverUri || !_hasConnection) if (_brokerUri != serverUri || !_hasConnection)

View File

@@ -62,7 +62,9 @@ namespace GitHub.Runner.Common
CheckConnection(); CheckConnection();
return RetryRequest<AgentJobRequestMessage>( return RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, cancellationToken), cancellationToken, async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, cancellationToken), cancellationToken,
shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException); shouldRetry: ex => ex is not TaskOrchestrationJobNotFoundException &&
ex is not TaskOrchestrationJobAlreadyAcquiredException &&
ex is not TaskOrchestrationJobUnprocessableException);
} }
public Task CompleteJobAsync( public Task CompleteJobAsync(

View File

@@ -314,7 +314,30 @@ namespace GitHub.Runner.Listener
public async Task DeleteMessageAsync(TaskAgentMessage message) public async Task DeleteMessageAsync(TaskAgentMessage message)
{ {
await Task.CompletedTask; Trace.Entering();
ArgUtil.NotNull(_session, nameof(_session));
if (message == null || _session.SessionId == Guid.Empty)
{
return;
}
var jobMessageKey = "";
if (MessageUtil.IsRunServiceJob(message.MessageType))
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
jobMessageKey = messageRef.RunnerRequestId;
}
else
{
// Broker currently doesn't support delete for other message types
return;
}
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, jobMessageKey, cs.Token);
}
} }
private bool IsGetNextMessageExceptionRetriable(Exception ex) private bool IsGetNextMessageExceptionRetriable(Exception ex)

View File

@@ -398,7 +398,15 @@ namespace GitHub.Runner.Listener
{ {
using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30))) using (var cs = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{ {
await _runnerServer.DeleteAgentMessageAsync(_settings.PoolId, message.MessageId, _session.SessionId, cs.Token); if (MessageUtil.IsRunServiceJob(message.MessageType))
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
await _brokerServer.DeleteRunnerMessageAsync(_session.SessionId, messageRef.RunnerRequestId, cs.Token);
}
else
{
await _runnerServer.DeleteAgentMessageAsync(_settings.PoolId, message.MessageId, _session.SessionId, cs.Token);
}
} }
} }
} }

View File

@@ -544,6 +544,8 @@ namespace GitHub.Runner.Listener
} }
else else
{ {
skipMessageDeletion = true;
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body); var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null; Pipelines.AgentJobRequestMessage jobRequestMessage = null;
@@ -563,13 +565,15 @@ namespace GitHub.Runner.Listener
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
try try
{ {
jobRequestMessage = jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
await runServer.GetJobMessageAsync(messageRef.RunnerRequestId,
messageQueueLoopTokenSource.Token);
} }
catch (TaskOrchestrationJobAlreadyAcquiredException) catch (Exception ex) when (
ex is TaskOrchestrationJobNotFoundException ||
ex is TaskOrchestrationJobAlreadyAcquiredException ||
ex is TaskOrchestrationJobUnprocessableException)
{ {
Trace.Info("Job is already acquired, skip this message."); Trace.Error($"Skipping job: {ex.Message}");
skipMessageDeletion = false;
continue; continue;
} }
catch (Exception ex) catch (Exception ex)

View File

@@ -1539,6 +1539,26 @@ namespace GitHub.DistributedTask.WebApi
} }
} }
[Serializable]
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationJobUnprocessableException", "GitHub.DistributedTask.WebApi.TaskOrchestrationJobUnprocessableException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
public sealed class TaskOrchestrationJobUnprocessableException : DistributedTaskException
{
public TaskOrchestrationJobUnprocessableException(String message)
: base(message)
{
}
public TaskOrchestrationJobUnprocessableException(String message, Exception innerException)
: base(message, innerException)
{
}
private TaskOrchestrationJobUnprocessableException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
[Serializable] [Serializable]
[ExceptionMapping("0.0", "3.0", "TaskOrchestrationPlanSecurityException", "GitHub.DistributedTask.WebApi.TaskOrchestrationPlanSecurityException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")] [ExceptionMapping("0.0", "3.0", "TaskOrchestrationPlanSecurityException", "GitHub.DistributedTask.WebApi.TaskOrchestrationPlanSecurityException, GitHub.DistributedTask.WebApi, Version=14.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a")]
public sealed class TaskOrchestrationPlanSecurityException : DistributedTaskException public sealed class TaskOrchestrationPlanSecurityException : DistributedTaskException

View File

@@ -0,0 +1,17 @@
using System.Runtime.Serialization;
namespace GitHub.Actions.RunService.WebApi
{
[DataContract]
public class RunServiceError
{
[DataMember(Name = "source", EmitDefaultValue = false)]
public string Source { get; set; }
[DataMember(Name = "statusCode", EmitDefaultValue = false)]
public int StatusCode { get; set; }
[DataMember(Name = "errorMessage", EmitDefaultValue = false)]
public string ErrorMessage { get; set; }
}
}

View File

@@ -86,6 +86,7 @@ namespace GitHub.Actions.RunService.WebApi
httpMethod, httpMethod,
requestUri: requestUri, requestUri: requestUri,
content: requestContent, content: requestContent,
readErrorContent: true,
cancellationToken: cancellationToken); cancellationToken: cancellationToken);
if (result.IsSuccess) if (result.IsSuccess)
@@ -93,14 +94,35 @@ namespace GitHub.Actions.RunService.WebApi
return result.Value; return result.Value;
} }
if (TryParseErrorContent(result.ErrorContent, out RunServiceError error))
{
switch ((HttpStatusCode)error.StatusCode)
{
case HttpStatusCode.NotFound:
throw new TaskOrchestrationJobNotFoundException($"Job message not found '{messageId}'. {error.ErrorMessage}");
case HttpStatusCode.Conflict:
throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired '{messageId}'. {error.ErrorMessage}");
case HttpStatusCode.UnprocessableEntity:
throw new TaskOrchestrationJobUnprocessableException($"Unprocessable job '{messageId}'. {error.ErrorMessage}");
}
}
// Temporary back compat
switch (result.StatusCode) switch (result.StatusCode)
{ {
case HttpStatusCode.NotFound: case HttpStatusCode.NotFound:
throw new TaskOrchestrationJobNotFoundException($"Job message not found: {messageId}"); throw new TaskOrchestrationJobNotFoundException($"Job message not found: {messageId}");
case HttpStatusCode.Conflict: case HttpStatusCode.Conflict:
throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired: {messageId}"); throw new TaskOrchestrationJobAlreadyAcquiredException($"Job message already acquired: {messageId}");
default: }
throw new Exception($"Failed to get job message: {result.Error}");
if (!string.IsNullOrEmpty(result.ErrorContent))
{
throw new Exception($"Failed to get job message: {result.Error}. {result.ErrorContent}");
}
else
{
throw new Exception($"Failed to get job message: {result.Error}");
} }
} }
@@ -190,5 +212,26 @@ namespace GitHub.Actions.RunService.WebApi
var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
return JsonConvert.DeserializeObject<T>(json, s_serializerSettings); return JsonConvert.DeserializeObject<T>(json, s_serializerSettings);
} }
private static bool TryParseErrorContent(string errorContent, out RunServiceError error)
{
if (!string.IsNullOrEmpty(errorContent))
{
try
{
error = JsonUtility.FromString<RunServiceError>(errorContent);
if (error?.Source == "actions-run-service")
{
return true;
}
}
catch (Exception)
{
}
}
error = null;
return false;
}
} }
} }

View File

@@ -63,8 +63,7 @@ namespace GitHub.Actions.RunService.WebApi
string os = null, string os = null,
string architecture = null, string architecture = null,
bool? disableUpdate = null, bool? disableUpdate = null,
CancellationToken cancellationToken = default CancellationToken cancellationToken = default)
)
{ {
var requestUri = new Uri(Client.BaseAddress, "message"); var requestUri = new Uri(Client.BaseAddress, "message");
@@ -123,8 +122,42 @@ namespace GitHub.Actions.RunService.WebApi
throw new Exception($"Failed to get job message: {result.Error}"); throw new Exception($"Failed to get job message: {result.Error}");
} }
public async Task<TaskAgentSession> CreateSessionAsync( public async Task DeleteRunnerMessageAsync(
Guid? sessionId,
string jobMessageKey,
CancellationToken cancellationToken = default)
{
var requestUri = new Uri(Client.BaseAddress, "message");
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}
if (!string.IsNullOrEmpty(jobMessageKey))
{
queryParams.Add("jobMessageKey", jobMessageKey);
}
queryParams.Add("status", TaskAgentStatus.Online.ToString());
var result = await SendAsync<object>(
new HttpMethod("DELETE"),
requestUri: requestUri,
queryParameters: queryParams,
cancellationToken: cancellationToken);
if (result.IsSuccess)
{
return;
}
throw new Exception($"Failed to get job message: StatusCode={result.StatusCode} Error={result.Error}");
}
public async Task<TaskAgentSession> CreateSessionAsync(
TaskAgentSession session, TaskAgentSession session,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {

View File

@@ -106,10 +106,11 @@ namespace Sdk.WebApi.WebApi
Uri requestUri, Uri requestUri,
HttpContent content = null, HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null, IEnumerable<KeyValuePair<String, String>> queryParameters = null,
Boolean readErrorContent = false,
Object userState = null, Object userState = null,
CancellationToken cancellationToken = default(CancellationToken)) CancellationToken cancellationToken = default(CancellationToken))
{ {
return SendAsync<T>(method, null, requestUri, content, queryParameters, userState, cancellationToken); return SendAsync<T>(method, null, requestUri, content, queryParameters, readErrorContent, userState, cancellationToken);
} }
protected async Task<RawHttpClientResult<T>> SendAsync<T>( protected async Task<RawHttpClientResult<T>> SendAsync<T>(
@@ -118,18 +119,20 @@ namespace Sdk.WebApi.WebApi
Uri requestUri, Uri requestUri,
HttpContent content = null, HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null, IEnumerable<KeyValuePair<String, String>> queryParameters = null,
Boolean readErrorContent = false,
Object userState = null, Object userState = null,
CancellationToken cancellationToken = default(CancellationToken)) CancellationToken cancellationToken = default(CancellationToken))
{ {
using (VssTraceActivity.GetOrCreate().EnterCorrelationScope()) using (VssTraceActivity.GetOrCreate().EnterCorrelationScope())
using (HttpRequestMessage requestMessage = CreateRequestMessage(method, additionalHeaders, requestUri, content, queryParameters)) using (HttpRequestMessage requestMessage = CreateRequestMessage(method, additionalHeaders, requestUri, content, queryParameters))
{ {
return await SendAsync<T>(requestMessage, userState, cancellationToken).ConfigureAwait(false); return await SendAsync<T>(requestMessage, readErrorContent, userState, cancellationToken).ConfigureAwait(false);
} }
} }
protected async Task<RawHttpClientResult<T>> SendAsync<T>( protected async Task<RawHttpClientResult<T>> SendAsync<T>(
HttpRequestMessage message, HttpRequestMessage message,
Boolean readErrorContent = false,
Object userState = null, Object userState = null,
CancellationToken cancellationToken = default(CancellationToken)) CancellationToken cancellationToken = default(CancellationToken))
{ {
@@ -145,8 +148,14 @@ namespace Sdk.WebApi.WebApi
} }
else else
{ {
var errorContent = default(string);
if (readErrorContent)
{
errorContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
}
string errorMessage = $"Error: {response.ReasonPhrase}"; string errorMessage = $"Error: {response.ReasonPhrase}";
return RawHttpClientResult<T>.Fail(errorMessage, response.StatusCode); return RawHttpClientResult<T>.Fail(errorMessage, response.StatusCode, errorContent);
} }
} }
} }

View File

@@ -5,15 +5,27 @@ namespace Sdk.WebApi.WebApi
public class RawHttpClientResult public class RawHttpClientResult
{ {
public bool IsSuccess { get; protected set; } public bool IsSuccess { get; protected set; }
/// <summary>
/// A description of the HTTP status code, like "Error: Unprocessable Entity"
/// </summary>
public string Error { get; protected set; } public string Error { get; protected set; }
/// <summary>
/// The raw of the HTTP response, for unsuccessful HTTP status codes
/// </summary>
public string ErrorContent { get; protected set; }
public HttpStatusCode StatusCode { get; protected set; } public HttpStatusCode StatusCode { get; protected set; }
public bool IsFailure => !IsSuccess; public bool IsFailure => !IsSuccess;
protected RawHttpClientResult(bool isSuccess, string error, HttpStatusCode statusCode) protected RawHttpClientResult(bool isSuccess, string error, HttpStatusCode statusCode, string errorContent = null)
{ {
IsSuccess = isSuccess; IsSuccess = isSuccess;
Error = error; Error = error;
StatusCode = statusCode; StatusCode = statusCode;
ErrorContent = errorContent;
} }
} }
@@ -21,13 +33,13 @@ namespace Sdk.WebApi.WebApi
{ {
public T Value { get; private set; } public T Value { get; private set; }
protected internal RawHttpClientResult(T value, bool isSuccess, string error, HttpStatusCode statusCode) protected internal RawHttpClientResult(T value, bool isSuccess, string error, HttpStatusCode statusCode, string errorContent)
: base(isSuccess, error, statusCode) : base(isSuccess, error, statusCode, errorContent)
{ {
Value = value; Value = value;
} }
public static RawHttpClientResult<T> Fail(string message, HttpStatusCode statusCode) => new RawHttpClientResult<T>(default(T), false, message, statusCode); public static RawHttpClientResult<T> Fail(string message, HttpStatusCode statusCode, string errorContent) => new RawHttpClientResult<T>(default(T), false, message, statusCode, errorContent);
public static RawHttpClientResult<T> Ok(T value) => new RawHttpClientResult<T>(value, true, string.Empty, HttpStatusCode.OK); public static RawHttpClientResult<T> Ok(T value) => new RawHttpClientResult<T>(value, true, string.Empty, HttpStatusCode.OK, null);
} }
} }