Compare commits

...

1 Commits

Author SHA1 Message Date
Tingluo Huang
566ff8485a . 2024-10-15 17:28:32 -04:00
5 changed files with 160 additions and 145 deletions

View File

@@ -0,0 +1 @@
{ "2.319.0": {"targetVersion":"2.320.0"}}

View File

@@ -248,15 +248,16 @@ namespace GitHub.Runner.Listener
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try try
{ {
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, await Task.Delay(5000);
_session.SessionId, // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_lastMessageId, // _session.SessionId,
runnerStatus, // _lastMessageId,
BuildConstants.RunnerPackage.Version, // runnerStatus,
VarUtil.OS, // BuildConstants.RunnerPackage.Version,
VarUtil.OSArchitecture, // VarUtil.OS,
_settings.DisableUpdate, // VarUtil.OSArchitecture,
_getMessagesTokenSource.Token); // _settings.DisableUpdate,
// _getMessagesTokenSource.Token);
// Decrypt the message body if the session is using encryption // Decrypt the message body if the session is using encryption
message = DecryptMessage(message); message = DecryptMessage(message);

View File

@@ -479,14 +479,14 @@ namespace GitHub.Runner.Listener
} }
} }
message = await getNextMessage; //get next message // message = await getNextMessage; //get next message
HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); // HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}");
if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) // if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{ {
if (autoUpdateInProgress == false) if (autoUpdateInProgress == false)
{ {
autoUpdateInProgress = true; autoUpdateInProgress = true;
AgentRefreshMessage runnerUpdateMessage = JsonUtility.FromString<AgentRefreshMessage>(message.Body); AgentRefreshMessage runnerUpdateMessage = null;
#if DEBUG #if DEBUG
// Can mock the update for testing // Can mock the update for testing
@@ -502,9 +502,9 @@ namespace GitHub.Runner.Listener
if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version)) if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version))
{ {
var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion; var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion;
_term.WriteLine($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); _term.WriteLine($"Mocking update, using version {mockTargetVersion}");
Trace.Info($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); Trace.Info($"Mocking update, using version {mockTargetVersion}");
runnerUpdateMessage = new AgentRefreshMessage(runnerUpdateMessage.AgentId, mockTargetVersion, runnerUpdateMessage.Timeout); runnerUpdateMessage = new AgentRefreshMessage(settings.AgentId, mockTargetVersion, TimeSpan.FromSeconds(100));
} }
} }
} }
@@ -518,125 +518,125 @@ namespace GitHub.Runner.Listener
Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
} }
} }
else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) // else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{ // {
if (autoUpdateInProgress == false) // if (autoUpdateInProgress == false)
{ // {
autoUpdateInProgress = true; // autoUpdateInProgress = true;
RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString<RunnerRefreshMessage>(message.Body); // RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString<RunnerRefreshMessage>(message.Body);
var selfUpdater = HostContext.GetService<ISelfUpdaterV2>(); // var selfUpdater = HostContext.GetService<ISelfUpdaterV2>();
selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken); // selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken);
Trace.Info("Refresh message received, kick-off selfupdate background process."); // Trace.Info("Refresh message received, kick-off selfupdate background process.");
} // }
else // else
{ // {
Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); // Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running.");
} // }
} // }
else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase)) // else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase))
{ // {
if (autoUpdateInProgress || runOnceJobReceived) // if (autoUpdateInProgress || runOnceJobReceived)
{ // {
skipMessageDeletion = true; // skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); // Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
} // }
else // else
{ // {
Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); // Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'");
var jobMessage = StringUtil.ConvertFromJson<Pipelines.AgentJobRequestMessage>(message.Body); // var jobMessage = StringUtil.ConvertFromJson<Pipelines.AgentJobRequestMessage>(message.Body);
jobDispatcher.Run(jobMessage, runOnce); // jobDispatcher.Run(jobMessage, runOnce);
if (runOnce) // if (runOnce)
{ // {
Trace.Info("One time used runner received job message."); // Trace.Info("One time used runner received job message.");
runOnceJobReceived = true; // runOnceJobReceived = true;
} // }
} // }
} // }
// Broker flow // // Broker flow
else if (MessageUtil.IsRunServiceJob(message.MessageType)) // else if (MessageUtil.IsRunServiceJob(message.MessageType))
{ // {
if (autoUpdateInProgress || runOnceJobReceived) // if (autoUpdateInProgress || runOnceJobReceived)
{ // {
skipMessageDeletion = true; // skipMessageDeletion = true;
Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); // Trace.Info($"Skip message deletion for job request message '{message.MessageId}'.");
} // }
else // else
{ // {
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body); // var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null; // Pipelines.AgentJobRequestMessage jobRequestMessage = null;
// Create connection // // Create connection
var credMgr = HostContext.GetService<ICredentialManager>(); // var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials(); // var creds = credMgr.LoadCredentials();
if (string.IsNullOrEmpty(messageRef.RunServiceUrl)) // if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{ // {
var actionsRunServer = HostContext.CreateService<IActionsRunServer>(); // var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); // await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); // jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
} // }
else // else
{ // {
var runServer = HostContext.CreateService<IRunServer>(); // var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); // await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
try // try
{ // {
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); // jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
_acquireJobThrottler.Reset(); // _acquireJobThrottler.Reset();
} // }
catch (Exception ex) when ( // catch (Exception ex) when (
ex is TaskOrchestrationJobNotFoundException || // HTTP status 404 // ex is TaskOrchestrationJobNotFoundException || // HTTP status 404
ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409 // ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409
ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422 // ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422
{ // {
Trace.Info($"Skipping message Job. {ex.Message}"); // Trace.Info($"Skipping message Job. {ex.Message}");
await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token); // await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token);
continue; // continue;
} // }
catch (Exception ex) // catch (Exception ex)
{ // {
Trace.Error($"Caught exception from acquiring job message: {ex}"); // Trace.Error($"Caught exception from acquiring job message: {ex}");
continue; // continue;
} // }
} // }
jobDispatcher.Run(jobRequestMessage, runOnce); // jobDispatcher.Run(jobRequestMessage, runOnce);
if (runOnce) // if (runOnce)
{ // {
Trace.Info("One time used runner received job message."); // Trace.Info("One time used runner received job message.");
runOnceJobReceived = true; // runOnceJobReceived = true;
} // }
} // }
} // }
else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) // else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{ // {
var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body); // var cancelJobMessage = JsonUtility.FromString<JobCancelMessage>(message.Body);
bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage); // bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage);
skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled; // skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled;
if (skipMessageDeletion) // if (skipMessageDeletion)
{ // {
Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'."); // Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'.");
} // }
} // }
else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase)) // else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase))
{ // {
var HostedRunnerShutdownMessage = JsonUtility.FromString<Pipelines.HostedRunnerShutdownMessage>(message.Body); // var HostedRunnerShutdownMessage = JsonUtility.FromString<Pipelines.HostedRunnerShutdownMessage>(message.Body);
skipMessageDeletion = true; // skipMessageDeletion = true;
skipSessionDeletion = true; // skipSessionDeletion = true;
Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); // Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'.");
return Constants.Runner.ReturnCode.Success; // return Constants.Runner.ReturnCode.Success;
} // }
else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh)) // else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh))
{ // {
Trace.Info("Received ForceTokenRefreshMessage"); // Trace.Info("Received ForceTokenRefreshMessage");
await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token); // await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token);
} // }
else // else
{ // {
Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}."); // Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}.");
} // }
} }
finally finally
{ {

View File

@@ -132,26 +132,39 @@ namespace GitHub.Runner.Listener
private async Task<bool> UpdateNeeded(string targetVersion, CancellationToken token) private async Task<bool> UpdateNeeded(string targetVersion, CancellationToken token)
{ {
// when talk to old version server, always prefer latest package. if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE")))
// old server won't send target version as part of update message.
if (string.IsNullOrEmpty(targetVersion))
{ {
var packages = await _runnerServer.GetPackagesAsync(_packageType, _platform, 1, true, token); _targetPackage = new PackageMetadata()
if (packages == null || packages.Count == 0)
{ {
Trace.Info($"There is no package for {_packageType} and {_platform}."); Type = "agent",
return false; Platform = BuildConstants.RunnerPackage.PackageName,
} Version = new PackageVersion(targetVersion),
};
_targetPackage = packages.FirstOrDefault();
} }
else else
{ {
_targetPackage = await _runnerServer.GetPackageAsync(_packageType, _platform, targetVersion, true, token);
if (_targetPackage == null) // when talk to old version server, always prefer latest package.
// old server won't send target version as part of update message.
if (string.IsNullOrEmpty(targetVersion))
{ {
Trace.Info($"There is no package for {_packageType} and {_platform} with version {targetVersion}."); var packages = await _runnerServer.GetPackagesAsync(_packageType, _platform, 1, true, token);
return false; if (packages == null || packages.Count == 0)
{
Trace.Info($"There is no package for {_packageType} and {_platform}.");
return false;
}
_targetPackage = packages.FirstOrDefault();
}
else
{
_targetPackage = await _runnerServer.GetPackageAsync(_packageType, _platform, targetVersion, true, token);
if (_targetPackage == null)
{
Trace.Info($"There is no package for {_packageType} and {_platform} with version {targetVersion}.");
return false;
}
} }
} }

View File

@@ -1 +1 @@
2.320.0 2.319.0