Skip to content

Commit

Permalink
Merge pull request #5 from databricks/PLAT-111199
Browse files Browse the repository at this point in the history
 [PLAT-111199] Terminate Statefulset only if enough pods are available
  • Loading branch information
yuchen-db authored Jul 3, 2024
2 parents 72f7399 + 97779c6 commit dd54955
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 57 deletions.
6 changes: 1 addition & 5 deletions cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type config struct {

useZoneTracker bool
zoneTrackerConfigMapName string
deletionInterval time.Duration
delayBetweenSts time.Duration
}

func (cfg *config) register(fs *flag.FlagSet) {
Expand All @@ -90,8 +88,6 @@ func (cfg *config) register(fs *flag.FlagSet) {

fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones")
fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker")
fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod")
fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets")
}

func (cfg config) validate() error {
Expand Down Expand Up @@ -175,7 +171,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
1 change: 0 additions & 1 deletion integration/manifests_mock_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl
_, err := api.AppsV1().StatefulSets(namespace).Create(ctx, mockServiceStatefulSet(name, "1", true), metav1.CreateOptions{})
require.NoError(t, err, "Can't create StatefulSet")
}

{
_, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceService(name), metav1.CreateOptions{})
require.NoError(t, err, "Can't create Service")
Expand Down
1 change: 0 additions & 1 deletion integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen
fmt.Sprintf("-kubernetes.namespace=%s", namespace),
"-reconcile.interval=1s",
"-log.level=debug",
"-delay-between-sts=0s",
}
if webhook {
args = append(args,
Expand Down
60 changes: 17 additions & 43 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ type RolloutController struct {
// Used to signal when the controller should stop.
stopCh chan struct{}

//deletion interval related
delayBetweenSts time.Duration
lastUpdatedSts *v1.StatefulSet
lastUpdatedTime time.Time

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
Expand All @@ -82,7 +77,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -114,9 +109,6 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
delayBetweenSts: delayBetweenSts,
lastUpdatedSts: nil,
lastUpdatedTime: time.Time{},
groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_group_reconciles_total",
Help: "Total number of reconciles started for a specific rollout group.",
Expand Down Expand Up @@ -217,7 +209,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")
level.Info(c.logger).Log("msg", "reconcile started (minReadySeconds version)")

sets, err := c.listStatefulSetsWithRolloutGroup()
if err != nil {
Expand Down Expand Up @@ -310,8 +302,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
if len(notReadySets) == 1 {
level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name)
sets = util.MoveStatefulSetToFront(sets, notReadySets[0])
// sts could become not ready by other activities like UKI, we should consider this as last updated sts
c.updateLastUpdatedSts(notReadySets[0])
}

for _, sts := range sets {
Expand All @@ -325,7 +315,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
if ongoing {
// Do not continue with other StatefulSets because this StatefulSet
// update is still ongoing.
c.updateLastUpdatedSts(sts)
return nil
}
}
Expand Down Expand Up @@ -499,20 +488,6 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
return pods, nil
}

func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool {
if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name {
// no need to wait within the same sts updates
return true
}
return time.Since(c.lastUpdatedTime) > c.delayBetweenSts
}

func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) {
c.lastUpdatedSts = sts
c.lastUpdatedTime = time.Now()
level.Debug(c.logger).Log("msg", "updated last updated sts", "sts", sts.Name, "time", c.lastUpdatedTime)
}

func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)

Expand All @@ -522,26 +497,19 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
}

if len(podsToUpdate) > 0 {
if !c.canUpdateSts(sts) {
// only check canUpdateSts if the current sts has pods to be updated
level.Info(c.logger).Log("msg", "delaying reconcile between StatefulSets updates",
"curr", sts.Name, "last", c.lastUpdatedSts.Name,
"delay", c.delayBetweenSts, "pods_to_update", len(podsToUpdate))
time.Sleep(c.delayBetweenSts)
// MUST return here:
// 1. pods state could change during the delay period, scan the entire cluster again
// 2. since we didn't actually update current sts, the last updated sts should not be changed
// 3. throw errors to not proceed with other StatefulSets
return false, errors.New("delaying reconcile between StatefulSets updates")
}

maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger)
numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas)
var numNotAvailable int
if sts.Spec.MinReadySeconds > 0 {
level.Info(c.logger).Log("msg", "StatefulSet has minReadySeconds set, waiting before terminating pods", "statefulset", sts.Name, "min_ready_seconds", sts.Spec.MinReadySeconds)
numNotAvailable = int(sts.Status.Replicas - sts.Status.AvailableReplicas)
} else {
numNotAvailable = int(sts.Status.Replicas - sts.Status.ReadyReplicas)
}

// Compute the number of pods we should update, honoring the configured maxUnavailable.
numPods := max(0, min(
maxUnavailable-numNotReady, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
maxUnavailable-numNotAvailable, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
))

if numPods == 0 {
Expand All @@ -551,6 +519,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"pods_to_update", len(podsToUpdate),
"replicas", sts.Status.Replicas,
"ready_replicas", sts.Status.ReadyReplicas,
"available_replicas", sts.Status.AvailableReplicas,
"max_unavailable", maxUnavailable)

return true, nil
Expand Down Expand Up @@ -584,6 +553,11 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"statefulset", sts.Name)

return true, nil
} else if sts.Spec.MinReadySeconds > 0 && sts.Status.Replicas != sts.Status.AvailableReplicas {
level.Info(c.logger).Log(
"msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas",
"statefulset", sts.Name)
return true, nil
}

// At this point there are no pods to update, so we can update the currentRevision in the StatefulSet.
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 2
sts.Status.AvailableReplicas = 2
}),
},
pods: []runtime.Object{
Expand All @@ -183,6 +184,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 1
sts.Status.AvailableReplicas = 1
}),
},
pods: []runtime.Object{
Expand Down Expand Up @@ -540,7 +542,7 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -825,7 +827,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -928,7 +930,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1010,10 +1012,11 @@ func mockStatefulSet(name string, overrides ...func(sts *v1.StatefulSet)) *v1.St
},
},
Status: v1.StatefulSetStatus{
Replicas: 3,
ReadyReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
Replicas: 3,
ReadyReplicas: 3,
AvailableReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
},
}

Expand Down Expand Up @@ -1067,6 +1070,7 @@ func withReplicas(totalReplicas, readyReplicas int32) func(sts *v1.StatefulSet)
sts.Spec.Replicas = &totalReplicas
sts.Status.Replicas = totalReplicas
sts.Status.ReadyReplicas = readyReplicas
sts.Status.AvailableReplicas = readyReplicas
}
}

Expand Down

0 comments on commit dd54955

Please sign in to comment.