Compare commits

..

1 Commits

Author SHA1 Message Date
JoannaaKL
3abc186ede Update TaskAgentHttpClientBase.cs 2023-02-16 16:11:37 +01:00
37 changed files with 243 additions and 12229 deletions

View File

@@ -24,26 +24,11 @@ RUN export DOCKER_ARCH=x86_64 \
FROM mcr.microsoft.com/dotnet/runtime-deps:6.0 FROM mcr.microsoft.com/dotnet/runtime-deps:6.0
ENV DEBIAN_FRONTEND=noninteractive ENV RUNNER_ALLOW_RUNASROOT=1
ENV RUNNER_MANUALLY_TRAP_SIG=1 ENV RUNNER_MANUALLY_TRAP_SIG=1
ENV ACTIONS_RUNNER_PRINT_LOG_TO_STDOUT=1 ENV ACTIONS_RUNNER_PRINT_LOG_TO_STDOUT=1
RUN apt-get update -y \ WORKDIR /actions-runner
&& apt-get install -y --no-install-recommends \ COPY --from=build /actions-runner .
sudo \
&& rm -rf /var/lib/apt/lists/*
RUN adduser --disabled-password --gecos "" --uid 1001 runner \ RUN install -o root -g root -m 755 docker/* /usr/bin/ && rm -rf docker
&& groupadd docker --gid 123 \
&& usermod -aG sudo runner \
&& usermod -aG docker runner \
&& echo "%sudo ALL=(ALL:ALL) NOPASSWD:ALL" > /etc/sudoers \
&& echo "Defaults env_keep += \"DEBIAN_FRONTEND\"" >> /etc/sudoers
WORKDIR /home/runner
COPY --chown=runner:docker --from=build /actions-runner .
RUN install -o root -g root -m 755 docker/* /usr/bin/ && rm -rf docker
USER runner

View File

@@ -1,3 +0,0 @@
dist/
lib/
node_modules/

View File

@@ -1,61 +0,0 @@
{
"plugins": ["@typescript-eslint"],
"extends": ["plugin:github/recommended"],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 9,
"sourceType": "module",
"project": "./tsconfig.json"
},
"rules": {
"eslint-comments/no-use": "off",
"import/no-namespace": "off",
"no-console": "off",
"no-unused-vars": "off",
"@typescript-eslint/no-unused-vars": "error",
"@typescript-eslint/explicit-member-accessibility": ["error", {"accessibility": "no-public"}],
"@typescript-eslint/no-require-imports": "error",
"@typescript-eslint/array-type": "error",
"@typescript-eslint/await-thenable": "error",
"@typescript-eslint/naming-convention": [
"error",
{
"selector": "default",
"format": ["camelCase"]
}
],
"camelcase": "off",
"@typescript-eslint/explicit-function-return-type": ["error", {"allowExpressions": true}],
"@typescript-eslint/func-call-spacing": ["error", "never"],
"@typescript-eslint/no-array-constructor": "error",
"@typescript-eslint/no-empty-interface": "error",
"@typescript-eslint/no-explicit-any": "error",
"@typescript-eslint/no-extraneous-class": "error",
"@typescript-eslint/no-for-in-array": "error",
"@typescript-eslint/no-inferrable-types": "error",
"@typescript-eslint/no-misused-new": "error",
"@typescript-eslint/no-namespace": "error",
"@typescript-eslint/no-non-null-assertion": "warn",
"@typescript-eslint/no-unnecessary-qualifier": "error",
"@typescript-eslint/no-unnecessary-type-assertion": "error",
"@typescript-eslint/no-useless-constructor": "error",
"@typescript-eslint/no-var-requires": "error",
"@typescript-eslint/prefer-for-of": "warn",
"@typescript-eslint/prefer-function-type": "warn",
"@typescript-eslint/prefer-includes": "error",
"@typescript-eslint/prefer-string-starts-ends-with": "error",
"@typescript-eslint/promise-function-async": "error",
"@typescript-eslint/require-array-sort-compare": "error",
"@typescript-eslint/restrict-plus-operands": "error",
"@typescript-eslint/semi": ["error", "never"],
"@typescript-eslint/type-annotation-spacing": "error",
"@typescript-eslint/unbound-method": "error",
"filenames/match-regex" : "off",
"github/no-then" : 1, // warning
"semi": "off"
},
"env": {
"node": true,
"es6": true
}
}

View File

@@ -1,3 +0,0 @@
dist/
lib/
node_modules/

View File

@@ -1,11 +0,0 @@
{
"printWidth": 80,
"tabWidth": 2,
"useTabs": false,
"semi": false,
"singleQuote": true,
"trailingComma": "none",
"bracketSpacing": false,
"arrowParens": "avoid",
"parser": "typescript"
}

View File

@@ -1,4 +0,0 @@
To compile this package (output will be stored in `Misc/layoutbin`) run `npm install && npm run all`.
> Note: this package also needs to be recompiled for dependabot PRs updating one of
> its dependencies.

File diff suppressed because it is too large Load Diff

View File

@@ -1,36 +0,0 @@
{
"name": "hashFiles",
"version": "1.0.0",
"description": "GitHub Actions HashFiles() expression function",
"main": "lib/hashFiles.js",
"scripts": {
"build": "tsc",
"format": "prettier --write **/*.ts",
"format-check": "prettier --check **/*.ts",
"lint": "eslint src/**/*.ts",
"pack": "ncc build -o ../../layoutbin/hashFilesV2",
"all": "npm run build && npm run format && npm run lint && npm run pack"
},
"repository": {
"type": "git",
"url": "git+https://github.com/actions/runner.git"
},
"keywords": [
"actions"
],
"author": "GitHub Actions",
"license": "MIT",
"dependencies": {
"@actions/glob": "^0.4.0"
},
"devDependencies": {
"@types/node": "^12.7.12",
"@typescript-eslint/parser": "^5.15.0",
"@vercel/ncc": "^0.36.0",
"eslint": "^8.11.0",
"eslint-plugin-github": "^4.3.5",
"eslint-plugin-prettier": "^4.2.1",
"prettier": "^2.8.4",
"typescript": "^3.6.4"
}
}

View File

@@ -1,31 +0,0 @@
import * as glob from '@actions/glob'
async function run(): Promise<void> {
// arg0 -> node
// arg1 -> hashFiles.js
// env[followSymbolicLinks] = true/null
// env[patterns] -> glob patterns
let followSymbolicLinks = false
const matchPatterns = process.env.patterns || ''
if (process.env.followSymbolicLinks === 'true') {
console.log('Follow symbolic links')
followSymbolicLinks = true
}
const githubWorkspace = process.cwd()
console.log(`Match Pattern: ${matchPatterns}`)
try {
const result = await glob.hashFiles(
matchPatterns,
githubWorkspace,
{followSymbolicLinks},
true
)
console.error(`__OUTPUT__${result}__OUTPUT__`)
process.exit(0)
} catch (error) {
console.log(error)
process.exit(1)
}
}
run()

View File

@@ -1,12 +0,0 @@
{
"compilerOptions": {
"target": "es6", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019' or 'ESNEXT'. */
"module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', or 'ESNext'. */
"outDir": "./lib", /* Redirect output structure to the directory. */
"rootDir": "./src", /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */
"strict": true, /* Enable all strict type-checking options. */
"noImplicitAny": true, /* Raise error on expressions and declarations with an implied 'any' type. */
"esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */
},
"exclude": ["node_modules", "**/*.test.ts"]
}

View File

