Add DrainJobsMode (aka UpdateStrategy feature) (#2569)

This commit is contained in:
Bassem Dghaidi
2023-05-23 13:42:30 +02:00
committed by GitHub
parent 032443fcfd
commit 8afef51c8b
9 changed files with 450 additions and 40 deletions

View File

@@ -49,6 +49,24 @@ const (
runnerScaleSetNameAnnotationKey = "runner-scale-set-name"
)
type UpdateStrategy string
// Defines how the controller should handle upgrades while having running jobs.
const (
// "immediate": (default) The controller will immediately apply the change causing the
// recreation of the listener and ephemeral runner set. This can lead to an
// overprovisioning of runners, if there are pending / running jobs. This should not
// be a problem at a small scale, but it could lead to a significant increase of
// resources if you have a lot of jobs running concurrently.
UpdateStrategyImmediate = UpdateStrategy("immediate")
// "eventual": The controller will remove the listener and ephemeral runner set
// immediately, but will not recreate them (to apply changes) until all
// pending / running jobs have completed.
// This can lead to a longer time to apply the change but it will ensure
// that you don't have any overprovisioning of runners.
UpdateStrategyEventual = UpdateStrategy("eventual")
)
// AutoscalingRunnerSetReconciler reconciles a AutoscalingRunnerSet object
type AutoscalingRunnerSetReconciler struct {
client.Client
@@ -57,6 +75,7 @@ type AutoscalingRunnerSetReconciler struct {
ControllerNamespace string
DefaultRunnerScaleSetListenerImage string
DefaultRunnerScaleSetListenerImagePullSecrets []string
UpdateStrategy UpdateStrategy
ActionsClient actions.MultiClient
resourceBuilder resourceBuilder
@@ -218,7 +237,48 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
log.Info("Find existing ephemeral runner set", "name", runnerSet.Name, "specHash", runnerSet.Labels[labelKeyRunnerSpecHash])
}
// Make sure the AutoscalingListener is up and running in the controller namespace
listener := new(v1alpha1.AutoscalingListener)
listenerFound := true
if err := r.Get(ctx, client.ObjectKey{Namespace: r.ControllerNamespace, Name: scaleSetListenerName(autoscalingRunnerSet)}, listener); err != nil {
if !kerrors.IsNotFound(err) {
log.Error(err, "Failed to get AutoscalingListener resource")
return ctrl.Result{}, err
}
listenerFound = false
log.Info("AutoscalingListener does not exist.")
}
// Our listener pod is out of date, so we need to delete it to get a new recreate.
if listenerFound && (listener.Labels[labelKeyRunnerSpecHash] != autoscalingRunnerSet.ListenerSpecHash()) {
log.Info("RunnerScaleSetListener is out of date. Deleting it so that it is recreated", "name", listener.Name)
if err := r.Delete(ctx, listener); err != nil {
if kerrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
log.Error(err, "Failed to delete AutoscalingListener resource")
return ctrl.Result{}, err
}
log.Info("Deleted RunnerScaleSetListener since existing one is out of date")
return ctrl.Result{}, nil
}
if desiredSpecHash != latestRunnerSet.Labels[labelKeyRunnerSpecHash] {
if r.drainingJobs(&latestRunnerSet.Status) {
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners)
log.Info("Scaling down the number of desired replicas to 0")
// We are in the process of draining the jobs. The listener has been deleted and the ephemeral runner set replicas
// need to scale down to 0
err := patch(ctx, r.Client, latestRunnerSet, func(obj *v1alpha1.EphemeralRunnerSet) {
obj.Spec.Replicas = 0
})
if err != nil {
log.Error(err, "Failed to patch runner set to set desired count to 0")
}
return ctrl.Result{}, err
}
log.Info("Latest runner set spec hash does not match the current autoscaling runner set. Creating a new runner set")
return r.createEphemeralRunnerSet(ctx, autoscalingRunnerSet, log)
}
@@ -234,30 +294,13 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
}
// Make sure the AutoscalingListener is up and running in the controller namespace
listener := new(v1alpha1.AutoscalingListener)
if err := r.Get(ctx, client.ObjectKey{Namespace: r.ControllerNamespace, Name: scaleSetListenerName(autoscalingRunnerSet)}, listener); err != nil {
if kerrors.IsNotFound(err) {
// We don't have a listener
log.Info("Creating a new AutoscalingListener for the runner set", "ephemeralRunnerSetName", latestRunnerSet.Name)
return r.createAutoScalingListenerForRunnerSet(ctx, autoscalingRunnerSet, latestRunnerSet, log)
if !listenerFound {
if r.drainingJobs(&latestRunnerSet.Status) {
log.Info("Creating a new AutoscalingListener is waiting for the running and pending runners to finish. Waiting for the running and pending runners to finish:", "running", latestRunnerSet.Status.RunningEphemeralRunners, "pending", latestRunnerSet.Status.PendingEphemeralRunners)
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get AutoscalingListener resource")
return ctrl.Result{}, err
}
// Our listener pod is out of date, so we need to delete it to get a new recreate.
if listener.Labels[labelKeyRunnerSpecHash] != autoscalingRunnerSet.ListenerSpecHash() {
log.Info("RunnerScaleSetListener is out of date. Deleting it so that it is recreated", "name", listener.Name)
if err := r.Delete(ctx, listener); err != nil {
if kerrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
log.Error(err, "Failed to delete AutoscalingListener resource")
return ctrl.Result{}, err
}
log.Info("Deleted RunnerScaleSetListener since existing one is out of date")
return ctrl.Result{}, nil
log.Info("Creating a new AutoscalingListener for the runner set", "ephemeralRunnerSetName", latestRunnerSet.Name)
return r.createAutoScalingListenerForRunnerSet(ctx, autoscalingRunnerSet, latestRunnerSet, log)
}
// Update the status of autoscaling runner set.
@@ -276,6 +319,16 @@ func (r *AutoscalingRunnerSetReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, nil
}
// Prevents overprovisioning of runners.
// We reach this code path when runner scale set has been patched with a new runner spec but there are still running ephemeral runners.
// The safest approach is to wait for the running ephemeral runners to finish before creating a new runner set.
func (r *AutoscalingRunnerSetReconciler) drainingJobs(latestRunnerSetStatus *v1alpha1.EphemeralRunnerSetStatus) bool {
if r.UpdateStrategy == UpdateStrategyEventual && ((latestRunnerSetStatus.RunningEphemeralRunners + latestRunnerSetStatus.PendingEphemeralRunners) > 0) {
return true
}
return false
}
func (r *AutoscalingRunnerSetReconciler) cleanupListener(ctx context.Context, autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet, logger logr.Logger) (done bool, err error) {
logger.Info("Cleaning up the listener")
var listener v1alpha1.AutoscalingListener

View File

@@ -42,6 +42,7 @@ const (
var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
var ctx context.Context
var mgr ctrl.Manager
var controller *AutoscalingRunnerSetReconciler
var autoscalingNS *corev1.Namespace
var autoscalingRunnerSet *v1alpha1.AutoscalingRunnerSet
var configSecret *corev1.Secret
@@ -63,7 +64,7 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
autoscalingNS, mgr = createNamespace(GinkgoT(), k8sClient)
configSecret = createDefaultSecret(GinkgoT(), k8sClient, autoscalingNS.Name)
controller := &AutoscalingRunnerSetReconciler{
controller = &AutoscalingRunnerSetReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: logf.Log,
@@ -424,6 +425,110 @@ var _ = Describe("Test AutoScalingRunnerSet controller", Ordered, func() {
})
})
Context("When updating an AutoscalingRunnerSet with running or pending jobs", func() {
It("It should wait for running and pending jobs to finish before applying the update. Update Strategy is set to eventual.", func() {
// Switch update strategy to eventual (drain jobs )
controller.UpdateStrategy = UpdateStrategyEventual
// Wait till the listener is created
listener := new(v1alpha1.AutoscalingListener)
Eventually(
func() error {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener)
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(Succeed(), "Listener should be created")
// Wait till the ephemeral runner set is created
Eventually(
func() (int, error) {
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
if err != nil {
return 0, err
}
return len(runnerSetList.Items), nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(1), "Only one EphemeralRunnerSet should be created")
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
Expect(err).NotTo(HaveOccurred(), "failed to list EphemeralRunnerSet")
// Emulate running and pending jobs
runnerSet := runnerSetList.Items[0]
activeRunnerSet := runnerSet.DeepCopy()
activeRunnerSet.Status.CurrentReplicas = 6
activeRunnerSet.Status.FailedEphemeralRunners = 1
activeRunnerSet.Status.RunningEphemeralRunners = 2
activeRunnerSet.Status.PendingEphemeralRunners = 3
desiredStatus := v1alpha1.AutoscalingRunnerSetStatus{
CurrentRunners: activeRunnerSet.Status.CurrentReplicas,
State: "",
PendingEphemeralRunners: activeRunnerSet.Status.PendingEphemeralRunners,
RunningEphemeralRunners: activeRunnerSet.Status.RunningEphemeralRunners,
FailedEphemeralRunners: activeRunnerSet.Status.FailedEphemeralRunners,
}
err = k8sClient.Status().Patch(ctx, activeRunnerSet, client.MergeFrom(&runnerSet))
Expect(err).NotTo(HaveOccurred(), "Failed to patch runner set status")
Eventually(
func() (v1alpha1.AutoscalingRunnerSetStatus, error) {
updated := new(v1alpha1.AutoscalingRunnerSet)
err := k8sClient.Get(ctx, client.ObjectKey{Name: autoscalingRunnerSet.Name, Namespace: autoscalingRunnerSet.Namespace}, updated)
if err != nil {
return v1alpha1.AutoscalingRunnerSetStatus{}, fmt.Errorf("failed to get AutoScalingRunnerSet: %w", err)
}
return updated.Status, nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeEquivalentTo(desiredStatus), "AutoScalingRunnerSet status should be updated")
// Patch the AutoScalingRunnerSet image which should trigger
// the recreation of the Listener and EphemeralRunnerSet
patched := autoscalingRunnerSet.DeepCopy()
patched.Spec.Template.Spec = corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "runner",
Image: "ghcr.io/actions/abcd:1.1.1",
},
},
}
// patched.Spec.Template.Spec.PriorityClassName = "test-priority-class"
err = k8sClient.Patch(ctx, patched, client.MergeFrom(autoscalingRunnerSet))
Expect(err).NotTo(HaveOccurred(), "failed to patch AutoScalingRunnerSet")
autoscalingRunnerSet = patched.DeepCopy()
// The EphemeralRunnerSet should not be recreated
Consistently(
func() (string, error) {
runnerSetList := new(v1alpha1.EphemeralRunnerSetList)
err := k8sClient.List(ctx, runnerSetList, client.InNamespace(autoscalingRunnerSet.Namespace))
Expect(err).NotTo(HaveOccurred(), "failed to fetch AutoScalingRunnerSet")
return runnerSetList.Items[0].Name, nil
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(Equal(activeRunnerSet.Name), "The EphemeralRunnerSet should not be recreated")
// The listener should not be recreated
Consistently(
func() error {
return k8sClient.Get(ctx, client.ObjectKey{Name: scaleSetListenerName(autoscalingRunnerSet), Namespace: autoscalingRunnerSet.Namespace}, listener)
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).ShouldNot(Succeed(), "Listener should not be recreated")
})
})
It("Should update Status on EphemeralRunnerSet status Update", func() {
ars := new(v1alpha1.AutoscalingRunnerSet)
Eventually(
@@ -1617,10 +1722,14 @@ var _ = Describe("Test resource version and build version mismatch", func() {
startManagers(GinkgoT(), mgr)
Eventually(func() bool {
ars := new(v1alpha1.AutoscalingRunnerSet)
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: autoscalingRunnerSet.Namespace, Name: autoscalingRunnerSet.Name}, ars)
return errors.IsNotFound(err)
}).Should(BeTrue())
Eventually(
func() bool {
ars := new(v1alpha1.AutoscalingRunnerSet)
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: autoscalingRunnerSet.Namespace, Name: autoscalingRunnerSet.Name}, ars)
return errors.IsNotFound(err)
},
autoscalingRunnerSetTestTimeout,
autoscalingRunnerSetTestInterval,
).Should(BeTrue())
})
})