diff --git a/src/Runner.Worker/Dap/DapDebugger.cs b/src/Runner.Worker/Dap/DapDebugger.cs index 9416b16e7..359c6a4d5 100644 --- a/src/Runner.Worker/Dap/DapDebugger.cs +++ b/src/Runner.Worker/Dap/DapDebugger.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using GitHub.DistributedTask.WebApi; using GitHub.Runner.Common; +using GitHub.Runner.Sdk; using Newtonsoft.Json; namespace GitHub.Runner.Worker.Dap @@ -36,6 +37,7 @@ namespace GitHub.Runner.Worker.Dap private const string ContentLengthHeader = "Content-Length: "; private const int MaxMessageSize = 10 * 1024 * 1024; // 10 MB private const int MaxHeaderLineLength = 8192; // 8 KB + private const int ConnectionRetryDelayMilliseconds = 500; // Thread ID for the single job execution thread private const int JobThreadId = 1; @@ -54,7 +56,6 @@ namespace GitHub.Runner.Worker.Dap private readonly SemaphoreSlim _sendLock = new SemaphoreSlim(1, 1); private int _nextSeq = 1; private Task _connectionLoopTask; - private volatile bool _acceptConnections = true; private volatile DapSessionState _state = DapSessionState.WaitingForConnection; private CancellationTokenRegistration? _cancellationRegistration; private volatile bool _started; @@ -65,7 +66,7 @@ namespace GitHub.Runner.Worker.Dap private readonly object _stateLock = new object(); // Handshake completion — signaled when configurationDone is received - private TaskCompletionSource _handshakeTcs = CreateHandshakeCompletionSource(); + private TaskCompletionSource _handshakeTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); // Whether to pause before the next step (set by 'next' command) private bool _pauseOnNextStep = true; @@ -103,13 +104,15 @@ namespace GitHub.Runner.Worker.Dap Trace.Info("DapDebugger initialized"); } - public Task StartAsync(CancellationToken cancellationToken) + public Task StartAsync(IExecutionContext jobContext) { + ArgUtil.NotNull(jobContext, nameof(jobContext)); var port = ResolvePort(); Trace.Info($"Starting DAP debugger on port {port}"); - _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _jobContext = jobContext; + _cts = CancellationTokenSource.CreateLinkedTokenSource(jobContext.CancellationToken); _connectionTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _listener = new TcpListener(IPAddress.Loopback, port); @@ -123,13 +126,14 @@ namespace GitHub.Runner.Worker.Dap return Task.CompletedTask; } - public async Task WaitUntilReadyAsync(CancellationToken cancellationToken) + public async Task WaitUntilReadyAsync() { - if (!_started || _listener == null) + if (!_started || _listener == null || _jobContext == null) { return; } + var cancellationToken = _jobContext.CancellationToken; var timeoutMinutes = ResolveTimeout(); using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(timeoutMinutes)); using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); @@ -185,7 +189,6 @@ namespace GitHub.Runner.Worker.Dap try { Trace.Info("Stopping DAP debugger"); - _acceptConnections = false; _cts?.Cancel(); CleanupConnection(); @@ -264,7 +267,7 @@ namespace GitHub.Runner.Worker.Dap Trace.Info("Debug session cancelled"); } - public async Task OnStepStartingAsync(IStep step, IExecutionContext jobContext, CancellationToken cancellationToken) + public async Task OnStepStartingAsync(IStep step) { if (!IsActive) { @@ -275,7 +278,7 @@ namespace GitHub.Runner.Worker.Dap { bool isFirst = _isFirstStep; _isFirstStep = false; - await OnStepStartingAsync(step, jobContext, isFirst, cancellationToken); + await OnStepStartingAsync(step, isFirst); } catch (Exception ex) { @@ -374,6 +377,15 @@ namespace GitHub.Runner.Worker.Dap response.Command = request.Command; SendResponse(response); + + if (request.Command == "initialize") + { + SendEvent(new Event + { + EventType = "initialized" + }); + Trace.Info("Sent initialized event"); + } } catch (Exception ex) { @@ -431,7 +443,7 @@ namespace GitHub.Runner.Worker.Dap catch { /* listener already stopped */ } }); - while (_acceptConnections && !cancellationToken.IsCancellationRequested) + while (!cancellationToken.IsCancellationRequested) { try { @@ -456,27 +468,21 @@ namespace GitHub.Runner.Worker.Dap HandleClientDisconnected(); CleanupConnection(); } - catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested) - { - break; - } - catch (SocketException) when (cancellationToken.IsCancellationRequested) - { - break; - } catch (Exception ex) { - Trace.Warning($"Connection error ({ex.GetType().Name})"); CleanupConnection(); - if (!_acceptConnections || cancellationToken.IsCancellationRequested) + if (cancellationToken.IsCancellationRequested) { break; } + Trace.Error("Debugger connection error"); + Trace.Error(ex); + try { - await Task.Delay(100, cancellationToken); + await Task.Delay(ConnectionRetryDelayMilliseconds, cancellationToken); } catch (OperationCanceledException) { @@ -753,9 +759,10 @@ namespace GitHub.Runner.Worker.Dap Trace.Info("DAP handshake complete, session is ready"); } - internal async Task OnStepStartingAsync(IStep step, IExecutionContext jobContext, bool isFirstStep, CancellationToken cancellationToken) + internal async Task OnStepStartingAsync(IStep step, bool isFirstStep) { bool pauseOnNextStep; + CancellationToken cancellationToken; lock (_stateLock) { if (_state != DapSessionState.Ready && @@ -766,9 +773,9 @@ namespace GitHub.Runner.Worker.Dap } _currentStep = step; - _jobContext = jobContext; _currentStepIndex = _completedSteps.Count; pauseOnNextStep = _pauseOnNextStep; + cancellationToken = _jobContext?.CancellationToken ?? CancellationToken.None; } // Reset variable references so stale nested refs from the @@ -830,11 +837,6 @@ namespace GitHub.Runner.Worker.Dap }); } - private static TaskCompletionSource CreateHandshakeCompletionSource() - { - return new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - private Response HandleInitialize(Request request) { if (request.Arguments != null) @@ -883,18 +885,6 @@ namespace GitHub.Runner.Worker.Dap SupportsExceptionInfoRequest = false, }; - // Send initialized event after a brief delay to ensure the - // response is delivered first (DAP spec requirement) - _ = Task.Run(async () => - { - await Task.Delay(50); - SendEvent(new Event - { - EventType = "initialized" - }); - Trace.Info("Sent initialized event"); - }); - Trace.Info("Initialize request handled, capabilities sent"); return CreateResponse(request, true, body: capabilities); } diff --git a/src/Runner.Worker/Dap/IDapDebugger.cs b/src/Runner.Worker/Dap/IDapDebugger.cs index d895850da..72bff0175 100644 --- a/src/Runner.Worker/Dap/IDapDebugger.cs +++ b/src/Runner.Worker/Dap/IDapDebugger.cs @@ -1,5 +1,4 @@ -using System.Threading; -using System.Threading.Tasks; +using System.Threading.Tasks; using GitHub.Runner.Common; namespace GitHub.Runner.Worker.Dap @@ -17,9 +16,9 @@ namespace GitHub.Runner.Worker.Dap [ServiceLocator(Default = typeof(DapDebugger))] public interface IDapDebugger : IRunnerService { - Task StartAsync(CancellationToken cancellationToken); - Task WaitUntilReadyAsync(CancellationToken cancellationToken); - Task OnStepStartingAsync(IStep step, IExecutionContext jobContext, CancellationToken cancellationToken); + Task StartAsync(IExecutionContext jobContext); + Task WaitUntilReadyAsync(); + Task OnStepStartingAsync(IStep step); void OnStepCompleted(IStep step); Task OnJobCompletedAsync(); } diff --git a/src/Runner.Worker/JobRunner.cs b/src/Runner.Worker/JobRunner.cs index bb1c74d8b..10623bbef 100644 --- a/src/Runner.Worker/JobRunner.cs +++ b/src/Runner.Worker/JobRunner.cs @@ -189,7 +189,7 @@ namespace GitHub.Runner.Worker try { dapDebugger = HostContext.GetService(); - await dapDebugger.StartAsync(jobRequestCancellationToken); + await dapDebugger.StartAsync(jobContext); } catch (Exception ex) { @@ -248,7 +248,7 @@ namespace GitHub.Runner.Worker { try { - await dapDebugger.WaitUntilReadyAsync(jobRequestCancellationToken); + await dapDebugger.WaitUntilReadyAsync(); AddDebuggerConnectionTelemetry(jobContext, "Connected"); } catch (OperationCanceledException) when (jobRequestCancellationToken.IsCancellationRequested) diff --git a/src/Runner.Worker/StepsRunner.cs b/src/Runner.Worker/StepsRunner.cs index a10901200..21bdfa6f7 100644 --- a/src/Runner.Worker/StepsRunner.cs +++ b/src/Runner.Worker/StepsRunner.cs @@ -229,7 +229,7 @@ namespace GitHub.Runner.Worker else { // Pause for DAP debugger before step execution - await dapDebugger?.OnStepStartingAsync(step, jobContext, jobContext.CancellationToken); + await dapDebugger?.OnStepStartingAsync(step); // Run the step await RunStepAsync(step, jobContext.CancellationToken); diff --git a/src/Test/L0/Worker/DapDebuggerL0.cs b/src/Test/L0/Worker/DapDebuggerL0.cs index 1a9b18663..df6a49e74 100644 --- a/src/Test/L0/Worker/DapDebuggerL0.cs +++ b/src/Test/L0/Worker/DapDebuggerL0.cs @@ -6,6 +6,7 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; +using Moq; using GitHub.Runner.Worker; using GitHub.Runner.Worker.Dap; using Newtonsoft.Json; @@ -139,6 +140,16 @@ namespace GitHub.Runner.Common.Tests.Worker return Encoding.UTF8.GetString(body); } + private static Mock CreateJobContext(CancellationToken cancellationToken, string jobName = null) + { + var jobContext = new Mock(); + jobContext.Setup(x => x.CancellationToken).Returns(cancellationToken); + jobContext + .Setup(x => x.GetGitHubContext(It.IsAny())) + .Returns((string contextName) => string.Equals(contextName, "job", StringComparison.Ordinal) ? jobName : null); + return jobContext; + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] @@ -246,7 +257,8 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); using var client = await ConnectClientAsync(port); Assert.True(client.Connected); await _debugger.StopAsync(); @@ -266,7 +278,8 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); await _debugger.StopAsync(); }); } @@ -284,9 +297,10 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); - var waitTask = _debugger.WaitUntilReadyAsync(cts.Token); + var waitTask = _debugger.WaitUntilReadyAsync(); using var client = await ConnectClientAsync(port); await SendRequestAsync(client.GetStream(), new Request { @@ -302,6 +316,36 @@ namespace GitHub.Runner.Common.Tests.Worker } } + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Worker")] + public async Task StartStoresJobContextForThreadsRequest() + { + using (CreateTestContext()) + { + var port = GetFreePort(); + await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var jobContext = CreateJobContext(cts.Token, "ci-job"); + await _debugger.StartAsync(jobContext.Object); + using var client = await ConnectClientAsync(port); + var stream = client.GetStream(); + await SendRequestAsync(client.GetStream(), new Request + { + Seq = 1, + Type = "request", + Command = "threads" + }); + + var response = await ReadDapMessageAsync(stream, TimeSpan.FromSeconds(5)); + Assert.Contains("\"command\":\"threads\"", response); + Assert.Contains("\"name\":\"Job: ci-job\"", response); + await _debugger.StopAsync(); + }); + } + } + [Fact] [Trait("Level", "L0")] [Trait("Category", "Worker")] @@ -313,9 +357,10 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); - var waitTask = _debugger.WaitUntilReadyAsync(cts.Token); + var waitTask = _debugger.WaitUntilReadyAsync(); using var client = await ConnectClientAsync(port); await SendRequestAsync(client.GetStream(), new Request { @@ -355,9 +400,10 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); - var waitTask = _debugger.WaitUntilReadyAsync(cts.Token); + var waitTask = _debugger.WaitUntilReadyAsync(); using var client = await ConnectClientAsync(port); await SendRequestAsync(client.GetStream(), new Request { @@ -380,7 +426,7 @@ namespace GitHub.Runner.Common.Tests.Worker { using (CreateTestContext()) { - await _debugger.WaitUntilReadyAsync(CancellationToken.None); + await _debugger.WaitUntilReadyAsync(); } } @@ -395,9 +441,10 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); - var waitTask = _debugger.WaitUntilReadyAsync(cts.Token); + var waitTask = _debugger.WaitUntilReadyAsync(); await Task.Delay(50); cts.Cancel(); @@ -424,7 +471,8 @@ namespace GitHub.Runner.Common.Tests.Worker await WithEnvironmentVariableAsync(PortEnvironmentVariable, port.ToString(), async () => { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); - await _debugger.StartAsync(cts.Token); + var jobContext = CreateJobContext(cts.Token); + await _debugger.StartAsync(jobContext.Object); using var client = await ConnectClientAsync(port); var stream = client.GetStream();