@@ -24,8 +24,7 @@ if (exitServiceAfterNFailures <= 0) {
exitServiceAfterNFailures = NaN; exitServiceAfterNFailures = NaN;
} }
var unknownFailureRetryCount = 0; var consecutiveFailureCount = 0;
var retriableFailureRetryCount = 0;
var gracefulShutdown = function () { var gracefulShutdown = function () {
console.log("Shutting down runner listener"); console.log("Shutting down runner listener");
@@ -63,8 +62,7 @@ var runService = function () {
listener.stdout.on("data", (data) => { listener.stdout.on("data", (data) => {
if (data.toString("utf8").includes("Listening for Jobs")) { if (data.toString("utf8").includes("Listening for Jobs")) {
unknownFailureRetryCount = 0; consecutiveFailureCount = 0;
retriableFailureRetryCount = 0;
} }
process.stdout.write(data.toString("utf8")); process.stdout.write(data.toString("utf8"));
}); });
@@ -94,38 +92,24 @@ var runService = function () {
console.log( console.log(
"Runner listener exit with retryable error, re-launch runner in 5 seconds." "Runner listener exit with retryable error, re-launch runner in 5 seconds."
); );
unknownFailureRetryCount = 0; consecutiveFailureCount = 0;
retriableFailureRetryCount++;
if (retriableFailureRetryCount >= 10) {
console.error(
"Stopping the runner after 10 consecutive re-tryable failures"
);
stopping = true;
}
} else if (code === 3 || code === 4) { } else if (code === 3 || code === 4) {
console.log( console.log(
"Runner listener exit because of updating, re-launch runner in 5 seconds." "Runner listener exit because of updating, re-launch runner in 5 seconds."
); );
unknownFailureRetryCount = 0; consecutiveFailureCount = 0;
retriableFailureRetryCount++;
if (retriableFailureRetryCount >= 10) {
console.error(
"Stopping the runner after 10 consecutive re-tryable failures"
);
stopping = true;
}
} else { } else {
var messagePrefix = "Runner listener exit with undefined return code"; var messagePrefix = "Runner listener exit with undefined return code";
unknownFailureRetryCount++; consecutiveFailureCount++;
retriableFailureRetryCount = 0;
if ( if (
!isNaN(exitServiceAfterNFailures) && !isNaN(exitServiceAfterNFailures) &&
unknownFailureRetryCount >= exitServiceAfterNFailures consecutiveFailureCount >= exitServiceAfterNFailures
) { ) {
console.error( console.error(
`${messagePrefix}, exiting service after ${unknownFailureRetryCount} consecutive failures` `${messagePrefix}, exiting service after ${consecutiveFailureCount} consecutive failures`
); );
stopping = true gracefulShutdown();
return;
} else { } else {
console.log(`${messagePrefix}, re-launch runner in 5 seconds.`); console.log(`${messagePrefix}, re-launch runner in 5 seconds.`);
} }

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,6 @@ checkScripts/downloadCert.js
checkScripts/makeWebRequest.js checkScripts/makeWebRequest.js
darwin.svc.sh.template darwin.svc.sh.template
hashFiles/index.js hashFiles/index.js
hashFilesV2/index.js
installdependencies.sh installdependencies.sh
macos-run-invoker.js macos-run-invoker.js
Microsoft.IdentityModel.Logging.dll Microsoft.IdentityModel.Logging.dll

View File

@@ -1,4 +1,4 @@
using System; using System;
namespace GitHub.Runner.Common namespace GitHub.Runner.Common
{ {
@@ -156,7 +156,6 @@ namespace GitHub.Runner.Common
public static readonly string Node12Warning = "DistributedTask.AddWarningToNode12Action"; public static readonly string Node12Warning = "DistributedTask.AddWarningToNode12Action";
public static readonly string UseContainerPathForTemplate = "DistributedTask.UseContainerPathForTemplate"; public static readonly string UseContainerPathForTemplate = "DistributedTask.UseContainerPathForTemplate";
public static readonly string AllowRunnerContainerHooks = "DistributedTask.AllowRunnerContainerHooks"; public static readonly string AllowRunnerContainerHooks = "DistributedTask.AllowRunnerContainerHooks";
public static readonly string UseGlobHashFiles = "DistributedTask.UseGlobHashFiles";
} }
public static readonly string InternalTelemetryIssueDataKey = "_internal_telemetry"; public static readonly string InternalTelemetryIssueDataKey = "_internal_telemetry";

View File

@@ -30,9 +30,7 @@ namespace GitHub.Runner.Common
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken); Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken); Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
@@ -318,7 +316,7 @@ namespace GitHub.Runner.Common
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken); return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
} }
public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
{ {
if (_resultsClient != null) if (_resultsClient != null)
{ {
@@ -327,23 +325,6 @@ namespace GitHub.Runner.Common
throw new InvalidOperationException("Results client is not initialized."); throw new InvalidOperationException("Results client is not initialized.");
} }
public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task CreateResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsJobLogAsync(planId, jobId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}
public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken) public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
{ {

View File

@@ -20,7 +20,7 @@ namespace GitHub.Runner.Common
void Start(Pipelines.AgentJobRequestMessage jobRequest); void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines); void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource);
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord); void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
} }
@@ -31,7 +31,7 @@ namespace GitHub.Runner.Common
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000); private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000);
// Job message information // Job message information
private Guid _scopeIdentifier; private Guid _scopeIdentifier;
@@ -46,7 +46,7 @@ namespace GitHub.Runner.Common
// queue for file upload (log file or attachment) // queue for file upload (log file or attachment)
private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new(); private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new();
private readonly ConcurrentQueue<ResultsUploadFileInfo> _resultsFileUploadQueue = new(); private readonly ConcurrentQueue<SummaryUploadFileInfo> _summaryFileUploadQueue = new();
// queue for timeline or timeline record update (one queue per timeline) // queue for timeline or timeline record update (one queue per timeline)
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new(); private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new();
@@ -60,7 +60,7 @@ namespace GitHub.Runner.Common
// Task for each queue's dequeue process // Task for each queue's dequeue process
private Task _webConsoleLineDequeueTask; private Task _webConsoleLineDequeueTask;
private Task _fileUploadDequeueTask; private Task _fileUploadDequeueTask;
private Task _resultsUploadDequeueTask; private Task _summaryUploadDequeueTask;
private Task _timelineUpdateDequeueTask; private Task _timelineUpdateDequeueTask;
// common // common
@@ -84,9 +84,6 @@ namespace GitHub.Runner.Common
private bool _webConsoleLineAggressiveDequeue = true; private bool _webConsoleLineAggressiveDequeue = true;
private bool _firstConsoleOutputs = true; private bool _firstConsoleOutputs = true;
private bool _resultsClientInitiated = false;
private delegate Task ResultsFileUploadHandler(ResultsUploadFileInfo file);
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
@@ -112,9 +109,9 @@ namespace GitHub.Runner.Common
{ {
Trace.Info("Initializing results client"); Trace.Info("Initializing results client");
_jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); _jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken);
_resultsClientInitiated = true;
} }
if (_queueInProcess) if (_queueInProcess)
{ {
Trace.Info("No-opt, all queue process tasks are running."); Trace.Info("No-opt, all queue process tasks are running.");
@@ -143,12 +140,12 @@ namespace GitHub.Runner.Common
_fileUploadDequeueTask = ProcessFilesUploadQueueAsync(); _fileUploadDequeueTask = ProcessFilesUploadQueueAsync();
Trace.Info("Start results file upload queue."); Trace.Info("Start results file upload queue.");
_resultsUploadDequeueTask = ProcessResultsUploadQueueAsync(); _summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync();
Trace.Info("Start process timeline update queue."); Trace.Info("Start process timeline update queue.");
_timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync(); _timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync();
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask }; _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask };
_queueInProcess = true; _queueInProcess = true;
} }
@@ -179,9 +176,9 @@ namespace GitHub.Runner.Common
await ProcessFilesUploadQueueAsync(runOnce: true); await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained."); Trace.Info("File upload queue drained.");
Trace.Verbose("Draining results upload queue."); Trace.Verbose("Draining results summary upload queue.");
await ProcessResultsUploadQueueAsync(runOnce: true); await ProcessSummaryUploadQueueAsync(runOnce: true);
Trace.Info("Results upload queue drained."); Trace.Info("Results summary upload queue drained.");
// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown // ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variabls. // if there is any timeline records that failed to update contains output variabls.
@@ -233,43 +230,21 @@ namespace GitHub.Runner.Common
_fileUploadQueue.Enqueue(newFile); _fileUploadQueue.Enqueue(newFile);
} }
public void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines) public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource)
{ {
if (!_resultsClientInitiated)
{
Trace.Verbose("Skipping results upload");
try
{
if (deleteSource)
{
File.Delete(path);
}
}
catch (Exception ex)
{
Trace.Info("Catch exception during delete skipped results upload file.");
Trace.Error(ex);
}
return;
}
// all parameter not null, file path exist. // all parameter not null, file path exist.
var newFile = new ResultsUploadFileInfo() var newFile = new SummaryUploadFileInfo()
{ {
Name = name, Name = name,
Path = path, Path = path,
Type = type,
PlanId = _planId.ToString(), PlanId = _planId.ToString(),
JobId = _jobTimelineRecordId.ToString(), JobId = _jobTimelineRecordId.ToString(),
RecordId = timelineRecordId, StepId = stepRecordId.ToString(),
DeleteSource = deleteSource, DeleteSource = deleteSource
Finalize = finalize,
FirstBlock = firstBlock,
TotalLines = totalLines,
}; };
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, timelineRecordId); Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId);
_resultsFileUploadQueue.Enqueue(newFile); _summaryFileUploadQueue.Enqueue(newFile);
} }
public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord) public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord)
@@ -462,18 +437,18 @@ namespace GitHub.Runner.Common
} }
} }
private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
{ {
Trace.Info("Starting results-based upload queue..."); Trace.Info("Starting results-based upload queue...");
while (!_jobCompletionSource.Task.IsCompleted || runOnce) while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{ {
List<ResultsUploadFileInfo> filesToUpload = new(); List<SummaryUploadFileInfo> filesToUpload = new();
ResultsUploadFileInfo dequeueFile; SummaryUploadFileInfo dequeueFile;
while (_resultsFileUploadQueue.TryDequeue(out dequeueFile)) while (_summaryFileUploadQueue.TryDequeue(out dequeueFile))
{ {
filesToUpload.Add(dequeueFile); filesToUpload.Add(dequeueFile);
// process at most 10 file uploads. // process at most 10 file upload.
if (!runOnce && filesToUpload.Count > 10) if (!runOnce && filesToUpload.Count > 10)
{ {
break; break;
@@ -484,7 +459,7 @@ namespace GitHub.Runner.Common
{ {
if (runOnce) if (runOnce)
{ {
Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service."); Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service.");
} }
int errorCount = 0; int errorCount = 0;
@@ -492,27 +467,11 @@ namespace GitHub.Runner.Common
{ {
try try
{ {
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase)) await UploadSummaryFile(file);
{
await UploadSummaryFile(file);
}
else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase))
{
if (file.RecordId != _jobTimelineRecordId)
{
Trace.Info($"Got a step log file to send to results service.");
await UploadResultsStepLogFile(file);
}
else if (file.RecordId == _jobTimelineRecordId)
{
Trace.Info($"Got a job log file to send to results service.");
await UploadResultsJobLogFile(file);
}
}
} }
catch (Exception ex) catch (Exception ex)
{ {
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" }; var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure; issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;
var telemetryRecord = new TimelineRecord() var telemetryRecord = new TimelineRecord()
@@ -522,13 +481,16 @@ namespace GitHub.Runner.Common
telemetryRecord.Issues.Add(issue); telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord); QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);
Trace.Info("Catch exception during file upload to results, keep going since the process is best effort."); Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort.");
Trace.Error(ex); Trace.Error(ex);
}
finally
{
errorCount++; errorCount++;
} }
} }
Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount); Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
} }
if (runOnce) if (runOnce)
@@ -537,7 +499,7 @@ namespace GitHub.Runner.Common
} }
else else
{ {
await Task.Delay(_delayForResultsUploadDequeue); await Task.Delay(_delayForSummaryUploadDequeue);
} }
} }
} }
@@ -814,45 +776,16 @@ namespace GitHub.Runner.Common
} }
} }
private async Task UploadSummaryFile(ResultsUploadFileInfo file) private async Task UploadSummaryFile(SummaryUploadFileInfo file)
{
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler summaryHandler = async (file) =>
{
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None);
};
await UploadResultsFile(file, summaryHandler);
}
private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file)
{
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler stepLogHandler = async (file) =>
{
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None);
};
await UploadResultsFile(file, stepLogHandler);
}
private async Task UploadResultsJobLogFile(ResultsUploadFileInfo file)
{
Trace.Info($"Starting upload of job log file to results service {file.Name}, {file.Path}");
ResultsFileUploadHandler jobLogHandler = async (file) =>
{
await _jobServer.CreateResultsJobLogAsync(file.PlanId, file.JobId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None);
};
await UploadResultsFile(file, jobLogHandler);
}
private async Task UploadResultsFile(ResultsUploadFileInfo file, ResultsFileUploadHandler uploadHandler)
{ {
bool uploadSucceed = false; bool uploadSucceed = false;
try try
{ {
await uploadHandler(file); // Upload the step summary
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token);
uploadSucceed = true; uploadSucceed = true;
} }
finally finally
@@ -865,7 +798,7 @@ namespace GitHub.Runner.Common
} }
catch (Exception ex) catch (Exception ex)
{ {
Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results."); Trace.Info("Catch exception during delete success results uploaded summary file.");
Trace.Error(ex); Trace.Error(ex);
} }
} }
@@ -889,18 +822,14 @@ namespace GitHub.Runner.Common
public bool DeleteSource { get; set; } public bool DeleteSource { get; set; }
} }
internal class ResultsUploadFileInfo internal class SummaryUploadFileInfo
{ {
public string Name { get; set; } public string Name { get; set; }
public string Type { get; set; }
public string Path { get; set; } public string Path { get; set; }
public string PlanId { get; set; } public string PlanId { get; set; }
public string JobId { get; set; } public string JobId { get; set; }
public Guid RecordId { get; set; } public string StepId { get; set; }
public bool DeleteSource { get; set; } public bool DeleteSource { get; set; }
public bool Finalize { get; set; }
public bool FirstBlock { get; set; }
public long TotalLines { get; set; }
} }

