From ab92e4edc305917c21b4d6d751da22c93bb373dd Mon Sep 17 00:00:00 2001 From: Nikola Jokic Date: Fri, 17 May 2024 15:12:16 +0200 Subject: [PATCH] Re-use the last desired patch on empty batch (#3453) --- cmd/ghalistener/worker/worker.go | 85 +++-- cmd/ghalistener/worker/worker_test.go | 326 ++++++++++++++++++ .../ephemeralrunnerset_controller.go | 3 - 3 files changed, 376 insertions(+), 38 deletions(-) create mode 100644 cmd/ghalistener/worker/worker_test.go diff --git a/cmd/ghalistener/worker/worker.go b/cmd/ghalistener/worker/worker.go index 25fb90e1..9d6266bf 100644 --- a/cmd/ghalistener/worker/worker.go +++ b/cmd/ghalistener/worker/worker.go @@ -38,20 +38,20 @@ type Config struct { // The Worker's role is to process the messages it receives from the listener. // It then initiates Kubernetes API requests to carry out the necessary actions. type Worker struct { - clientset *kubernetes.Clientset - config Config - lastPatch int - lastPatchID int - logger *logr.Logger + clientset *kubernetes.Clientset + config Config + lastPatch int + patchSeq int + logger *logr.Logger } var _ listener.Handler = (*Worker)(nil) func New(config Config, options ...Option) (*Worker, error) { w := &Worker{ - config: config, - lastPatch: -1, - lastPatchID: -1, + config: config, + lastPatch: -1, + patchSeq: -1, } conf, err := rest.InClusterConfig() @@ -163,27 +163,8 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart // The function then scales the ephemeral runner set by applying the merge patch. // Finally, it logs the scaled ephemeral runner set details and returns nil if successful. // If any error occurs during the process, it returns an error with a descriptive message. -func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) { - // Max runners should always be set by the resource builder either to the configured value, - // or the maximum int32 (resourcebuilder.newAutoScalingListener()). - targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) - - logValues := []any{ - "assigned job", count, - "decision", targetRunnerCount, - "min", w.config.MinRunners, - "max", w.config.MaxRunners, - "currentRunnerCount", w.lastPatch, - "jobsCompleted", jobsCompleted, - } - - if count == 0 && jobsCompleted == 0 { - w.lastPatchID = 0 - } else { - w.lastPatchID++ - } - - w.lastPatch = targetRunnerCount +func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) { + patchID := w.setDesiredWorkerState(count, jobsCompleted) original, err := json.Marshal( &v1alpha1.EphemeralRunnerSet{ @@ -200,8 +181,8 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo patch, err := json.Marshal( &v1alpha1.EphemeralRunnerSet{ Spec: v1alpha1.EphemeralRunnerSetSpec{ - Replicas: targetRunnerCount, - PatchID: w.lastPatchID, + Replicas: w.lastPatch, + PatchID: patchID, }, }, ) @@ -210,14 +191,13 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo return 0, err } + w.logger.Info("Compare", "original", string(original), "patch", string(patch)) mergePatch, err := jsonpatch.CreateMergePatch(original, patch) if err != nil { return 0, fmt.Errorf("failed to create merge patch json for ephemeral runner set: %w", err) } - w.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch)) - - w.logger.Info("Scaling ephemeral runner set", logValues...) + w.logger.Info("Preparing EphemeralRunnerSet update", "json", string(mergePatch)) patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{} err = w.clientset.RESTClient(). @@ -238,5 +218,40 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo "name", w.config.EphemeralRunnerSetName, "replicas", patchedEphemeralRunnerSet.Spec.Replicas, ) - return targetRunnerCount, nil + return w.lastPatch, nil +} + +// calculateDesiredState calculates the desired state of the worker based on the desired count and the the number of jobs completed. +func (w *Worker) setDesiredWorkerState(count, jobsCompleted int) int { + // Max runners should always be set by the resource builder either to the configured value, + // or the maximum int32 (resourcebuilder.newAutoScalingListener()). + targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners) + w.patchSeq++ + desiredPatchID := w.patchSeq + + if count == 0 && jobsCompleted == 0 { // empty batch + targetRunnerCount = max(w.lastPatch, targetRunnerCount) + if targetRunnerCount == w.config.MinRunners { + // We have an empty batch, and the last patch was the min runners. + // Since this is an empty batch, and we are at the min runners, they should all be idle. + // If controller created few more pods on accident (during scale down events), + // this situation allows the controller to scale down to the min runners. + // However, it is important to keep the patch sequence increasing so we don't ignore one batch. + desiredPatchID = 0 + } + } + + w.lastPatch = targetRunnerCount + + w.logger.Info( + "Calculated target runner count", + "assigned job", count, + "decision", targetRunnerCount, + "min", w.config.MinRunners, + "max", w.config.MaxRunners, + "currentRunnerCount", w.lastPatch, + "jobsCompleted", jobsCompleted, + ) + + return desiredPatchID } diff --git a/cmd/ghalistener/worker/worker_test.go b/cmd/ghalistener/worker/worker_test.go new file mode 100644 index 00000000..d009bccf --- /dev/null +++ b/cmd/ghalistener/worker/worker_test.go @@ -0,0 +1,326 @@ +package worker + +import ( + "math" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" +) + +func TestSetDesiredWorkerState_MinMaxDefaults(t *testing.T) { + logger := logr.Discard() + newEmptyWorker := func() *Worker { + return &Worker{ + config: Config{ + MinRunners: 0, + MaxRunners: math.MaxInt32, + }, + lastPatch: -1, + patchSeq: -1, + logger: &logger, + } + } + + t.Run("init calculate with acquired 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + assert.Equal(t, 0, patchID) + }) + + t.Run("init calculate with acquired 1", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 0) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + assert.Equal(t, 0, patchID) + }) + + t.Run("increment patch when job done", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("increment patch when called with same parameters", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(1, 0) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("calculate desired scale when acquired > 0 and completed > 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 1) + assert.Equal(t, 0, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("re-use the last state when acquired == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("adjust when acquired == 0 and completed == 1", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 1) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) +} + +func TestSetDesiredWorkerState_MinSet(t *testing.T) { + logger := logr.Discard() + newEmptyWorker := func() *Worker { + return &Worker{ + config: Config{ + MinRunners: 1, + MaxRunners: math.MaxInt32, + }, + lastPatch: -1, + patchSeq: -1, + logger: &logger, + } + } + + t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 1, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("request back to 0 on job done", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("desired patch is 0 but sequence continues on empty batch and min runners", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(3, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 4, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + + patchID = w.setDesiredWorkerState(0, 3) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + + // Empty batch on min runners + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) // forcing the state + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 2, w.patchSeq) + }) + +} + +func TestSetDesiredWorkerState_MaxSet(t *testing.T) { + logger := logr.Discard() + newEmptyWorker := func() *Worker { + return &Worker{ + config: Config{ + MinRunners: 0, + MaxRunners: 5, + }, + lastPatch: -1, + patchSeq: -1, + logger: &logger, + } + } + + t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 1, patchID) + assert.Equal(t, 2, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("request back to 0 on job done", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("scale up to max when count > max", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(6, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 5, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("scale to max when count == max", func(t *testing.T) { + w := newEmptyWorker() + w.setDesiredWorkerState(5, 0) + assert.Equal(t, 5, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("scale to max when count > max and completed > 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(1, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(6, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 5, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("scale back to 0 when count was > max", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(6, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("force 0 on empty batch and last patch == min runners", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(3, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + + patchID = w.setDesiredWorkerState(0, 3) + assert.Equal(t, 1, patchID) + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + + // Empty batch on min runners + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) // forcing the state + assert.Equal(t, 0, w.lastPatch) + assert.Equal(t, 2, w.patchSeq) + }) +} + +func TestSetDesiredWorkerState_MinMaxSet(t *testing.T) { + logger := logr.Discard() + newEmptyWorker := func() *Worker { + return &Worker{ + config: Config{ + MinRunners: 1, + MaxRunners: 3, + }, + lastPatch: -1, + patchSeq: -1, + logger: &logger, + } + } + + t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 1, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("scale to min when count == 0", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(2, 0) + assert.Equal(t, 0, patchID) + patchID = w.setDesiredWorkerState(0, 1) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + }) + + t.Run("scale up to max when count > max", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(4, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("scale to max when count == max", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(3, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + }) + + t.Run("force 0 on empty batch and last patch == min runners", func(t *testing.T) { + w := newEmptyWorker() + patchID := w.setDesiredWorkerState(3, 0) + assert.Equal(t, 0, patchID) + assert.Equal(t, 3, w.lastPatch) + assert.Equal(t, 0, w.patchSeq) + + patchID = w.setDesiredWorkerState(0, 3) + assert.Equal(t, 1, patchID) + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 1, w.patchSeq) + + // Empty batch on min runners + patchID = w.setDesiredWorkerState(0, 0) + assert.Equal(t, 0, patchID) // forcing the state + assert.Equal(t, 1, w.lastPatch) + assert.Equal(t, 2, w.patchSeq) + }) +} diff --git a/controllers/actions.github.com/ephemeralrunnerset_controller.go b/controllers/actions.github.com/ephemeralrunnerset_controller.go index 4c3692b8..c5d166a5 100644 --- a/controllers/actions.github.com/ephemeralrunnerset_controller.go +++ b/controllers/actions.github.com/ephemeralrunnerset_controller.go @@ -213,9 +213,6 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R // on the next batch case ephemeralRunnerSet.Spec.PatchID == 0 && total > ephemeralRunnerSet.Spec.Replicas: count := total - ephemeralRunnerSet.Spec.Replicas - if count <= 0 { - break - } log.Info("Deleting ephemeral runners (scale down)", "count", count) if err := r.deleteIdleEphemeralRunners( ctx,