Files
runner/src/Runner.Common/ProcessChannel.cs
jeff 42fe704132 Add message size in chars and bytes (#1100)
* Add message size in chars and bytes

* Log hash of message body
2021-06-10 15:58:56 -04:00

104 lines
3.8 KiB
C#

using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using GitHub.Runner.Sdk;
namespace GitHub.Runner.Common
{
public delegate void StartProcessDelegate(string pipeHandleOut, string pipeHandleIn);
public enum MessageType
{
NotInitialized = -1,
NewJobRequest = 1,
CancelRequest = 2,
RunnerShutdown = 3,
OperatingSystemShutdown = 4
}
public struct WorkerMessage
{
public MessageType MessageType;
public string Body;
public WorkerMessage(MessageType messageType, string body)
{
MessageType = messageType;
Body = body;
}
}
[ServiceLocator(Default = typeof(ProcessChannel))]
public interface IProcessChannel : IDisposable, IRunnerService
{
void StartServer(StartProcessDelegate startProcess);
void StartClient(string pipeNameInput, string pipeNameOutput);
Task SendAsync(MessageType messageType, string body, CancellationToken cancellationToken);
Task<WorkerMessage> ReceiveAsync(CancellationToken cancellationToken);
}
public sealed class ProcessChannel : RunnerService, IProcessChannel
{
private AnonymousPipeServerStream _inServer;
private AnonymousPipeServerStream _outServer;
private AnonymousPipeClientStream _inClient;
private AnonymousPipeClientStream _outClient;
private StreamString _writeStream;
private StreamString _readStream;
public void StartServer(StartProcessDelegate startProcess)
{
_outServer = new AnonymousPipeServerStream(PipeDirection.Out, HandleInheritability.Inheritable);
_inServer = new AnonymousPipeServerStream(PipeDirection.In, HandleInheritability.Inheritable);
_readStream = new StreamString(_inServer);
_writeStream = new StreamString(_outServer);
startProcess(_outServer.GetClientHandleAsString(), _inServer.GetClientHandleAsString());
_outServer.DisposeLocalCopyOfClientHandle();
_inServer.DisposeLocalCopyOfClientHandle();
}
public void StartClient(string pipeNameInput, string pipeNameOutput)
{
_inClient = new AnonymousPipeClientStream(PipeDirection.In, pipeNameInput);
_outClient = new AnonymousPipeClientStream(PipeDirection.Out, pipeNameOutput);
_readStream = new StreamString(_inClient);
_writeStream = new StreamString(_outClient);
}
public async Task SendAsync(MessageType messageType, string body, CancellationToken cancellationToken)
{
Trace.Info($"Sending message of length {body.Length}, with hash '{IOUtil.GetSha256Hash(body)}'");
await _writeStream.WriteInt32Async((int)messageType, cancellationToken);
await _writeStream.WriteStringAsync(body, cancellationToken);
}
public async Task<WorkerMessage> ReceiveAsync(CancellationToken cancellationToken)
{
WorkerMessage result = new WorkerMessage(MessageType.NotInitialized, string.Empty);
result.MessageType = (MessageType)await _readStream.ReadInt32Async(cancellationToken);
result.Body = await _readStream.ReadStringAsync(cancellationToken);
Trace.Info($"Receiving message of length {result.Body.Length}, with hash '{IOUtil.GetSha256Hash(result.Body)}'");
return result;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
_inServer?.Dispose();
_outServer?.Dispose();
_inClient?.Dispose();
_outClient?.Dispose();
}
}
}
}