changes to support specific run service URL (#2158)

* changes to support run service url

* feedback
This commit is contained in:
Yashwanth Anantharaju
2022-10-06 10:28:32 -04:00
committed by GitHub
parent b6a46f2114
commit 252f4de577
15 changed files with 1408 additions and 96 deletions

View File

@@ -0,0 +1,51 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
namespace GitHub.Runner.Common
{
[ServiceLocator(Default = typeof(ActionsRunServer))]
public interface IActionsRunServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);
Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);
}
public sealed class ActionsRunServer : RunnerService, IActionsRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;
public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
_hasConnection = true;
}
private void CheckConnection()
{
if (!_hasConnection)
{
throw new InvalidOperationException($"SetConnection");
}
}
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);
return jobMessage;
}
}
}

View File

@@ -1,4 +1,4 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
@@ -6,6 +6,7 @@ using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
{
@@ -20,42 +21,19 @@ namespace GitHub.Runner.Common
public sealed class RunServer : RunnerService, IRunServer
{
private bool _hasConnection;
private VssConnection _connection;
private TaskAgentHttpClient _taskAgentClient;
private Uri requestUri;
private RawConnection _connection;
private RunServiceHttpClient _runServiceHttpClient;
public async Task ConnectAsync(Uri serverUrl, VssCredentials credentials)
public async Task ConnectAsync(Uri serverUri, VssCredentials credentials)
{
_connection = await EstablishVssConnection(serverUrl, credentials, TimeSpan.FromSeconds(100));
_taskAgentClient = _connection.GetClient<TaskAgentHttpClient>();
requestUri = serverUri;
_connection = VssUtil.CreateRawConnection(new Uri(serverUri.Authority), credentials);
_runServiceHttpClient = await _connection.GetClientAsync<RunServiceHttpClient>();
_hasConnection = true;
}
private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);
await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}
private void CheckConnection()
{
if (!_hasConnection)
@@ -67,37 +45,15 @@ namespace GitHub.Runner.Common
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
{
CheckConnection();
var jobMessage = RetryRequest<AgentJobRequestMessage>(async () =>
{
return await _taskAgentClient.GetJobMessageAsync(id, cancellationToken);
}, cancellationToken);
var jobMessage = RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, cancellationToken), cancellationToken);
if (jobMessage == null)
{
throw new TaskOrchestrationJobNotFoundException(id);
}
return jobMessage;
}
private async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}
}
}
}

View File

@@ -179,31 +179,6 @@ namespace GitHub.Runner.Common
}
}
private async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);
await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}
private void CheckConnection(RunnerConnectionType connectionType)
{
switch (connectionType)

View File

@@ -1,4 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common
{
@@ -21,9 +27,9 @@ namespace GitHub.Runner.Common
protected IHostContext HostContext { get; private set; }
protected Tracing Trace { get; private set; }
public string TraceName
public string TraceName
{
get
get
{
return GetType().Name;
}
@@ -35,5 +41,57 @@ namespace GitHub.Runner.Common
Trace = HostContext.GetTrace(TraceName);
Trace.Entering();
}
protected async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCredentials credentials, TimeSpan timeout)
{
Trace.Info($"EstablishVssConnection");
Trace.Info($"Establish connection with {timeout.TotalSeconds} seconds timeout.");
int attemptCount = 5;
while (attemptCount-- > 0)
{
var connection = VssUtil.CreateConnection(serverUrl, credentials, timeout: timeout);
try
{
await connection.ConnectAsync();
return connection;
}
catch (Exception ex) when (attemptCount > 0)
{
Trace.Info($"Catch exception during connect. {attemptCount} attempt left.");
Trace.Error(ex);
await HostContext.Delay(TimeSpan.FromMilliseconds(100), CancellationToken.None);
}
}
// should never reach here.
throw new InvalidOperationException(nameof(EstablishVssConnection));
}
protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5
)
{
var retryCount = 0;
while (true)
{
retryCount++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount)
{
Trace.Error("Catch exception during get full job message");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}
}
}
}

View File

@@ -496,16 +496,26 @@ namespace GitHub.Runner.Listener
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;
// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials();
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
var jobMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
jobDispatcher.Run(jobMessage, runOnce);
jobDispatcher.Run(jobRequestMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");

View File

@@ -9,5 +9,7 @@ namespace GitHub.Runner.Listener
public string Id { get; set; }
[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }
[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }
}
}
}

View File

