From 566ff8485a6234ecb7e52ebb89e7cabc4947fae4 Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Tue, 15 Oct 2024 17:28:32 -0400 Subject: [PATCH] . --- src/Misc/layoutroot/mock_update_messages.json | 1 + src/Runner.Listener/MessageListener.cs | 19 +- src/Runner.Listener/Runner.cs | 242 +++++++++--------- src/Runner.Listener/SelfUpdater.cs | 41 ++- src/runnerversion | 2 +- 5 files changed, 160 insertions(+), 145 deletions(-) create mode 100644 src/Misc/layoutroot/mock_update_messages.json diff --git a/src/Misc/layoutroot/mock_update_messages.json b/src/Misc/layoutroot/mock_update_messages.json new file mode 100644 index 000000000..702cc107f --- /dev/null +++ b/src/Misc/layoutroot/mock_update_messages.json @@ -0,0 +1 @@ +{ "2.319.0": {"targetVersion":"2.320.0"}} diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 6a5c9368e..bf4ab5ec1 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -248,15 +248,16 @@ namespace GitHub.Runner.Listener _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); try { - message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, - _session.SessionId, - _lastMessageId, - runnerStatus, - BuildConstants.RunnerPackage.Version, - VarUtil.OS, - VarUtil.OSArchitecture, - _settings.DisableUpdate, - _getMessagesTokenSource.Token); + await Task.Delay(5000); + // message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, + // _session.SessionId, + // _lastMessageId, + // runnerStatus, + // BuildConstants.RunnerPackage.Version, + // VarUtil.OS, + // VarUtil.OSArchitecture, + // _settings.DisableUpdate, + // _getMessagesTokenSource.Token); // Decrypt the message body if the session is using encryption message = DecryptMessage(message); diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 8649af735..fdc293f9d 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -479,14 +479,14 @@ namespace GitHub.Runner.Listener } } - message = await getNextMessage; //get next message - HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); - if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + // message = await getNextMessage; //get next message + // HostContext.WritePerfCounter($"MessageReceived_{message.MessageType}"); + // if (string.Equals(message.MessageType, AgentRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) { if (autoUpdateInProgress == false) { autoUpdateInProgress = true; - AgentRefreshMessage runnerUpdateMessage = JsonUtility.FromString(message.Body); + AgentRefreshMessage runnerUpdateMessage = null; #if DEBUG // Can mock the update for testing @@ -502,9 +502,9 @@ namespace GitHub.Runner.Listener if (mockUpdateMessages.ContainsKey(BuildConstants.RunnerPackage.Version)) { var mockTargetVersion = mockUpdateMessages[BuildConstants.RunnerPackage.Version].TargetVersion; - _term.WriteLine($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - Trace.Info($"Mocking update, using version {mockTargetVersion} instead of {runnerUpdateMessage.TargetVersion}"); - runnerUpdateMessage = new AgentRefreshMessage(runnerUpdateMessage.AgentId, mockTargetVersion, runnerUpdateMessage.Timeout); + _term.WriteLine($"Mocking update, using version {mockTargetVersion}"); + Trace.Info($"Mocking update, using version {mockTargetVersion}"); + runnerUpdateMessage = new AgentRefreshMessage(settings.AgentId, mockTargetVersion, TimeSpan.FromSeconds(100)); } } } @@ -518,125 +518,125 @@ namespace GitHub.Runner.Listener Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); } } - else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) - { - if (autoUpdateInProgress == false) - { - autoUpdateInProgress = true; - RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString(message.Body); + // else if (string.Equals(message.MessageType, RunnerRefreshMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + // { + // if (autoUpdateInProgress == false) + // { + // autoUpdateInProgress = true; + // RunnerRefreshMessage brokerRunnerUpdateMessage = JsonUtility.FromString(message.Body); - var selfUpdater = HostContext.GetService(); - selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken); - Trace.Info("Refresh message received, kick-off selfupdate background process."); - } - else - { - Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); - } - } - else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase)) - { - if (autoUpdateInProgress || runOnceJobReceived) - { - skipMessageDeletion = true; - Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); - } - else - { - Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); - var jobMessage = StringUtil.ConvertFromJson(message.Body); - jobDispatcher.Run(jobMessage, runOnce); - if (runOnce) - { - Trace.Info("One time used runner received job message."); - runOnceJobReceived = true; - } - } - } - // Broker flow - else if (MessageUtil.IsRunServiceJob(message.MessageType)) - { - if (autoUpdateInProgress || runOnceJobReceived) - { - skipMessageDeletion = true; - Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); - } - else - { - var messageRef = StringUtil.ConvertFromJson(message.Body); - Pipelines.AgentJobRequestMessage jobRequestMessage = null; + // var selfUpdater = HostContext.GetService(); + // selfUpdateTask = selfUpdater.SelfUpdate(brokerRunnerUpdateMessage, jobDispatcher, false, HostContext.RunnerShutdownToken); + // Trace.Info("Refresh message received, kick-off selfupdate background process."); + // } + // else + // { + // Trace.Info("Refresh message received, skip autoupdate since a previous autoupdate is already running."); + // } + // } + // else if (string.Equals(message.MessageType, JobRequestMessageTypes.PipelineAgentJobRequest, StringComparison.OrdinalIgnoreCase)) + // { + // if (autoUpdateInProgress || runOnceJobReceived) + // { + // skipMessageDeletion = true; + // Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); + // } + // else + // { + // Trace.Info($"Received job message of length {message.Body.Length} from service, with hash '{IOUtil.GetSha256Hash(message.Body)}'"); + // var jobMessage = StringUtil.ConvertFromJson(message.Body); + // jobDispatcher.Run(jobMessage, runOnce); + // if (runOnce) + // { + // Trace.Info("One time used runner received job message."); + // runOnceJobReceived = true; + // } + // } + // } + // // Broker flow + // else if (MessageUtil.IsRunServiceJob(message.MessageType)) + // { + // if (autoUpdateInProgress || runOnceJobReceived) + // { + // skipMessageDeletion = true; + // Trace.Info($"Skip message deletion for job request message '{message.MessageId}'."); + // } + // else + // { + // var messageRef = StringUtil.ConvertFromJson(message.Body); + // Pipelines.AgentJobRequestMessage jobRequestMessage = null; - // Create connection - var credMgr = HostContext.GetService(); - var creds = credMgr.LoadCredentials(); + // // Create connection + // var credMgr = HostContext.GetService(); + // var creds = credMgr.LoadCredentials(); - if (string.IsNullOrEmpty(messageRef.RunServiceUrl)) - { - var actionsRunServer = HostContext.CreateService(); - await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); - jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); - } - else - { - var runServer = HostContext.CreateService(); - await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); - try - { - jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); - _acquireJobThrottler.Reset(); - } - catch (Exception ex) when ( - ex is TaskOrchestrationJobNotFoundException || // HTTP status 404 - ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409 - ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422 - { - Trace.Info($"Skipping message Job. {ex.Message}"); - await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token); - continue; - } - catch (Exception ex) - { - Trace.Error($"Caught exception from acquiring job message: {ex}"); - continue; - } - } + // if (string.IsNullOrEmpty(messageRef.RunServiceUrl)) + // { + // var actionsRunServer = HostContext.CreateService(); + // await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); + // jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + // } + // else + // { + // var runServer = HostContext.CreateService(); + // await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds); + // try + // { + // jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + // _acquireJobThrottler.Reset(); + // } + // catch (Exception ex) when ( + // ex is TaskOrchestrationJobNotFoundException || // HTTP status 404 + // ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409 + // ex is TaskOrchestrationJobUnprocessableException) // HTTP status 422 + // { + // Trace.Info($"Skipping message Job. {ex.Message}"); + // await _acquireJobThrottler.IncrementAndWaitAsync(messageQueueLoopTokenSource.Token); + // continue; + // } + // catch (Exception ex) + // { + // Trace.Error($"Caught exception from acquiring job message: {ex}"); + // continue; + // } + // } - jobDispatcher.Run(jobRequestMessage, runOnce); - if (runOnce) - { - Trace.Info("One time used runner received job message."); - runOnceJobReceived = true; - } - } - } - else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) - { - var cancelJobMessage = JsonUtility.FromString(message.Body); - bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage); - skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled; + // jobDispatcher.Run(jobRequestMessage, runOnce); + // if (runOnce) + // { + // Trace.Info("One time used runner received job message."); + // runOnceJobReceived = true; + // } + // } + // } + // else if (string.Equals(message.MessageType, JobCancelMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + // { + // var cancelJobMessage = JsonUtility.FromString(message.Body); + // bool jobCancelled = jobDispatcher.Cancel(cancelJobMessage); + // skipMessageDeletion = (autoUpdateInProgress || runOnceJobReceived) && !jobCancelled; - if (skipMessageDeletion) - { - Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'."); - } - } - else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase)) - { - var HostedRunnerShutdownMessage = JsonUtility.FromString(message.Body); - skipMessageDeletion = true; - skipSessionDeletion = true; - Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); - return Constants.Runner.ReturnCode.Success; - } - else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh)) - { - Trace.Info("Received ForceTokenRefreshMessage"); - await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token); - } - else - { - Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}."); - } + // if (skipMessageDeletion) + // { + // Trace.Info($"Skip message deletion for cancellation message '{message.MessageId}'."); + // } + // } + // else if (string.Equals(message.MessageType, Pipelines.HostedRunnerShutdownMessage.MessageType, StringComparison.OrdinalIgnoreCase)) + // { + // var HostedRunnerShutdownMessage = JsonUtility.FromString(message.Body); + // skipMessageDeletion = true; + // skipSessionDeletion = true; + // Trace.Info($"Service requests the hosted runner to shutdown. Reason: '{HostedRunnerShutdownMessage.Reason}'."); + // return Constants.Runner.ReturnCode.Success; + // } + // else if (string.Equals(message.MessageType, TaskAgentMessageTypes.ForceTokenRefresh)) + // { + // Trace.Info("Received ForceTokenRefreshMessage"); + // await _listener.RefreshListenerTokenAsync(messageQueueLoopTokenSource.Token); + // } + // else + // { + // Trace.Error($"Received message {message.MessageId} with unsupported message type {message.MessageType}."); + // } } finally { diff --git a/src/Runner.Listener/SelfUpdater.cs b/src/Runner.Listener/SelfUpdater.cs index 6ebeebd82..29d2475f1 100644 --- a/src/Runner.Listener/SelfUpdater.cs +++ b/src/Runner.Listener/SelfUpdater.cs @@ -132,26 +132,39 @@ namespace GitHub.Runner.Listener private async Task UpdateNeeded(string targetVersion, CancellationToken token) { - // when talk to old version server, always prefer latest package. - // old server won't send target version as part of update message. - if (string.IsNullOrEmpty(targetVersion)) + if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_IS_MOCK_UPDATE"))) { - var packages = await _runnerServer.GetPackagesAsync(_packageType, _platform, 1, true, token); - if (packages == null || packages.Count == 0) + _targetPackage = new PackageMetadata() { - Trace.Info($"There is no package for {_packageType} and {_platform}."); - return false; - } - - _targetPackage = packages.FirstOrDefault(); + Type = "agent", + Platform = BuildConstants.RunnerPackage.PackageName, + Version = new PackageVersion(targetVersion), + }; } else { - _targetPackage = await _runnerServer.GetPackageAsync(_packageType, _platform, targetVersion, true, token); - if (_targetPackage == null) + + // when talk to old version server, always prefer latest package. + // old server won't send target version as part of update message. + if (string.IsNullOrEmpty(targetVersion)) { - Trace.Info($"There is no package for {_packageType} and {_platform} with version {targetVersion}."); - return false; + var packages = await _runnerServer.GetPackagesAsync(_packageType, _platform, 1, true, token); + if (packages == null || packages.Count == 0) + { + Trace.Info($"There is no package for {_packageType} and {_platform}."); + return false; + } + + _targetPackage = packages.FirstOrDefault(); + } + else + { + _targetPackage = await _runnerServer.GetPackageAsync(_packageType, _platform, targetVersion, true, token); + if (_targetPackage == null) + { + Trace.Info($"There is no package for {_packageType} and {_platform} with version {targetVersion}."); + return false; + } } } diff --git a/src/runnerversion b/src/runnerversion index 8084ad311..8330b5bf1 100644 --- a/src/runnerversion +++ b/src/runnerversion @@ -1 +1 @@ -2.320.0 +2.319.0