Skip to content

Commit

Permalink
Merge pull request #2 from jnyi/PLAT-106952
Browse files Browse the repository at this point in the history
[PLAT-106952] only delay with longer time between sts update
  • Loading branch information
jnyi authored Apr 27, 2024
2 parents bbfeada + 6a870a2 commit 9d7b864
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 45 deletions.
6 changes: 4 additions & 2 deletions cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type config struct {
useZoneTracker bool
zoneTrackerConfigMapName string
deletionInterval time.Duration
delayBetweenSts time.Duration
}

func (cfg *config) register(fs *flag.FlagSet) {
Expand All @@ -89,7 +90,8 @@ func (cfg *config) register(fs *flag.FlagSet) {

fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones")
fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker")
fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "time to wait before actually terminating the pod")
fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod")
fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets")
}

func (cfg config) validate() error {
Expand Down Expand Up @@ -173,7 +175,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.deletionInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
2 changes: 1 addition & 1 deletion integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen
fmt.Sprintf("-kubernetes.namespace=%s", namespace),
"-reconcile.interval=1s",
"-log.level=debug",
"-deletion-interval=0s",
"-delay-between-sts=0s",
}
if webhook {
args = append(args,
Expand Down
72 changes: 30 additions & 42 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ type RolloutController struct {
stopCh chan struct{}

//deletion interval related
deletionInterval time.Duration
//TODO: convert this to a map of rollout group to deletion ready name
deletionReadyTime time.Time
delayBetweenSts time.Duration
lastUpdatedSts *v1.StatefulSet
lastUpdatedTime time.Time

// Metrics.
groupReconcileTotal *prometheus.CounterVec
Expand All @@ -82,7 +82,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -114,7 +114,9 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
deletionInterval: deletionInterval,
delayBetweenSts: delayBetweenSts,
lastUpdatedSts: nil,
lastUpdatedTime: time.Time{},
groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_group_reconciles_total",
Help: "Total number of reconciles started for a specific rollout group.",
Expand Down Expand Up @@ -294,17 +296,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
}
}

//if the cluster is stable, and deletion ready time is not set, then set the deletion ready time
if len(notReadySets) == 0 && c.deletionReadyTime.IsZero() {
c.deletionReadyTime = time.Now().Add(c.deletionInterval)
level.Info(c.logger).Log("msg", "sts group healthy, setting deletion ready time if empty ", "deletionReadyTime", c.deletionReadyTime)

//reset deletionReadyTime since the cluster is not ready anymore
} else if len(notReadySets) != 0 && !c.deletionReadyTime.IsZero() {
c.deletionReadyTime = time.Time{}
level.Info(c.logger).Log("msg", "sts group unhealthy, reset non-empty deletion ready time ", "deletionReadyTime", c.deletionReadyTime)
}

// Ensure there are not 2+ StatefulSets with not-Ready pods. If there are, we shouldn't proceed
// rolling out pods and we should wait until these pods are Ready. The reason is that if there are
// unavailable pods in multiple StatefulSets, this could lead to an outage, so we want pods to
Expand All @@ -319,19 +310,29 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
if len(notReadySets) == 1 {
level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name)
sets = util.MoveStatefulSetToFront(sets, notReadySets[0])
// sts could become not ready by other activities like UKI, we should consider this as last updated sts
c.updateLastUpdatedSts(notReadySets[0])
}

for _, sts := range sets {
if !c.canUpdateSts(sts) {
level.Info(c.logger).Log("msg", "delaying reconcile due to last updated sts is not ready yet",
"last", c.lastUpdatedSts.Name, "curr", sts.Name, "delay", c.delayBetweenSts)
time.Sleep(c.delayBetweenSts)
return nil
}
ongoing, err := c.updateStatefulSetPods(ctx, sts)
if err != nil {
// Do not continue with other StatefulSets because this StatefulSet
// is expected to be successfully updated before proceeding.
c.updateLastUpdatedSts(sts)
return errors.Wrapf(err, "failed to update StatefulSet %s", sts.Name)
}

if ongoing {
// Do not continue with other StatefulSets because this StatefulSet
// update is still ongoing.
c.updateLastUpdatedSts(sts)
return nil
}
}
Expand Down Expand Up @@ -505,6 +506,19 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
return pods, nil
}

func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool {
if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name {
// no need to wait within the same sts updates
return true
}
return time.Since(c.lastUpdatedTime) > c.delayBetweenSts
}

func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) {
c.lastUpdatedSts = sts
c.lastUpdatedTime = time.Now()
}

func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)

Expand Down Expand Up @@ -535,26 +549,10 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
return true, nil
}

deletionHappened := false

now := time.Now()
// Check if deletionReadyTime is set
if c.deletionReadyTime.IsZero() {
// If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval
c.deletionReadyTime = now.Add(c.deletionInterval)
level.Info(c.logger).Log("msg", "Scheduled future deletion for sts pods", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime)
}

if now.Before(c.deletionReadyTime) {
level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime)
return true, nil // Not yet time to delete; skip this loop iteration
}

for _, pod := range podsToUpdate[:numPods] {
// Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking
// if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something
// similar too).

if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp != nil {
level.Debug(c.logger).Log("msg", fmt.Sprintf("waiting for pod %s to be terminated", pod.Name))
continue
Expand All @@ -563,17 +561,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name))
if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name)
} else {
deletionHappened = true
level.Info(c.logger).Log("msg", fmt.Sprintf("pod %s successfully terminated", pod.Name), "deletionReadyTime", c.deletionReadyTime)
}

}

//make sure no other pods can be deleted
if deletionHappened {
c.deletionReadyTime = time.Time{}
level.Info(c.logger).Log("msg", "reset deletion ready time since deletion just happened")
}

return true, nil
Expand Down

0 comments on commit 9d7b864

Please sign in to comment.