Re-use the last desired patch on empty batch (#3453)

This commit is contained in:
Nikola Jokic
2024-05-17 15:12:16 +02:00
committed by GitHub
parent fa7a4f584e
commit ab92e4edc3
3 changed files with 376 additions and 38 deletions

View File

@@ -41,7 +41,7 @@ type Worker struct {
clientset *kubernetes.Clientset
config Config
lastPatch int
lastPatchID int
patchSeq int
logger *logr.Logger
}
@@ -51,7 +51,7 @@ func New(config Config, options ...Option) (*Worker, error) {
w := &Worker{
config: config,
lastPatch: -1,
lastPatchID: -1,
patchSeq: -1,
}
conf, err := rest.InClusterConfig()
@@ -163,27 +163,8 @@ func (w *Worker) HandleJobStarted(ctx context.Context, jobInfo *actions.JobStart
// The function then scales the ephemeral runner set by applying the merge patch.
// Finally, it logs the scaled ephemeral runner set details and returns nil if successful.
// If any error occurs during the process, it returns an error with a descriptive message.
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCompleted int) (int, error) {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
logValues := []any{
"assigned job", count,
"decision", targetRunnerCount,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
"jobsCompleted", jobsCompleted,
}
if count == 0 && jobsCompleted == 0 {
w.lastPatchID = 0
} else {
w.lastPatchID++
}
w.lastPatch = targetRunnerCount
func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count, jobsCompleted int) (int, error) {
patchID := w.setDesiredWorkerState(count, jobsCompleted)
original, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
@@ -200,8 +181,8 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
patch, err := json.Marshal(
&v1alpha1.EphemeralRunnerSet{
Spec: v1alpha1.EphemeralRunnerSetSpec{
Replicas: targetRunnerCount,
PatchID: w.lastPatchID,
Replicas: w.lastPatch,
PatchID: patchID,
},
},
)
@@ -210,14 +191,13 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
return 0, err
}
w.logger.Info("Compare", "original", string(original), "patch", string(patch))
mergePatch, err := jsonpatch.CreateMergePatch(original, patch)
if err != nil {
return 0, fmt.Errorf("failed to create merge patch json for ephemeral runner set: %w", err)
}
w.logger.Info("Created merge patch json for EphemeralRunnerSet update", "json", string(mergePatch))
w.logger.Info("Scaling ephemeral runner set", logValues...)
w.logger.Info("Preparing EphemeralRunnerSet update", "json", string(mergePatch))
patchedEphemeralRunnerSet := &v1alpha1.EphemeralRunnerSet{}
err = w.clientset.RESTClient().
@@ -238,5 +218,40 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
"name", w.config.EphemeralRunnerSetName,
"replicas", patchedEphemeralRunnerSet.Spec.Replicas,
)
return targetRunnerCount, nil
return w.lastPatch, nil
}
// calculateDesiredState calculates the desired state of the worker based on the desired count and the the number of jobs completed.
func (w *Worker) setDesiredWorkerState(count, jobsCompleted int) int {
// Max runners should always be set by the resource builder either to the configured value,
// or the maximum int32 (resourcebuilder.newAutoScalingListener()).
targetRunnerCount := min(w.config.MinRunners+count, w.config.MaxRunners)
w.patchSeq++
desiredPatchID := w.patchSeq
if count == 0 && jobsCompleted == 0 { // empty batch
targetRunnerCount = max(w.lastPatch, targetRunnerCount)
if targetRunnerCount == w.config.MinRunners {
// We have an empty batch, and the last patch was the min runners.
// Since this is an empty batch, and we are at the min runners, they should all be idle.
// If controller created few more pods on accident (during scale down events),
// this situation allows the controller to scale down to the min runners.
// However, it is important to keep the patch sequence increasing so we don't ignore one batch.
desiredPatchID = 0
}
}
w.lastPatch = targetRunnerCount
w.logger.Info(
"Calculated target runner count",
"assigned job", count,
"decision", targetRunnerCount,
"min", w.config.MinRunners,
"max", w.config.MaxRunners,
"currentRunnerCount", w.lastPatch,
"jobsCompleted", jobsCompleted,
)
return desiredPatchID
}

View File