View File

@@ -21,12 +21,6 @@ namespace GitHub.Runner.Common
// 8 MB // 8 MB
public const int PageSize = 8 * 1024 * 1024; public const int PageSize = 8 * 1024 * 1024;
// For Results
public static string BlocksFolder = "blocks";
// 2 MB
public const int BlockSize = 2 * 1024 * 1024;
private Guid _timelineId; private Guid _timelineId;
private Guid _timelineRecordId; private Guid _timelineRecordId;
private FileStream _pageData; private FileStream _pageData;
@@ -38,13 +32,6 @@ namespace GitHub.Runner.Common
private string _pagesFolder; private string _pagesFolder;
private IJobServerQueue _jobServerQueue; private IJobServerQueue _jobServerQueue;
private string _resultsDataFileName;
private FileStream _resultsBlockData;
private StreamWriter _resultsBlockWriter;
private string _resultsBlockFolder;
private int _blockByteCount;
private int _blockCount;
public long TotalLines => _totalLines; public long TotalLines => _totalLines;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
@@ -52,10 +39,8 @@ namespace GitHub.Runner.Common
base.Initialize(hostContext); base.Initialize(hostContext);
_totalLines = 0; _totalLines = 0;
_pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder); _pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder);
Directory.CreateDirectory(_pagesFolder);
_resultsBlockFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), BlocksFolder);
Directory.CreateDirectory(_resultsBlockFolder);
_jobServerQueue = HostContext.GetService<IJobServerQueue>(); _jobServerQueue = HostContext.GetService<IJobServerQueue>();
Directory.CreateDirectory(_pagesFolder);
} }
public void Setup(Guid timelineId, Guid timelineRecordId) public void Setup(Guid timelineId, Guid timelineRecordId)
@@ -75,17 +60,11 @@ namespace GitHub.Runner.Common
// lazy creation on write // lazy creation on write
if (_pageWriter == null) if (_pageWriter == null)
{ {
NewPage(); Create();
}
if (_resultsBlockWriter == null)
{
NewBlock();
} }
string line = $"{DateTime.UtcNow.ToString("O")} {message}"; string line = $"{DateTime.UtcNow.ToString("O")} {message}";
_pageWriter.WriteLine(line); _pageWriter.WriteLine(line);
_resultsBlockWriter.WriteLine(line);
_totalLines++; _totalLines++;
if (line.IndexOf('\n') != -1) if (line.IndexOf('\n') != -1)
@@ -99,25 +78,21 @@ namespace GitHub.Runner.Common
} }
} }
var bytes = System.Text.Encoding.UTF8.GetByteCount(line); _byteCount += System.Text.Encoding.UTF8.GetByteCount(line);
_byteCount += bytes;
_blockByteCount += bytes;
if (_byteCount >= PageSize) if (_byteCount >= PageSize)
{ {
NewPage(); NewPage();
} }
if (_blockByteCount >= BlockSize)
{
NewBlock();
}
} }
public void End() public void End()
{ {
EndPage(); EndPage();
EndBlock(true); }
private void Create()
{
NewPage();
} }
private void NewPage() private void NewPage()
@@ -142,27 +117,5 @@ namespace GitHub.Runner.Common
_jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true); _jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true);
} }
} }
private void NewBlock()
{
EndBlock(false);
_blockByteCount = 0;
_resultsDataFileName = Path.Combine(_resultsBlockFolder, $"{_timelineId}_{_timelineRecordId}.{++_blockCount}");
_resultsBlockData = new FileStream(_resultsDataFileName, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.ReadWrite);
_resultsBlockWriter = new StreamWriter(_resultsBlockData, System.Text.Encoding.UTF8);
}
private void EndBlock(bool finalize)
{
if (_resultsBlockWriter != null)
{
_resultsBlockWriter.Flush();
_resultsBlockData.Flush();
_resultsBlockWriter.Dispose();
_resultsBlockWriter = null;
_resultsBlockData = null;
_jobServerQueue.QueueResultsUpload(_timelineRecordId, "ResultsLog", _resultsDataFileName, "Results.Core.Log", deleteSource: true, finalize, firstBlock: _resultsDataFileName.EndsWith(".1"), totalLines: _totalLines);
}
}
} }
} }

View File

@@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -7,7 +7,6 @@ using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
using GitHub.Services.Common; using GitHub.Services.Common;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi.RawClient; using Sdk.WebApi.WebApi.RawClient;
namespace GitHub.Runner.Common namespace GitHub.Runner.Common
@@ -20,8 +19,6 @@ namespace GitHub.Runner.Common
Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token); Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);
Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary<String, VariableValue> outputs, IList<StepResult> stepResults, CancellationToken token); Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary<String, VariableValue> outputs, IList<StepResult> stepResults, CancellationToken token);
Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken token);
} }
public sealed class RunServer : RunnerService, IRunServer public sealed class RunServer : RunnerService, IRunServer
@@ -67,18 +64,5 @@ namespace GitHub.Runner.Common
return RetryRequest( return RetryRequest(
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, cancellationToken), cancellationToken); async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, cancellationToken), cancellationToken);
} }
public Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken)
{
CheckConnection();
var renewJobResponse = RetryRequest<RenewJobResponse>(
async () => await _runServiceHttpClient.RenewJobAsync(requestUri, planId, jobId, cancellationToken), cancellationToken);
if (renewJobResponse == null)
{
throw new TaskOrchestrationJobNotFoundException(jobId.ToString());
}
return renewJobResponse;
}
} }
} }

View File

@@ -1,14 +0,0 @@
namespace GitHub.Runner.Common.Util
{
using System;
using GitHub.DistributedTask.WebApi;
public static class MessageUtil
{
public static bool IsRunServiceJob(string messageType)
{
return string.Equals(messageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase);
}
}
}

View File

@@ -1,3 +0,0 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Test")]

View File

