Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rollouts getting stuck due to bad rs informer updates #3200

Merged
merged 10 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions rollout/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ func (c *rolloutContext) removeScaleDownDelay(rs *appsv1.ReplicaSet) error {
}
patch := fmt.Sprintf(removeScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error removing scale-down-deadline annotation from RS '%s': %w", rs.Name, err)
}
c.log.Infof("Removed '%s' annotation from RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error updating replicaset informer in removeScaleDownDelay: %w", err)
}
return err
}
Expand All @@ -60,9 +64,13 @@ func (c *rolloutContext) addScaleDownDelay(rs *appsv1.ReplicaSet, scaleDownDelay
deadline := timeutil.MetaNow().Add(scaleDownDelaySeconds).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, deadline)
rs, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(ctx, rs.Name, patchtypes.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err == nil {
c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds)
c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error adding scale-down-deadline annotation to RS '%s': %w", rs.Name, err)
}
c.log.Infof("Set '%s' annotation on '%s' to %s (%s)", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name, deadline, scaleDownDelaySeconds)
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return fmt.Errorf("error updating replicaset informer in addScaleDownDelay: %w", err)
}
return err
}
Expand Down
45 changes: 42 additions & 3 deletions rollout/replicaset_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rollout

import (
"fmt"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -134,8 +135,9 @@ func TestReconcileNewReplicaSet(t *testing.T) {
abortScaleDownAnnotated bool
abortScaleDownDelayPassed bool
expectedNewReplicas int
failRSUpdate bool
abort bool
}{

{
name: "New Replica Set matches rollout replica: No scale",
rolloutReplicas: 10,
Expand Down Expand Up @@ -163,6 +165,7 @@ func TestReconcileNewReplicaSet(t *testing.T) {
newReplicas: 10,
// ScaleDownOnAbort: true,
abortScaleDownDelaySeconds: 5,
abort: true,
abortScaleDownAnnotated: true,
abortScaleDownDelayPassed: true,
scaleExpected: true,
Expand All @@ -174,6 +177,7 @@ func TestReconcileNewReplicaSet(t *testing.T) {
newReplicas: 8,
// ScaleDownOnAbort: true,
abortScaleDownDelaySeconds: 5,
abort: true,
abortScaleDownAnnotated: true,
abortScaleDownDelayPassed: false,
scaleExpected: false,
Expand All @@ -184,10 +188,20 @@ func TestReconcileNewReplicaSet(t *testing.T) {
rolloutReplicas: 10,
newReplicas: 10,
abortScaleDownDelaySeconds: 5,
abort: true,
abortScaleDownAnnotated: false,
scaleExpected: false,
expectedNewReplicas: 0,
},
{
name: "Fail to update RS: No scale and add default annotation",
rolloutReplicas: 10,
newReplicas: 10,
scaleExpected: false,
failRSUpdate: true,
abort: true,
abortScaleDownDelaySeconds: -1,
},
}
for i := range tests {
test := tests[i]
Expand All @@ -199,6 +213,12 @@ func TestReconcileNewReplicaSet(t *testing.T) {
fake := fake.Clientset{}
k8sfake := k8sfake.Clientset{}

if test.failRSUpdate {
k8sfake.PrependReactor("patch", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
return true, &appsv1.ReplicaSet{}, fmt.Errorf("should not patch replica set")
})
}

f := newFixture(t)
defer f.Close()
f.objects = append(f.objects, rollout)
Expand Down Expand Up @@ -227,14 +247,21 @@ func TestReconcileNewReplicaSet(t *testing.T) {
},
}
roCtx.enqueueRolloutAfter = func(obj any, duration time.Duration) {}

rollout.Status.Abort = test.abort
roCtx.stableRS.Status.AvailableReplicas = int32(test.rolloutReplicas)
rollout.Spec.Strategy = v1alpha1.RolloutStrategy{
BlueGreen: &v1alpha1.BlueGreenStrategy{
AbortScaleDownDelaySeconds: &test.abortScaleDownDelaySeconds,
},
}

if test.abortScaleDownDelaySeconds > 0 {
rollout.Status.Abort = true
rollout.Spec.Strategy = v1alpha1.RolloutStrategy{
BlueGreen: &v1alpha1.BlueGreenStrategy{
AbortScaleDownDelaySeconds: &test.abortScaleDownDelaySeconds,
},
}

if test.abortScaleDownAnnotated {
var deadline string
if test.abortScaleDownDelayPassed {
Expand All @@ -246,7 +273,19 @@ func TestReconcileNewReplicaSet(t *testing.T) {
}
}

if test.abortScaleDownDelaySeconds < 0 {
rollout.Spec.Strategy = v1alpha1.RolloutStrategy{
BlueGreen: &v1alpha1.BlueGreenStrategy{
AbortScaleDownDelaySeconds: nil,
},
}
}

scaled, err := roCtx.reconcileNewReplicaSet()
if test.failRSUpdate {
assert.Error(t, err)
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
return
Expand Down
18 changes: 15 additions & 3 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (c *rolloutContext) syncReplicaSetRevision() (*appsv1.ReplicaSet, error) {
return nil, fmt.Errorf("error updating replicaset revision: %v", err)
}
c.log.Infof("Synced revision on ReplicaSet '%s' to '%s'", rs.Name, newRevision)
c.replicaSetInformer.GetIndexer().Update(rs)
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
return nil, fmt.Errorf("error updating replicaset informer in syncReplicaSetRevision: %w", err)
}
return rs, nil
}

Expand Down Expand Up @@ -372,12 +375,21 @@ func (c *rolloutContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32,
if fullScaleDown && !c.shouldDelayScaleDownOnAbort() {
delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
}

rs, err = c.kubeclientset.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
if err != nil {
return scaled, rs, fmt.Errorf("error updating replicaset %s: %w", rs.Name, err)
}
err = c.replicaSetInformer.GetIndexer().Update(rs)
if err != nil {
err = fmt.Errorf("error updating replicaset informer in scaleReplicaSet: %w", err)
return scaled, rs, err
}

if sizeNeedsUpdate {
scaled = true
revision, _ := replicasetutil.Revision(rs)
c.recorder.Eventf(rollout, record.EventOptions{EventReason: conditions.ScalingReplicaSetReason}, conditions.ScalingReplicaSetMessage, scalingOperation, rs.Name, revision, oldScale, newScale)
c.replicaSetInformer.GetIndexer().Update(rs)
}
}
return scaled, rs, err
Expand Down
Loading