@@ -9,6 +9,7 @@ using GitHub.Services.OAuth;
using System.Net.Http.Headers;
using System.Runtime.InteropServices;
using System.Net;
using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Sdk
{
@@ -34,7 +35,11 @@ namespace GitHub.Runner.Sdk
}
}
public static VssConnection CreateConnection(Uri serverUri, VssCredentials credentials, IEnumerable<DelegatingHandler> additionalDelegatingHandler = null, TimeSpan? timeout = null)
public static VssConnection CreateConnection(
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
VssClientHttpRequestSettings settings = VssClientHttpRequestSettings.Default.Clone();
@@ -75,6 +80,46 @@ namespace GitHub.Runner.Sdk
return connection;
}
public static RawConnection CreateRawConnection(
Uri serverUri,
VssCredentials credentials,
IEnumerable<DelegatingHandler> additionalDelegatingHandler = null,
TimeSpan? timeout = null)
{
RawClientHttpRequestSettings settings = RawClientHttpRequestSettings.Default.Clone();
int maxRetryRequest;
if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_RETRY") ?? string.Empty, out maxRetryRequest))
{
maxRetryRequest = 3;
}
// make sure MaxRetryRequest in range [3, 10]
settings.MaxRetryRequest = Math.Min(Math.Max(maxRetryRequest, 3), 10);
if (!int.TryParse(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_HTTP_TIMEOUT") ?? string.Empty, out int httpRequestTimeoutSeconds))
{
settings.SendTimeout = timeout ?? TimeSpan.FromSeconds(100);
}
else
{
// prefer environment variable
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
}
// Remove Invariant from the list of accepted languages.
//
// The constructor of VssHttpRequestSettings (base class of VssClientHttpRequestSettings) adds the current
// UI culture to the list of accepted languages. The UI culture will be Invariant on OSX/Linux when the
// LANG environment variable is not set when the program starts. If Invariant is in the list of accepted
// languages, then "System.ArgumentException: The value cannot be null or empty." will be thrown when the
// settings are applied to an HttpRequestMessage.
settings.AcceptLanguages.Remove(CultureInfo.InvariantCulture);
RawConnection connection = new RawConnection(serverUri, new RawHttpMessageHandler(credentials.ToOAuthCredentials(), settings), additionalDelegatingHandler);
return connection;
}
public static VssCredentials GetVssCredential(ServiceEndpoint serviceEndpoint)
{
ArgUtil.NotNull(serviceEndpoint, nameof(serviceEndpoint));

View File

@@ -0,0 +1,20 @@
using GitHub.Services.OAuth;
namespace GitHub.Services.Common
{
public static class VssCredentialsExtension
{
public static VssOAuthCredential ToOAuthCredentials(
this VssCredentials credentials)
{
if (credentials.Federated.CredentialType == VssCredentialsType.OAuth)
{
return credentials.Federated as VssOAuthCredential;
}
else
{
return null;
}
}
}
}

View File

@@ -0,0 +1,194 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Globalization;
using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using GitHub.Services.WebApi.Utilities.Internal;
namespace GitHub.Services.Common
{
public class RawClientHttpRequestSettings
{
/// <summary>
/// Timespan to wait before timing out a request. Defaults to 100 seconds
/// </summary>
public TimeSpan SendTimeout
{
get;
set;
}
/// <summary>
/// User-Agent header passed along in the request,
/// For multiple values, the order in the list is the order
/// in which they will appear in the header
/// </summary>
public List<ProductInfoHeaderValue> UserAgent
{
get;
set;
}
/// <summary>
/// The name of the culture is passed in the Accept-Language header
/// </summary>
public ICollection<CultureInfo> AcceptLanguages
{
get
{
return m_acceptLanguages;
}
}
/// <summary>
/// A unique identifier for the user session
/// </summary>
public Guid SessionId
{
get;
set;
}
/// <summary>
/// Optional implementation used to validate server certificate validation
/// </summary>
public Func<HttpRequestMessage, X509Certificate2, X509Chain, SslPolicyErrors, bool> ServerCertificateValidationCallback
{
get;
set;
}
/// <summary>
/// Number of times to retry a request that has an ambient failure
/// </summary>
/// <remarks>
/// This property is only used by RawConnection, so only relevant on the client
/// </remarks>
[DefaultValue(c_defaultMaxRetry)]
public Int32 MaxRetryRequest
{
get;
set;
}
/// <summary>
/// Gets the property name used to reference this object.
/// </summary>
public const String PropertyName = "Actions.RequestSettings";
public static RawClientHttpRequestSettings Default => s_defaultSettings.Value;
protected RawClientHttpRequestSettings(RawClientHttpRequestSettings copy)
{
this.SendTimeout = copy.SendTimeout;
this.m_acceptLanguages = new List<CultureInfo>(copy.AcceptLanguages);
this.SessionId = copy.SessionId;
this.UserAgent = new List<ProductInfoHeaderValue>(copy.UserAgent);
this.ServerCertificateValidationCallback = copy.ServerCertificateValidationCallback;
this.MaxRetryRequest = copy.MaxRetryRequest;
}
public RawClientHttpRequestSettings Clone()
{
return new RawClientHttpRequestSettings(this);
}
public RawClientHttpRequestSettings()
: this(Guid.NewGuid())
{
}
public RawClientHttpRequestSettings(Guid sessionId)
{
this.SendTimeout = s_defaultTimeout;
if (!String.IsNullOrEmpty(CultureInfo.CurrentUICulture.Name)) // InvariantCulture for example has an empty name.
{
this.AcceptLanguages.Add(CultureInfo.CurrentUICulture);
}
this.SessionId = sessionId;
this.ServerCertificateValidationCallback = null;
// If different, we'll also add CurrentCulture to the request headers,
// but UICulture was added first, so it gets first preference
if (!CultureInfo.CurrentCulture.Equals(CultureInfo.CurrentUICulture) && !String.IsNullOrEmpty(CultureInfo.CurrentCulture.Name))
{
this.AcceptLanguages.Add(CultureInfo.CurrentCulture);
}
this.MaxRetryRequest = c_defaultMaxRetry;
#if DEBUG
string customClientRequestTimeout = Environment.GetEnvironmentVariable("VSS_Client_Request_Timeout");
if (!string.IsNullOrEmpty(customClientRequestTimeout) && int.TryParse(customClientRequestTimeout, out int customTimeout))
{
// avoid disrupting a debug session due to the request timing out by setting a custom timeout.
this.SendTimeout = TimeSpan.FromSeconds(customTimeout);
}
#endif
}
protected internal virtual Boolean ApplyTo(HttpRequestMessage request)
{
// Make sure we only apply the settings to the request once
if (request.Options.TryGetValue<object>(PropertyName, out _))
{
return false;
}
request.Options.Set(new HttpRequestOptionsKey<RawClientHttpRequestSettings>(PropertyName), this);
if (this.AcceptLanguages != null && this.AcceptLanguages.Count > 0)
{
// An empty or null CultureInfo name will cause an ArgumentNullException in the
// StringWithQualityHeaderValue constructor. CultureInfo.InvariantCulture is an example of
// a CultureInfo that has an empty name.
foreach (CultureInfo culture in this.AcceptLanguages.Where(a => !String.IsNullOrEmpty(a.Name)))
{
request.Headers.AcceptLanguage.Add(new StringWithQualityHeaderValue(culture.Name));
}
}
if (this.UserAgent != null)
{
foreach (var headerVal in this.UserAgent)
{
if (!request.Headers.UserAgent.Contains(headerVal))
{
request.Headers.UserAgent.Add(headerVal);
}
}
}
if (!request.Headers.Contains(Internal.RawHttpHeaders.SessionHeader))
{
request.Headers.Add(Internal.RawHttpHeaders.SessionHeader, this.SessionId.ToString("D"));
}
return true;
}
/// <summary>
/// Creates an instance of the default request settings.
/// </summary>
/// <returns>The default request settings</returns>
private static RawClientHttpRequestSettings ConstructDefaultSettings()
{
// Set up reasonable defaults in case the registry keys are not present
var settings = new RawClientHttpRequestSettings();
settings.UserAgent = UserAgentUtility.GetDefaultRestUserAgent();
return settings;
}
private static Lazy<RawClientHttpRequestSettings> s_defaultSettings
= new Lazy<RawClientHttpRequestSettings>(ConstructDefaultSettings);
private const Int32 c_defaultMaxRetry = 3;
private static readonly TimeSpan s_defaultTimeout = TimeSpan.FromSeconds(100); //default WebAPI timeout
private ICollection<CultureInfo> m_acceptLanguages = new List<CultureInfo>();
}
}

View File

@@ -0,0 +1,12 @@
using System;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
namespace GitHub.Services.Common.Internal
{
[EditorBrowsable(EditorBrowsableState.Never)]
public static class RawHttpHeaders
{
public const String SessionHeader = "X-Runner-Session";
}
}

View File

@@ -0,0 +1,349 @@
using System;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Services.Common.Diagnostics;
using GitHub.Services.Common.Internal;
using GitHub.Services.OAuth;
namespace GitHub.Services.Common
{
public class RawHttpMessageHandler: HttpMessageHandler
{
public RawHttpMessageHandler(
VssOAuthCredential credentials)
: this(credentials, new RawClientHttpRequestSettings())
{
}
public RawHttpMessageHandler(
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: this(credentials, settings, new HttpClientHandler())
{
}
public RawHttpMessageHandler(
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
HttpMessageHandler innerHandler)
{
this.Credentials = credentials;
this.Settings = settings;
m_messageInvoker = new HttpMessageInvoker(innerHandler);
m_credentialWrapper = new CredentialWrapper();
// If we were given a pipeline make sure we find the inner-most handler to apply our settings as this
// will be the actual outgoing transport.
{
HttpMessageHandler transportHandler = innerHandler;
DelegatingHandler delegatingHandler = transportHandler as DelegatingHandler;
while (delegatingHandler != null)
{
transportHandler = delegatingHandler.InnerHandler;
delegatingHandler = transportHandler as DelegatingHandler;
}
m_transportHandler = transportHandler;
}
ApplySettings(m_transportHandler, m_credentialWrapper, this.Settings);
m_thisLock = new Object();
}
/// <summary>
/// Gets the credentials associated with this handler.
/// </summary>
public VssOAuthCredential Credentials
{
get;
private set;
}
/// <summary>
/// Gets the settings associated with this handler.
/// </summary>
public RawClientHttpRequestSettings Settings
{
get;
private set;
}
// setting this to WebRequest.DefaultWebProxy in NETSTANDARD is causing a System.PlatformNotSupportedException
//.in System.Net.SystemWebProxy.IsBypassed. Comment in IsBypassed method indicates ".NET Core and .NET Native
// code will handle this exception and call into WinInet/WinHttp as appropriate to use the system proxy."
// This needs to be investigated further.
private static IWebProxy s_defaultWebProxy = null;
/// <summary>
/// Allows you to set a proxy to be used by all RawHttpMessageHandler requests without affecting the global WebRequest.DefaultWebProxy. If not set it returns the WebRequest.DefaultWebProxy.
/// </summary>
public static IWebProxy DefaultWebProxy
{
get
{
var toReturn = WebProxyWrapper.Wrap(s_defaultWebProxy);
if (null != toReturn &&
toReturn.Credentials == null)
{
toReturn.Credentials = CredentialCache.DefaultCredentials;
}
return toReturn;
}
set
{
s_defaultWebProxy = value;
}
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
VssTraceActivity traceActivity = VssTraceActivity.Current;
lock (m_thisLock)
{
// Ensure that we attempt to use the most appropriate authentication mechanism by default.
if (m_tokenProvider == null)
{
m_tokenProvider = this.Credentials.GetTokenProvider(request.RequestUri);
}
}
CancellationTokenSource tokenSource = null;
HttpResponseMessage response = null;
Boolean succeeded = false;
HttpResponseMessageWrapper responseWrapper;
Int32 retries = m_maxAuthRetries;
try
{
tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
if (this.Settings.SendTimeout > TimeSpan.Zero)
{
tokenSource.CancelAfter(this.Settings.SendTimeout);
}
do
{
if (response != null)
{
response.Dispose();
}
// Let's start with sending a token
IssuedToken token = await m_tokenProvider.GetTokenAsync(null, tokenSource.Token).ConfigureAwait(false);
ApplyToken(request, token);
// ConfigureAwait(false) enables the continuation to be run outside any captured
// SyncronizationContext (such as ASP.NET's) which keeps things from deadlocking...
response = await m_messageInvoker.SendAsync(request, tokenSource.Token).ConfigureAwait(false);
responseWrapper = new HttpResponseMessageWrapper(response);
var isUnAuthorized = responseWrapper.StatusCode == HttpStatusCode.Unauthorized;
if (!isUnAuthorized)
{
// Validate the token after it has been successfully authenticated with the server.
m_tokenProvider?.ValidateToken(token, responseWrapper);
succeeded = true;
break;
}
else
{
m_tokenProvider?.InvalidateToken(token);
if (retries == 0 || retries < m_maxAuthRetries)
{
break;
}
token = await m_tokenProvider.GetTokenAsync(token, tokenSource.Token).ConfigureAwait(false);
retries--;
}
}
while (retries >= 0);
// We're out of retries and the response was an auth challenge -- then the request was unauthorized
// and we will throw a strongly-typed exception with a friendly error message.
if (!succeeded && response != null && responseWrapper.StatusCode == HttpStatusCode.Unauthorized)
{
// Make sure we do not leak the response object when raising an exception
if (response != null)
{
response.Dispose();
}
var message = CommonResources.VssUnauthorized(request.RequestUri.GetLeftPart(UriPartial.Authority));
VssHttpEventSource.Log.HttpRequestUnauthorized(traceActivity, request, message);
VssUnauthorizedException unauthorizedException = new VssUnauthorizedException(message);
throw unauthorizedException;
}
return response;
}
catch (OperationCanceledException ex)
{
if (cancellationToken.IsCancellationRequested)
{
VssHttpEventSource.Log.HttpRequestCancelled(traceActivity, request);
throw;
}
else
{
VssHttpEventSource.Log.HttpRequestTimedOut(traceActivity, request, this.Settings.SendTimeout);
throw new TimeoutException(CommonResources.HttpRequestTimeout(this.Settings.SendTimeout), ex);
}
}
finally
{
// We always dispose of the token source since otherwise we leak resources if there is a timer pending
if (tokenSource != null)
{
tokenSource.Dispose();
}
}
}
private void ApplyToken(
HttpRequestMessage request,
IssuedToken token)
{
switch (token)
{
case null:
return;
case ICredentials credentialsToken:
m_credentialWrapper.InnerCredentials = credentialsToken;
break;
default:
token.ApplyTo(new HttpRequestMessageWrapper(request));
break;
}
}
private static void ApplySettings(
HttpMessageHandler handler,
ICredentials defaultCredentials,
RawClientHttpRequestSettings settings)
{
HttpClientHandler httpClientHandler = handler as HttpClientHandler;
if (httpClientHandler != null)
{
httpClientHandler.ClientCertificateOptions = ClientCertificateOption.Manual;
//Setting httpClientHandler.UseDefaultCredentials to false in .Net Core, clears httpClientHandler.Credentials if
//credentials is already set to defaultcredentials. Therefore httpClientHandler.Credentials must be
//set after httpClientHandler.UseDefaultCredentials.
httpClientHandler.UseDefaultCredentials = false;
httpClientHandler.Credentials = defaultCredentials;
httpClientHandler.PreAuthenticate = false;
httpClientHandler.Proxy = DefaultWebProxy;
httpClientHandler.UseCookies = false;
httpClientHandler.UseProxy = true;
}
}
private readonly HttpMessageHandler m_transportHandler;
private HttpMessageInvoker m_messageInvoker;
private CredentialWrapper m_credentialWrapper;
private object m_thisLock;
private const Int32 m_maxAuthRetries = 3;
private VssOAuthTokenProvider m_tokenProvider;
//.Net Core does not attempt NTLM schema on Linux, unless ICredentials is a CredentialCache instance
//This workaround may not be needed after this corefx fix is consumed: https://github.com/dotnet/corefx/pull/7923
private sealed class CredentialWrapper : CredentialCache, ICredentials
{
public ICredentials InnerCredentials
{
get;
set;
}
NetworkCredential ICredentials.GetCredential(
Uri uri,
String authType)
{
return InnerCredentials != null ? InnerCredentials.GetCredential(uri, authType) : null;
}
}
private sealed class WebProxyWrapper : IWebProxy
{
private WebProxyWrapper(IWebProxy toWrap)
{
m_wrapped = toWrap;
m_credentials = null;
}
public static WebProxyWrapper Wrap(IWebProxy toWrap)
{
if (null == toWrap)
{
return null;
}
return new WebProxyWrapper(toWrap);
}
public ICredentials Credentials
{
get
{
ICredentials credentials = m_credentials;
if (null == credentials)
{
// This means to fall back to the Credentials from the wrapped
// IWebProxy.
credentials = m_wrapped.Credentials;
}
else if (Object.ReferenceEquals(credentials, m_nullCredentials))
{
// This sentinel value means we have explicitly had our credentials
// set to null.
credentials = null;
}
return credentials;
}
set
{
if (null == value)
{
// Use this as a sentinel value to distinguish the case when someone has
// explicitly set our credentials to null. We don't want to fall back to
// m_wrapped.Credentials when we have credentials that are explicitly null.
m_credentials = m_nullCredentials;
}
else
{
m_credentials = value;
}
}
}
public Uri GetProxy(Uri destination)
{
return m_wrapped.GetProxy(destination);
}
public bool IsBypassed(Uri host)
{
return m_wrapped.IsBypassed(host);
}
private readonly IWebProxy m_wrapped;
private ICredentials m_credentials;
private static readonly ICredentials m_nullCredentials = new CredentialWrapper();
}
}
}

View File

@@ -0,0 +1,75 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using Sdk.WebApi.WebApi;
namespace GitHub.DistributedTask.WebApi
{
[ResourceArea(TaskResourceIds.AreaId)]
public class RunServiceHttpClient : RawHttpClientBase
{
public RunServiceHttpClient(
Uri baseUrl,
VssOAuthCredential credentials)
: base(baseUrl, credentials)
{
}
public RunServiceHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: base(baseUrl, credentials, settings)
{
}
public RunServiceHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, handlers)
{
}
public RunServiceHttpClient(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: base(baseUrl, credentials, settings, handlers)
{
}
public RunServiceHttpClient(
Uri baseUrl,
HttpMessageHandler pipeline,
Boolean disposeHandler)
: base(baseUrl, pipeline, disposeHandler)
{
}
public Task<Pipelines.AgentJobRequestMessage> GetJobMessageAsync(
Uri requestUri,
string messageId,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
var payload = new {
StreamID = messageId
};
var payloadJson = JsonUtility.ToString(payload);
var requestContent = new StringContent(payloadJson, System.Text.Encoding.UTF8, "application/json");
return SendAsync<Pipelines.AgentJobRequestMessage>(
httpMethod,
additionalHeaders: null,
requestUri: requestUri,
content: requestContent,
cancellationToken: cancellationToken);
}
}
}

View File

@@ -12,7 +12,7 @@ namespace GitHub.Services.OAuth
public class VssOAuthCredential : FederatedCredential
{
/// <summary>
/// Initializes a new <c>VssOAuthCredential</c> instance with the specified authorization grant and client
/// Initializes a new <c>VssOAuthCredential</c> instance with the specified authorization grant and client
/// credentials.
/// </summary>
/// <param name="authorizationUrl">The location of the token endpoint for the target authorization server</param>
@@ -117,8 +117,14 @@ namespace GitHub.Services.OAuth
return false;
}
public VssOAuthTokenProvider GetTokenProvider(
Uri serviceUrl)
{
return new VssOAuthTokenProvider(this, serviceUrl);
}
protected override IssuedTokenProvider OnCreateTokenProvider(
Uri serverUrl,
Uri serverUrl,
IHttpResponse response)
{
return new VssOAuthTokenProvider(this, serverUrl);

View File

@@ -0,0 +1,207 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Services.Common;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using GitHub.Services.WebApi.Utilities;
namespace Sdk.WebApi.WebApi.RawClient
{
public class RawConnection : IDisposable
{
public RawConnection(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: this(baseUrl, new RawHttpMessageHandler(credentials, settings), null)
{
}
public RawConnection(
Uri baseUrl,
RawHttpMessageHandler innerHandler,
IEnumerable<DelegatingHandler> delegatingHandlers)
{
ArgumentUtility.CheckForNull(baseUrl, "baseUrl");
ArgumentUtility.CheckForNull(innerHandler, "innerHandler");
// Permit delegatingHandlers to be null
m_delegatingHandlers = delegatingHandlers = delegatingHandlers ?? Enumerable.Empty<DelegatingHandler>();
m_baseUrl = baseUrl;
m_innerHandler = innerHandler;
if (this.Settings.MaxRetryRequest > 0)
{
delegatingHandlers = delegatingHandlers.Concat(new DelegatingHandler[] { new VssHttpRetryMessageHandler(this.Settings.MaxRetryRequest) });
}
// Create and persist the pipeline.
if (delegatingHandlers.Any())
{
m_pipeline = HttpClientFactory.CreatePipeline(m_innerHandler, delegatingHandlers);
}
else
{
m_pipeline = m_innerHandler;
}
}
/// <summary>
///
/// </summary>
public RawClientHttpRequestSettings Settings
{
get
{
return (RawClientHttpRequestSettings)m_innerHandler.Settings;
}
}
public async Task<T> GetClientAsync<T>(CancellationToken cancellationToken = default(CancellationToken)) where T : RawHttpClientBase
{
CheckForDisposed();
Type clientType = typeof(T);
return (T)await GetClientServiceImplAsync(typeof(T), cancellationToken).ConfigureAwait(false);
}
private async Task<Object> GetClientServiceImplAsync(
Type requestedType,
CancellationToken cancellationToken = default(CancellationToken))
{
CheckForDisposed();
Object requestedObject = null;
// Get the actual type to lookup or instantiate, which will either be requestedType itself
// or an extensible type if one was registered
Type managedType = GetExtensibleType(requestedType);
if (!m_cachedTypes.TryGetValue(managedType, out requestedObject))
{
AsyncLock typeLock = m_loadingTypes.GetOrAdd(managedType, (t) => new AsyncLock());
// This ensures only a single thread at a time will be performing the work to initialize this particular type
// The other threads will go async awaiting the lock task. This is still an improvement over the old synchronous locking,
// as this thread won't be blocked (like a Monitor.Enter), but can return a task to the caller so that the thread
// can continue to be used to do useful work while the result is being worked on.
// We are trusting that getInstanceAsync does not have any code paths that lead back here (for the same type), otherwise we can deadlock on ourselves.
// The old code also extended the same trust which (if violated) would've resulted in a StackOverflowException,
// but with async tasks it will lead to a deadlock.
using (await typeLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (!m_cachedTypes.TryGetValue(managedType, out requestedObject))
{
requestedObject = (RawHttpClientBase)Activator.CreateInstance(managedType, m_baseUrl, m_pipeline, false /* disposeHandler */);
m_cachedTypes[managedType] = requestedObject;
AsyncLock removed;
m_loadingTypes.TryRemove(managedType, out removed);
}
}
}
return requestedObject;
}
/// <summary>
///
/// </summary>
/// <param name="managedType"></param>
/// <returns></returns>
private Type GetExtensibleType(Type managedType)
{
if (managedType.GetTypeInfo().IsAbstract || managedType.GetTypeInfo().IsInterface)
{
Type extensibleType = null;
// We can add extensible type registration for the client later (app.config? windows registry?). For now it is based solely on the attribute
if (!m_extensibleServiceTypes.TryGetValue(managedType.Name, out extensibleType))
{
VssClientServiceImplementationAttribute[] attributes = (VssClientServiceImplementationAttribute[])managedType.GetTypeInfo().GetCustomAttributes<VssClientServiceImplementationAttribute>(true);
if (attributes.Length > 0)
{
if (attributes[0].Type != null)
{
extensibleType = attributes[0].Type;
m_extensibleServiceTypes[managedType.Name] = extensibleType;
}
else if (!String.IsNullOrEmpty(attributes[0].TypeName))
{
extensibleType = Type.GetType(attributes[0].TypeName);
if (extensibleType != null)
{
m_extensibleServiceTypes[managedType.Name] = extensibleType;
}
else
{
Debug.Assert(false, "VssConnection: Could not load type from type name: " + attributes[0].TypeName);
}
}
}
}
if (extensibleType == null)
{
throw new ExtensibleServiceTypeNotRegisteredException(managedType);
}
if (!managedType.GetTypeInfo().IsAssignableFrom(extensibleType.GetTypeInfo()))
{
throw new ExtensibleServiceTypeNotValidException(managedType, extensibleType);
}
return extensibleType;
}
else
{
return managedType;
}
}
public void Dispose()
{
if (!m_isDisposed)
{
lock (m_disposeLock)
{
if (!m_isDisposed)
{
m_isDisposed = true;
foreach (var cachedType in m_cachedTypes.Values.Where(v => v is IDisposable).Select(v => v as IDisposable))
{
cachedType.Dispose();
}
m_cachedTypes.Clear();
}
}
}
}
private void CheckForDisposed()
{
if (m_isDisposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
private bool m_isDisposed = false;
private object m_disposeLock = new object();
private readonly ConcurrentDictionary<String, Type> m_extensibleServiceTypes = new ConcurrentDictionary<String, Type>();
private readonly Uri m_baseUrl;
private readonly HttpMessageHandler m_pipeline;
private readonly IEnumerable<DelegatingHandler> m_delegatingHandlers;
private readonly RawHttpMessageHandler m_innerHandler;
private readonly ConcurrentDictionary<Type, AsyncLock> m_loadingTypes = new ConcurrentDictionary<Type, AsyncLock>();
private readonly ConcurrentDictionary<Type, Object> m_cachedTypes = new ConcurrentDictionary<Type, Object>();
}
}

View File

@@ -0,0 +1,352 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Formatting;
using System.Net.Http.Headers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Services.Common;
using GitHub.Services.Common.Diagnostics;
using GitHub.Services.OAuth;
using GitHub.Services.WebApi;
using GitHub.Services.WebApi.Utilities.Internal;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Sdk.WebApi.WebApi
{
public class RawHttpClientBase: IDisposable
{
protected RawHttpClientBase(
Uri baseUrl,
VssOAuthCredential credentials)
: this(baseUrl, credentials, settings: null)
{
}
protected RawHttpClientBase(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings)
: this(baseUrl, credentials, settings: settings, handlers: null)
{
}
protected RawHttpClientBase(
Uri baseUrl,
VssOAuthCredential credentials,
params DelegatingHandler[] handlers)
: this(baseUrl, credentials, null, handlers)
{
}
protected RawHttpClientBase(
Uri baseUrl,
VssOAuthCredential credentials,
RawClientHttpRequestSettings settings,
params DelegatingHandler[] handlers)
: this(baseUrl, BuildHandler(credentials, settings, handlers), disposeHandler: true)
{
}
protected RawHttpClientBase(
Uri baseUrl,
HttpMessageHandler pipeline,
bool disposeHandler)
{
m_client = new HttpClient(pipeline, disposeHandler);
// Disable their timeout since we handle it ourselves
m_client.Timeout = TimeSpan.FromMilliseconds(-1.0);
m_client.BaseAddress = baseUrl;
m_formatter = new VssJsonMediaTypeFormatter();
}
public void Dispose()
{
if (!m_isDisposed)
{
lock (m_disposeLock)
{
if (!m_isDisposed)
{
m_isDisposed = true;
m_client.Dispose();
}
}
}
}
[EditorBrowsable(EditorBrowsableState.Never)]
public static TimeSpan TestDelay { get; set; }
protected async Task<HttpResponseMessage> SendAsync(
HttpMethod method,
Uri requestUri,
HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
using (VssTraceActivity.GetOrCreate().EnterCorrelationScope())
using (HttpRequestMessage requestMessage = CreateRequestMessage(method, null, requestUri, content, queryParameters))
{
return await SendAsync(requestMessage, userState, cancellationToken).ConfigureAwait(false);
}
}
protected async Task<T> SendAsync<T>(
HttpMethod method,
IEnumerable<KeyValuePair<String, String>> additionalHeaders,
Uri requestUri,
HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
using (VssTraceActivity.GetOrCreate().EnterCorrelationScope())
using (HttpRequestMessage requestMessage = CreateRequestMessage(method, additionalHeaders, requestUri, content, queryParameters))
{
return await SendAsync<T>(requestMessage, userState, cancellationToken).ConfigureAwait(false);
}
}
protected async Task<T> SendAsync<T>(
HttpRequestMessage message,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
//ConfigureAwait(false) enables the continuation to be run outside
//any captured SyncronizationContext (such as ASP.NET's) which keeps things
//from deadlocking...
using (HttpResponseMessage response = await this.SendAsync(message, userState, cancellationToken).ConfigureAwait(false))
{
return await ReadContentAsAsync<T>(response, cancellationToken).ConfigureAwait(false);
}
}
protected Task<HttpResponseMessage> SendAsync(
HttpRequestMessage message,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
// the default in httpClient for HttpCompletionOption is ResponseContentRead so that is what we do here
return this.SendAsync(
message,
/*completionOption:*/ HttpCompletionOption.ResponseContentRead,
userState,
cancellationToken);
}
protected async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage message,
HttpCompletionOption completionOption,
Object userState = null,
CancellationToken cancellationToken = default(CancellationToken))
{
CheckForDisposed();
if (message.Headers.UserAgent != null)
{
foreach (ProductInfoHeaderValue headerValue in UserAgentUtility.GetDefaultRestUserAgent())
{
if (!message.Headers.UserAgent.Contains(headerValue))
{
message.Headers.UserAgent.Add(headerValue);
}
}
}
VssTraceActivity traceActivity = VssTraceActivity.GetOrCreate();
using (traceActivity.EnterCorrelationScope())
{
VssHttpEventSource.Log.HttpRequestStart(traceActivity, message);
message.Trace();
HttpResponseMessage response = await Client.SendAsync(message, completionOption, cancellationToken)
.ConfigureAwait(false);
// Inject delay or failure for testing
if (TestDelay != TimeSpan.Zero)
{
await ProcessDelayAsync().ConfigureAwait(false);
}
response.Trace();
VssHttpEventSource.Log.HttpRequestStop(VssTraceActivity.Current, response);
return response;
}
}
protected async Task<T> ReadContentAsAsync<T>(HttpResponseMessage response, CancellationToken cancellationToken = default(CancellationToken))
{
CheckForDisposed();
Boolean isJson = IsJsonResponse(response);
try
{
//deal with wrapped collections in json
if (isJson &&
typeof(IEnumerable).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()) &&
!typeof(Byte[]).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()) &&
!typeof(JObject).GetTypeInfo().IsAssignableFrom(typeof(T).GetTypeInfo()))
{
var wrapper = await ReadJsonContentAsync<VssJsonCollectionWrapper<T>>(response, cancellationToken).ConfigureAwait(false);
return wrapper.Value;
}
else if (isJson)
{
return await ReadJsonContentAsync<T>(response, cancellationToken).ConfigureAwait(false);
}
}
catch (JsonReaderException)
{
// We thought the content was JSON but failed to parse.
// We ignore for now
}
return default(T);
}
protected virtual async Task<T> ReadJsonContentAsync<T>(HttpResponseMessage response, CancellationToken cancellationToken = default(CancellationToken))
{
return await response.Content.ReadAsAsync<T>(new[] { m_formatter }, cancellationToken).ConfigureAwait(false);
}
protected HttpRequestMessage CreateRequestMessage(
HttpMethod method,
IEnumerable<KeyValuePair<String, String>> additionalHeaders,
Uri requestUri,
HttpContent content = null,
IEnumerable<KeyValuePair<String, String>> queryParameters = null,
String mediaType = c_jsonMediaType)
{
CheckForDisposed();
if (queryParameters != null && queryParameters.Any())
{
requestUri = requestUri.AppendQuery(queryParameters);
}
HttpRequestMessage requestMessage = new HttpRequestMessage(method, requestUri.AbsoluteUri);
MediaTypeWithQualityHeaderValue acceptType = new MediaTypeWithQualityHeaderValue(mediaType);
requestMessage.Headers.Accept.Add(acceptType);
if (additionalHeaders != null)
{
foreach (KeyValuePair<String, String> kvp in additionalHeaders)
{
requestMessage.Headers.Add(kvp.Key, kvp.Value);
}
}
if (content != null)
{
requestMessage.Content = content;
}
return requestMessage;
}
/// <summary>
/// The inner client.
/// </summary>
/// <remarks>
/// Note to implementers: You should not update or expose the inner client
/// unless you instantiate your own instance of this class. Getting
/// an instance of this class from method such as GetClient&lt;T&gt;
/// a cached and shared instance.
/// </remarks>
protected HttpClient Client
{
get
{
return m_client;
}
}
/// <summary>
/// The media type formatter.
/// </summary>
/// <remarks>
/// Note to implementers: You should not update or expose the media type formatter
/// unless you instantiate your own instance of this class. Getting
/// an instance of this class from method such as GetClient&lt;T&gt;
/// a cached and shared instance.
/// </remarks>
protected MediaTypeFormatter Formatter
{
get
{
return m_formatter;
}
}
private static HttpMessageHandler BuildHandler(VssOAuthCredential credentials, RawClientHttpRequestSettings settings, DelegatingHandler[] handlers)
{
RawHttpMessageHandler innerHandler = new RawHttpMessageHandler(credentials, settings ?? new RawClientHttpRequestSettings());
if (null == handlers ||
0 == handlers.Length)
{
return innerHandler;
}
return HttpClientFactory.CreatePipeline(innerHandler, handlers);
}
private void CheckForDisposed()
{
if (m_isDisposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
private async Task ProcessDelayAsync()
{
await Task.Delay(Math.Abs((Int32)TestDelay.TotalMilliseconds)).ConfigureAwait(false);
if (TestDelay < TimeSpan.Zero)
{
throw new Exception("User injected failure.");
}
}
private Boolean IsJsonResponse(
HttpResponseMessage response)
{
if (HasContent(response)
&& response.Content.Headers != null && response.Content.Headers.ContentType != null
&& !String.IsNullOrEmpty(response.Content.Headers.ContentType.MediaType))
{
return (0 == String.Compare("application/json", response.Content.Headers.ContentType.MediaType, StringComparison.OrdinalIgnoreCase));
}
return false;
}
private Boolean HasContent(HttpResponseMessage response)
{
if (response != null &&
response.StatusCode != HttpStatusCode.NoContent &&
response.RequestMessage?.Method != HttpMethod.Head &&
response.Content?.Headers != null &&
(!response.Content.Headers.ContentLength.HasValue ||
(response.Content.Headers.ContentLength.HasValue && response.Content.Headers.ContentLength != 0)))
{
return true;
}
return false;
}
private readonly HttpClient m_client;
private MediaTypeFormatter m_formatter;
private bool m_isDisposed = false;
private object m_disposeLock = new object();
private const String c_jsonMediaType = "application/json";
}
}