mirror of
https://github.com/actions/actions-runner-controller.git
synced 2025-12-11 12:06:57 +00:00
refactor: Make RunnerReplicaSet and Runner backed by the same logic that backs RunnerSet
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/go-logr/logr"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -26,6 +27,7 @@ type podsForOwner struct {
|
||||
runner *v1alpha1.Runner
|
||||
statefulSet *appsv1.StatefulSet
|
||||
owner owner
|
||||
object client.Object
|
||||
synced bool
|
||||
pods []corev1.Pod
|
||||
}
|
||||
@@ -52,6 +54,9 @@ func (r *ownerRunner) pods(ctx context.Context, c client.Client) ([]corev1.Pod,
|
||||
var pod corev1.Pod
|
||||
|
||||
if err := c.Get(ctx, types.NamespacedName{Namespace: r.Runner.Namespace, Name: r.Runner.Name}, &pod); err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return nil, nil
|
||||
}
|
||||
r.Log.Error(err, "Failed to get pod managed by runner")
|
||||
return nil, err
|
||||
}
|
||||
@@ -70,7 +75,7 @@ func (r *ownerRunner) withAnnotation(k, v string) client.Object {
|
||||
}
|
||||
|
||||
func (r *ownerRunner) synced() bool {
|
||||
return true
|
||||
return r.Runner.Status.Phase != ""
|
||||
}
|
||||
|
||||
type ownerStatefulSet struct {
|
||||
@@ -104,7 +109,7 @@ func (s *ownerStatefulSet) pods(ctx context.Context, c client.Client) ([]corev1.
|
||||
}
|
||||
|
||||
func (s *ownerStatefulSet) templateHash() (string, bool) {
|
||||
return getStatefulSetTemplateHash(s.StatefulSet)
|
||||
return getRunnerTemplateHash(s.StatefulSet)
|
||||
}
|
||||
|
||||
func (s *ownerStatefulSet) withAnnotation(k, v string) client.Object {
|
||||
@@ -132,23 +137,26 @@ func getPodsForOwner(ctx context.Context, c client.Client, log logr.Logger, o cl
|
||||
owner owner
|
||||
runner *v1alpha1.Runner
|
||||
statefulSet *appsv1.StatefulSet
|
||||
object client.Object
|
||||
)
|
||||
|
||||
switch v := o.(type) {
|
||||
case *v1alpha1.Runner:
|
||||
owner = &ownerRunner{
|
||||
Object: v,
|
||||
Log: log,
|
||||
Runner: v,
|
||||
Object: v,
|
||||
}
|
||||
runner = v
|
||||
object = v
|
||||
case *appsv1.StatefulSet:
|
||||
owner = &ownerStatefulSet{
|
||||
Object: v,
|
||||
Log: log,
|
||||
StatefulSet: v,
|
||||
Object: v,
|
||||
}
|
||||
statefulSet = v
|
||||
object = v
|
||||
default:
|
||||
return nil, fmt.Errorf("BUG: Unsupported runner pods owner %v(%T)", v, v)
|
||||
}
|
||||
@@ -209,19 +217,14 @@ func getPodsForOwner(ctx context.Context, c client.Client, log logr.Logger, o cl
|
||||
runner: runner,
|
||||
statefulSet: statefulSet,
|
||||
owner: owner,
|
||||
object: object,
|
||||
synced: synced,
|
||||
pods: pods,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getRunnerTemplateHash(r *v1alpha1.Runner) (string, bool) {
|
||||
hash, ok := r.Labels[LabelKeyRunnerTemplateHash]
|
||||
|
||||
return hash, ok
|
||||
}
|
||||
|
||||
func getStatefulSetTemplateHash(rs *appsv1.StatefulSet) (string, bool) {
|
||||
hash, ok := rs.Labels[LabelKeyRunnerTemplateHash]
|
||||
func getRunnerTemplateHash(r client.Object) (string, bool) {
|
||||
hash, ok := r.GetLabels()[LabelKeyRunnerTemplateHash]
|
||||
|
||||
return hash, ok
|
||||
}
|
||||
@@ -235,7 +238,18 @@ type result struct {
|
||||
currentObjects []*podsForOwner
|
||||
}
|
||||
|
||||
func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger, effectiveTime *metav1.Time, newDesiredReplicas int, desiredTemplateHash string, create client.Object, ephemeral bool, owners []client.Object) (*result, error) {
|
||||
// Why `create` must be a function rather than a client.Object? That's becase we use it to create one or more objects on scale up.
|
||||
//
|
||||
// We use client.Create to create a necessary number of client.Object. client.Create mutates the passed object on a successful creation.
|
||||
// It seems to set .Revision at least, and the existence of .Revision let client.Create fail due to K8s restriction that an object being just created
|
||||
// can't have .Revision.
|
||||
// Now, imagine that you are to add 2 runner replicas on scale up.
|
||||
// We create one resource object per a replica that ends up calling 2 client.Create calls.
|
||||
// If we were reusing client.Object to be passed to client.Create calls, only the first call suceeeds.
|
||||
// The second call fails due to the first call mutated the client.Object to have .Revision.
|
||||
// Passing a factory function of client.Object and creating a brand-new client.Object per a client.Create call resolves this issue,
|
||||
// allowing us to create two or more replicas in one reconcilation loop without being rejected by K8s.
|
||||
func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger, effectiveTime *metav1.Time, newDesiredReplicas int, create func() client.Object, ephemeral bool, owners []client.Object) (*result, error) {
|
||||
state, err := collectPodsForOwners(ctx, c, log, owners)
|
||||
if err != nil || state == nil {
|
||||
return nil, err
|
||||
@@ -265,6 +279,13 @@ func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
// Even though the error message includes "Forbidden", this error's reason is "Invalid".
|
||||
// So we used to match these errors by using errors.IsInvalid. But that's another story...
|
||||
|
||||
desiredTemplateHash, ok := getRunnerTemplateHash(create())
|
||||
if !ok {
|
||||
log.Info("Failed to get template hash of desired owner resource. It must be in an invalid state. Please manually delete the owner so that it is recreated")
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
currentObjects := podsForOwnersPerTemplateHash[desiredTemplateHash]
|
||||
|
||||
sort.SliceStable(currentObjects, func(i, j int) bool {
|
||||
@@ -289,13 +310,20 @@ func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
regTimeout += ss.regTimeout
|
||||
}
|
||||
|
||||
numOwners := len(owners)
|
||||
|
||||
var hashes []string
|
||||
for h, _ := range state.podsForOwners {
|
||||
hashes = append(hashes, h)
|
||||
}
|
||||
|
||||
log.V(2).Info(
|
||||
"Found some pods across owner(s)",
|
||||
"pending", pending,
|
||||
"running", running,
|
||||
"regTimeout", regTimeout,
|
||||
"desired", newDesiredReplicas,
|
||||
"owners", len(owners),
|
||||
"owners", numOwners,
|
||||
)
|
||||
|
||||
maybeRunning := pending + running
|
||||
@@ -307,15 +335,39 @@ func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
// Add more replicas
|
||||
if err := c.Create(ctx, create); err != nil {
|
||||
if err := c.Create(ctx, create()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
log.V(2).Info("Created object(s) to add more replicas", "num", num)
|
||||
log.V(1).Info("Created replica(s)",
|
||||
"created", num,
|
||||
"templateHashDesired", desiredTemplateHash,
|
||||
"replicasDesired", newDesiredReplicas,
|
||||
"replicasMaybeRunning", maybeRunning,
|
||||
"templateHashObserved", hashes,
|
||||
)
|
||||
|
||||
return nil, nil
|
||||
} else if newDesiredReplicas <= running {
|
||||
// If you use ephemeral runners with webhook-based autoscaler and the runner controller is working normally,
|
||||
// you're unlikely to fall into this branch.
|
||||
//
|
||||
// That's because all the stakeholders work like this:
|
||||
//
|
||||
// 1. A runner pod completes with the runner container exiting with code 0
|
||||
// 2. ARC runner controller detects the pod completion, marks the owner(runner or statefulset) resource on k8s for deletion (=Runner.DeletionTimestamp becomes non-zero)
|
||||
// 3. GitHub triggers a corresponding workflow_job "complete" webhook event
|
||||
// 4. ARC github-webhook-server (webhook-based autoscaler) receives the webhook event updates HRA with removing the oldest capacity reservation
|
||||
// 5. ARC horizontalrunnerautoscaler updates RunnerDeployment's desired replicas based on capacity reservations
|
||||
// 6. ARC runnerdeployment controller updates RunnerReplicaSet's desired replicas
|
||||
// 7. (We're here) ARC runnerset or runnerreplicaset controller starts reconciling the owner resource (statefulset or runner)
|
||||
//
|
||||
// In a normally working ARC installation, the runner that was used to run the workflow job should already have been
|
||||
// marked for deletion by the runner controller.
|
||||
// This runnerreplicaset controller doesn't count marked runners into the `running` value, hence you're unlikely to
|
||||
// fall into this branch when you're using ephemeral runners with webhook-based-autoscaler.
|
||||
|
||||
var retained int
|
||||
|
||||
var delete []*podsForOwner
|
||||
@@ -354,7 +406,7 @@ func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
if _, ok := getAnnotation(ss.owner, AnnotationKeyUnregistrationRequestTimestamp); !ok {
|
||||
updated := ss.owner.withAnnotation(AnnotationKeyUnregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
|
||||
|
||||
if err := c.Patch(ctx, updated, client.MergeFrom(ss.owner)); err != nil {
|
||||
if err := c.Patch(ctx, updated, client.MergeFrom(ss.object)); err != nil {
|
||||
log.Error(err, fmt.Sprintf("Failed to patch object to have %s annotation", AnnotationKeyUnregistrationRequestTimestamp))
|
||||
return nil, err
|
||||
}
|
||||
@@ -379,7 +431,7 @@ func syncRunnerPodsOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
for _, ss := range sss {
|
||||
if ss.templateHash != desiredTemplateHash {
|
||||
if ss.owner.GetDeletionTimestamp().IsZero() {
|
||||
if err := c.Delete(ctx, ss.owner); err != nil {
|
||||
if err := c.Delete(ctx, ss.object); err != nil {
|
||||
log.Error(err, "Unable to delete object")
|
||||
return nil, err
|
||||
}
|
||||
@@ -417,6 +469,12 @@ func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res.templateHash == "" {
|
||||
log.Info("validation error: runner pod owner must have template hash", "object", res.object)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
|
||||
//
|
||||
// If the runner is already marked for deletion(=has a non-zero deletion timestamp) by the runner controller (can be caused by an ephemeral runner completion)
|
||||
@@ -429,7 +487,7 @@ func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
|
||||
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
|
||||
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); ok {
|
||||
if err := c.Delete(ctx, res.owner); err != nil {
|
||||
if err := c.Delete(ctx, res.object); err != nil {
|
||||
log.Error(err, "Failed to delete owner")
|
||||
return nil, err
|
||||
}
|
||||
@@ -454,7 +512,7 @@ func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
if _, ok := getAnnotation(res.owner, AnnotationKeyUnregistrationCompleteTimestamp); !ok {
|
||||
updated := res.owner.withAnnotation(AnnotationKeyUnregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
|
||||
|
||||
if err := c.Patch(ctx, updated, client.MergeFrom(res.owner)); err != nil {
|
||||
if err := c.Patch(ctx, updated, client.MergeFrom(res.object)); err != nil {
|
||||
log.Error(err, fmt.Sprintf("Failed to patch owner to have %s annotation", AnnotationKeyUnregistrationCompleteTimestamp))
|
||||
return nil, err
|
||||
}
|
||||
@@ -494,6 +552,8 @@ func collectPodsForOwners(ctx context.Context, c client.Client, log logr.Logger,
|
||||
}
|
||||
|
||||
if !res.synced {
|
||||
log.V(1).Info("Skipped reconcilation because owner is not synced yet", "pods", res.pods)
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user