@@ -7,7 +7,6 @@ using System.Text;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common; using GitHub.Runner.Common;
using GitHub.Runner.Common.Util; using GitHub.Runner.Common.Util;
@@ -59,8 +58,6 @@ namespace GitHub.Runner.Listener
public event EventHandler<JobStatusEventArgs> JobStatus; public event EventHandler<JobStatusEventArgs> JobStatus;
private bool _isRunServiceJob;
public override void Initialize(IHostContext hostContext) public override void Initialize(IHostContext hostContext)
{ {
base.Initialize(hostContext); base.Initialize(hostContext);
@@ -89,8 +86,6 @@ namespace GitHub.Runner.Listener
{ {
Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received."); Trace.Info($"Job request {jobRequestMessage.RequestId} for plan {jobRequestMessage.Plan.PlanId} job {jobRequestMessage.JobId} received.");
_isRunServiceJob = MessageUtil.IsRunServiceJob(jobRequestMessage.MessageType);
WorkerDispatcher currentDispatch = null; WorkerDispatcher currentDispatch = null;
if (_jobDispatchedQueue.Count > 0) if (_jobDispatchedQueue.Count > 0)
{ {
@@ -244,13 +239,6 @@ namespace GitHub.Runner.Listener
return; return;
} }
if (this._isRunServiceJob)
{
Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
jobDispatch.WorkerCancellationTokenSource.Cancel();
return;
}
// based on the current design, server will only send one job for a given runner at a time. // based on the current design, server will only send one job for a given runner at a time.
// if the runner received a new job request while a previous job request is still running, this typically indicates two situations // if the runner received a new job request while a previous job request is still running, this typically indicates two situations
// 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest // 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest
@@ -379,11 +367,9 @@ namespace GitHub.Runner.Listener
long requestId = message.RequestId; long requestId = message.RequestId;
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
// start renew job request // start renew job request
Trace.Info($"Start renew job request {requestId} for job {message.JobId}."); Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
Task renewJobRequest = RenewJobRequestAsync(message, systemConnection, _poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token); Task renewJobRequest = RenewJobRequestAsync(_poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);
// wait till first renew succeed or job request is cancelled // wait till first renew succeed or job request is cancelled
// not even start worker if the first renew fail // not even start worker if the first renew fail
@@ -440,7 +426,7 @@ namespace GitHub.Runner.Listener
{ {
workerOutput.Add(stdout.Data); workerOutput.Add(stdout.Data);
} }
if (printToStdout) if (printToStdout)
{ {
term.WriteLine(stdout.Data, skipTracing: true); term.WriteLine(stdout.Data, skipTracing: true);
@@ -522,6 +508,7 @@ namespace GitHub.Runner.Listener
// we get first jobrequest renew succeed and start the worker process with the job message. // we get first jobrequest renew succeed and start the worker process with the job message.
// send notification to machine provisioner. // send notification to machine provisioner.
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"]; var accessToken = systemConnection?.Authorization?.Parameters["AccessToken"];
notification.JobStarted(message.JobId, accessToken, systemConnection.Url); notification.JobStarted(message.JobId, accessToken, systemConnection.Url);
@@ -544,8 +531,11 @@ namespace GitHub.Runner.Listener
detailInfo = string.Join(Environment.NewLine, workerOutput); detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
await jobServer.ConnectAsync(jobConnection);
var jobServer = await InitializeJobServerAsync(systemConnection);
await LogWorkerProcessUnhandledException(jobServer, message, detailInfo); await LogWorkerProcessUnhandledException(jobServer, message, detailInfo);
// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space. // Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
@@ -685,128 +675,9 @@ namespace GitHub.Runner.Listener
} }
} }
internal async Task RenewJobRequestAsync(Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token) public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
{
if (this._isRunServiceJob)
{
var runServer = await GetRunServerAsync(systemConnection);
await RenewJobRequestAsync(runServer, message.Plan.PlanId, message.JobId, firstJobRequestRenewed, token);
}
else
{
var runnerServer = HostContext.GetService<IRunnerServer>();
await RenewJobRequestAsync(runnerServer, poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, token);
}
}
private async Task RenewJobRequestAsync(IRunServer runServer, Guid planId, Guid jobId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
{
TaskAgentJobRequest request = null;
int firstRenewRetryLimit = 5;
int encounteringError = 0;
// renew lock during job running.
// stop renew only if cancellation token for lock renew task been signal or exception still happen after retry.
while (!token.IsCancellationRequested)
{
try
{
var renewResponse = await runServer.RenewJobAsync(planId, jobId, token);
Trace.Info($"Successfully renew job {jobId}, job is valid till {renewResponse.LockedUntil}");
if (!firstJobRequestRenewed.Task.IsCompleted)
{
// fire first renew succeed event.
firstJobRequestRenewed.TrySetResult(0);
}
if (encounteringError > 0)
{
encounteringError = 0;
HostContext.WritePerfCounter("JobRenewRecovered");
}
// renew again after 60 sec delay
await HostContext.Delay(TimeSpan.FromSeconds(60), token);
}
catch (TaskOrchestrationJobNotFoundException)
{
// no need for retry. the job is not valid anymore.
Trace.Info($"TaskAgentJobNotFoundException received when renew job {jobId}, job is no longer valid, stop renew job request.");
return;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// OperationCanceledException may caused by http timeout or _lockRenewalTokenSource.Cance();
// Stop renew only on cancellation token fired.
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
return;
}
catch (Exception ex)
{
Trace.Error($"Catch exception during renew runner job {jobId}.");
Trace.Error(ex);
encounteringError++;
// retry
TimeSpan remainingTime = TimeSpan.Zero;
if (!firstJobRequestRenewed.Task.IsCompleted)
{
// retry 5 times every 10 sec for the first renew
if (firstRenewRetryLimit-- > 0)
{
remainingTime = TimeSpan.FromSeconds(10);
}
}
else
{
// retry till reach lockeduntil + 5 mins extra buffer.
remainingTime = request.LockedUntil.Value + TimeSpan.FromMinutes(5) - DateTime.UtcNow;
}
if (remainingTime > TimeSpan.Zero)
{
TimeSpan delayTime;
if (!firstJobRequestRenewed.Task.IsCompleted)
{
Trace.Info($"Retrying lock renewal for job {jobId}. The first job renew request has failed.");
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
}
else
{
Trace.Info($"Retrying lock renewal for job {jobId}. Job is valid until {request.LockedUntil.Value}.");
if (encounteringError > 5)
{
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(30));
}
else
{
delayTime = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
}
}
try
{
// back-off before next retry.
await HostContext.Delay(delayTime, token);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info($"job renew has been cancelled, stop renew job {jobId}.");
}
}
else
{
Trace.Info($"Lock renewal has run out of retry, stop renew lock for job {jobId}.");
HostContext.WritePerfCounter("JobRenewReachLimit");
return;
}
}
}
}
private async Task RenewJobRequestAsync(IRunnerServer runnerServer, int poolId, long requestId, Guid lockToken, string orchestrationId, TaskCompletionSource<int> firstJobRequestRenewed, CancellationToken token)
{ {
var runnerServer = HostContext.GetService<IRunnerServer>();
TaskAgentJobRequest request = null; TaskAgentJobRequest request = null;
int firstRenewRetryLimit = 5; int firstRenewRetryLimit = 5;
int encounteringError = 0; int encounteringError = 0;
@@ -969,93 +840,90 @@ namespace GitHub.Runner.Listener
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection)); ArgUtil.NotNull(systemConnection, nameof(systemConnection));
var server = await InitializeJobServerAsync(systemConnection); var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
if (server is IJobServer jobServer) await jobServer.ConnectAsync(jobConnection);
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
var updatedRecords = new List<TimelineRecord>();
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
var logRecords = new Dictionary<Guid, TimelineRecord>();
foreach (var log in logs)
{ {
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); var logName = Path.GetFileNameWithoutExtension(log);
var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries);
var updatedRecords = new List<TimelineRecord>(); if (logNameParts.Length != 3)
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
var logRecords = new Dictionary<Guid, TimelineRecord>();
foreach (var log in logs)
{ {
var logName = Path.GetFileNameWithoutExtension(log); Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
var logNameParts = logName.Split('_', StringSplitOptions.RemoveEmptyEntries); continue;
if (logNameParts.Length != 3) }
{ var logPageSeperator = logName.IndexOf('_');
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'."); var logRecordId = Guid.Empty;
continue; var pageNumber = 0;
}
var logPageSeperator = logName.IndexOf('_');
var logRecordId = Guid.Empty;
var pageNumber = 0;
if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id) if (!Guid.TryParse(logNameParts[0], out Guid timelineId) || timelineId != timeline.Id)
{ {
Trace.Warning($"log file '{log}' is not belongs to current job"); Trace.Warning($"log file '{log}' is not belongs to current job");
continue; continue;
}
if (!Guid.TryParse(logNameParts[1], out logRecordId))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
continue;
}
if (!int.TryParse(logNameParts[2], out pageNumber))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
continue;
}
var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
if (record != null)
{
if (!logPages.ContainsKey(record.Id))
{
logPages[record.Id] = new Dictionary<int, string>();
logRecords[record.Id] = record;
}
logPages[record.Id][pageNumber] = log;
}
} }
foreach (var pages in logPages) if (!Guid.TryParse(logNameParts[1], out logRecordId))
{ {
var record = logRecords[pages.Key]; Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
if (record.Log == null) continue;
{
// Create the log
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));
// Need to post timeline record updates to reflect the log creation
updatedRecords.Add(record.Clone());
}
for (var i = 1; i <= pages.Value.Count; i++)
{
var logFile = pages.Value[i];
// Upload the contents
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
}
Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
IOUtil.DeleteFile(logFile);
}
} }
if (updatedRecords.Count > 0) if (!int.TryParse(logNameParts[2], out pageNumber))
{ {
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None); Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_GUID_INT'.");
continue;
}
var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
if (record != null)
{
if (!logPages.ContainsKey(record.Id))
{
logPages[record.Id] = new Dictionary<int, string>();
logRecords[record.Id] = record;
}
logPages[record.Id][pageNumber] = log;
} }
} }
else
foreach (var pages in logPages)
{ {
Trace.Info("Job server does not support log upload yet."); var record = logRecords[pages.Key];
if (record.Log == null)
{
// Create the log
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));
// Need to post timeline record updates to reflect the log creation
updatedRecords.Add(record.Clone());
}
for (var i = 1; i <= pages.Value.Count; i++)
{
var logFile = pages.Value[i];
// Upload the contents
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
}
Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
IOUtil.DeleteFile(logFile);
}
}
if (updatedRecords.Count > 0)
{
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
} }
} }
catch (Exception ex) catch (Exception ex)
@@ -1075,12 +943,6 @@ namespace GitHub.Runner.Listener
return; return;
} }
if (this._isRunServiceJob)
{
Trace.Verbose($"Skip FinishAgentRequest call from Listener because MessageType is {message.MessageType}");
return;
}
var runnerServer = HostContext.GetService<IRunnerServer>(); var runnerServer = HostContext.GetService<IRunnerServer>();
int completeJobRequestRetryLimit = 5; int completeJobRequestRetryLimit = 5;
List<Exception> exceptions = new(); List<Exception> exceptions = new();
@@ -1117,117 +979,66 @@ namespace GitHub.Runner.Listener
} }
// log an error issue to job level timeline record // log an error issue to job level timeline record
private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string errorMessage) private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, string errorMessage)
{ {
if (server is IJobServer jobServer) try
{ {
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);
ArgUtil.NotNull(timeline, nameof(timeline));
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
try try
{ {
var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); if (!string.IsNullOrEmpty(errorMessage) &&
ArgUtil.NotNull(timeline, nameof(timeline)); message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) &&
StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value))
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));
try
{ {
if (!string.IsNullOrEmpty(errorMessage) && // the trace should be best effort and not affect any job result
message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) && var match = _invalidJsonRegex.Match(errorMessage);
StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value)) if (match.Success &&
match.Groups.Count == 2)
{ {
// the trace should be best effort and not affect any job result var jsonPosition = int.Parse(match.Groups[1].Value);
var match = _invalidJsonRegex.Match(errorMessage); var serializedJobMessage = JsonUtility.ToString(message);
if (match.Success && var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20);
match.Groups.Count == 2) errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}";
{
var jsonPosition = int.Parse(match.Groups[1].Value);
var serializedJobMessage = JsonUtility.ToString(message);
var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20);
errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}";
}
} }
} }
catch (Exception ex)
{
Trace.Error(ex);
errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}";
}
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage };
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
jobRecord.ErrorCount++;
jobRecord.Issues.Add(unhandledExceptionIssue);
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
} }
catch (Exception ex) catch (Exception ex)
{ {
Trace.Error("Fail to report unhandled exception from Runner.Worker process");
Trace.Error(ex); Trace.Error(ex);
errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}";
} }
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage };
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
jobRecord.ErrorCount++;
jobRecord.Issues.Add(unhandledExceptionIssue);
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
} }
else catch (Exception ex)
{ {
Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", errorMessage); Trace.Error("Fail to report unhandled exception from Runner.Worker process");
return; Trace.Error(ex);
} }
} }
// raise job completed event to fail the job. // raise job completed event to fail the job.
private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message) private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message)
{ {
if (server is IJobServer jobServer) try
{ {
try var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
{ await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed);
await jobServer.RaisePlanEventAsync<JobCompletedEvent>(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None);
}
catch (Exception ex)
{
Trace.Error("Fail to raise JobCompletedEvent back to service.");
Trace.Error(ex);
}
} }
else if (server is IRunServer runServer) catch (Exception ex)
{ {
try Trace.Error("Fail to raise JobCompletedEvent back to service.");
{ Trace.Error(ex);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, CancellationToken.None);
}
catch (Exception ex)
{
Trace.Error("Fail to raise job completion back to service.");
Trace.Error(ex);
}
} }
else
{
throw new NotSupportedException($"Server type {server.GetType().FullName} is not supported.");
}
}
private async Task<IRunnerService> InitializeJobServerAsync(ServiceEndpoint systemConnection)
{
if (this._isRunServiceJob)
{
return await GetRunServerAsync(systemConnection);
}
else
{
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);
await jobServer.ConnectAsync(jobConnection);
return jobServer;
}
}
private async Task<IRunServer> GetRunServerAsync(ServiceEndpoint systemConnection)
{
var runServer = HostContext.GetService<IRunServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
return runServer;
} }
private class WorkerDispatcher : IDisposable private class WorkerDispatcher : IDisposable

