Prevent runnerset pod unregistration until it gets runner ID

This eliminates the race condition that results in the runner terminated prematurely when RunnerSet triggered unregistration of StatefulSet that added just a few seconds ago.
This commit is contained in:
Yusuke Kuoka
2022-03-01 02:28:15 +00:00
parent 15b402bb32
commit a3072c110d
4 changed files with 225 additions and 64 deletions

View File

@@ -18,6 +18,7 @@ package controllers
import (
"context"
"fmt"
"reflect"
"sort"
"time"
@@ -154,10 +155,53 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}
// Statefulset termination process 4/4: Let Kubernetes cascade-delete the statefulset and the pods.
if !res.statefulset.DeletionTimestamp.IsZero() {
continue
}
// Statefulset termination process 3/4: Set the deletionTimestamp to let Kubernetes start a cascade deletion of the statefulset and the pods.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationCompleteTimestamp); ok {
if err := r.Client.Delete(ctx, res.statefulset); err != nil {
log.Error(err, "Failed to delete statefulset")
return ctrl.Result{}, err
}
continue
}
// Statefulset termination process 2/4: Set unregistrationCompleteTimestamp only if all the pods managed by the statefulset
// have either unregistered or being deleted.
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationRequestTimestamp); ok {
var deletionSafe int
for _, po := range res.pods {
if _, ok := getAnnotation(&po.ObjectMeta, unregistrationCompleteTimestamp); ok {
deletionSafe++
} else if !po.DeletionTimestamp.IsZero() {
deletionSafe++
}
}
log.V(2).Info("Marking statefulset for unregistration completion", "deletionSafe", deletionSafe, "total", res.total)
if deletionSafe == res.total {
if _, ok := getAnnotation(&res.statefulset.ObjectMeta, unregistrationCompleteTimestamp); !ok {
updated := res.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, unregistrationCompleteTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(res.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", unregistrationCompleteTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated to start the deletion")
}
}
continue
}
if res.statefulset.Annotations != nil {
if a, ok := res.statefulset.Annotations[SyncTimeAnnotationKey]; ok {
t, err := time.Parse(time.RFC3339, a)
@@ -250,34 +294,58 @@ func (r *RunnerSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
log.V(2).Info("Created statefulset(s) to add more replicas", "num", num)
return ctrl.Result{}, nil
} else if newDesiredReplicas < running {
} else if newDesiredReplicas <= running {
var retained int
var lastIndex int
var delete []*podsForStatefulset
for i := len(currentStatefulSets) - 1; i >= 0; i-- {
ss := currentStatefulSets[i]
retained += ss.running
if retained >= newDesiredReplicas {
lastIndex = i
break
if ss.running == 0 || retained >= newDesiredReplicas {
delete = append(delete, ss)
} else if retained < newDesiredReplicas {
retained += ss.running
}
}
if retained == newDesiredReplicas {
for i := 0; i < lastIndex; i++ {
ss := currentStatefulSets[i]
for _, ss := range delete {
log := log.WithValues("statefulset", types.NamespacedName{Namespace: ss.statefulset.Namespace, Name: ss.statefulset.Name})
if err := r.Client.Delete(ctx, ss.statefulset); err != nil {
return ctrl.Result{}, err
// Statefulset termination process 1/4: Set unregistrationRequestTimestamp only after all the pods managed by the statefulset have
// started unregistreation process.
//
// NOTE: We just mark it instead of immediately starting the deletion process.
// Otherwise, the runner pod may hit termiationGracePeriod before the unregistration completes(the max terminationGracePeriod is limited to 1h by K8s and a job can be run for more than that),
// or actions/runner may potentially misbehave on SIGTERM immediately sent by K8s.
// We'd better unregister first and then start a pod deletion process.
// The annotation works as a mark to start the pod unregistration and deletion process of ours.
for _, po := range ss.pods {
if _, err := annotatePodOnce(ctx, r.Client, log, &po, unregistrationRequestTimestamp, time.Now().Format(time.RFC3339)); err != nil {
return ctrl.Result{}, err
}
}
if _, ok := getAnnotation(&ss.statefulset.ObjectMeta, unregistrationRequestTimestamp); !ok {
updated := ss.statefulset.DeepCopy()
setAnnotation(&updated.ObjectMeta, unregistrationRequestTimestamp, time.Now().Format(time.RFC3339))
if err := r.Client.Patch(ctx, updated, client.MergeFrom(ss.statefulset)); err != nil {
log.Error(err, fmt.Sprintf("Failed to patch statefulset to have %s annotation", unregistrationRequestTimestamp))
return ctrl.Result{}, err
}
log.V(2).Info("Redundant statefulset has been annotated to start the unregistration before deletion")
} else {
log.V(2).Info("BUG: Redundant statefulset was already annotated")
}
log.V(2).Info("Deleted redundant statefulset", "i", i, "lastIndex", lastIndex)
}
return ctrl.Result{}, err
} else if retained > newDesiredReplicas {
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex)
log.V(2).Info("Waiting sync before scale down", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
return ctrl.Result{}, nil
} else {
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas, "lastIndex", lastIndex)
log.Info("Invalid state", "retained", retained, "newDesiredReplicas", newDesiredReplicas)
panic("crashed due to invalid state")
}
}
@@ -352,11 +420,15 @@ func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log log
var completed, running, terminating, pending, total int
var pods []corev1.Pod
for _, pod := range podList.Items {
if owner := metav1.GetControllerOf(&pod); owner == nil || owner.Kind != "StatefulSet" || owner.Name != ss.Name {
continue
}
pods = append(pods, pod)
total++
if runnerPodOrContainerIsStopped(&pod) {
@@ -385,7 +457,7 @@ func (r *RunnerSetReconciler) getPodsForStatefulset(ctx context.Context, log log
pending: pending,
templateHash: templateHash,
statefulset: ss,
pods: podList.Items,
pods: pods,
}, nil
}