Files
actions-runner-controller/controllers/horizontal_runner_autoscaler_batch_scale.go
Cory Miller c91e76f169 Add golangci-lilnt to CI (#1794)
This introduces a linter to PRs to help with code reviews and code hygiene. I've also gone ahead and fixed (or ignored) the existing lints.

I've only setup the default linters right now. There are many more options that are documented at https://golangci-lint.run/.

The GitHub Action should add appropriate annotations to the lint job for the PR. Contributors can also lint locally using `make lint`.
2022-09-21 09:08:22 +09:00

207 lines
4.9 KiB
Go

package controllers
import (
"context"
"fmt"
"sync"
"time"
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type batchScaler struct {
Ctx context.Context
Client client.Client
Log logr.Logger
interval time.Duration
queue chan *ScaleTarget
workerStart sync.Once
}
func newBatchScaler(ctx context.Context, client client.Client, log logr.Logger) *batchScaler {
return &batchScaler{
Ctx: ctx,
Client: client,
Log: log,
interval: 3 * time.Second,
}
}
type batchScaleOperation struct {
namespacedName types.NamespacedName
scaleOps []scaleOperation
}
type scaleOperation struct {
trigger v1alpha1.ScaleUpTrigger
log logr.Logger
}
// Add the scale target to the unbounded queue, blocking until the target is successfully added to the queue.
// All the targets in the queue are dequeued every 3 seconds, grouped by the HRA, and applied.
// In a happy path, batchScaler update each HRA only once, even though the HRA had two or more associated webhook events in the 3 seconds interval,
// which results in less K8s API calls and less HRA update conflicts in case your ARC installation receives a lot of webhook events
func (s *batchScaler) Add(st *ScaleTarget) {
if st == nil {
return
}
s.workerStart.Do(func() {
var expBackoff = []time.Duration{time.Second, 2 * time.Second, 4 * time.Second, 8 * time.Second, 16 * time.Second}
s.queue = make(chan *ScaleTarget)
log := s.Log
go func() {
log.Info("Starting batch worker")
defer log.Info("Stopped batch worker")
for {
select {
case <-s.Ctx.Done():
return
default:
}
log.V(2).Info("Batch worker is dequeueing operations")
batches := map[types.NamespacedName]batchScaleOperation{}
after := time.After(s.interval)
var ops uint
batch:
for {
select {
case <-after:
break batch
case st := <-s.queue:
nsName := types.NamespacedName{
Namespace: st.HorizontalRunnerAutoscaler.Namespace,
Name: st.HorizontalRunnerAutoscaler.Name,
}
b, ok := batches[nsName]
if !ok {
b = batchScaleOperation{
namespacedName: nsName,
}
}
b.scaleOps = append(b.scaleOps, scaleOperation{
log: *st.log,
trigger: st.ScaleUpTrigger,
})
batches[nsName] = b
ops++
}
}
log.V(2).Info("Batch worker dequeued operations", "ops", ops, "batches", len(batches))
retry:
for i := 0; ; i++ {
failed := map[types.NamespacedName]batchScaleOperation{}
for nsName, b := range batches {
b := b
if err := s.batchScale(context.Background(), b); err != nil {
log.V(2).Info("Failed to scale due to error", "error", err)
failed[nsName] = b
} else {
log.V(2).Info("Successfully ran batch scale", "hra", b.namespacedName)
}
}
if len(failed) == 0 {
break retry
}
batches = failed
delay := 16 * time.Second
if i < len(expBackoff) {
delay = expBackoff[i]
}
time.Sleep(delay)
}
}
}()
})
s.queue <- st
}
func (s *batchScaler) batchScale(ctx context.Context, batch batchScaleOperation) error {
var hra v1alpha1.HorizontalRunnerAutoscaler
if err := s.Client.Get(ctx, batch.namespacedName, &hra); err != nil {
return err
}
copy := hra.DeepCopy()
copy.Spec.CapacityReservations = getValidCapacityReservations(copy)
var added, completed int
for _, scale := range batch.scaleOps {
amount := 1
if scale.trigger.Amount != 0 {
amount = scale.trigger.Amount
}
scale.log.V(2).Info("Adding capacity reservation", "amount", amount)
if amount > 0 {
now := time.Now()
copy.Spec.CapacityReservations = append(copy.Spec.CapacityReservations, v1alpha1.CapacityReservation{
EffectiveTime: metav1.Time{Time: now},
ExpirationTime: metav1.Time{Time: now.Add(scale.trigger.Duration.Duration)},
Replicas: amount,
})
added += amount
} else if amount < 0 {
var reservations []v1alpha1.CapacityReservation
var found bool
for _, r := range copy.Spec.CapacityReservations {
if !found && r.Replicas+amount == 0 {
found = true
} else {
reservations = append(reservations, r)
}
}
copy.Spec.CapacityReservations = reservations
completed += amount
}
}
before := len(hra.Spec.CapacityReservations)
expired := before - len(copy.Spec.CapacityReservations)
after := len(copy.Spec.CapacityReservations)
s.Log.V(1).Info(
fmt.Sprintf("Updating hra %s for capacityReservations update", hra.Name),
"before", before,
"expired", expired,
"added", added,
"completed", completed,
"after", after,
)
if err := s.Client.Update(ctx, copy); err != nil {
return fmt.Errorf("updating horizontalrunnerautoscaler to add capacity reservation: %w", err)
}
return nil
}