diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 71e570e08..28668464f 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -47,6 +47,11 @@
$(DefineConstants);DEBUG
+
+
+ $(DefineConstants);USE_BROKER
+
+
true
diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs
new file mode 100644
index 000000000..616cb7804
--- /dev/null
+++ b/src/Runner.Common/BrokerServer.cs
@@ -0,0 +1,38 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using GitHub.Runner.Common.Util;
+using GitHub.Services.WebApi;
+using GitHub.Services.Common;
+using GitHub.Runner.Sdk;
+using System.Net.Http;
+
+namespace GitHub.Runner.Common
+{
+ [ServiceLocator(Default = typeof(BrokerServer))]
+ public interface IBrokerServer : IRunnerService
+ {
+ Task ConnectAsync(Uri serverUrl);
+ Task GetMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken);
+ }
+
+ public sealed class BrokerServer : RunnerService, IBrokerServer
+ {
+ private HttpClient _httpClient;
+
+ public async Task ConnectAsync(Uri serverUrl)
+ {
+ _httpClient = new HttpClient();
+ _httpClient.BaseAddress = serverUrl;
+ _httpClient.Timeout = TimeSpan.FromSeconds(50);
+ await _httpClient.GetAsync("health");
+ }
+
+ public async Task GetMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken)
+ {
+ await _httpClient.GetAsync("message");
+ return null;
+ }
+ }
+}
diff --git a/src/Runner.Listener/Configuration/CredentialManager.cs b/src/Runner.Listener/Configuration/CredentialManager.cs
index e13d29510..adbd53452 100644
--- a/src/Runner.Listener/Configuration/CredentialManager.cs
+++ b/src/Runner.Listener/Configuration/CredentialManager.cs
@@ -40,6 +40,12 @@ namespace GitHub.Runner.Listener.Configuration
return creds;
}
+#if USE_BROKER
+ public VssCredentials LoadCredentials()
+ {
+ return new VssCredentials();
+ }
+#else
public VssCredentials LoadCredentials()
{
IConfigurationStore store = HostContext.GetService();
@@ -69,6 +75,7 @@ namespace GitHub.Runner.Listener.Configuration
return creds;
}
+#endif
}
[DataContract]
diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs
index 81afd737d..2cd1577e0 100644
--- a/src/Runner.Listener/MessageListener.cs
+++ b/src/Runner.Listener/MessageListener.cs
@@ -31,6 +31,7 @@ namespace GitHub.Runner.Listener
private RunnerSettings _settings;
private ITerminal _term;
private IRunnerServer _runnerServer;
+ private IBrokerServer _brokerServer;
private TaskAgentSession _session;
private TimeSpan _getNextMessageRetryInterval;
private bool _accessTokenRevoked = false;
@@ -45,8 +46,44 @@ namespace GitHub.Runner.Listener
_term = HostContext.GetService();
_runnerServer = HostContext.GetService();
+ _brokerServer = HostContext.GetService();
}
+#if USE_BROKER
+ public async Task CreateSessionAsync(CancellationToken token)
+ {
+ Trace.Entering();
+
+ // Settings
+ var configManager = HostContext.GetService();
+ _settings = configManager.LoadSettings();
+ var serverUrl = _settings.ServerUrl;
+ Trace.Info(_settings);
+
+ // Connect
+ token.ThrowIfCancellationRequested();
+ Trace.Info($"Attempt to create session.");
+ Trace.Info("Connecting to the Runner Server...");
+ _term.WriteLine($"Connecting to {new Uri(serverUrl)}");
+ await _brokerServer.ConnectAsync(new Uri(serverUrl));
+ _term.WriteLine();
+ _term.WriteSuccessMessage("Connected to GitHub");
+ _term.WriteLine();
+
+ // Session info
+ var agent = new TaskAgentReference
+ {
+ Id = _settings.AgentId,
+ Name = _settings.AgentName,
+ Version = BuildConstants.RunnerPackage.Version,
+ OSDescription = RuntimeInformation.OSDescription,
+ };
+ string sessionName = $"{Environment.MachineName ?? "RUNNER"}";
+ _session = new TaskAgentSession(sessionName, agent);
+
+ return true;
+ }
+#else
public async Task CreateSessionAsync(CancellationToken token)
{
Trace.Entering();
@@ -151,6 +188,7 @@ namespace GitHub.Runner.Listener
}
}
}
+#endif
public async Task DeleteSessionAsync()
{
@@ -170,6 +208,116 @@ namespace GitHub.Runner.Listener
}
}
+#if USE_BROKER
+ public async Task GetNextMessageAsync(CancellationToken token)
+ {
+ Trace.Entering();
+ ArgUtil.NotNull(_session, nameof(_session));
+ ArgUtil.NotNull(_settings, nameof(_settings));
+ bool encounteringError = false;
+ int continuousError = 0;
+ string errorMessage = string.Empty;
+ Stopwatch heartbeat = new Stopwatch();
+ heartbeat.Restart();
+ while (true)
+ {
+ token.ThrowIfCancellationRequested();
+ TaskAgentMessage message = null;
+ try
+ {
+ message = await _brokerServer.GetMessageAsync(_settings.PoolId, _session.SessionId, _lastMessageId, token);
+
+ // Decrypt the message body if the session is using encryption
+ message = DecryptMessage(message);
+
+ if (message != null)
+ {
+ _lastMessageId = message.MessageId;
+ }
+
+ if (encounteringError) //print the message once only if there was an error
+ {
+ _term.WriteLine($"{DateTime.UtcNow:u}: Runner reconnected.");
+ encounteringError = false;
+ continuousError = 0;
+ }
+ }
+ catch (OperationCanceledException) when (token.IsCancellationRequested)
+ {
+ Trace.Info("Get next message has been cancelled.");
+ throw;
+ }
+ catch (TaskAgentAccessTokenExpiredException)
+ {
+ Trace.Info("Runner OAuth token has been revoked. Unable to pull message.");
+ _accessTokenRevoked = true;
+ throw;
+ }
+ catch (Exception ex)
+ {
+ Trace.Error("Catch exception during get next message.");
+ Trace.Error(ex);
+
+ // don't retry if SkipSessionRecover = true, DT service will delete agent session to stop agent from taking more jobs.
+ if (ex is TaskAgentSessionExpiredException && !_settings.SkipSessionRecover && await CreateSessionAsync(token))
+ {
+ Trace.Info($"{nameof(TaskAgentSessionExpiredException)} received, recovered by recreate session.");
+ }
+ else if (!IsGetNextMessageExceptionRetriable(ex))
+ {
+ throw;
+ }
+ else
+ {
+ continuousError++;
+ //retry after a random backoff to avoid service throttling
+ //in case of there is a service error happened and all agents get kicked off of the long poll and all agent try to reconnect back at the same time.
+ if (continuousError <= 5)
+ {
+ // random backoff [15, 30]
+ _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30), _getNextMessageRetryInterval);
+ }
+ else
+ {
+ // more aggressive backoff [30, 60]
+ _getNextMessageRetryInterval = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), _getNextMessageRetryInterval);
+ }
+
+ if (!encounteringError)
+ {
+ //print error only on the first consecutive error
+ _term.WriteError($"{DateTime.UtcNow:u}: Runner connect error: {ex.Message}. Retrying until reconnected.");
+ encounteringError = true;
+ }
+
+ // re-create VssConnection before next retry
+ await _runnerServer.RefreshConnectionAsync(RunnerConnectionType.MessageQueue, TimeSpan.FromSeconds(60));
+
+ Trace.Info("Sleeping for {0} seconds before retrying.", _getNextMessageRetryInterval.TotalSeconds);
+ await HostContext.Delay(_getNextMessageRetryInterval, token);
+ }
+ }
+
+ if (message == null)
+ {
+ if (heartbeat.Elapsed > TimeSpan.FromMinutes(30))
+ {
+ Trace.Info($"No message retrieved from session '{_session.SessionId}' within last 30 minutes.");
+ heartbeat.Restart();
+ }
+ else
+ {
+ Trace.Verbose($"No message retrieved from session '{_session.SessionId}'.");
+ }
+
+ continue;
+ }
+
+ Trace.Info($"Message '{message.MessageId}' received from session '{_session.SessionId}'.");
+ return message;
+ }
+ }
+#else
public async Task GetNextMessageAsync(CancellationToken token)
{
Trace.Entering();
@@ -281,6 +429,7 @@ namespace GitHub.Runner.Listener
return message;
}
}
+#endif
public async Task DeleteMessageAsync(TaskAgentMessage message)
{
diff --git a/src/dev.sh b/src/dev.sh
index e7b075ca5..13ccbc747 100755
--- a/src/dev.sh
+++ b/src/dev.sh
@@ -2,7 +2,7 @@
###############################################################################
#
-# ./dev.sh build/layout/test/package [Debug/Release]
+# ./dev.sh build/layout/test/package [Debug/Release] [linux-x64|linux-x86|linux-arm64|linux-arm|osx-x64|win-x64|win-x86] [use-broker]
#
###############################################################################
@@ -11,6 +11,7 @@ set -e
DEV_CMD=$1
DEV_CONFIG=$2
DEV_TARGET_RUNTIME=$3
+DEV_USE_BROKER=$4
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LAYOUT_DIR="$SCRIPT_DIR/../_layout"
@@ -81,6 +82,13 @@ elif [[ "$CURRENT_PLATFORM" == 'darwin' ]]; then
fi
fi
+if [ -n "$DEV_USE_BROKER" ]; then
+ USE_BROKER='-p:USE_BROKER="true"'
+else
+ USE_BROKER=''
+fi
+
+
function failed()
{
local error=${1:-Undefined error}
@@ -114,13 +122,13 @@ function heading()
function build ()
{
heading "Building ..."
- dotnet msbuild -t:Build -p:PackageRuntime="${RUNTIME_ID}" -p:BUILDCONFIG="${BUILD_CONFIG}" -p:RunnerVersion="${RUNNER_VERSION}" ./dir.proj || failed build
+ dotnet msbuild -t:Build -p:PackageRuntime="${RUNTIME_ID}" -p:BUILDCONFIG="${BUILD_CONFIG}" -p:RunnerVersion="${RUNNER_VERSION}" $USE_BROKER ./dir.proj || failed build
}
function layout ()
{
heading "Create layout ..."
- dotnet msbuild -t:layout -p:PackageRuntime="${RUNTIME_ID}" -p:BUILDCONFIG="${BUILD_CONFIG}" -p:RunnerVersion="${RUNNER_VERSION}" ./dir.proj || failed build
+ dotnet msbuild -t:layout -p:PackageRuntime="${RUNTIME_ID}" -p:BUILDCONFIG="${BUILD_CONFIG}" -p:RunnerVersion="${RUNNER_VERSION}" $USE_BROKER ./dir.proj || failed build
#change execution flag to allow running with sudo
if [[ ("$CURRENT_PLATFORM" == "linux") || ("$CURRENT_PLATFORM" == "darwin") ]]; then