View File

@@ -9,7 +9,6 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common; using GitHub.Runner.Common;
using GitHub.Runner.Common.Util;
using GitHub.Runner.Listener.Check; using GitHub.Runner.Listener.Check;
using GitHub.Runner.Listener.Configuration; using GitHub.Runner.Listener.Configuration;
using GitHub.Runner.Sdk; using GitHub.Runner.Sdk;
@@ -137,7 +136,7 @@ namespace GitHub.Runner.Listener
if (command.Remove) if (command.Remove)
{ {
// only remove local config files and exit // only remove local config files and exit
if (command.RemoveLocalConfig) if(command.RemoveLocalConfig)
{ {
configManager.DeleteLocalRunnerConfig(); configManager.DeleteLocalRunnerConfig();
return Constants.Runner.ReturnCode.Success; return Constants.Runner.ReturnCode.Success;
@@ -503,7 +502,7 @@ namespace GitHub.Runner.Listener
} }
} }
// Broker flow // Broker flow
else if (MessageUtil.IsRunServiceJob(message.MessageType)) else if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase))
{ {
if (autoUpdateInProgress || runOnceJobReceived) if (autoUpdateInProgress || runOnceJobReceived)
{ {

View File

@@ -164,6 +164,7 @@ namespace GitHub.Runner.Sdk
{ {
continue; continue;
} }
_noProxyList.Add(noProxyInfo); _noProxyList.Add(noProxyInfo);
} }
} }
@@ -206,11 +207,6 @@ namespace GitHub.Runner.Sdk
{ {
foreach (var noProxy in _noProxyList) foreach (var noProxy in _noProxyList)
{ {
// bypass on wildcard no_proxy
if (string.Equals(noProxy.Host, "*", StringComparison.OrdinalIgnoreCase))
{
return true;
}
var matchHost = false; var matchHost = false;
var matchPort = false; var matchPort = false;

View File

@@ -181,7 +181,7 @@ namespace GitHub.Runner.Worker
else else
{ {
var templateEvaluator = ExecutionContext.ToPipelineTemplateEvaluator(); var templateEvaluator = ExecutionContext.ToPipelineTemplateEvaluator();
inputs = templateEvaluator.EvaluateStepInputs(Action.Inputs, ExecutionContext.ExpressionValues, ExecutionContext.ExpressionFunctions, ExecutionContext.ToExpressionState()); inputs = templateEvaluator.EvaluateStepInputs(Action.Inputs, ExecutionContext.ExpressionValues, ExecutionContext.ExpressionFunctions);
} }
var userInputs = new HashSet<string>(StringComparer.OrdinalIgnoreCase); var userInputs = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
@@ -355,7 +355,7 @@ namespace GitHub.Runner.Worker
{ {
DictionaryContextData expressionValues = ExecutionContext.GetExpressionValues(stepHost); DictionaryContextData expressionValues = ExecutionContext.GetExpressionValues(stepHost);
var templateEvaluator = ExecutionContext.ToPipelineTemplateEvaluator(); var templateEvaluator = ExecutionContext.ToPipelineTemplateEvaluator();
var inputs = templateEvaluator.EvaluateStepInputs(Action.Inputs, expressionValues, ExecutionContext.ExpressionFunctions, ExecutionContext.ToExpressionState()); var inputs = templateEvaluator.EvaluateStepInputs(Action.Inputs, expressionValues, ExecutionContext.ExpressionFunctions);
return inputs; return inputs;
} }

View File

@@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
@@ -81,7 +81,7 @@ namespace GitHub.Runner.Worker
// logging // logging
long Write(string tag, string message); long Write(string tag, string message);
void QueueAttachFile(string type, string name, string filePath); void QueueAttachFile(string type, string name, string filePath);
void QueueSummaryFile(string name, string filePath, Guid stepRecordId); void QueueSummaryFile(string name, string filePath, Guid stepRecordId);
// timeline record update methods // timeline record update methods
void Start(string currentOperation = null); void Start(string currentOperation = null);
@@ -871,7 +871,8 @@ namespace GitHub.Runner.Worker
{ {
throw new FileNotFoundException($"Can't upload (name:{name}) file: {filePath}. File does not exist."); throw new FileNotFoundException($"Can't upload (name:{name}) file: {filePath}. File does not exist.");
} }
_jobServerQueue.QueueResultsUpload(stepRecordId, name, filePath, ChecksAttachmentType.StepSummary, deleteSource: false, finalize: true, firstBlock: true, totalLines: 0);
_jobServerQueue.QueueSummaryUpload(stepRecordId, name, filePath, deleteSource: false);
} }
// Add OnMatcherChanged // Add OnMatcherChanged

View File

@@ -29,8 +29,6 @@ namespace GitHub.Runner.Worker.Expressions
githubContext.TryGetValue(PipelineTemplateConstants.Workspace, out var workspace); githubContext.TryGetValue(PipelineTemplateConstants.Workspace, out var workspace);
var workspaceData = workspace as StringContextData; var workspaceData = workspace as StringContextData;
ArgUtil.NotNull(workspaceData, nameof(workspaceData)); ArgUtil.NotNull(workspaceData, nameof(workspaceData));
var executionContext = templateContext.State[nameof(IExecutionContext)] as IExecutionContext;
ArgUtil.NotNull(executionContext, nameof(executionContext));
string githubWorkspace = workspaceData.Value; string githubWorkspace = workspaceData.Value;
bool followSymlink = false; bool followSymlink = false;
@@ -66,15 +64,7 @@ namespace GitHub.Runner.Worker.Expressions
string runnerRoot = new DirectoryInfo(binDir).Parent.FullName; string runnerRoot = new DirectoryInfo(binDir).Parent.FullName;
string node = Path.Combine(runnerRoot, "externals", NodeUtil.GetInternalNodeVersion(), "bin", $"node{IOUtil.ExeExtension}"); string node = Path.Combine(runnerRoot, "externals", NodeUtil.GetInternalNodeVersion(), "bin", $"node{IOUtil.ExeExtension}");
//Feature flag to fetch a new version of hashFiles script string hashFilesScript = Path.Combine(binDir, "hashFiles");
string hashFilesScript = string.Empty;
var isGlobHashFilesEnabled = executionContext.Global.Variables.GetBoolean("DistributedTask.UseGlobHashFiles") ?? false;
if (isGlobHashFilesEnabled)
{
hashFilesScript = Path.Combine(binDir, "hashFilesV2");
}
hashFilesScript = Path.Combine(binDir, "hashFiles");
var hashResult = string.Empty; var hashResult = string.Empty;
var p = new ProcessInvoker(new HashFilesTrace(context.Trace)); var p = new ProcessInvoker(new HashFilesTrace(context.Trace));
p.ErrorDataReceived += ((_, data) => p.ErrorDataReceived += ((_, data) =>
@@ -146,4 +136,4 @@ namespace GitHub.Runner.Worker.Expressions
} }
} }
} }
} }

View File

@@ -6,7 +6,6 @@ using System.Net.Http;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common; using GitHub.Runner.Common;
using GitHub.Runner.Common.Util; using GitHub.Runner.Common.Util;
@@ -20,7 +19,7 @@ namespace GitHub.Runner.Worker
[ServiceLocator(Default = typeof(JobRunner))] [ServiceLocator(Default = typeof(JobRunner))]
public interface IJobRunner : IRunnerService public interface IJobRunner : IRunnerService
{ {
Task<TaskResult> RunAsync(AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken); Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken);
} }
public sealed class JobRunner : RunnerService, IJobRunner public sealed class JobRunner : RunnerService, IJobRunner
@@ -29,7 +28,7 @@ namespace GitHub.Runner.Worker
private RunnerSettings _runnerSettings; private RunnerSettings _runnerSettings;
private ITempDirectoryManager _tempDirectoryManager; private ITempDirectoryManager _tempDirectoryManager;
public async Task<TaskResult> RunAsync(AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken) public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken)
{ {
// Validate parameters. // Validate parameters.
Trace.Entering(); Trace.Entering();
@@ -43,14 +42,14 @@ namespace GitHub.Runner.Worker
IRunnerService server = null; IRunnerService server = null;
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase)); ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
if (MessageUtil.IsRunServiceJob(message.MessageType)) if (string.Equals(message.MessageType, JobRequestMessageTypes.RunnerJobRequest, StringComparison.OrdinalIgnoreCase))
{ {
var runServer = HostContext.GetService<IRunServer>(); var runServer = HostContext.GetService<IRunServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
await runServer.ConnectAsync(systemConnection.Url, jobServerCredential); await runServer.ConnectAsync(systemConnection.Url, jobServerCredential);
server = runServer; server = runServer;
} }
else else
{ {
// Setup the job server and job server queue. // Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>(); var jobServer = HostContext.GetService<IJobServer>();
@@ -66,7 +65,7 @@ namespace GitHub.Runner.Worker
_jobServerQueue.Start(message); _jobServerQueue.Start(message);
server = jobServer; server = jobServer;
} }
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}"); HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");

View File

@@ -116,7 +116,7 @@ namespace GitHub.DistributedTask.WebApi
return; return;
} }
} }
////dfxgsdzf
/// <summary> /// <summary>
/// [Preview API] Get information about an agent. /// [Preview API] Get information about an agent.
/// </summary> /// </summary>

View File