@@ -0,0 +1,326 @@
package worker
import (
"math"
"testing"
"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
)
func TestSetDesiredWorkerState_MinMaxDefaults(t *testing.T) {
logger := logr.Discard()
newEmptyWorker := func() *Worker {
return &Worker{
config: Config{
MinRunners: 0,
MaxRunners: math.MaxInt32,
},
lastPatch: -1,
patchSeq: -1,
logger: &logger,
}
}
t.Run("init calculate with acquired 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
assert.Equal(t, 0, patchID)
})
t.Run("init calculate with acquired 1", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 0)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
assert.Equal(t, 0, patchID)
})
t.Run("increment patch when job done", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("increment patch when called with same parameters", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(1, 0)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("calculate desired scale when acquired > 0 and completed > 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 1)
assert.Equal(t, 0, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("re-use the last state when acquired == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("adjust when acquired == 0 and completed == 1", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 1)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
}
func TestSetDesiredWorkerState_MinSet(t *testing.T) {
logger := logr.Discard()
newEmptyWorker := func() *Worker {
return &Worker{
config: Config{
MinRunners: 1,
MaxRunners: math.MaxInt32,
},
lastPatch: -1,
patchSeq: -1,
logger: &logger,
}
}
t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 1, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("request back to 0 on job done", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("desired patch is 0 but sequence continues on empty batch and min runners", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(3, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 4, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
patchID = w.setDesiredWorkerState(0, 3)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
// Empty batch on min runners
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID) // forcing the state
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 2, w.patchSeq)
})
}
func TestSetDesiredWorkerState_MaxSet(t *testing.T) {
logger := logr.Discard()
newEmptyWorker := func() *Worker {
return &Worker{
config: Config{
MinRunners: 0,
MaxRunners: 5,
},
lastPatch: -1,
patchSeq: -1,
logger: &logger,
}
}
t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 1, patchID)
assert.Equal(t, 2, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("request back to 0 on job done", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("scale up to max when count > max", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(6, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 5, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("scale to max when count == max", func(t *testing.T) {
w := newEmptyWorker()
w.setDesiredWorkerState(5, 0)
assert.Equal(t, 5, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("scale to max when count > max and completed > 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(1, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(6, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 5, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("scale back to 0 when count was > max", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(6, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("force 0 on empty batch and last patch == min runners", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(3, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
patchID = w.setDesiredWorkerState(0, 3)
assert.Equal(t, 1, patchID)
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
// Empty batch on min runners
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID) // forcing the state
assert.Equal(t, 0, w.lastPatch)
assert.Equal(t, 2, w.patchSeq)
})
}
func TestSetDesiredWorkerState_MinMaxSet(t *testing.T) {
logger := logr.Discard()
newEmptyWorker := func() *Worker {
return &Worker{
config: Config{
MinRunners: 1,
MaxRunners: 3,
},
lastPatch: -1,
patchSeq: -1,
logger: &logger,
}
}
t.Run("initial scale when acquired == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("re-use the old state on count == 0 and completed == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 1, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("scale to min when count == 0", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(2, 0)
assert.Equal(t, 0, patchID)
patchID = w.setDesiredWorkerState(0, 1)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
})
t.Run("scale up to max when count > max", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(4, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("scale to max when count == max", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(3, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
})
t.Run("force 0 on empty batch and last patch == min runners", func(t *testing.T) {
w := newEmptyWorker()
patchID := w.setDesiredWorkerState(3, 0)
assert.Equal(t, 0, patchID)
assert.Equal(t, 3, w.lastPatch)
assert.Equal(t, 0, w.patchSeq)
patchID = w.setDesiredWorkerState(0, 3)
assert.Equal(t, 1, patchID)
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 1, w.patchSeq)
// Empty batch on min runners
patchID = w.setDesiredWorkerState(0, 0)
assert.Equal(t, 0, patchID) // forcing the state
assert.Equal(t, 1, w.lastPatch)
assert.Equal(t, 2, w.patchSeq)
})
}

View File

@@ -213,9 +213,6 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R
// on the next batch
case ephemeralRunnerSet.Spec.PatchID == 0 && total > ephemeralRunnerSet.Spec.Replicas:
count := total - ephemeralRunnerSet.Spec.Replicas
if count <= 0 {
break
}
log.Info("Deleting ephemeral runners (scale down)", "count", count)
if err := r.deleteIdleEphemeralRunners(
ctx,