mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-10 19:50:30 +00:00
This will work on GHES but GitHub Enterprise Cloud due to excessive GitHub API calls required. More work is needed, like adding a cache layer to the GitHub client, to make it usable on GitHub Enterprise Cloud. Fixes additional cases from https://github.com/actions-runner-controller/actions-runner-controller/pull/1012 If GitHub auth is provided in the webhooks controller then runner groups with custom visibility are supported. Otherwise, all runner groups will be assumed to be visible to all repositories `getScaleUpTargetWithFunction()` will check if there is an HRA available with the following flow: 1. Search for **repository** HRAs - if so it ends here 2. Get available HRAs in k8s 3. Compute visible runner groups a. If GitHub auth is provided - get all the runner groups that are visible to the repository of the incoming webhook using GitHub API calls. b. If GitHub auth is not provided - assume all runner groups are visible to all repositories 4. Search for **default organization** runners (a.k.a runners from organization's visible default runner group) with matching labels 5. Search for **default enterprise** runners (a.k.a runners from enterprise's visible default runner group) with matching labels 6. Search for **custom organization runner groups** with matching labels 7. Search for **custom enterprise runner groups** with matching labels Co-authored-by: Yusuke Kuoka <ykuoka@gmail.com>
904 lines
27 KiB
Go
904 lines
27 KiB
Go
/*
|
|
Copyright 2020 The actions-runner-controller authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
"github.com/go-logr/logr"
|
|
gogithub "github.com/google/go-github/v39/github"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/client-go/tools/record"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
"github.com/actions-runner-controller/actions-runner-controller/api/v1alpha1"
|
|
"github.com/actions-runner-controller/actions-runner-controller/github"
|
|
"github.com/actions-runner-controller/actions-runner-controller/simulator"
|
|
)
|
|
|
|
const (
|
|
scaleTargetKey = "scaleTarget"
|
|
|
|
keyPrefixEnterprise = "enterprises/"
|
|
keyRunnerGroup = "/group/"
|
|
)
|
|
|
|
// HorizontalRunnerAutoscalerGitHubWebhook autoscales a HorizontalRunnerAutoscaler and the RunnerDeployment on each
|
|
// GitHub Webhook received
|
|
type HorizontalRunnerAutoscalerGitHubWebhook struct {
|
|
client.Client
|
|
Log logr.Logger
|
|
Recorder record.EventRecorder
|
|
Scheme *runtime.Scheme
|
|
|
|
// SecretKeyBytes is the byte representation of the Webhook secret token
|
|
// the administrator is generated and specified in GitHub Web UI.
|
|
SecretKeyBytes []byte
|
|
|
|
// GitHub Client to discover runner groups assigned to a repository
|
|
GitHubClient *github.Client
|
|
|
|
// Namespace is the namespace to watch for HorizontalRunnerAutoscaler's to be
|
|
// scaled on Webhook.
|
|
// Set to empty for letting it watch for all namespaces.
|
|
Namespace string
|
|
Name string
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// +kubebuilder:rbac:groups=actions.summerwind.dev,resources=horizontalrunnerautoscalers,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=actions.summerwind.dev,resources=horizontalrunnerautoscalers/finalizers,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=actions.summerwind.dev,resources=horizontalrunnerautoscalers/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) Handle(w http.ResponseWriter, r *http.Request) {
|
|
var (
|
|
ok bool
|
|
|
|
err error
|
|
)
|
|
|
|
defer func() {
|
|
if !ok {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
|
|
if err != nil {
|
|
msg := err.Error()
|
|
if written, err := w.Write([]byte(msg)); err != nil {
|
|
autoscaler.Log.Error(err, "failed writing http error response", "msg", msg, "written", written)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
defer func() {
|
|
if r.Body != nil {
|
|
r.Body.Close()
|
|
}
|
|
}()
|
|
|
|
// respond ok to GET / e.g. for health check
|
|
if r.Method == http.MethodGet {
|
|
ok = true
|
|
fmt.Fprintln(w, "webhook server is running")
|
|
return
|
|
}
|
|
|
|
var payload []byte
|
|
|
|
if len(autoscaler.SecretKeyBytes) > 0 {
|
|
payload, err = gogithub.ValidatePayload(r, autoscaler.SecretKeyBytes)
|
|
if err != nil {
|
|
autoscaler.Log.Error(err, "error validating request body")
|
|
|
|
return
|
|
}
|
|
} else {
|
|
payload, err = ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
autoscaler.Log.Error(err, "error reading request body")
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
webhookType := gogithub.WebHookType(r)
|
|
event, err := gogithub.ParseWebHook(webhookType, payload)
|
|
if err != nil {
|
|
var s string
|
|
if payload != nil {
|
|
s = string(payload)
|
|
}
|
|
|
|
autoscaler.Log.Error(err, "could not parse webhook", "webhookType", webhookType, "payload", s)
|
|
|
|
return
|
|
}
|
|
|
|
var target *ScaleTarget
|
|
|
|
log := autoscaler.Log.WithValues(
|
|
"event", webhookType,
|
|
"hookID", r.Header.Get("X-GitHub-Hook-ID"),
|
|
"delivery", r.Header.Get("X-GitHub-Delivery"),
|
|
)
|
|
|
|
var enterpriseEvent struct {
|
|
Enterprise struct {
|
|
Slug string `json:"slug,omitempty"`
|
|
} `json:"enterprise,omitempty"`
|
|
}
|
|
if err := json.Unmarshal(payload, &enterpriseEvent); err != nil {
|
|
var s string
|
|
if payload != nil {
|
|
s = string(payload)
|
|
}
|
|
autoscaler.Log.Error(err, "could not parse webhook payload for extracting enterprise slug", "webhookType", webhookType, "payload", s)
|
|
}
|
|
enterpriseSlug := enterpriseEvent.Enterprise.Slug
|
|
|
|
switch e := event.(type) {
|
|
case *gogithub.PushEvent:
|
|
target, err = autoscaler.getScaleUpTarget(
|
|
context.TODO(),
|
|
log,
|
|
e.Repo.GetName(),
|
|
e.Repo.Owner.GetLogin(),
|
|
e.Repo.Owner.GetType(),
|
|
// Most go-github Event types don't seem to contain Enteprirse(.Slug) fields
|
|
// we need, so we parse it by ourselves.
|
|
enterpriseSlug,
|
|
autoscaler.MatchPushEvent(e),
|
|
)
|
|
case *gogithub.PullRequestEvent:
|
|
target, err = autoscaler.getScaleUpTarget(
|
|
context.TODO(),
|
|
log,
|
|
e.Repo.GetName(),
|
|
e.Repo.Owner.GetLogin(),
|
|
e.Repo.Owner.GetType(),
|
|
// Most go-github Event types don't seem to contain Enteprirse(.Slug) fields
|
|
// we need, so we parse it by ourselves.
|
|
enterpriseSlug,
|
|
autoscaler.MatchPullRequestEvent(e),
|
|
)
|
|
|
|
if pullRequest := e.PullRequest; pullRequest != nil {
|
|
log = log.WithValues(
|
|
"pullRequest.base.ref", e.PullRequest.Base.GetRef(),
|
|
"action", e.GetAction(),
|
|
)
|
|
}
|
|
case *gogithub.CheckRunEvent:
|
|
target, err = autoscaler.getScaleUpTarget(
|
|
context.TODO(),
|
|
log,
|
|
e.Repo.GetName(),
|
|
e.Repo.Owner.GetLogin(),
|
|
e.Repo.Owner.GetType(),
|
|
// Most go-github Event types don't seem to contain Enteprirse(.Slug) fields
|
|
// we need, so we parse it by ourselves.
|
|
enterpriseSlug,
|
|
autoscaler.MatchCheckRunEvent(e),
|
|
)
|
|
|
|
if checkRun := e.GetCheckRun(); checkRun != nil {
|
|
log = log.WithValues(
|
|
"checkRun.status", checkRun.GetStatus(),
|
|
"action", e.GetAction(),
|
|
)
|
|
}
|
|
case *gogithub.WorkflowJobEvent:
|
|
if workflowJob := e.GetWorkflowJob(); workflowJob != nil {
|
|
log = log.WithValues(
|
|
"workflowJob.status", workflowJob.GetStatus(),
|
|
"workflowJob.labels", workflowJob.Labels,
|
|
"repository.name", e.Repo.GetName(),
|
|
"repository.owner.login", e.Repo.Owner.GetLogin(),
|
|
"repository.owner.type", e.Repo.Owner.GetType(),
|
|
"enterprise.slug", enterpriseSlug,
|
|
"action", e.GetAction(),
|
|
)
|
|
}
|
|
|
|
labels := e.WorkflowJob.Labels
|
|
|
|
switch action := e.GetAction(); action {
|
|
case "queued", "completed":
|
|
target, err = autoscaler.getJobScaleUpTargetForRepoOrOrg(
|
|
context.TODO(),
|
|
log,
|
|
e.Repo.GetName(),
|
|
e.Repo.Owner.GetLogin(),
|
|
e.Repo.Owner.GetType(),
|
|
enterpriseSlug,
|
|
labels,
|
|
)
|
|
|
|
if target != nil {
|
|
if e.GetAction() == "queued" {
|
|
target.Amount = 1
|
|
} else if e.GetAction() == "completed" {
|
|
// A nagative amount is processed in the tryScale func as a scale-down request,
|
|
// that erasese the oldest CapacityReservation with the same amount.
|
|
// If the first CapacityReservation was with Replicas=1, this negative scale target erases that,
|
|
// so that the resulting desired replicas decreases by 1.
|
|
target.Amount = -1
|
|
}
|
|
}
|
|
default:
|
|
ok = true
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
log.V(2).Info("Received and ignored a workflow_job event as it triggers neither scale-up nor scale-down", "action", action)
|
|
|
|
return
|
|
}
|
|
case *gogithub.PingEvent:
|
|
ok = true
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
msg := "pong"
|
|
|
|
if written, err := w.Write([]byte(msg)); err != nil {
|
|
log.Error(err, "failed writing http response", "msg", msg, "written", written)
|
|
}
|
|
|
|
log.Info("received ping event")
|
|
|
|
return
|
|
default:
|
|
log.Info("unknown event type", "eventType", webhookType)
|
|
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
log.Error(err, "handling check_run event")
|
|
|
|
return
|
|
}
|
|
|
|
if target == nil {
|
|
log.Info(
|
|
"Scale target not found. If this is unexpected, ensure that there is exactly one repository-wide or organizational runner deployment that matches this webhook event",
|
|
)
|
|
|
|
msg := "no horizontalrunnerautoscaler to scale for this github event"
|
|
|
|
ok = true
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
if written, err := w.Write([]byte(msg)); err != nil {
|
|
log.Error(err, "failed writing http response", "msg", msg, "written", written)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
if err := autoscaler.tryScale(context.TODO(), target); err != nil {
|
|
log.Error(err, "could not scale up")
|
|
|
|
return
|
|
}
|
|
|
|
ok = true
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
msg := fmt.Sprintf("scaled %s by %d", target.Name, target.Amount)
|
|
|
|
autoscaler.Log.Info(msg)
|
|
|
|
if written, err := w.Write([]byte(msg)); err != nil {
|
|
log.Error(err, "failed writing http response", "msg", msg, "written", written)
|
|
}
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) findHRAsByKey(ctx context.Context, value string) ([]v1alpha1.HorizontalRunnerAutoscaler, error) {
|
|
ns := autoscaler.Namespace
|
|
|
|
var defaultListOpts []client.ListOption
|
|
|
|
if ns != "" {
|
|
defaultListOpts = append(defaultListOpts, client.InNamespace(ns))
|
|
}
|
|
|
|
var hras []v1alpha1.HorizontalRunnerAutoscaler
|
|
|
|
if value != "" {
|
|
opts := append([]client.ListOption{}, defaultListOpts...)
|
|
opts = append(opts, client.MatchingFields{scaleTargetKey: value})
|
|
|
|
if autoscaler.Namespace != "" {
|
|
opts = append(opts, client.InNamespace(autoscaler.Namespace))
|
|
}
|
|
|
|
var hraList v1alpha1.HorizontalRunnerAutoscalerList
|
|
|
|
if err := autoscaler.List(ctx, &hraList, opts...); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, d := range hraList.Items {
|
|
hras = append(hras, d)
|
|
}
|
|
}
|
|
|
|
return hras, nil
|
|
}
|
|
|
|
func matchTriggerConditionAgainstEvent(types []string, eventAction *string) bool {
|
|
if len(types) == 0 {
|
|
return true
|
|
}
|
|
|
|
if eventAction == nil {
|
|
return false
|
|
}
|
|
|
|
for _, tpe := range types {
|
|
if tpe == *eventAction {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
type ScaleTarget struct {
|
|
v1alpha1.HorizontalRunnerAutoscaler
|
|
v1alpha1.ScaleUpTrigger
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) searchScaleTargets(hras []v1alpha1.HorizontalRunnerAutoscaler, f func(v1alpha1.ScaleUpTrigger) bool) []ScaleTarget {
|
|
var matched []ScaleTarget
|
|
|
|
for _, hra := range hras {
|
|
if !hra.ObjectMeta.DeletionTimestamp.IsZero() {
|
|
continue
|
|
}
|
|
|
|
for _, scaleUpTrigger := range hra.Spec.ScaleUpTriggers {
|
|
if !f(scaleUpTrigger) {
|
|
continue
|
|
}
|
|
|
|
matched = append(matched, ScaleTarget{
|
|
HorizontalRunnerAutoscaler: hra,
|
|
ScaleUpTrigger: scaleUpTrigger,
|
|
})
|
|
}
|
|
}
|
|
|
|
return matched
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getScaleTarget(ctx context.Context, name string, f func(v1alpha1.ScaleUpTrigger) bool) (*ScaleTarget, error) {
|
|
hras, err := autoscaler.findHRAsByKey(ctx, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("Found %d HRAs by key", len(hras)), "key", name)
|
|
|
|
targets := autoscaler.searchScaleTargets(hras, f)
|
|
|
|
n := len(targets)
|
|
|
|
if n == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
if n > 1 {
|
|
var scaleTargetIDs []string
|
|
|
|
for _, t := range targets {
|
|
scaleTargetIDs = append(scaleTargetIDs, t.HorizontalRunnerAutoscaler.Name)
|
|
}
|
|
|
|
autoscaler.Log.Info(
|
|
"Found too many scale targets: "+
|
|
"It must be exactly one to avoid ambiguity. "+
|
|
"Either set Namespace for the webhook-based autoscaler to let it only find HRAs in the namespace, "+
|
|
"or update Repository, Organization, or Enterprise fields in your RunnerDeployment resources to fix the ambiguity.",
|
|
"scaleTargets", strings.Join(scaleTargetIDs, ","))
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
return &targets[0], nil
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getScaleUpTarget(ctx context.Context, log logr.Logger, repo, owner, ownerType, enterprise string, f func(v1alpha1.ScaleUpTrigger) bool) (*ScaleTarget, error) {
|
|
scaleTarget := func(value string) (*ScaleTarget, error) {
|
|
return autoscaler.getScaleTarget(ctx, value, f)
|
|
}
|
|
return autoscaler.getScaleUpTargetWithFunction(ctx, log, repo, owner, ownerType, enterprise, scaleTarget)
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getJobScaleUpTargetForRepoOrOrg(
|
|
ctx context.Context, log logr.Logger, repo, owner, ownerType, enterprise string, labels []string,
|
|
) (*ScaleTarget, error) {
|
|
|
|
scaleTarget := func(value string) (*ScaleTarget, error) {
|
|
return autoscaler.getJobScaleTarget(ctx, value, labels)
|
|
}
|
|
return autoscaler.getScaleUpTargetWithFunction(ctx, log, repo, owner, ownerType, enterprise, scaleTarget)
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getScaleUpTargetWithFunction(
|
|
ctx context.Context, log logr.Logger, repo, owner, ownerType, enterprise string, scaleTarget func(value string) (*ScaleTarget, error)) (*ScaleTarget, error) {
|
|
|
|
repositoryRunnerKey := owner + "/" + repo
|
|
|
|
// Search for repository HRAs
|
|
if target, err := scaleTarget(repositoryRunnerKey); err != nil {
|
|
log.Error(err, "finding repository-wide runner", "repository", repositoryRunnerKey)
|
|
return nil, err
|
|
} else if target != nil {
|
|
log.Info("job scale up target is repository-wide runners", "repository", repo)
|
|
return target, nil
|
|
}
|
|
|
|
if ownerType == "User" {
|
|
log.V(1).Info("user repositories not supported", "owner", owner)
|
|
return nil, nil
|
|
}
|
|
|
|
// Find the potential runner groups first to avoid spending API queries needless. Once/if GitHub improves an
|
|
// API to find related/linked runner groups from a specific repository this logic could be removed
|
|
managedRunnerGroups, err := autoscaler.getManagedRunnerGroupsFromHRAs(ctx, enterprise, owner)
|
|
if err != nil {
|
|
log.Error(err, "finding potential organization/enterprise runner groups from HRAs", "organization", owner)
|
|
return nil, err
|
|
}
|
|
if managedRunnerGroups.IsEmpty() {
|
|
log.V(1).Info("no repository/organizational/enterprise runner found",
|
|
"repository", repositoryRunnerKey,
|
|
"organization", owner,
|
|
"enterprises", enterprise,
|
|
)
|
|
}
|
|
|
|
var visibleGroups *simulator.VisibleRunnerGroups
|
|
if autoscaler.GitHubClient != nil {
|
|
simu := &simulator.Simulator{
|
|
Client: autoscaler.GitHubClient,
|
|
}
|
|
// Get available organization runner groups and enterprise runner groups for a repository
|
|
// These are the sum of runner groups with repository access = All repositories and runner groups
|
|
// where owner/repo has access to as well. The list will include default runner group also if it has access to
|
|
visibleGroups, err = simu.GetRunnerGroupsVisibleToRepository(ctx, owner, repositoryRunnerKey, managedRunnerGroups)
|
|
log.V(1).Info("Searching in runner groups", "groups", visibleGroups)
|
|
if err != nil {
|
|
log.Error(err, "Unable to find runner groups from repository", "organization", owner, "repository", repo)
|
|
return nil, fmt.Errorf("error while finding visible runner groups: %v", err)
|
|
}
|
|
} else {
|
|
// For backwards compatibility if GitHub authentication is not configured, we assume all runner groups have
|
|
// visibility=all to honor the previous implementation, therefore any available enterprise/organization runner
|
|
// is a potential target for scaling. This will also avoid doing extra API calls caused by
|
|
// GitHubClient.GetRunnerGroupsVisibleToRepository in case users are not using custom visibility on their runner
|
|
// groups or they are using only default runner groups
|
|
visibleGroups = managedRunnerGroups
|
|
}
|
|
|
|
scaleTargetKey := func(rg simulator.RunnerGroup) string {
|
|
switch rg.Kind {
|
|
case simulator.Default:
|
|
switch rg.Scope {
|
|
case simulator.Organization:
|
|
return owner
|
|
case simulator.Enterprise:
|
|
return enterpriseKey(enterprise)
|
|
}
|
|
case simulator.Custom:
|
|
switch rg.Scope {
|
|
case simulator.Organization:
|
|
return organizationalRunnerGroupKey(owner, rg.Name)
|
|
case simulator.Enterprise:
|
|
return enterpriseRunnerGroupKey(enterprise, rg.Name)
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
log.Info("groups", "groups", visibleGroups)
|
|
|
|
var t *ScaleTarget
|
|
|
|
traverseErr := visibleGroups.Traverse(func(rg simulator.RunnerGroup) (bool, error) {
|
|
key := scaleTargetKey(rg)
|
|
|
|
target, err := scaleTarget(key)
|
|
|
|
if err != nil {
|
|
log.Error(err, "finding runner group", "enterprise", enterprise, "organization", owner, "repository", repo, "key", key)
|
|
return false, err
|
|
} else if target == nil {
|
|
return false, nil
|
|
}
|
|
|
|
t = target
|
|
log.Info("job scale up target found", "enterprise", enterprise, "organization", owner, "repository", repo, "key", key)
|
|
|
|
return true, nil
|
|
})
|
|
|
|
if traverseErr != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if t == nil {
|
|
log.V(1).Info("no repository/organizational/enterprise runner found",
|
|
"repository", repositoryRunnerKey,
|
|
"organization", owner,
|
|
"enterprise", enterprise,
|
|
)
|
|
}
|
|
|
|
return t, nil
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getManagedRunnerGroupsFromHRAs(ctx context.Context, enterprise, org string) (*simulator.VisibleRunnerGroups, error) {
|
|
groups := simulator.NewVisibleRunnerGroups()
|
|
ns := autoscaler.Namespace
|
|
|
|
var defaultListOpts []client.ListOption
|
|
if ns != "" {
|
|
defaultListOpts = append(defaultListOpts, client.InNamespace(ns))
|
|
}
|
|
|
|
opts := append([]client.ListOption{}, defaultListOpts...)
|
|
if autoscaler.Namespace != "" {
|
|
opts = append(opts, client.InNamespace(autoscaler.Namespace))
|
|
}
|
|
|
|
var hraList v1alpha1.HorizontalRunnerAutoscalerList
|
|
if err := autoscaler.List(ctx, &hraList, opts...); err != nil {
|
|
return groups, err
|
|
}
|
|
|
|
for _, hra := range hraList.Items {
|
|
var o, e, g string
|
|
|
|
kind := hra.Spec.ScaleTargetRef.Kind
|
|
switch kind {
|
|
case "RunnerSet":
|
|
var rs v1alpha1.RunnerSet
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rs); err != nil {
|
|
return groups, err
|
|
}
|
|
o, e, g = rs.Spec.Organization, rs.Spec.Enterprise, rs.Spec.Group
|
|
case "RunnerDeployment", "":
|
|
var rd v1alpha1.RunnerDeployment
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rd); err != nil {
|
|
return groups, err
|
|
}
|
|
o, e, g = rd.Spec.Template.Spec.Organization, rd.Spec.Template.Spec.Enterprise, rd.Spec.Template.Spec.Group
|
|
default:
|
|
return nil, fmt.Errorf("unsupported scale target kind: %v", kind)
|
|
}
|
|
|
|
if g == "" {
|
|
continue
|
|
}
|
|
|
|
if e == "" && o == "" {
|
|
autoscaler.Log.V(1).Info(
|
|
"invalid runner group config in scale target: spec.group must be set along with either spec.enterprise or spec.organization",
|
|
"scaleTargetKind", kind,
|
|
"group", g,
|
|
"enterprise", e,
|
|
"organization", o,
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
if e != enterprise && o != org {
|
|
continue
|
|
}
|
|
|
|
rg := simulator.NewRunnerGroupFromProperties(e, o, g)
|
|
|
|
if err := groups.Add(rg); err != nil {
|
|
return groups, fmt.Errorf("failed adding visible group from HRA %s/%s: %w", hra.Namespace, hra.Name, err)
|
|
}
|
|
}
|
|
return groups, nil
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) getJobScaleTarget(ctx context.Context, name string, labels []string) (*ScaleTarget, error) {
|
|
hras, err := autoscaler.findHRAsByKey(ctx, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("Found %d HRAs by key", len(hras)), "key", name)
|
|
|
|
HRA:
|
|
for _, hra := range hras {
|
|
if !hra.ObjectMeta.DeletionTimestamp.IsZero() {
|
|
continue
|
|
}
|
|
|
|
if len(hra.Spec.ScaleUpTriggers) > 1 {
|
|
autoscaler.Log.V(1).Info("Skipping this HRA as it has too many ScaleUpTriggers to be used in workflow_job based scaling", "hra", hra.Name)
|
|
|
|
continue
|
|
}
|
|
|
|
var duration metav1.Duration
|
|
|
|
if len(hra.Spec.ScaleUpTriggers) > 0 {
|
|
duration = hra.Spec.ScaleUpTriggers[0].Duration
|
|
}
|
|
|
|
if duration.Duration <= 0 {
|
|
// Try to release the reserved capacity after at least 10 minutes by default,
|
|
// we won't end up in the reserved capacity remained forever in case GitHub somehow stopped sending us "completed" workflow_job events.
|
|
// GitHub usually send us those but nothing is 100% guaranteed, e.g. in case of something went wrong on GitHub :)
|
|
// Probably we'd better make this configurable via custom resources in the future?
|
|
duration.Duration = 10 * time.Minute
|
|
}
|
|
|
|
switch hra.Spec.ScaleTargetRef.Kind {
|
|
case "RunnerSet":
|
|
var rs v1alpha1.RunnerSet
|
|
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure that the RunnerSet-managed runners have all the labels requested by the workflow_job.
|
|
for _, l := range labels {
|
|
var matched bool
|
|
|
|
// ignore "self-hosted" label as all instance here are self-hosted
|
|
if l == "self-hosted" {
|
|
continue
|
|
}
|
|
|
|
// TODO labels related to OS and architecture needs to be explicitly declared or the current implementation will not be able to find them.
|
|
|
|
for _, l2 := range rs.Spec.Labels {
|
|
if l == l2 {
|
|
matched = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !matched {
|
|
continue HRA
|
|
}
|
|
}
|
|
|
|
return &ScaleTarget{HorizontalRunnerAutoscaler: hra, ScaleUpTrigger: v1alpha1.ScaleUpTrigger{Duration: duration}}, nil
|
|
case "RunnerDeployment", "":
|
|
var rd v1alpha1.RunnerDeployment
|
|
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rd); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure that the RunnerDeployment-managed runners have all the labels requested by the workflow_job.
|
|
for _, l := range labels {
|
|
var matched bool
|
|
|
|
// ignore "self-hosted" label as all instance here are self-hosted
|
|
if l == "self-hosted" {
|
|
continue
|
|
}
|
|
|
|
// TODO labels related to OS and architecture needs to be explicitly declared or the current implementation will not be able to find them.
|
|
|
|
for _, l2 := range rd.Spec.Template.Spec.Labels {
|
|
if l == l2 {
|
|
matched = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !matched {
|
|
continue HRA
|
|
}
|
|
}
|
|
|
|
return &ScaleTarget{HorizontalRunnerAutoscaler: hra, ScaleUpTrigger: v1alpha1.ScaleUpTrigger{Duration: duration}}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported scaleTargetRef.kind: %v", hra.Spec.ScaleTargetRef.Kind)
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) tryScale(ctx context.Context, target *ScaleTarget) error {
|
|
if target == nil {
|
|
return nil
|
|
}
|
|
|
|
copy := target.HorizontalRunnerAutoscaler.DeepCopy()
|
|
|
|
amount := 1
|
|
|
|
if target.ScaleUpTrigger.Amount != 0 {
|
|
amount = target.ScaleUpTrigger.Amount
|
|
}
|
|
|
|
capacityReservations := getValidCapacityReservations(copy)
|
|
|
|
if amount > 0 {
|
|
copy.Spec.CapacityReservations = append(capacityReservations, v1alpha1.CapacityReservation{
|
|
ExpirationTime: metav1.Time{Time: time.Now().Add(target.ScaleUpTrigger.Duration.Duration)},
|
|
Replicas: amount,
|
|
})
|
|
} else if amount < 0 {
|
|
var reservations []v1alpha1.CapacityReservation
|
|
|
|
var found bool
|
|
|
|
for _, r := range capacityReservations {
|
|
if !found && r.Replicas+amount == 0 {
|
|
found = true
|
|
} else {
|
|
reservations = append(reservations, r)
|
|
}
|
|
}
|
|
|
|
copy.Spec.CapacityReservations = reservations
|
|
}
|
|
|
|
autoscaler.Log.Info(
|
|
"Patching hra for capacityReservations update",
|
|
"before", target.HorizontalRunnerAutoscaler.Spec.CapacityReservations,
|
|
"after", copy.Spec.CapacityReservations,
|
|
)
|
|
|
|
if err := autoscaler.Client.Patch(ctx, copy, client.MergeFrom(&target.HorizontalRunnerAutoscaler)); err != nil {
|
|
return fmt.Errorf("patching horizontalrunnerautoscaler to add capacity reservation: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getValidCapacityReservations(autoscaler *v1alpha1.HorizontalRunnerAutoscaler) []v1alpha1.CapacityReservation {
|
|
var capacityReservations []v1alpha1.CapacityReservation
|
|
|
|
now := time.Now()
|
|
|
|
for _, reservation := range autoscaler.Spec.CapacityReservations {
|
|
if reservation.ExpirationTime.Time.After(now) {
|
|
capacityReservations = append(capacityReservations, reservation)
|
|
}
|
|
}
|
|
|
|
return capacityReservations
|
|
}
|
|
|
|
func (autoscaler *HorizontalRunnerAutoscalerGitHubWebhook) SetupWithManager(mgr ctrl.Manager) error {
|
|
name := "webhookbasedautoscaler"
|
|
if autoscaler.Name != "" {
|
|
name = autoscaler.Name
|
|
}
|
|
|
|
autoscaler.Recorder = mgr.GetEventRecorderFor(name)
|
|
|
|
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v1alpha1.HorizontalRunnerAutoscaler{}, scaleTargetKey, func(rawObj client.Object) []string {
|
|
hra := rawObj.(*v1alpha1.HorizontalRunnerAutoscaler)
|
|
|
|
if hra.Spec.ScaleTargetRef.Name == "" {
|
|
return nil
|
|
}
|
|
|
|
switch hra.Spec.ScaleTargetRef.Kind {
|
|
case "", "RunnerDeployment":
|
|
var rd v1alpha1.RunnerDeployment
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rd); err != nil {
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("RunnerDeployment not found with scale target ref name %s for hra %s", hra.Spec.ScaleTargetRef.Name, hra.Name))
|
|
return nil
|
|
}
|
|
|
|
keys := []string{}
|
|
if rd.Spec.Template.Spec.Repository != "" {
|
|
keys = append(keys, rd.Spec.Template.Spec.Repository) // Repository runners
|
|
}
|
|
if rd.Spec.Template.Spec.Organization != "" {
|
|
if group := rd.Spec.Template.Spec.Group; group != "" {
|
|
keys = append(keys, organizationalRunnerGroupKey(rd.Spec.Template.Spec.Organization, rd.Spec.Template.Spec.Group)) // Organization runner groups
|
|
} else {
|
|
keys = append(keys, rd.Spec.Template.Spec.Organization) // Organization runners
|
|
}
|
|
}
|
|
if enterprise := rd.Spec.Template.Spec.Enterprise; enterprise != "" {
|
|
if group := rd.Spec.Template.Spec.Group; group != "" {
|
|
keys = append(keys, enterpriseRunnerGroupKey(enterprise, rd.Spec.Template.Spec.Group)) // Enterprise runner groups
|
|
} else {
|
|
keys = append(keys, enterpriseKey(enterprise)) // Enterprise runners
|
|
}
|
|
}
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("HRA keys indexed for HRA %s: %v", hra.Name, keys))
|
|
return keys
|
|
case "RunnerSet":
|
|
var rs v1alpha1.RunnerSet
|
|
if err := autoscaler.Client.Get(context.Background(), types.NamespacedName{Namespace: hra.Namespace, Name: hra.Spec.ScaleTargetRef.Name}, &rs); err != nil {
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("RunnerSet not found with scale target ref name %s for hra %s", hra.Spec.ScaleTargetRef.Name, hra.Name))
|
|
return nil
|
|
}
|
|
|
|
keys := []string{}
|
|
if rs.Spec.Repository != "" {
|
|
keys = append(keys, rs.Spec.Repository) // Repository runners
|
|
}
|
|
if rs.Spec.Organization != "" {
|
|
keys = append(keys, rs.Spec.Organization) // Organization runners
|
|
if group := rs.Spec.Group; group != "" {
|
|
keys = append(keys, organizationalRunnerGroupKey(rs.Spec.Organization, rs.Spec.Group)) // Organization runner groups
|
|
}
|
|
}
|
|
if enterprise := rs.Spec.Enterprise; enterprise != "" {
|
|
keys = append(keys, enterpriseKey(enterprise)) // Enterprise runners
|
|
if group := rs.Spec.Group; group != "" {
|
|
keys = append(keys, enterpriseRunnerGroupKey(enterprise, rs.Spec.Group)) // Enterprise runner groups
|
|
}
|
|
}
|
|
autoscaler.Log.V(1).Info(fmt.Sprintf("HRA keys indexed for HRA %s: %v", hra.Name, keys))
|
|
return keys
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&v1alpha1.HorizontalRunnerAutoscaler{}).
|
|
Named(name).
|
|
Complete(autoscaler)
|
|
}
|
|
|
|
func enterpriseKey(name string) string {
|
|
return keyPrefixEnterprise + name
|
|
}
|
|
|
|
func organizationalRunnerGroupKey(owner, group string) string {
|
|
return owner + keyRunnerGroup + group
|
|
}
|
|
|
|
func enterpriseRunnerGroupKey(enterprise, group string) string {
|
|
return keyPrefixEnterprise + enterprise + keyRunnerGroup + group
|
|
}
|