@@ -1,4 +1,4 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel; using System.ComponentModel;
using System.Linq; using System.Linq;
@@ -164,14 +164,13 @@ namespace GitHub.DistributedTask.Pipelines.ObjectTemplating
public Dictionary<String, String> EvaluateStepInputs( public Dictionary<String, String> EvaluateStepInputs(
TemplateToken token, TemplateToken token,
DictionaryContextData contextData, DictionaryContextData contextData,
IList<IFunctionInfo> expressionFunctions, IList<IFunctionInfo> expressionFunctions)
IEnumerable<KeyValuePair<String, Object>> expressionState = null)
{ {
var result = default(Dictionary<String, String>); var result = default(Dictionary<String, String>);
if (token != null && token.Type != TokenType.Null) if (token != null && token.Type != TokenType.Null)
{ {
var context = CreateContext(contextData, expressionFunctions, expressionState); var context = CreateContext(contextData, expressionFunctions);
try try
{ {
token = TemplateEvaluator.Evaluate(context, PipelineTemplateConstants.StepWith, token, 0, null, omitHeader: true); token = TemplateEvaluator.Evaluate(context, PipelineTemplateConstants.StepWith, token, 0, null, omitHeader: true);

View File

@@ -1,4 +1,4 @@
using GitHub.Services.Common; using GitHub.Services.Common;
using GitHub.Services.WebApi; using GitHub.Services.WebApi;
using System; using System;
using System.Runtime.Serialization; using System.Runtime.Serialization;
@@ -100,7 +100,6 @@ namespace GitHub.DistributedTask.WebApi
public static readonly String Summary = "DistributedTask.Core.Summary"; public static readonly String Summary = "DistributedTask.Core.Summary";
public static readonly String FileAttachment = "DistributedTask.Core.FileAttachment"; public static readonly String FileAttachment = "DistributedTask.Core.FileAttachment";
public static readonly String DiagnosticLog = "DistributedTask.Core.DiagnosticLog"; public static readonly String DiagnosticLog = "DistributedTask.Core.DiagnosticLog";
public static readonly String ResultsLog = "Results.Core.Log";
} }
[GenerateAllConstants] [GenerateAllConstants]

View File

@@ -1,15 +0,0 @@
using System;
using System.Runtime.Serialization;
namespace GitHub.Actions.RunService.WebApi
{
[DataContract]
public class RenewJobRequest
{
[DataMember(Name = "planId", EmitDefaultValue = false)]
public Guid PlanID { get; set; }
[DataMember(Name = "jobId", EmitDefaultValue = false)]
public Guid JobID { get; set; }
}
}

View File

@@ -1,16 +0,0 @@
using System;
using System.Runtime.Serialization;
namespace Sdk.RSWebApi.Contracts
{
[DataContract]
public class RenewJobResponse
{
[DataMember]
public DateTime LockedUntil
{
get;
internal set;
}
}
}

View File

@@ -8,7 +8,6 @@ using GitHub.DistributedTask.WebApi;
using GitHub.Services.Common; using GitHub.Services.Common;
using GitHub.Services.OAuth; using GitHub.Services.OAuth;
using GitHub.Services.WebApi; using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Sdk.WebApi.WebApi; using Sdk.WebApi.WebApi;
namespace GitHub.Actions.RunService.WebApi namespace GitHub.Actions.RunService.WebApi
@@ -99,29 +98,6 @@ namespace GitHub.Actions.RunService.WebApi
var requestContent = new ObjectContent<CompleteJobRequest>(payload, new VssJsonMediaTypeFormatter(true)); var requestContent = new ObjectContent<CompleteJobRequest>(payload, new VssJsonMediaTypeFormatter(true));
return SendAsync( return SendAsync(
httpMethod,
requestUri,
content: requestContent,
cancellationToken: cancellationToken);
}
public Task<RenewJobResponse> RenewJobAsync(
Uri requestUri,
Guid planId,
Guid jobId,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
var payload = new RenewJobRequest()
{
PlanID = planId,
JobID = jobId
};
requestUri = new Uri(requestUri, "renewjob");
var requestContent = new ObjectContent<RenewJobRequest>(payload, new VssJsonMediaTypeFormatter(true));
return SendAsync<RenewJobResponse>(
httpMethod, httpMethod,
requestUri, requestUri,
content: requestContent, content: requestContent,

View File

@@ -46,81 +46,7 @@ namespace GitHub.Services.Results.Contracts
[DataContract] [DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class GetSignedJobLogsURLRequest public class CreateStepSummaryMetadataResponse
{
[DataMember]
public string WorkflowJobRunBackendId;
[DataMember]
public string WorkflowRunBackendId;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class GetSignedJobLogsURLResponse
{
[DataMember]
public string LogsUrl;
[DataMember]
public string BlobStorageType;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class GetSignedStepLogsURLRequest
{
[DataMember]
public string WorkflowJobRunBackendId;
[DataMember]
public string WorkflowRunBackendId;
[DataMember]
public string StepBackendId;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class GetSignedStepLogsURLResponse
{
[DataMember]
public string LogsUrl;
[DataMember]
public string BlobStorageType;
[DataMember]
public long SoftSizeLimit;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class JobLogsMetadataCreate
{
[DataMember]
public string WorkflowRunBackendId;
[DataMember]
public string WorkflowJobRunBackendId;
[DataMember]
public string UploadedAt;
[DataMember]
public long LineCount;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class StepLogsMetadataCreate
{
[DataMember]
public string WorkflowRunBackendId;
[DataMember]
public string WorkflowJobRunBackendId;
[DataMember]
public string StepBackendId;
[DataMember]
public string UploadedAt;
[DataMember]
public long LineCount;
}
[DataContract]
[JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))]
public class CreateMetadataResponse
{ {
[DataMember] [DataMember]
public bool Ok; public bool Ok;

View File

@@ -24,138 +24,68 @@ namespace GitHub.Services.Results.Client
m_formatter = new JsonMediaTypeFormatter(); m_formatter = new JsonMediaTypeFormatter();
} }
// Get Sas URL calls public async Task<GetSignedStepSummaryURLResponse> GetStepSummaryUploadUrlAsync(string planId, string jobId, string stepId, CancellationToken cancellationToken)
private async Task<T> GetResultsSignedURLResponse<R, T>(Uri uri, CancellationToken cancellationToken, R request)
{ {
using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, uri)) var request = new GetSignedStepSummaryURLRequest()
{
WorkflowJobRunBackendId= jobId,
WorkflowRunBackendId= planId,
StepBackendId= stepId
};
var stepSummaryUploadRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/GetStepSummarySignedBlobURL");
using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepSummaryUploadRequest))
{ {
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token);
requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json"));
using (HttpContent content = new ObjectContent<R>(request, m_formatter)) using (HttpContent content = new ObjectContent<GetSignedStepSummaryURLRequest>(request, m_formatter))
{ {
requestMessage.Content = content; requestMessage.Content = content;
using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken))
{ {
return await ReadJsonContentAsync<T>(response, cancellationToken); return await ReadJsonContentAsync<GetSignedStepSummaryURLResponse>(response, cancellationToken);
} }
} }
} }
} }
private async Task<GetSignedStepSummaryURLResponse> GetStepSummaryUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken) private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, string stepId, long size, CancellationToken cancellationToken)
{ {
var request = new GetSignedStepSummaryURLRequest() var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK");
var request = new StepSummaryMetadataCreate()
{ {
WorkflowJobRunBackendId = jobId, WorkflowJobRunBackendId= jobId,
WorkflowRunBackendId = planId, WorkflowRunBackendId= planId,
StepBackendId = stepId.ToString() StepBackendId = stepId,
Size = size,
UploadedAt = timestamp
}; };
var getStepSummarySignedBlobURLEndpoint = new Uri(m_resultsServiceUrl, Constants.GetStepSummarySignedBlobURL); var stepSummaryUploadCompleteRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/CreateStepSummaryMetadata");
return await GetResultsSignedURLResponse<GetSignedStepSummaryURLRequest, GetSignedStepSummaryURLResponse>(getStepSummarySignedBlobURLEndpoint, cancellationToken, request); using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepSummaryUploadCompleteRequest))
}
private async Task<GetSignedStepLogsURLResponse> GetStepLogUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken)
{
var request = new GetSignedStepLogsURLRequest()
{
WorkflowJobRunBackendId = jobId,
WorkflowRunBackendId = planId,
StepBackendId = stepId.ToString(),
};
var getStepLogsSignedBlobURLEndpoint = new Uri(m_resultsServiceUrl, Constants.GetStepLogsSignedBlobURL);
return await GetResultsSignedURLResponse<GetSignedStepLogsURLRequest, GetSignedStepLogsURLResponse>(getStepLogsSignedBlobURLEndpoint, cancellationToken, request);
}
private async Task<GetSignedJobLogsURLResponse> GetJobLogUploadUrlAsync(string planId, string jobId, CancellationToken cancellationToken)
{
var request = new GetSignedJobLogsURLRequest()
{
WorkflowJobRunBackendId = jobId,
WorkflowRunBackendId = planId,
};
var getJobLogsSignedBlobURLEndpoint = new Uri(m_resultsServiceUrl, Constants.GetJobLogsSignedBlobURL);
return await GetResultsSignedURLResponse<GetSignedJobLogsURLRequest, GetSignedJobLogsURLResponse>(getJobLogsSignedBlobURLEndpoint, cancellationToken, request);
}
// Create metadata calls
private async Task CreateMetadata<R>(Uri uri, CancellationToken cancellationToken, R request, string timestamp)
{
using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, uri))
{ {
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token);
requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json"));
using (HttpContent content = new ObjectContent<R>(request, m_formatter)) using (HttpContent content = new ObjectContent<StepSummaryMetadataCreate>(request, m_formatter))
{ {
requestMessage.Content = content; requestMessage.Content = content;
using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken))
{ {
var jsonResponse = await ReadJsonContentAsync<CreateMetadataResponse>(response, cancellationToken); var jsonResponse = await ReadJsonContentAsync<CreateStepSummaryMetadataResponse>(response, cancellationToken);
if (!jsonResponse.Ok) if (!jsonResponse.Ok)
{ {
throw new Exception($"Failed to mark {typeof(R).Name} upload as complete, status code: {response.StatusCode}, ok: {jsonResponse.Ok}, timestamp: {timestamp}"); throw new Exception($"Failed to mark step summary upload as complete, status code: {response.StatusCode}, ok: {jsonResponse.Ok}, size: {size}, timestamp: {timestamp}");
} }
} }
} }
} }
} }
private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, Guid stepId, long size, CancellationToken cancellationToken) private async Task<HttpResponseMessage> UploadFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken)
{
var timestamp = DateTime.UtcNow.ToString(Constants.TimestampFormat);
var request = new StepSummaryMetadataCreate()
{
WorkflowJobRunBackendId = jobId,
WorkflowRunBackendId = planId,
StepBackendId = stepId.ToString(),
Size = size,
UploadedAt = timestamp
};
var createStepSummaryMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepSummaryMetadata);
await CreateMetadata<StepSummaryMetadataCreate>(createStepSummaryMetadataEndpoint, cancellationToken, request, timestamp);
}
private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken)
{
var timestamp = DateTime.UtcNow.ToString(Constants.TimestampFormat);
var request = new StepLogsMetadataCreate()
{
WorkflowJobRunBackendId = jobId,
WorkflowRunBackendId = planId,
StepBackendId = stepId.ToString(),
UploadedAt = timestamp,
LineCount = lineCount,
};
var createStepLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateStepLogsMetadata);
await CreateMetadata<StepLogsMetadataCreate>(createStepLogsMetadataEndpoint, cancellationToken, request, timestamp);
}
private async Task JobLogUploadCompleteAsync(string planId, string jobId, long lineCount, CancellationToken cancellationToken)
{
var timestamp = DateTime.UtcNow.ToString(Constants.TimestampFormat);
var request = new JobLogsMetadataCreate()
{
WorkflowJobRunBackendId = jobId,
WorkflowRunBackendId = planId,
UploadedAt = timestamp,
LineCount = lineCount,
};
var createJobLogsMetadataEndpoint = new Uri(m_resultsServiceUrl, Constants.CreateJobLogsMetadata);
await CreateMetadata<JobLogsMetadataCreate>(createJobLogsMetadataEndpoint, cancellationToken, request, timestamp);
}
private async Task<HttpResponseMessage> UploadBlockFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken)
{ {
// Upload the file to the url // Upload the file to the url
var request = new HttpRequestMessage(HttpMethod.Put, url) var request = new HttpRequestMessage(HttpMethod.Put, url)
@@ -165,7 +95,7 @@ namespace GitHub.Services.Results.Client
if (blobStorageType == BlobStorageTypes.AzureBlobStorage) if (blobStorageType == BlobStorageTypes.AzureBlobStorage)
{ {
request.Content.Headers.Add(Constants.AzureBlobTypeHeader, Constants.AzureBlockBlob); request.Content.Headers.Add("x-ms-blob-type", "BlockBlob");
} }
using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken)) using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken))
@@ -178,55 +108,8 @@ namespace GitHub.Services.Results.Client
} }
} }
private async Task<HttpResponseMessage> CreateAppendFileAsync(string url, string blobStorageType, CancellationToken cancellationToken)
{
var request = new HttpRequestMessage(HttpMethod.Put, url)
{
Content = new StringContent("")
};
if (blobStorageType == BlobStorageTypes.AzureBlobStorage)
{
request.Content.Headers.Add(Constants.AzureBlobTypeHeader, Constants.AzureAppendBlob);
request.Content.Headers.Add("Content-Length", "0");
}
using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception($"Failed to create append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}");
}
return response;
}
}
private async Task<HttpResponseMessage> UploadAppendFileAsync(string url, string blobStorageType, FileStream file, bool finalize, long fileSize, CancellationToken cancellationToken)
{
var comp = finalize ? "&comp=appendblock&seal=true" : "&comp=appendblock";
// Upload the file to the url
var request = new HttpRequestMessage(HttpMethod.Put, url + comp)
{
Content = new StreamContent(file)
};
if (blobStorageType == BlobStorageTypes.AzureBlobStorage)
{
request.Content.Headers.Add("Content-Length", fileSize.ToString());
request.Content.Headers.Add(Constants.AzureBlobSealedHeader, finalize.ToString());
}
using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken))
{
if (!response.IsSuccessStatusCode)
{
throw new Exception($"Failed to upload append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}, object: {response}, fileSize: {fileSize}");
}
return response;
}
}
// Handle file upload for step summary // Handle file upload for step summary
public async Task UploadStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) public async Task UploadStepSummaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
{ {
// Get the upload url // Get the upload url
var uploadUrlResponse = await GetStepSummaryUploadUrlAsync(planId, jobId, stepId, cancellationToken); var uploadUrlResponse = await GetStepSummaryUploadUrlAsync(planId, jobId, stepId, cancellationToken);
@@ -245,97 +128,15 @@ namespace GitHub.Services.Results.Client
// Upload the file // Upload the file
using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true)) using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
{ {
var response = await UploadBlockFileAsync(uploadUrlResponse.SummaryUrl, uploadUrlResponse.BlobStorageType, fileStream, cancellationToken); var response = await UploadFileAsync(uploadUrlResponse.SummaryUrl, uploadUrlResponse.BlobStorageType, fileStream, cancellationToken);
} }
// Send step summary upload complete message // Send step summary upload complete message
await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken); await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken);
} }
// Handle file upload for step log
public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
// Get the upload url
var uploadUrlResponse = await GetStepLogUploadUrlAsync(planId, jobId, stepId, cancellationToken);
if (uploadUrlResponse == null || uploadUrlResponse.LogsUrl == null)
{
throw new Exception("Failed to get step log upload url");
}
// Create the Append blob
if (firstBlock)
{
await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken);
}
// Upload content
var fileSize = new FileInfo(file).Length;
using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
{
var response = await UploadAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, fileStream, finalize, fileSize, cancellationToken);
}
// Update metadata
if (finalize)
{
// Send step log upload complete message
await StepLogUploadCompleteAsync(planId, jobId, stepId, lineCount, cancellationToken);
}
}
// Handle file upload for job log
public async Task UploadResultsJobLogAsync(string planId, string jobId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
// Get the upload url
var uploadUrlResponse = await GetJobLogUploadUrlAsync(planId, jobId, cancellationToken);
if (uploadUrlResponse == null || uploadUrlResponse.LogsUrl == null)
{
throw new Exception("Failed to get job log upload url");
}
// Create the Append blob
if (firstBlock)
{
await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken);
}
// Upload content
var fileSize = new FileInfo(file).Length;
using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true))
{
var response = await UploadAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, fileStream, finalize, fileSize, cancellationToken);
}
// Update metadata
if (finalize)
{
// Send step log upload complete message
await JobLogUploadCompleteAsync(planId, jobId, lineCount, cancellationToken);
}
}
private MediaTypeFormatter m_formatter; private MediaTypeFormatter m_formatter;
private Uri m_resultsServiceUrl; private Uri m_resultsServiceUrl;
private string m_token; private string m_token;
} }
// Constants specific to results
public static class Constants
{
public static readonly string TimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.fffK";
public static readonly string ResultsReceiverTwirpEndpoint = "twirp/results.services.receiver.Receiver/";
public static readonly string GetStepSummarySignedBlobURL = ResultsReceiverTwirpEndpoint + "GetStepSummarySignedBlobURL";
public static readonly string CreateStepSummaryMetadata = ResultsReceiverTwirpEndpoint + "CreateStepSummaryMetadata";
public static readonly string GetStepLogsSignedBlobURL = ResultsReceiverTwirpEndpoint + "GetStepLogsSignedBlobURL";
public static readonly string CreateStepLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateStepLogsMetadata";
public static readonly string GetJobLogsSignedBlobURL = ResultsReceiverTwirpEndpoint + "GetJobLogsSignedBlobURL";
public static readonly string CreateJobLogsMetadata = ResultsReceiverTwirpEndpoint + "CreateJobLogsMetadata";
public static readonly string AzureBlobSealedHeader = "x-ms-blob-sealed";
public static readonly string AzureBlobTypeHeader = "x-ms-blob-type";
public static readonly string AzureBlockBlob = "BlockBlob";
public static readonly string AzureAppendBlob = "AppendBlob";
}
} }

