Include self correction on empty batch and avoid removing pending runners when cluster is busy (#3426)

This commit is contained in:
Nikola Jokic
2024-04-16 12:55:25 +02:00
committed by GitHub
parent 98854ef9c0
commit 963ae48a3f
5 changed files with 566 additions and 228 deletions

View File

@@ -164,6 +164,11 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) error {
} }
if msg == nil { if msg == nil {
_, err := handler.HandleDesiredRunnerCount(ctx, 0, 0)
if err != nil {
return fmt.Errorf("handling nil message failed: %w", err)
}
continue continue
} }

View File

@@ -177,12 +177,12 @@ func (w *Worker) HandleDesiredRunnerCount(ctx context.Context, count int, jobsCo
"jobsCompleted", jobsCompleted, "jobsCompleted", jobsCompleted,
} }
if w.lastPatch == targetRunnerCount && jobsCompleted == 0 { if count == 0 && jobsCompleted == 0 {
w.logger.Info("Skipping patch", logValues...) w.lastPatchID = 0
return targetRunnerCount, nil } else {
w.lastPatchID++
} }
w.lastPatchID++
w.lastPatch = targetRunnerCount w.lastPatch = targetRunnerCount
original, err := json.Marshal( original, err := json.Marshal(

View File

@@ -277,6 +277,7 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
// need to scale down to 0 // need to scale down to 0
err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) { err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) {
obj.Spec.Replicas = 0 obj.Spec.Replicas = 0
obj.Spec.PatchID = 0
}) })
if err != nil { if err != nil {
log.Error(err, "Failed to patch runner set to set desired count to 0") log.Error(err, "Failed to patch runner set to set desired count to 0")

View File

@@ -197,7 +197,6 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R
log.Error(err, "failed to cleanup finished ephemeral runners") log.Error(err, "failed to cleanup finished ephemeral runners")
} }
}() }()
log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas) log.Info("Scaling comparison", "current", total, "desired", ephemeralRunnerSet.Spec.Replicas)
switch { switch {
case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up case total < ephemeralRunnerSet.Spec.Replicas: // Handle scale up
@@ -208,8 +207,16 @@ func (r *EphemeralRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{}, err return ctrl.Result{}, err
} }
case total > ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario. case ephemeralRunnerSet.Spec.PatchID > 0 && total >= ephemeralRunnerSet.Spec.Replicas: // Handle scale down scenario.
// If ephemeral runner did not yet update the phase to succeeded, but the scale down
// request is issued, we should ignore the scale down request.
// Eventually, the ephemeral runner will be cleaned up on the next patch request, which happens
// on the next batch
case ephemeralRunnerSet.Spec.PatchID == 0 && total > ephemeralRunnerSet.Spec.Replicas:
count := total - ephemeralRunnerSet.Spec.Replicas count := total - ephemeralRunnerSet.Spec.Replicas
if count <= 0 {
break
}
log.Info("Deleting ephemeral runners (scale down)", "count", count) log.Info("Deleting ephemeral runners (scale down)", "count", count)
if err := r.deleteIdleEphemeralRunners( if err := r.deleteIdleEphemeralRunners(
ctx, ctx,
@@ -428,6 +435,9 @@ func (r *EphemeralRunnerSetReconciler) createProxySecret(ctx context.Context, ep
// When this happens, the next reconcile loop will try to delete the remaining ephemeral runners // When this happens, the next reconcile loop will try to delete the remaining ephemeral runners
// after we get notified by any of the `v1alpha1.EphemeralRunner.Status` updates. // after we get notified by any of the `v1alpha1.EphemeralRunner.Status` updates.
func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners []*v1alpha1.EphemeralRunner, count int, log logr.Logger) error { func (r *EphemeralRunnerSetReconciler) deleteIdleEphemeralRunners(ctx context.Context, ephemeralRunnerSet *v1alpha1.EphemeralRunnerSet, pendingEphemeralRunners, runningEphemeralRunners []*v1alpha1.EphemeralRunner, count int, log logr.Logger) error {
if count <= 0 {
return nil
}
runners := newEphemeralRunnerStepper(pendingEphemeralRunners, runningEphemeralRunners) runners := newEphemeralRunnerStepper(pendingEphemeralRunners, runningEphemeralRunners)
if runners.len() == 0 { if runners.len() == 0 {
log.Info("No pending or running ephemeral runners running at this time for scale down") log.Info("No pending or running ephemeral runners running at this time for scale down")

View File

@@ -4,7 +4,6 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/base64" "encoding/base64"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@@ -275,21 +274,18 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
}) })
Context("When a new EphemeralRunnerSet scale up and down", func() { Context("When a new EphemeralRunnerSet scale up and down", func() {
It("Should scale only on patch ID change", func() { It("Should scale up with patch ID 0", func() {
created := new(actionsv1alpha1.EphemeralRunnerSet) ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, created) err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
patchID := 1 updated := ers.DeepCopy()
// Scale up the EphemeralRunnerSet
updated := created.DeepCopy()
updated.Spec.Replicas = 5 updated.Spec.Replicas = 5
updated.Spec.PatchID = patchID updated.Spec.PatchID = 0
err = k8sClient.Update(ctx, updated)
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
// Wait for the EphemeralRunnerSet to be scaled up
runnerList := new(actionsv1alpha1.EphemeralRunnerList) runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually( Eventually(
func() (int, error) { func() (int, error) {
@@ -298,110 +294,282 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return -1, err return -1, err
} }
// Set status to simulate a configured EphemeralRunner
refetch := false
for i, runner := range runnerList.Items {
if runner.Status.RunnerId == 0 {
updatedRunner := runner.DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
updatedRunner.Status.RunnerId = i + 100
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
refetch = true
}
}
if refetch {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
}
return len(runnerList.Items), nil return len(runnerList.Items), nil
}, },
ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval, ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created") ).Should(BeEquivalentTo(5), "5 EphemeralRunner should be created")
})
// Mark one of the EphemeralRunner as finished It("Should scale up when patch ID changes", func() {
finishedRunner := runnerList.Items[4].DeepCopy() ers := new(actionsv1alpha1.EphemeralRunnerSet)
finishedRunner.Status.Phase = corev1.PodSucceeded err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
err = k8sClient.Status().Patch(ctx, finishedRunner, client.MergeFrom(&runnerList.Items[4])) Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 1
updated.Spec.PatchID = 0
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "1 EphemeralRunner should be created")
ers = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
})
It("Should clean up finished ephemeral runner when scaling down", func() {
ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
updatedRunner := runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodSucceeded
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Wait for the finished EphemeralRunner to be set to succeeded updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Keep the ephemeral runner until the next patch
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "1 EphemeralRunner should be up")
// The listener was slower to patch the completed, but we should still have 1 running
ers = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 1
updated.Spec.PatchID = 2
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "1 Ephemeral runner should be up")
})
It("Should keep finished ephemeral runners until patch id changes", func() {
ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
updatedRunner := runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodSucceeded
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodPending
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// confirm they are not deleted
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Consistently(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
5*time.Second,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
})
It("Should handle double scale up", func() {
ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
updatedRunner := runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodSucceeded
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
ers = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 3
updated.Spec.PatchID = 2
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
// We should have 3 runners, and have no Succeeded ones
Eventually( Eventually(
func() error { func() error {
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil { if err != nil {
return err return err
} }
for _, runner := range runnerList.Items { if len(runnerList.Items) != 3 {
if runner.Name != finishedRunner.Name { return fmt.Errorf("Expected 3 runners, got %d", len(runnerList.Items))
continue
}
if runner.Status.Phase != corev1.PodSucceeded {
return fmt.Errorf("EphemeralRunner is not finished")
}
// found pod succeeded
return nil
}
return errors.New("Finished ephemeral runner is not found")
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(Succeed(), "Finished EphemeralRunner should be deleted")
// After one ephemeral runner is finished, simulate job done patch
patchID++
original := new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = original.DeepCopy()
updated.Spec.PatchID = patchID
updated.Spec.Replicas = 4
err = k8sClient.Patch(ctx, updated, client.MergeFrom(original))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
// Only finished ephemeral runner should be deleted
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
} }
for _, runner := range runnerList.Items { for _, runner := range runnerList.Items {
if runner.Status.Phase == corev1.PodSucceeded { if runner.Status.Phase == corev1.PodSucceeded {
return -1, fmt.Errorf("Finished EphemeralRunner should be deleted") return fmt.Errorf("Runner %s is in Succeeded phase", runner.Name)
} }
} }
return len(runnerList.Items), nil return nil
}, },
ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval, ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(4), "4 EphemeralRunner should be created") ).Should(BeNil(), "3 EphemeralRunner should be created and none should be in Succeeded phase")
})
// Scaling down the EphemeralRunnerSet It("Should handle scale down without removing pending runners", func() {
patchID++ ers := new(actionsv1alpha1.EphemeralRunnerSet)
original = new(actionsv1alpha1.EphemeralRunnerSet) err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = original.DeepCopy()
updated.Spec.PatchID = patchID updated := ers.DeepCopy()
updated.Spec.Replicas = 3 updated.Spec.Replicas = 2
err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
// Wait for the EphemeralRunnerSet to be scaled down runnerList := new(actionsv1alpha1.EphemeralRunnerList)
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually( Eventually(
func() (int, error) { func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
@@ -409,150 +577,215 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return -1, err return -1, err
} }
// Set status to simulate a configured EphemeralRunner
refetch := false
for i, runner := range runnerList.Items {
if runner.Status.RunnerId == 0 {
updatedRunner := runner.DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
updatedRunner.Status.RunnerId = i + 100
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
refetch = true
}
}
if refetch {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(3), "3 EphemeralRunner should be created")
// We will not scale down runner that is running jobs
runningRunner := runnerList.Items[0].DeepCopy()
runningRunner.Status.JobRequestId = 1000
err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
runningRunner = runnerList.Items[1].DeepCopy()
runningRunner.Status.JobRequestId = 1001
err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Scale down to 1 while 2 are running
patchID++
original = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = original.DeepCopy()
updated.Spec.PatchID = patchID
updated.Spec.Replicas = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(original))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
// Wait for the EphemeralRunnerSet to be scaled down to 2 since we still have 2 runner running jobs
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
// Set status to simulate a configured EphemeralRunner
refetch := false
for i, runner := range runnerList.Items {
if runner.Status.RunnerId == 0 {
updatedRunner := runner.DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
updatedRunner.Status.RunnerId = i + 100
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
refetch = true
}
}
if refetch {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
}
return len(runnerList.Items), nil return len(runnerList.Items), nil
}, },
ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval, ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
// We will not scale down failed runner updatedRunner := runnerList.Items[0].DeepCopy()
failedRunner := runnerList.Items[0].DeepCopy() updatedRunner.Status.Phase = corev1.PodSucceeded
failedRunner.Status.Phase = corev1.PodFailed err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
err = k8sClient.Status().Patch(ctx, failedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner") Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodPending
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Wait for these statuses to actually be updated
runnerList = new(actionsv1alpha1.EphemeralRunnerList) runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually( Eventually(
func() (int, error) { func() error {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil { if err != nil {
return -1, err return err
} }
pending := 0
// Set status to simulate a configured EphemeralRunner succeeded := 0
refetch := false for _, runner := range runnerList.Items {
for i, runner := range runnerList.Items { switch runner.Status.Phase {
if runner.Status.RunnerId == 0 { case corev1.PodSucceeded:
updatedRunner := runner.DeepCopy() succeeded++
updatedRunner.Status.Phase = corev1.PodRunning case corev1.PodPending:
updatedRunner.Status.RunnerId = i + 100 pending++
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
refetch = true
} }
} }
if refetch { if pending != 1 && succeeded != 1 {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) return fmt.Errorf("Expected 1 runner in Pending and 1 in Succeeded, got %d in Pending and %d in Succeeded", pending, succeeded)
if err != nil {
return -1, err
}
} }
return len(runnerList.Items), nil return nil
}, },
ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval, ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created") ).Should(BeNil(), "1 EphemeralRunner should be in Pending and 1 in Succeeded phase")
// We will scale down to 0 when the running job is completed and the failed runner is deleted // Scale down to 0, while 1 is still pending. This simulates the difference between the desired and actual state
runningRunner = runnerList.Items[1].DeepCopy() ers = new(actionsv1alpha1.EphemeralRunnerSet)
runningRunner.Status.Phase = corev1.PodSucceeded err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
err = k8sClient.Status().Patch(ctx, runningRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
err = k8sClient.Delete(ctx, &runnerList.Items[0])
Expect(err).NotTo(HaveOccurred(), "failed to delete EphemeralRunner")
// Scale down to 0 while 1 ephemeral runner is failed
patchID++
original = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, original)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = original.DeepCopy()
updated.Spec.PatchID = patchID updated = ers.DeepCopy()
updated.Spec.Replicas = 0 updated.Spec.Replicas = 0
err = k8sClient.Patch(ctx, updated, client.MergeFrom(original)) updated.Spec.PatchID = 2
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet") Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
// Wait for the EphemeralRunnerSet to be scaled down to 0 runnerList = new(actionsv1alpha1.EphemeralRunnerList)
// We should have 1 runner up and pending
Eventually(
func() error {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return err
}
if len(runnerList.Items) != 1 {
return fmt.Errorf("Expected 1 runner, got %d", len(runnerList.Items))
}
if runnerList.Items[0].Status.Phase != corev1.PodPending {
return fmt.Errorf("Expected runner to be in Pending, got %s", runnerList.Items[0].Status.Phase)
}
return nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeNil(), "1 EphemeralRunner should be created and in Pending phase")
// Now, the ephemeral runner finally is done and we can scale down to 0
updatedRunner = runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodSucceeded
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "2 EphemeralRunner should be created")
})
It("Should kill pending and running runners if they are up for some reason and the batch contains no jobs", func() {
ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
// Put one runner in Pending and one in Running
updatedRunner := runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodPending
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Wait for these statuses to actually be updated
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() error {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return err
}
pending := 0
running := 0
for _, runner := range runnerList.Items {
switch runner.Status.Phase {
case corev1.PodPending:
pending++
case corev1.PodRunning:
running++
}
}
if pending != 1 && running != 1 {
return fmt.Errorf("Expected 1 runner in Pending and 1 in Running, got %d in Pending and %d in Running", pending, running)
}
return nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeNil(), "1 EphemeralRunner should be in Pending and 1 in Running phase")
// Scale down to 0 with patch ID 0. This forces the scale down to self correct on empty batch
ers = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 0
updated.Spec.PatchID = 0
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Consistently(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return -1, err
}
return len(runnerList.Items), nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(2), "2 EphemeralRunner should be up since they don't have an ID yet")
// Now, let's say ephemeral runner controller patched these ephemeral runners with the registration.
updatedRunner = runnerList.Items[0].DeepCopy()
updatedRunner.Status.RunnerId = 1
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.RunnerId = 2
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Now, eventually, they should be deleted
runnerList = new(actionsv1alpha1.EphemeralRunnerList) runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually( Eventually(
func() (int, error) { func() (int, error) {
@@ -561,31 +794,120 @@ var _ = Describe("Test EphemeralRunnerSet controller", func() {
return -1, err return -1, err
} }
// Set status to simulate a configured EphemeralRunner return len(runnerList.Items), nil
refetch := false
for i, runner := range runnerList.Items {
if runner.Status.RunnerId == 0 {
updatedRunner := runner.DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
updatedRunner.Status.RunnerId = i + 100
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runner))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
refetch = true
}
}
if refetch { },
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "0 EphemeralRunner should exist")
})
It("Should replace finished ephemeral runners with new ones", func() {
ers := new(actionsv1alpha1.EphemeralRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated := ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 1
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList := new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() (int, error) {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace)) err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil { if err != nil {
return -1, err return -1, err
} }
}
return len(runnerList.Items), nil return len(runnerList.Items), nil
}, },
ephemeralRunnerSetTestTimeout, ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval, ephemeralRunnerSetTestInterval,
).Should(BeEquivalentTo(0), "0 EphemeralRunner should be created") ).Should(BeEquivalentTo(2), "2 EphemeralRunner should be created")
// Put one runner in Succeeded and one in Running
updatedRunner := runnerList.Items[0].DeepCopy()
updatedRunner.Status.Phase = corev1.PodSucceeded
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[0]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
updatedRunner = runnerList.Items[1].DeepCopy()
updatedRunner.Status.Phase = corev1.PodRunning
err = k8sClient.Status().Patch(ctx, updatedRunner, client.MergeFrom(&runnerList.Items[1]))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunner")
// Wait for these statuses to actually be updated
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() error {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return err
}
succeeded := 0
running := 0
for _, runner := range runnerList.Items {
switch runner.Status.Phase {
case corev1.PodSucceeded:
succeeded++
case corev1.PodRunning:
running++
}
}
if succeeded != 1 && running != 1 {
return fmt.Errorf("Expected 1 runner in Succeeded and 1 in Running, got %d in Succeeded and %d in Running", succeeded, running)
}
return nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeNil(), "1 EphemeralRunner should be in Succeeded and 1 in Running phase")
// Now, let's simulate replacement. The desired count is still 2.
// This simulates that we got 1 job assigned, and 1 job completed.
ers = new(actionsv1alpha1.EphemeralRunnerSet)
err = k8sClient.Get(ctx, client.ObjectKey{Name: ephemeralRunnerSet.Name, Namespace: ephemeralRunnerSet.Namespace}, ers)
Expect(err).NotTo(HaveOccurred(), "failed to get EphemeralRunnerSet")
updated = ers.DeepCopy()
updated.Spec.Replicas = 2
updated.Spec.PatchID = 2
err = k8sClient.Patch(ctx, updated, client.MergeFrom(ers))
Expect(err).NotTo(HaveOccurred(), "failed to update EphemeralRunnerSet")
runnerList = new(actionsv1alpha1.EphemeralRunnerList)
Eventually(
func() error {
err := k8sClient.List(ctx, runnerList, client.InNamespace(ephemeralRunnerSet.Namespace))
if err != nil {
return err
}
if len(runnerList.Items) != 2 {
return fmt.Errorf("Expected 2 runners, got %d", len(runnerList.Items))
}
for _, runner := range runnerList.Items {
if runner.Status.Phase == corev1.PodSucceeded {
return fmt.Errorf("Expected no runners in Succeeded phase, got one")
}
}
return nil
},
ephemeralRunnerSetTestTimeout,
ephemeralRunnerSetTestInterval,
).Should(BeNil(), "2 EphemeralRunner should be created and none should be in Succeeded phase")
}) })
It("Should update status on Ephemeral Runner state changes", func() { It("Should update status on Ephemeral Runner state changes", func() {