mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-11 03:57:01 +00:00
Update unconsumed HRA capacity reservation's expiration more frequently and consistently (#2502)
Co-authored-by: Yusuke Kuoka <ykuoka@gmail.com>
This commit is contained in:
@@ -44,8 +44,8 @@ type scaleOperation struct {
|
||||
|
||||
// Add the scale target to the unbounded queue, blocking until the target is successfully added to the queue.
|
||||
// All the targets in the queue are dequeued every 3 seconds, grouped by the HRA, and applied.
|
||||
// In a happy path, batchScaler update each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval,
|
||||
// which results in less K8s API calls and less HRA update conflicts in case your ARC installation receives a lot of webhook events
|
||||
// In a happy path, batchScaler updates each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval,
|
||||
// which results in fewer K8s API calls and fewer HRA update conflicts in case your ARC installation receives a lot of webhook events
|
||||
func (s *batchScaler) Add(st *ScaleTarget) {
|
||||
if st == nil {
|
||||
return
|
||||
@@ -142,87 +142,130 @@ func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation)
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
copy, err := s.planBatchScale(ctx, batch, &hra, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Client.Patch(ctx, copy, client.MergeFrom(&hra)); err != nil {
|
||||
return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *batchScaler) planBatchScale(ctx context.Context, batch batchScaleOperation, hra *v1alpha1.HorizontalRunnerAutoscaler, now time.Time) (*v1alpha1.HorizontalRunnerAutoscaler, error) {
|
||||
copy := hra.DeepCopy()
|
||||
|
||||
if hra.Spec.MaxReplicas != nil && len(copy.Spec.CapacityReservations) > *copy.Spec.MaxReplicas {
|
||||
// We have more reservations than MaxReplicas, meaning that we previously
|
||||
// could not scale up to meet a capacity demand because we had hit MaxReplicas.
|
||||
// Therefore, there are reservations that are starved for capacity. We extend the
|
||||
// expiration time on these starved reservations because the "duration" is meant
|
||||
// to apply to reservations that have launched replicas, not replicas in the backlog.
|
||||
// Of course, if MaxReplicas is nil, then there is no max to hit, and we do not need this adjustment.
|
||||
// See https://github.com/actions/actions-runner-controller/issues/2254 for more context.
|
||||
|
||||
// Extend the expiration time of all the reservations not yet assigned to replicas.
|
||||
//
|
||||
// Note that we assume that the two scenarios equivalent here.
|
||||
// The first case is where the number of reservations become greater than MaxReplicas.
|
||||
// The second case is where MaxReplicas become greater than the number of reservations equivalent.
|
||||
// Presuming the HRA.spec.scaleTriggers[].duration as "the duration until the reservation expires after a corresponding runner was deployed",
|
||||
// it's correct.
|
||||
//
|
||||
// In other words, we settle on a capacity reservation's ExpirationTime only after the corresponding runner is "about to be" deployed.
|
||||
// It's "about to be deployed" not "deployed" because we have no way to correlate a capacity reservation and the runner;
|
||||
// the best we can do here is to simulate the desired behavior by reading MaxReplicas and assuming it will be equal to the number of active runners soon.
|
||||
//
|
||||
// Perhaps we could use RunnerDeployment.Status.Replicas or RunnerSet.Status.Replicas instead of the MaxReplicas as a better source of "the number of active runners".
|
||||
// However, note that the status is not guaranteed to be up-to-date.
|
||||
// It might not be that easy to decide which is better to use.
|
||||
for i := *hra.Spec.MaxReplicas; i < len(copy.Spec.CapacityReservations); i++ {
|
||||
// Let's say maxReplicas=3 and the workflow job of status=completed result in deleting the first capacity reservation
|
||||
// copy.Spec.CapacityReservations[i] where i=0.
|
||||
// We are interested in at least four reservations and runners:
|
||||
// i=0 - already included in the current desired replicas, but may be about to be deleted
|
||||
// i=1-2 - already included in the current desired replicas
|
||||
// i=3 - not yet included in the current desired replicas, might have been expired while waiting in the queue
|
||||
//
|
||||
// i=3 is especially important here- If we didn't reset the expiration time of this reservation,
|
||||
// it might expire before it is assigned to a runner, due to the delay between the time the
|
||||
// expiration timer starts and the time a runner becomes available.
|
||||
//
|
||||
// Why is there such delay? Because ARC implements the scale duration and expiration as such.
|
||||
// The expiration timer starts when the reservation is created, while the runner is created only after
|
||||
// the corresponding reservation fits within maxReplicas.
|
||||
//
|
||||
// We address that, by resetting the expiration time for fourth(i=3 in the above example)
|
||||
// and subsequent reservations whenever a batch is run (which is when expired reservations get deleted).
|
||||
|
||||
// There is no guarantee that all the reservations have the same duration, and even if there were,
|
||||
// at this point we have lost the reference to the duration that was intended.
|
||||
// However, we can compute the intended duration from the existing interval.
|
||||
//
|
||||
// In other words, updating HRA.spec.scaleTriggers[].duration does not result in delaying capacity reservations expiration any longer
|
||||
// than the "intended" duration, which is the duration of the trigger when the reservation was created.
|
||||
duration := copy.Spec.CapacityReservations[i].ExpirationTime.Time.Sub(copy.Spec.CapacityReservations[i].EffectiveTime.Time)
|
||||
copy.Spec.CapacityReservations[i].EffectiveTime = metav1.Time{Time: now}
|
||||
copy.Spec.CapacityReservations[i].ExpirationTime = metav1.Time{Time: now.Add(duration)}
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can filter out any expired reservations from consideration.
|
||||
// This could leave us with 0 reservations left.
|
||||
copy.Spec.CapacityReservations = getValidCapacityReservations(copy)
|
||||
before := len(hra.Spec.CapacityReservations)
|
||||
expired := before - len(copy.Spec.CapacityReservations)
|
||||
|
||||
var added, completed int
|
||||
|
||||
for _, scale := range batch.scaleOps {
|
||||
amount := 1
|
||||
amount := scale.trigger.Amount
|
||||
|
||||
if scale.trigger.Amount != 0 {
|
||||
amount = scale.trigger.Amount
|
||||
}
|
||||
|
||||
scale.log.V(2).Info("Adding capacity reservation", "amount", amount)
|
||||
|
||||
now := time.Now()
|
||||
// We do not track if a webhook-based scale-down event matches an expired capacity reservation
|
||||
// or a job for which the scale-up event was never received. This means that scale-down
|
||||
// events could drive capacity reservations into the negative numbers if we let it.
|
||||
// We ensure capacity never falls below zero, but that also means that the
|
||||
// final number of capacity reservations depends on the order in which events come in.
|
||||
// If capacity is at zero and we get a scale-down followed by a scale-up,
|
||||
// the scale-down will be ignored and we will end up with a desired capacity of 1.
|
||||
// However, if we get the scale-up first, the scale-down will drive desired capacity back to zero.
|
||||
// This could be fixed by matching events' `workflow_job.run_id` with capacity reservations,
|
||||
// but that would be a lot of work. So for now we allow for some slop, and hope that
|
||||
// GitHub provides a better autoscaling solution soon.
|
||||
if amount > 0 {
|
||||
copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{
|
||||
EffectiveTime: metav1.Time{Time: now},
|
||||
ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)},
|
||||
Replicas: amount,
|
||||
})
|
||||
scale.log.V(2).Info("Adding capacity reservation", "amount", amount)
|
||||
|
||||
// Parts of this function require that Spec.CapacityReservations.Replicas always equals 1.
|
||||
// Enforce that rule no matter what the `amount` value is
|
||||
for i := 0; i < amount; i++ {
|
||||
copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{
|
||||
EffectiveTime: metav1.Time{Time: now},
|
||||
ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)},
|
||||
Replicas: 1,
|
||||
})
|
||||
}
|
||||
added += amount
|
||||
} else if amount < 0 {
|
||||
var reservations []v1alpha1.CapacityReservation
|
||||
scale.log.V(2).Info("Removing capacity reservation", "amount", -amount)
|
||||
|
||||
var (
|
||||
found bool
|
||||
foundIdx int
|
||||
)
|
||||
|
||||
for i, r := range copy.Spec.CapacityReservations {
|
||||
r := r
|
||||
if !found && r.Replicas+amount == 0 {
|
||||
found = true
|
||||
foundIdx = i
|
||||
} else {
|
||||
// Note that we nil-check max replicas because this "fix" is needed only when there is the upper limit of runners.
|
||||
// In other words, you don't need to reset effective time and expiration time when there is no max replicas.
|
||||
// That's because the desired replicas would already contain the reservation since it's creation.
|
||||
if found && copy.Spec.MaxReplicas != nil && i > foundIdx+*copy.Spec.MaxReplicas {
|
||||
// Update newer CapacityReservations' time to now to trigger reconcile
|
||||
// Without this, we might stuck in minReplicas unnecessarily long.
|
||||
// That is, we might not scale up after an ephemeral runner has been deleted
|
||||
// until a new scale up, all runners finish, or after DefaultRunnerPodRecreationDelayAfterWebhookScale
|
||||
// See https://github.com/actions/actions-runner-controller/issues/2254 for more context.
|
||||
r.EffectiveTime = metav1.Time{Time: now}
|
||||
|
||||
// We also reset the scale trigger expiration time, so that you don't need to tweak
|
||||
// scale trigger duratoin depending on maxReplicas.
|
||||
// A detailed explanation follows.
|
||||
//
|
||||
// Let's say maxReplicas=3 and the workflow job of status=canceled result in deleting the first capacity reservation hence i=0.
|
||||
// We are interested in at least four reservations and runners:
|
||||
// i=0 - already included in the current desired replicas, but just got deleted
|
||||
// i=1-2 - already included in the current desired replicas
|
||||
// i=3 - not yet included in the current desired replicas, might have been expired while waiting in the queue
|
||||
//
|
||||
// i=3 is especially important here- If we didn't reset the expiration time of 3rd reservation,
|
||||
// it might expire before a corresponding runner is created, due to the delay between the expiration timer starts and the runner is created.
|
||||
//
|
||||
// Why is there such delay? Because ARC implements the scale duration and expiration as such...
|
||||
// The expiration timer starts when the reservation is created, while the runner is created only after the corresponding reservation fits within maxReplicas.
|
||||
//
|
||||
// We address that, by resetting the expiration time for fourth(i=3 in the above example) and subsequent reservations when the first reservation gets cancelled.
|
||||
r.ExpirationTime = metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)}
|
||||
}
|
||||
|
||||
reservations = append(reservations, r)
|
||||
}
|
||||
// Remove the requested number of reservations unless there are not that many left
|
||||
if len(copy.Spec.CapacityReservations) > -amount {
|
||||
copy.Spec.CapacityReservations = copy.Spec.CapacityReservations[-amount:]
|
||||
} else {
|
||||
copy.Spec.CapacityReservations = nil
|
||||
}
|
||||
|
||||
copy.Spec.CapacityReservations = reservations
|
||||
|
||||
completed += amount
|
||||
// This "completed" represents the number of completed and therefore removed runners in this batch,
|
||||
// which is logged later.
|
||||
// As the amount is negative for a scale-down trigger, we make the "completed" amount positive by negating the amount.
|
||||
// That way, the user can see the number of removed runners(like 3), rather than the delta (like -3) in the number of runners.
|
||||
completed -= amount
|
||||
}
|
||||
}
|
||||
|
||||
before := len(hra.Spec.CapacityReservations)
|
||||
expired := before - len(copy.Spec.CapacityReservations)
|
||||
after := len(copy.Spec.CapacityReservations)
|
||||
|
||||
s.Log.V(1).Info(
|
||||
@@ -234,9 +277,5 @@ func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation)
|
||||
"after", after,
|
||||
)
|
||||
|
||||
if err := s.Client.Patch(ctx, copy, client.MergeFrom(&hra)); err != nil {
|
||||
return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return copy, nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,166 @@
|
||||
package actionssummerwindnet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/actions/actions-runner-controller/apis/actions.summerwind.net/v1alpha1"
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func TestPlanBatchScale(t *testing.T) {
|
||||
s := &batchScaler{Log: logr.Discard()}
|
||||
|
||||
var (
|
||||
expiry = 10 * time.Second
|
||||
interval = 3 * time.Second
|
||||
|
||||
t0 = time.Now()
|
||||
t1 = t0.Add(interval)
|
||||
t2 = t1.Add(interval)
|
||||
)
|
||||
|
||||
check := func(t *testing.T, amount int, newExpiry time.Duration, wantReservations []v1alpha1.CapacityReservation) {
|
||||
t.Helper()
|
||||
|
||||
var (
|
||||
op = batchScaleOperation{
|
||||
scaleOps: []scaleOperation{
|
||||
{
|
||||
log: logr.Discard(),
|
||||
trigger: v1alpha1.ScaleUpTrigger{
|
||||
Amount: amount,
|
||||
Duration: metav1.Duration{Duration: newExpiry},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
hra = &v1alpha1.HorizontalRunnerAutoscaler{
|
||||
Spec: v1alpha1.HorizontalRunnerAutoscalerSpec{
|
||||
MaxReplicas: intPtr(1),
|
||||
ScaleUpTriggers: []v1alpha1.ScaleUpTrigger{
|
||||
{
|
||||
Amount: 1,
|
||||
Duration: metav1.Duration{Duration: newExpiry},
|
||||
},
|
||||
},
|
||||
CapacityReservations: []v1alpha1.CapacityReservation{
|
||||
{
|
||||
EffectiveTime: metav1.NewTime(t0),
|
||||
ExpirationTime: metav1.NewTime(t0.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
EffectiveTime: metav1.NewTime(t1),
|
||||
ExpirationTime: metav1.NewTime(t1.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
want := hra.DeepCopy()
|
||||
|
||||
want.Spec.CapacityReservations = wantReservations
|
||||
|
||||
got, err := s.planBatchScale(context.Background(), op, hra, t2)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
|
||||
t.Run("scale up", func(t *testing.T) {
|
||||
check(t, 1, expiry, []v1alpha1.CapacityReservation{
|
||||
{
|
||||
// This is kept based on t0 because it falls within maxReplicas
|
||||
// i.e. the corresponding runner has assumbed to be already deployed.
|
||||
EffectiveTime: metav1.NewTime(t0),
|
||||
ExpirationTime: metav1.NewTime(t0.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
// Updated from t1 to t2 due to this exceeded maxReplicas
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
// This is based on t2(=now) because it has been added just now.
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("scale up reuses previous scale trigger duration for extension", func(t *testing.T) {
|
||||
newExpiry := expiry + time.Second
|
||||
check(t, 1, newExpiry, []v1alpha1.CapacityReservation{
|
||||
{
|
||||
// This is kept based on t0 because it falls within maxReplicas
|
||||
// i.e. the corresponding runner has assumbed to be already deployed.
|
||||
EffectiveTime: metav1.NewTime(t0),
|
||||
ExpirationTime: metav1.NewTime(t0.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
// Updated from t1 to t2 due to this exceeded maxReplicas
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
// This is based on t2(=now) because it has been added just now.
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(newExpiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("scale down", func(t *testing.T) {
|
||||
check(t, -1, expiry, []v1alpha1.CapacityReservation{
|
||||
{
|
||||
// Updated from t1 to t2 due to this exceeded maxReplicas
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("scale down is not affected by new scale trigger duration", func(t *testing.T) {
|
||||
check(t, -1, expiry+time.Second, []v1alpha1.CapacityReservation{
|
||||
{
|
||||
// Updated from t1 to t2 due to this exceeded maxReplicas
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
// TODO: Keep refreshing the expiry date even when there are no other scale down/up triggers before the expiration
|
||||
t.Run("extension", func(t *testing.T) {
|
||||
check(t, 0, expiry, []v1alpha1.CapacityReservation{
|
||||
{
|
||||
// This is kept based on t0 because it falls within maxReplicas
|
||||
// i.e. the corresponding runner has assumbed to be already deployed.
|
||||
EffectiveTime: metav1.NewTime(t0),
|
||||
ExpirationTime: metav1.NewTime(t0.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
// Updated from t1 to t2 due to this exceeded maxReplicas
|
||||
EffectiveTime: metav1.NewTime(t2),
|
||||
ExpirationTime: metav1.NewTime(t2.Add(expiry)),
|
||||
Replicas: 1,
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -376,19 +376,24 @@ func TestGetRequest(t *testing.T) {
|
||||
|
||||
func TestGetValidCapacityReservations(t *testing.T) {
|
||||
now := time.Now()
|
||||
duration, _ := time.ParseDuration("10m")
|
||||
effectiveTime := now.Add(-duration)
|
||||
|
||||
hra := &actionsv1alpha1.HorizontalRunnerAutoscaler{
|
||||
Spec: actionsv1alpha1.HorizontalRunnerAutoscalerSpec{
|
||||
CapacityReservations: []actionsv1alpha1.CapacityReservation{
|
||||
{
|
||||
EffectiveTime: metav1.Time{Time: effectiveTime.Add(-time.Second)},
|
||||
ExpirationTime: metav1.Time{Time: now.Add(-time.Second)},
|
||||
Replicas: 1,
|
||||
},
|
||||
{
|
||||
EffectiveTime: metav1.Time{Time: effectiveTime},
|
||||
ExpirationTime: metav1.Time{Time: now},
|
||||
Replicas: 2,
|
||||
},
|
||||
{
|
||||
EffectiveTime: metav1.Time{Time: effectiveTime.Add(time.Second)},
|
||||
ExpirationTime: metav1.Time{Time: now.Add(time.Second)},
|
||||
Replicas: 3,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user