View File

@@ -1,17 +1,12 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Reflection; using System.Reflection;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using GitHub.DistributedTask.ObjectTemplating.Tokens;
using GitHub.DistributedTask.Pipelines;
using GitHub.DistributedTask.Pipelines.ContextData;
using GitHub.DistributedTask.WebApi; using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Listener; using GitHub.Runner.Listener;
using GitHub.Runner.Listener.Configuration;
using GitHub.Services.WebApi; using GitHub.Services.WebApi;
using Moq; using Moq;
using Sdk.RSWebApi.Contracts;
using Xunit; using Xunit;
using Pipelines = GitHub.DistributedTask.Pipelines; using Pipelines = GitHub.DistributedTask.Pipelines;
@@ -23,8 +18,6 @@ namespace GitHub.Runner.Common.Tests.Listener
private Mock<IProcessChannel> _processChannel; private Mock<IProcessChannel> _processChannel;
private Mock<IProcessInvoker> _processInvoker; private Mock<IProcessInvoker> _processInvoker;
private Mock<IRunnerServer> _runnerServer; private Mock<IRunnerServer> _runnerServer;
private Mock<IRunServer> _runServer;
private Mock<IConfigurationStore> _configurationStore; private Mock<IConfigurationStore> _configurationStore;
public JobDispatcherL0() public JobDispatcherL0()
@@ -32,7 +25,6 @@ namespace GitHub.Runner.Common.Tests.Listener
_processChannel = new Mock<IProcessChannel>(); _processChannel = new Mock<IProcessChannel>();
_processInvoker = new Mock<IProcessInvoker>(); _processInvoker = new Mock<IProcessInvoker>();
_runnerServer = new Mock<IRunnerServer>(); _runnerServer = new Mock<IRunnerServer>();
_runServer = new Mock<IRunServer>();
_configurationStore = new Mock<IConfigurationStore>(); _configurationStore = new Mock<IConfigurationStore>();
} }
@@ -147,7 +139,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully);
_runnerServer.Verify(x => x.RenewAgentRequestAsync(It.IsAny<int>(), It.IsAny<long>(), It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Exactly(5)); _runnerServer.Verify(x => x.RenewAgentRequestAsync(It.IsAny<int>(), It.IsAny<long>(), It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Exactly(5));
@@ -205,7 +197,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed.");
Assert.False(cancellationTokenSource.IsCancellationRequested); Assert.False(cancellationTokenSource.IsCancellationRequested);
@@ -213,75 +205,6 @@ namespace GitHub.Runner.Common.Tests.Listener
} }
} }
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void DispatcherRenewJobOnRunServiceStopOnJobNotFoundExceptions()
{
//Arrange
using (var hc = new TestHostContext(this))
{
int poolId = 1;
Int64 requestId = 1000;
int count = 0;
var trace = hc.GetTrace(nameof(DispatcherRenewJobOnRunServiceStopOnJobNotFoundExceptions));
TaskCompletionSource<int> firstJobRequestRenewed = new();
CancellationTokenSource cancellationTokenSource = new();
TaskAgentJobRequest request = new();
PropertyInfo lockUntilProperty = request.GetType().GetProperty("LockedUntil", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public);
Assert.NotNull(lockUntilProperty);
lockUntilProperty.SetValue(request, DateTime.UtcNow.AddMinutes(5));
hc.SetSingleton<IRunServer>(_runServer.Object);
hc.SetSingleton<IConfigurationStore>(_configurationStore.Object);
_configurationStore.Setup(x => x.GetSettings()).Returns(new RunnerSettings() { PoolId = 1 });
_ = _runServer.Setup(x => x.RenewJobAsync(It.IsAny<Guid>(), It.IsAny<Guid>(), It.IsAny<CancellationToken>()))
.Returns(() =>
{
count++;
if (!firstJobRequestRenewed.Task.IsCompletedSuccessfully)
{
trace.Info("First renew happens.");
}
if (count < 5)
{
var response = new RenewJobResponse()
{
LockedUntil = request.LockedUntil.Value
};
return Task.FromResult<RenewJobResponse>(response);
}
else if (count == 5)
{
cancellationTokenSource.CancelAfter(10000);
throw new TaskOrchestrationJobNotFoundException("");
}
else
{
throw new InvalidOperationException("Should not reach here.");
}
});
var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc);
EnableRunServiceJobForJobDispatcher(jobDispatcher);
// Set the value of the _isRunServiceJob field to true
var isRunServiceJobField = typeof(JobDispatcher).GetField("_isRunServiceJob", BindingFlags.NonPublic | BindingFlags.Instance);
isRunServiceJobField.SetValue(jobDispatcher, true);
await jobDispatcher.RenewJobRequestAsync(GetAgentJobRequestMessage(), GetServiceEndpoint(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed.");
Assert.False(cancellationTokenSource.IsCancellationRequested);
_runServer.Verify(x => x.RenewJobAsync(It.IsAny<Guid>(), It.IsAny<Guid>(), It.IsAny<CancellationToken>()), Times.Exactly(5));
}
}
[Fact] [Fact]
[Trait("Level", "L0")] [Trait("Level", "L0")]
[Trait("Category", "Runner")] [Trait("Category", "Runner")]
@@ -333,7 +256,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed.");
Assert.False(cancellationTokenSource.IsCancellationRequested); Assert.False(cancellationTokenSource.IsCancellationRequested);
@@ -389,9 +312,8 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
// Act // Act
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
// Assert // Assert
_configurationStore.Verify(x => x.SaveSettings(It.Is<RunnerSettings>(settings => settings.AgentName == newName)), Times.Once); _configurationStore.Verify(x => x.SaveSettings(It.Is<RunnerSettings>(settings => settings.AgentName == newName)), Times.Once);
@@ -446,7 +368,7 @@ namespace GitHub.Runner.Common.Tests.Listener
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
// Act // Act
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
// Assert // Assert
_configurationStore.Verify(x => x.SaveSettings(It.IsAny<RunnerSettings>()), Times.Never); _configurationStore.Verify(x => x.SaveSettings(It.IsAny<RunnerSettings>()), Times.Never);
@@ -499,7 +421,7 @@ namespace GitHub.Runner.Common.Tests.Listener
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
// Act // Act
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), 0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(0, 0, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
// Assert // Assert
_configurationStore.Verify(x => x.SaveSettings(It.IsAny<RunnerSettings>()), Times.Never); _configurationStore.Verify(x => x.SaveSettings(It.IsAny<RunnerSettings>()), Times.Never);
@@ -557,7 +479,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed.");
Assert.True(cancellationTokenSource.IsCancellationRequested); Assert.True(cancellationTokenSource.IsCancellationRequested);
@@ -614,7 +536,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.False(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should failed."); Assert.False(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should failed.");
Assert.False(cancellationTokenSource.IsCancellationRequested); Assert.False(cancellationTokenSource.IsCancellationRequested);
@@ -678,7 +600,7 @@ namespace GitHub.Runner.Common.Tests.Listener
var jobDispatcher = new JobDispatcher(); var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc); jobDispatcher.Initialize(hc);
await jobDispatcher.RenewJobRequestAsync(It.IsAny<AgentJobRequestMessage>(), It.IsAny<ServiceEndpoint>(), poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token); await jobDispatcher.RenewJobRequestAsync(poolId, requestId, Guid.Empty, Guid.NewGuid().ToString(), firstJobRequestRenewed, cancellationTokenSource.Token);
Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed."); Assert.True(firstJobRequestRenewed.Task.IsCompletedSuccessfully, "First renew should succeed.");
Assert.False(cancellationTokenSource.IsCancellationRequested); Assert.False(cancellationTokenSource.IsCancellationRequested);
@@ -737,78 +659,5 @@ namespace GitHub.Runner.Common.Tests.Listener
Assert.True(jobDispatcher.RunOnceJobCompleted.Task.Result, "JobDispatcher should set task complete token to 'TRUE' for one time agent."); Assert.True(jobDispatcher.RunOnceJobCompleted.Task.Result, "JobDispatcher should set task complete token to 'TRUE' for one time agent.");
} }
} }
private static void EnableRunServiceJobForJobDispatcher(JobDispatcher jobDispatcher)
{
// Set the value of the _isRunServiceJob field to true
var isRunServiceJobField = typeof(JobDispatcher).GetField("_isRunServiceJob", BindingFlags.NonPublic | BindingFlags.Instance);
isRunServiceJobField.SetValue(jobDispatcher, true);
}
private static ServiceEndpoint GetServiceEndpoint()
{
var serviceEndpoint = new ServiceEndpoint
{
Authorization = new EndpointAuthorization
{
Scheme = EndpointAuthorizationSchemes.OAuth
}
};
serviceEndpoint.Authorization.Parameters.Add("AccessToken", "token");
return serviceEndpoint;
}
private static AgentJobRequestMessage GetAgentJobRequestMessage()
{
var message = new AgentJobRequestMessage(
new TaskOrchestrationPlanReference()
{
PlanType = "Build",
PlanId = Guid.NewGuid(),
Version = 1
},
new TimelineReference()
{
Id = Guid.NewGuid()
},
Guid.NewGuid(),
"jobDisplayName",
"jobName",
null,
null,
new List<TemplateToken>(),
new Dictionary<string, VariableValue>()
{
{
"variables",
new VariableValue()
{
IsSecret = false,
Value = "variables"
}
}
},
new List<MaskHint>()
{
new MaskHint()
{
Type = MaskType.Variable,
Value = "maskHints"
}
},
new JobResources(),
new DictionaryContextData(),
new WorkspaceOptions(),
new List<JobStep>(),
new List<string>()
{
"fileTable"
},
null,
new List<TemplateToken>(),
new ActionsEnvironmentReference("env")
);
return message;
}
} }
} }

