mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-10 11:41:27 +00:00
* Make webhook-based scale operation asynchronous This prevents race condition in the webhook-based autoscaler when it received another webhook event while processing another webhook event and both ended up scaling up the same horizontal runner autoscaler. Ref #1321 * Fix typos * Update rather than Patch HRA to avoid race among webhook-based autoscaler servers * Batch capacity reservation updates for efficient use of apiserver * Fix potential never-ending HRA update conflicts in batch update * Extract batchScaler out of webhook-based autoscaler for testability * Fix log levels and batch scaler hang on start * Correlate webhook event with scale trigger amount in logs * Fix log message
56 lines
1.6 KiB
Go
56 lines
1.6 KiB
Go
package controllers
|
|
|
|
import (
|
|
"context"
|
|
)
|
|
|
|
// worker is a worker that has a non-blocking bounded queue of scale targets, dequeues scale target and executes the scale operation one by one.
|
|
type worker struct {
|
|
scaleTargetQueue chan *ScaleTarget
|
|
work func(*ScaleTarget)
|
|
done chan struct{}
|
|
}
|
|
|
|
func newWorker(ctx context.Context, queueLimit int, work func(*ScaleTarget)) *worker {
|
|
w := &worker{
|
|
scaleTargetQueue: make(chan *ScaleTarget, queueLimit),
|
|
work: work,
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
go func() {
|
|
defer close(w.done)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case t := <-w.scaleTargetQueue:
|
|
work(t)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return w
|
|
}
|
|
|
|
// Add the scale target to the bounded queue, returning the result as a bool value. It returns true on successful enqueue, and returns false otherwise.
|
|
// When returned false, the queue is already full so the enqueue operation must be retried later.
|
|
// If the enqueue was triggered by an external source and there's no intermediate queue that we can use,
|
|
// you must instruct the source to resend the original request later.
|
|
// In case you're building a webhook server around this worker, this means that you must return a http error to the webhook server,
|
|
// so that (hopefully) the sender can resend the webhook event later, or at least the human operator can notice or be notified about the
|
|
// webhook develiery failure so that a manual retry can be done later.
|
|
func (w *worker) Add(st *ScaleTarget) bool {
|
|
select {
|
|
case w.scaleTargetQueue <- st:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *worker) Done() chan struct{} {
|
|
return w.done
|
|
}
|