Compare commits

...

1 Commits

Author SHA1 Message Date
Tingluo Huang
566ff8485a . 2024-10-15 17:28:32 -04:00
5 changed files with 160 additions and 145 deletions

View File

@@ -0,0 +1 @@
{ "2.319.0": {"targetVersion":"2.320.0"}}

View File

@@ -248,15 +248,16 @@ namespace GitHub.Runner.Listener
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
_settings.DisableUpdate,
_getMessagesTokenSource.Token);
await Task.Delay(5000);
// message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
// _session.SessionId,
// _lastMessageId,
// runnerStatus,
// BuildConstants.RunnerPackage.Version,
// VarUtil.OS,
// VarUtil.OSArchitecture,
// _settings.DisableUpdate,
// _getMessagesTokenSource.Token);
// Decrypt the message body if the session is using encryption
message = DecryptMessage(message);

View File

@@ -479,14 +479,14 @@ namespace GitHub.Runner.Listener
}
}
message = await getNextMessage; //get next message
HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}");
if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
// message = await getNextMessage; //get next message
// HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}");
// if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
if (autoUpdateInProgress == false)
{
autoUpdateInProgress = true;
AgentRefreshMessage runnerUpdateMessage = JsonUtility.FromString<AgentRefreshMessage>(message.Body);
AgentRefreshMessage runnerUpdateMessage = null;
#if DEBUG
// Can mock the update for testing
@@ -502,9 +502,9 @@ namespace GitHub.Runner.Listener
if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version))
{
var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion;
_term.WriteLine($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}");
Trace.Info($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}");
runnerUpdateMessage = new AgentRefreshMessage(runnerUpdateMessage.AgentId, mockTargetVersion, runnerUpdateMessage.Timeout);
_term.WriteLine($"Mocking update, using version {mockTargetVersion}");
Trace.Info($"Mocking update, using version {mockTargetVersion}");
runnerUpdateMessage = new AgentRefreshMessage(settings.AgentId, mockTargetVersion, TimeSpan.FromSeconds(100));
}
}
}
@@ -518,125 +518,125 @@ namespace GitHub.Runner.Listener
Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
}
}
else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
if (autoUpdateInProgress == false)
{
autoUpdateInProgress = true;
RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString<RunnerRefreshMessage>(message.Body);
// else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
// {
// if (autoUpdateInProgress == false)
// {
// autoUpdateInProgress = true;
// RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString<RunnerRefreshMessage>(message.Body);
var selfUpdater = HostContext.GetService<ISelfUpdaterV2>();
selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken);
Trace.Info("Refresh message received, kick-off selfupdate background process.");
}
else
{
Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
}
}
else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase))
{
if (autoUpdateInProgress || runOnceJobReceived)
{
skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
}
else
{
Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'");
var jobMessage = StringUtil.ConvertFromJson<Pipelines.AgentJobRequestMessage>(message.Body);
jobDispatcher.Run(jobMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");
runOnceJobReceived = true;
}
}
}
// Broker flow
else if (MessageUtil.IsRunServiceJob(message.MessageType))
{
if (autoUpdateInProgress || runOnceJobReceived)
{
skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
}
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;
// var selfUpdater = HostContext.GetService<ISelfUpdaterV2>();
// selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken);
// Trace.Info("Refresh message received, kick-off selfupdate background process.");
// }
// else
// {
// Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
// }
// }
// else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase))
// {
// if (autoUpdateInProgress || runOnceJobReceived)
// {
// skipMessageDeletion = true;
// Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
// }
// else
// {
// Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'");
// var jobMessage = StringUtil.ConvertFromJson<Pipelines.AgentJobRequestMessage>(message.Body);
// jobDispatcher.Run(jobMessage, runOnce);
// if (runOnce)
// {
// Trace.Info("One time used runner received job message.");
// runOnceJobReceived = true;
// }
// }
// }
// // Broker flow
// else if (MessageUtil.IsRunServiceJob(message.MessageType))
// {
// if (autoUpdateInProgress || runOnceJobReceived)
// {
// skipMessageDeletion = true;
// Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
// }
// else
// {
// var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
// Pipelines.AgentJobRequestMessage jobRequestMessage = null;
// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials();
// // Create connection
// var credMgr = HostContext.GetService<ICredentialManager>();
// var creds = credMgr.LoadCredentials();
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
try
{
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
_acquireJobThrottler.Reset();
}
catch (Exception ex) when (
ex is TaskOrchestrationJobNotFoundException || // HTTP status 404
ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409
ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422
{
Trace.Info($"Skipping message Job. {ex.Message}");
await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token);
continue;
}
catch (Exception ex)
{
Trace.Error($"Caught exception from acquiring job message: {ex}");
continue;
}
}
// if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
// {
// var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
// await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
// jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
// }
// else
// {
// var runServer = HostContext.CreateService<IRunServer>();
// await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
// try
// {
// jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
// _acquireJobThrottler.Reset();
// }
// catch (Exception ex) when (
// ex is TaskOrchestrationJobNotFoundException || // HTTP status 404
// ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409
// ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422
// {
// Trace.Info($"Skipping message Job. {ex.Message}");
// await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token);
// continue;
// }
// catch (Exception ex)
// {
// Trace.Error($"Caught exception from acquiring job message: {ex}");
// continue;
// }
// }
jobDispatcher.Run(jobRequestMessage, runOnce);
if (runOnce)
{
Trace.Info("One time used runner received job message.");
runOnceJobReceived = true;
}
}
}
else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage);
skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled;
// jobDispatcher.Run(jobRequestMessage, runOnce);
// if (runOnce)
// {
// Trace.Info("One time used runner received job message.");
// runOnceJobReceived = true;
// }
// }
// }
// else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
// {
// var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
// bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage);
// skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled;
if (skipMessageDeletion)
{
Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'.");
}
}
else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{
var HostedRunnerShutdownMessage = JsonUtility.FromString<Pipelines.HostedRunnerShutdownMessage>(message.Body);
skipMessageDeletion = true;
skipSessionDeletion = true;
Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'.");
return Constants.Runner.ReturnCode.Success;
}
else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh))
{
Trace.Info("Received ForceTokenRefreshMessage");
await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token);
}
else
{
Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}.");
}
// if (skipMessageDeletion)
// {
// Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'.");
// }
// }
// else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase))
// {
// var HostedRunnerShutdownMessage = JsonUtility.FromString<Pipelines.HostedRunnerShutdownMessage>(message.Body);
// skipMessageDeletion = true;
// skipSessionDeletion = true;
// Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'.");
// return Constants.Runner.ReturnCode.Success;
// }
// else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh))
// {
// Trace.Info("Received ForceTokenRefreshMessage");
// await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token);
// }
// else
// {
// Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}.");
// }
}
finally
{

View File

@@ -132,6 +132,18 @@ namespace GitHub.Runner.Listener
private async Task<bool> UpdateNeeded(string targetVersion, CancellationToken token)
{
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE")))
{
_targetPackage = new PackageMetadata()
{
Type = "agent",
Platform = BuildConstants.RunnerPackage.PackageName,
Version = new PackageVersion(targetVersion),
};
}
else
{
// when talk to old version server, always prefer latest package.
// old server won't send target version as part of update message.
if (string.IsNullOrEmpty(targetVersion))
@@ -154,6 +166,7 @@ namespace GitHub.Runner.Listener
return false;
}
}
}
Trace.Info($"Version '{_targetPackage.Version}' of '{_targetPackage.Type}' package available in server.");
PackageVersion serverVersion = new(_targetPackage.Version);

View File

@@ -1 +1 @@
2.320.0
2.319.0