Skip to content

Commit

Permalink
support specified-delete in asts
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Sep 11, 2024
1 parent be1a79e commit cf50f43
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 1 deletion.
28 changes: 27 additions & 1 deletion pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
)

// Realistic value for maximum in-flight requests when processing in parallel mode.
Expand Down Expand Up @@ -622,11 +623,36 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}
}

// handle specified deleted pod, and protected by maxUnavailable
for target := len(replicas) - 1; target >= 0; target-- {
if replicas[target] == nil {
continue
}
// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// why skip here rather than return?
// case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready
// when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't
// pod 2 and pod1 can be deleted because they were unavailable
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
continue
}
if specifieddelete.IsSpecifiedDelete(replicas[target]) {
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
return status, err

Check warning on line 641 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L641

Added line #L641 was not covered by tests
}
// mark target as unavailable because it's updated
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--

Check warning on line 647 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L647

Added line #L647 was not covered by tests
}
continue
}
}
updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes)
// update pods in sequence
for _, target := range updateIndexes {

// the target is already up-to-date, go to next
if getPodRevision(replicas[target]) == updateRevision.Name {
continue
Expand Down
130 changes: 130 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -54,6 +55,7 @@ import (
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -2215,6 +2217,119 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(3)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to specified delete
originalPods, err := spc.setPodSpecifiedDelete(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is specified deleted
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// inplace update 5 and create 0
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
sort.Sort(ascendingOrdinal(pods))
_, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 is old image and pod 5/4 is new image
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx")

// set pod 1/2/5 to specified deleted and pod 0/4/5 to ready
spc.setPodSpecifiedDelete(set, 1)
spc.setPodSpecifiedDelete(set, 2)
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 5)
sort.Sort(ascendingOrdinal(originalPods))

// create new pod for 1/2/5, do not update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

_, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx")
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -3119,6 +3234,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi
return om.podsLister.Pods(set.Namespace).List(selector)
}

func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return om.podsLister.Pods(set.Namespace).List(selector)
}

var _ StatefulPodControlObjectManager = &fakeObjectManager{}

type fakeStatefulSetStatusUpdater struct {
Expand Down

0 comments on commit cf50f43

Please sign in to comment.