From 90a623646609119a50512724d7bc83fc0e85e9de Mon Sep 17 00:00:00 2001 From: Wout Van De Wiel Date: Tue, 31 Oct 2023 12:51:09 +0100 Subject: [PATCH] Add option to use the kubernetes scheduler for workflow pods (#111) * Add option to use kube scheduler This should only be used when rwx volumes are supported or when using a single node cluster. * Add option to set timeout for prepare job If the kube scheduler is used to hold jobs until sufficient resources are available, then prepare job needs to wait for a longer period until the workflow pod is running. This timeout will mostly need an increase in cases where many jobs are triggered which together exceed the resources available in the cluster. The workflows can then be gracefully handled later when sufficient resources become available again. * Skip name override warning when names match or job extension * Add guard for positive timeouts with a warning * Write out ReadWriteMany in full --- packages/k8s/src/hooks/prepare-job.ts | 6 ++-- packages/k8s/src/k8s/index.ts | 41 +++++++++++++++++++++++--- packages/k8s/src/k8s/utils.ts | 10 ++++++- packages/k8s/tests/prepare-job-test.ts | 12 ++++++++ 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/packages/k8s/src/hooks/prepare-job.ts b/packages/k8s/src/hooks/prepare-job.ts index 1a3b0a5..05153a4 100644 --- a/packages/k8s/src/hooks/prepare-job.ts +++ b/packages/k8s/src/hooks/prepare-job.ts @@ -13,7 +13,8 @@ import { createPod, isPodContainerAlpine, prunePods, - waitForPodPhases + waitForPodPhases, + getPrepareJobTimeoutSeconds } from '../k8s' import { containerVolumes, @@ -91,7 +92,8 @@ export async function prepareJob( await waitForPodPhases( createdPod.metadata.name, new Set([PodPhase.RUNNING]), - new Set([PodPhase.PENDING]) + new Set([PodPhase.PENDING]), + getPrepareJobTimeoutSeconds() ) } catch (err) { await prunePods() diff --git a/packages/k8s/src/k8s/index.ts b/packages/k8s/src/k8s/index.ts index 7a64788..5e45954 100644 --- a/packages/k8s/src/k8s/index.ts +++ b/packages/k8s/src/k8s/index.ts @@ -10,7 +10,12 @@ import { getVolumeClaimName, RunnerInstanceLabel } from '../hooks/constants' -import { PodPhase, mergePodSpecWithOptions, mergeObjectMeta } from './utils' +import { + PodPhase, + mergePodSpecWithOptions, + mergeObjectMeta, + useKubeScheduler +} from './utils' const kc = new k8s.KubeConfig() @@ -20,6 +25,8 @@ const k8sApi = kc.makeApiClient(k8s.CoreV1Api) const k8sBatchV1Api = kc.makeApiClient(k8s.BatchV1Api) const k8sAuthorizationV1Api = kc.makeApiClient(k8s.AuthorizationV1Api) +const DEFAULT_WAIT_FOR_POD_TIME_SECONDS = 10 * 60 // 10 min + export const POD_VOLUME_NAME = 'work' export const requiredPermissions = [ @@ -86,7 +93,11 @@ export async function createPod( appPod.spec = new k8s.V1PodSpec() appPod.spec.containers = containers appPod.spec.restartPolicy = 'Never' - appPod.spec.nodeName = await getCurrentNodeName() + + if (!useKubeScheduler()) { + appPod.spec.nodeName = await getCurrentNodeName() + } + const claimName = getVolumeClaimName() appPod.spec.volumes = [ { @@ -142,7 +153,10 @@ export async function createJob( job.spec.template.metadata.annotations = {} job.spec.template.spec.containers = [container] job.spec.template.spec.restartPolicy = 'Never' - job.spec.template.spec.nodeName = await getCurrentNodeName() + + if (!useKubeScheduler()) { + job.spec.template.spec.nodeName = await getCurrentNodeName() + } const claimName = getVolumeClaimName() job.spec.template.spec.volumes = [ @@ -346,7 +360,7 @@ export async function waitForPodPhases( podName: string, awaitingPhases: Set, backOffPhases: Set, - maxTimeSeconds = 10 * 60 // 10 min + maxTimeSeconds = DEFAULT_WAIT_FOR_POD_TIME_SECONDS ): Promise { const backOffManager = new BackOffManager(maxTimeSeconds) let phase: PodPhase = PodPhase.UNKNOWN @@ -369,6 +383,25 @@ export async function waitForPodPhases( } } +export function getPrepareJobTimeoutSeconds(): number { + const envTimeoutSeconds = + process.env['ACTIONS_RUNNER_PREPARE_JOB_TIMEOUT_SECONDS'] + + if (!envTimeoutSeconds) { + return DEFAULT_WAIT_FOR_POD_TIME_SECONDS + } + + const timeoutSeconds = parseInt(envTimeoutSeconds, 10) + if (!timeoutSeconds || timeoutSeconds <= 0) { + core.warning( + `Prepare job timeout is invalid ("${timeoutSeconds}"): use an int > 0` + ) + return DEFAULT_WAIT_FOR_POD_TIME_SECONDS + } + + return timeoutSeconds +} + async function getPodPhase(podName: string): Promise { const podPhaseLookup = new Set([ PodPhase.PENDING, diff --git a/packages/k8s/src/k8s/utils.ts b/packages/k8s/src/k8s/utils.ts index 86f3d06..860b412 100644 --- a/packages/k8s/src/k8s/utils.ts +++ b/packages/k8s/src/k8s/utils.ts @@ -6,11 +6,13 @@ import { Mount } from 'hooklib' import * as path from 'path' import { v1 as uuidv4 } from 'uuid' import { POD_VOLUME_NAME } from './index' +import { JOB_CONTAINER_EXTENSION_NAME } from '../hooks/constants' export const DEFAULT_CONTAINER_ENTRY_POINT_ARGS = [`-f`, `/dev/null`] export const DEFAULT_CONTAINER_ENTRY_POINT = 'tail' export const ENV_HOOK_TEMPLATE_PATH = 'ACTIONS_RUNNER_CONTAINER_HOOK_TEMPLATE' +export const ENV_USE_KUBE_SCHEDULER = 'ACTIONS_RUNNER_USE_KUBE_SCHEDULER' export function containerVolumes( userMountVolumes: Mount[] = [], @@ -177,7 +179,9 @@ export function mergeContainerWithOptions( ): void { for (const [key, value] of Object.entries(from)) { if (key === 'name') { - core.warning("Skipping name override: name can't be overwritten") + if (value !== base.name && value !== JOB_CONTAINER_EXTENSION_NAME) { + core.warning("Skipping name override: name can't be overwritten") + } continue } else if (key === 'image') { core.warning("Skipping image override: image can't be overwritten") @@ -257,6 +261,10 @@ export function readExtensionFromFile(): k8s.V1PodTemplateSpec | undefined { return doc as k8s.V1PodTemplateSpec } +export function useKubeScheduler(): boolean { + return process.env[ENV_USE_KUBE_SCHEDULER] === 'true' +} + export enum PodPhase { PENDING = 'Pending', RUNNING = 'Running', diff --git a/packages/k8s/tests/prepare-job-test.ts b/packages/k8s/tests/prepare-job-test.ts index adb8c1b..7047fa0 100644 --- a/packages/k8s/tests/prepare-job-test.ts +++ b/packages/k8s/tests/prepare-job-test.ts @@ -5,6 +5,7 @@ import { createContainerSpec, prepareJob } from '../src/hooks/prepare-job' import { TestHelper } from './test-setup' import { ENV_HOOK_TEMPLATE_PATH, + ENV_USE_KUBE_SCHEDULER, generateContainerName, readExtensionFromFile } from '../src/k8s/utils' @@ -130,6 +131,17 @@ describe('Prepare job', () => { expect(got.spec?.containers[2].args).toEqual(['-c', 'sleep 60']) }) + it('should not throw exception using kube scheduler', async () => { + // only for ReadWriteMany volumes or single node cluster + process.env[ENV_USE_KUBE_SCHEDULER] = 'true' + + await expect( + prepareJob(prepareJobData.args, prepareJobOutputFilePath) + ).resolves.not.toThrow() + + delete process.env[ENV_USE_KUBE_SCHEDULER] + }) + test.each([undefined, null, []])( 'should not throw exception when portMapping=%p', async pm => {