Add option to use the kubernetes scheduler for workflow pods (#111)

* Add option to use kube scheduler

This should only be used when rwx volumes are supported or when using a single node cluster.

* Add option to set timeout for prepare job

If the kube scheduler is used to hold jobs until sufficient resources are available,
then prepare job needs to wait for a longer period until the workflow pod is running.
This timeout will mostly need an increase in cases where many jobs are triggered
which together exceed the resources available in the cluster.
The workflows can then be gracefully handled later when sufficient resources become available again.

* Skip name override warning when names match or job extension

* Add guard for positive timeouts with a warning

* Write out ReadWriteMany in full
This commit is contained in:
Wout Van De Wiel
2023-10-31 12:51:09 +01:00
committed by GitHub
parent 496287d61d
commit 90a6236466
4 changed files with 62 additions and 7 deletions

View File

@@ -13,7 +13,8 @@ import {
createPod,
isPodContainerAlpine,
prunePods,
waitForPodPhases
waitForPodPhases,
getPrepareJobTimeoutSeconds
} from '../k8s'
import {
containerVolumes,
@@ -91,7 +92,8 @@ export async function prepareJob(
await waitForPodPhases(
createdPod.metadata.name,
new Set([PodPhase.RUNNING]),
new Set([PodPhase.PENDING])
new Set([PodPhase.PENDING]),
getPrepareJobTimeoutSeconds()
)
} catch (err) {
await prunePods()

View File

@@ -10,7 +10,12 @@ import {
getVolumeClaimName,
RunnerInstanceLabel
} from '../hooks/constants'
import { PodPhase, mergePodSpecWithOptions, mergeObjectMeta } from './utils'
import {
PodPhase,
mergePodSpecWithOptions,
mergeObjectMeta,
useKubeScheduler
} from './utils'
const kc = new k8s.KubeConfig()
@@ -20,6 +25,8 @@ const k8sApi = kc.makeApiClient(k8s.CoreV1Api)
const k8sBatchV1Api = kc.makeApiClient(k8s.BatchV1Api)
const k8sAuthorizationV1Api = kc.makeApiClient(k8s.AuthorizationV1Api)
const DEFAULT_WAIT_FOR_POD_TIME_SECONDS = 10 * 60 // 10 min
export const POD_VOLUME_NAME = 'work'
export const requiredPermissions = [
@@ -86,7 +93,11 @@ export async function createPod(
appPod.spec = new k8s.V1PodSpec()
appPod.spec.containers = containers
appPod.spec.restartPolicy = 'Never'
appPod.spec.nodeName = await getCurrentNodeName()
if (!useKubeScheduler()) {
appPod.spec.nodeName = await getCurrentNodeName()
}
const claimName = getVolumeClaimName()
appPod.spec.volumes = [
{
@@ -142,7 +153,10 @@ export async function createJob(
job.spec.template.metadata.annotations = {}
job.spec.template.spec.containers = [container]
job.spec.template.spec.restartPolicy = 'Never'
job.spec.template.spec.nodeName = await getCurrentNodeName()
if (!useKubeScheduler()) {
job.spec.template.spec.nodeName = await getCurrentNodeName()
}
const claimName = getVolumeClaimName()
job.spec.template.spec.volumes = [
@@ -346,7 +360,7 @@ export async function waitForPodPhases(
podName: string,
awaitingPhases: Set<PodPhase>,
backOffPhases: Set<PodPhase>,
maxTimeSeconds = 10 * 60 // 10 min
maxTimeSeconds = DEFAULT_WAIT_FOR_POD_TIME_SECONDS
): Promise<void> {
const backOffManager = new BackOffManager(maxTimeSeconds)
let phase: PodPhase = PodPhase.UNKNOWN
@@ -369,6 +383,25 @@ export async function waitForPodPhases(
}
}
export function getPrepareJobTimeoutSeconds(): number {
const envTimeoutSeconds =
process.env['ACTIONS_RUNNER_PREPARE_JOB_TIMEOUT_SECONDS']
if (!envTimeoutSeconds) {
return DEFAULT_WAIT_FOR_POD_TIME_SECONDS
}
const timeoutSeconds = parseInt(envTimeoutSeconds, 10)
if (!timeoutSeconds || timeoutSeconds <= 0) {
core.warning(
`Prepare job timeout is invalid ("${timeoutSeconds}"): use an int > 0`
)
return DEFAULT_WAIT_FOR_POD_TIME_SECONDS
}
return timeoutSeconds
}
async function getPodPhase(podName: string): Promise<PodPhase> {
const podPhaseLookup = new Set<string>([
PodPhase.PENDING,

View File

@@ -6,11 +6,13 @@ import { Mount } from 'hooklib'
import * as path from 'path'
import { v1 as uuidv4 } from 'uuid'
import { POD_VOLUME_NAME } from './index'
import { JOB_CONTAINER_EXTENSION_NAME } from '../hooks/constants'
export const DEFAULT_CONTAINER_ENTRY_POINT_ARGS = [`-f`, `/dev/null`]
export const DEFAULT_CONTAINER_ENTRY_POINT = 'tail'
export const ENV_HOOK_TEMPLATE_PATH = 'ACTIONS_RUNNER_CONTAINER_HOOK_TEMPLATE'
export const ENV_USE_KUBE_SCHEDULER = 'ACTIONS_RUNNER_USE_KUBE_SCHEDULER'
export function containerVolumes(
userMountVolumes: Mount[] = [],
@@ -177,7 +179,9 @@ export function mergeContainerWithOptions(
): void {
for (const [key, value] of Object.entries(from)) {
if (key === 'name') {
core.warning("Skipping name override: name can't be overwritten")
if (value !== base.name && value !== JOB_CONTAINER_EXTENSION_NAME) {
core.warning("Skipping name override: name can't be overwritten")
}
continue
} else if (key === 'image') {
core.warning("Skipping image override: image can't be overwritten")
@@ -257,6 +261,10 @@ export function readExtensionFromFile(): k8s.V1PodTemplateSpec | undefined {
return doc as k8s.V1PodTemplateSpec
}
export function useKubeScheduler(): boolean {
return process.env[ENV_USE_KUBE_SCHEDULER] === 'true'
}
export enum PodPhase {
PENDING = 'Pending',
RUNNING = 'Running',

View File

@@ -5,6 +5,7 @@ import { createContainerSpec, prepareJob } from '../src/hooks/prepare-job'
import { TestHelper } from './test-setup'
import {
ENV_HOOK_TEMPLATE_PATH,
ENV_USE_KUBE_SCHEDULER,
generateContainerName,
readExtensionFromFile
} from '../src/k8s/utils'
@@ -130,6 +131,17 @@ describe('Prepare job', () => {
expect(got.spec?.containers[2].args).toEqual(['-c', 'sleep 60'])
})
it('should not throw exception using kube scheduler', async () => {
// only for ReadWriteMany volumes or single node cluster
process.env[ENV_USE_KUBE_SCHEDULER] = 'true'
await expect(
prepareJob(prepareJobData.args, prepareJobOutputFilePath)
).resolves.not.toThrow()
delete process.env[ENV_USE_KUBE_SCHEDULER]
})
test.each([undefined, null, []])(
'should not throw exception when portMapping=%p',
async pm => {