View File

@@ -351,7 +351,7 @@ namespace GitHub.Runner.Common.Tests
Assert.False(proxy.IsBypassed(new Uri("https://actions.com"))); Assert.False(proxy.IsBypassed(new Uri("https://actions.com")));
Assert.False(proxy.IsBypassed(new Uri("https://ggithub.com"))); Assert.False(proxy.IsBypassed(new Uri("https://ggithub.com")));
Assert.False(proxy.IsBypassed(new Uri("https://github.comm"))); Assert.False(proxy.IsBypassed(new Uri("https://github.comm")));
Assert.False(proxy.IsBypassed(new Uri("https://google.com"))); // no_proxy has '.google.com', specifying only subdomains bypass Assert.False(proxy.IsBypassed(new Uri("https://google.com")));
Assert.False(proxy.IsBypassed(new Uri("https://example.com"))); Assert.False(proxy.IsBypassed(new Uri("https://example.com")));
Assert.False(proxy.IsBypassed(new Uri("http://example.com:333"))); Assert.False(proxy.IsBypassed(new Uri("http://example.com:333")));
Assert.False(proxy.IsBypassed(new Uri("http://192.168.0.123:123"))); Assert.False(proxy.IsBypassed(new Uri("http://192.168.0.123:123")));
@@ -374,76 +374,6 @@ namespace GitHub.Runner.Common.Tests
CleanProxyEnv(); CleanProxyEnv();
} }
} }
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Common")]
public void BypassAllOnWildcardNoProxy()
{
try
{
Environment.SetEnvironmentVariable("http_proxy", "http://user1:pass1%40@127.0.0.1:8888");
Environment.SetEnvironmentVariable("https_proxy", "http://user2:pass2%40@127.0.0.1:9999");
Environment.SetEnvironmentVariable("no_proxy", "example.com, * , example2.com");
var proxy = new RunnerWebProxy();
Assert.True(proxy.IsBypassed(new Uri("http://actions.com")));
Assert.True(proxy.IsBypassed(new Uri("http://localhost")));
Assert.True(proxy.IsBypassed(new Uri("http://127.0.0.1:8080")));
Assert.True(proxy.IsBypassed(new Uri("https://actions.com")));
Assert.True(proxy.IsBypassed(new Uri("https://localhost")));
Assert.True(proxy.IsBypassed(new Uri("https://127.0.0.1:8080")));
}
finally
{
CleanProxyEnv();
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Common")]
public void IgnoreWildcardInNoProxySubdomain()
{
try
{
Environment.SetEnvironmentVariable("http_proxy", "http://user1:pass1%40@127.0.0.1:8888");
Environment.SetEnvironmentVariable("https_proxy", "http://user2:pass2%40@127.0.0.1:9999");
Environment.SetEnvironmentVariable("no_proxy", "*.example.com");
var proxy = new RunnerWebProxy();
Assert.False(proxy.IsBypassed(new Uri("http://sub.example.com")));
Assert.False(proxy.IsBypassed(new Uri("http://example.com")));
}
finally
{
CleanProxyEnv();
}
}
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Common")]
public void WildcardNoProxyWorksWhenOtherNoProxyAreAround()
{
try
{
Environment.SetEnvironmentVariable("http_proxy", "http://user1:pass1%40@127.0.0.1:8888");
Environment.SetEnvironmentVariable("https_proxy", "http://user2:pass2%40@127.0.0.1:9999");
Environment.SetEnvironmentVariable("no_proxy", "example.com,*,example2.com");
var proxy = new RunnerWebProxy();
Assert.True(proxy.IsBypassed(new Uri("http://actions.com")));
Assert.True(proxy.IsBypassed(new Uri("http://localhost")));
Assert.True(proxy.IsBypassed(new Uri("http://127.0.0.1:8080")));
Assert.True(proxy.IsBypassed(new Uri("https://actions.com")));
Assert.True(proxy.IsBypassed(new Uri("https://localhost")));
Assert.True(proxy.IsBypassed(new Uri("https://127.0.0.1:8080")));
}
finally
{
CleanProxyEnv();
}
}
[Fact] [Fact]
[Trait("Level", "L0")] [Trait("Level", "L0")]