diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 4fa48f4fc..050839662 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -63,6 +63,7 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string deletionInterval time.Duration + delayBetweenSts time.Duration } func (cfg *config) register(fs *flag.FlagSet) { @@ -89,7 +90,8 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") - fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "time to wait before actually terminating the pod") + fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod") + fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets") } func (cfg config) validate() error { @@ -173,7 +175,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.deletionInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index fa926888f..f87211e10 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -59,7 +59,7 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen fmt.Sprintf("-kubernetes.namespace=%s", namespace), "-reconcile.interval=1s", "-log.level=debug", - "-deletion-interval=0s", + "-delay-between-sts=0s", } if webhook { args = append(args, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6033edb07..cbdf5ec8a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -67,9 +67,9 @@ type RolloutController struct { stopCh chan struct{} //deletion interval related - deletionInterval time.Duration - //TODO: convert this to a map of rollout group to deletion ready name - deletionReadyTime time.Time + delayBetweenSts time.Duration + lastUpdatedSts *v1.StatefulSet + lastUpdatedTime time.Time // Metrics. groupReconcileTotal *prometheus.CounterVec @@ -82,7 +82,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -114,7 +114,9 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM logger: logger, stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, - deletionInterval: deletionInterval, + delayBetweenSts: delayBetweenSts, + lastUpdatedSts: nil, + lastUpdatedTime: time.Time{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -294,17 +296,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou } } - //if the cluster is stable, and deletion ready time is not set, then set the deletion ready time - if len(notReadySets) == 0 && c.deletionReadyTime.IsZero() { - c.deletionReadyTime = time.Now().Add(c.deletionInterval) - level.Info(c.logger).Log("msg", "sts group healthy, setting deletion ready time if empty ", "deletionReadyTime", c.deletionReadyTime) - - //reset deletionReadyTime since the cluster is not ready anymore - } else if len(notReadySets) != 0 && !c.deletionReadyTime.IsZero() { - c.deletionReadyTime = time.Time{} - level.Info(c.logger).Log("msg", "sts group unhealthy, reset non-empty deletion ready time ", "deletionReadyTime", c.deletionReadyTime) - } - // Ensure there are not 2+ StatefulSets with not-Ready pods. If there are, we shouldn't proceed // rolling out pods and we should wait until these pods are Ready. The reason is that if there are // unavailable pods in multiple StatefulSets, this could lead to an outage, so we want pods to @@ -319,19 +310,29 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if len(notReadySets) == 1 { level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name) sets = util.MoveStatefulSetToFront(sets, notReadySets[0]) + // sts could become not ready by other activities like UKI, we should consider this as last updated sts + c.updateLastUpdatedSts(notReadySets[0]) } for _, sts := range sets { + if !c.canUpdateSts(sts) { + level.Info(c.logger).Log("msg", "delaying reconcile due to last updated sts is not ready yet", + "last", c.lastUpdatedSts.Name, "curr", sts.Name, "delay", c.delayBetweenSts) + time.Sleep(c.delayBetweenSts) + return nil + } ongoing, err := c.updateStatefulSetPods(ctx, sts) if err != nil { // Do not continue with other StatefulSets because this StatefulSet // is expected to be successfully updated before proceeding. + c.updateLastUpdatedSts(sts) return errors.Wrapf(err, "failed to update StatefulSet %s", sts.Name) } if ongoing { // Do not continue with other StatefulSets because this StatefulSet // update is still ongoing. + c.updateLastUpdatedSts(sts) return nil } } @@ -505,6 +506,19 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) return pods, nil } +func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool { + if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name { + // no need to wait within the same sts updates + return true + } + return time.Since(c.lastUpdatedTime) > c.delayBetweenSts +} + +func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) { + c.lastUpdatedSts = sts + c.lastUpdatedTime = time.Now() +} + func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) @@ -535,26 +549,10 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S return true, nil } - deletionHappened := false - - now := time.Now() - // Check if deletionReadyTime is set - if c.deletionReadyTime.IsZero() { - // If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval - c.deletionReadyTime = now.Add(c.deletionInterval) - level.Info(c.logger).Log("msg", "Scheduled future deletion for sts pods", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) - } - - if now.Before(c.deletionReadyTime) { - level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) - return true, nil // Not yet time to delete; skip this loop iteration - } - for _, pod := range podsToUpdate[:numPods] { // Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking // if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something // similar too). - if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp != nil { level.Debug(c.logger).Log("msg", fmt.Sprintf("waiting for pod %s to be terminated", pod.Name)) continue @@ -563,17 +561,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name)) if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name) - } else { - deletionHappened = true - level.Info(c.logger).Log("msg", fmt.Sprintf("pod %s successfully terminated", pod.Name), "deletionReadyTime", c.deletionReadyTime) } - - } - - //make sure no other pods can be deleted - if deletionHappened { - c.deletionReadyTime = time.Time{} - level.Info(c.logger).Log("msg", "reset deletion ready time since deletion just happened") } return true, nil