Skip to content

Commit

Permalink
support specified-delete in asts
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Sep 12, 2024
1 parent be1a79e commit 16b2ecf
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 3 deletions.
51 changes: 49 additions & 2 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
)

// Realistic value for maximum in-flight requests when processing in parallel mode.
Expand Down Expand Up @@ -622,11 +623,17 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}
}

// handle specified deleted pod under maxUnavailable constrain
// NOTE: specified deletion is not constraint by partition setting
specifiedDeletedPods, err := ssc.handleSpecifiedDeletedPods(set, status, currentRevision, updateRevision, replicas, maxUnavailable, unavailablePods)
if err != nil {
return status, err

Check warning on line 630 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L630

Added line #L630 was not covered by tests
}

updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes)
// update pods in sequence
for _, target := range updateIndexes {

// the target is already up-to-date, go to next
if getPodRevision(replicas[target]) == updateRevision.Name {
continue
Expand Down Expand Up @@ -667,7 +674,7 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}

// delete the Pod if it is not already terminating and does not match the update revision.
if !isTerminating(replicas[target]) {
if !specifiedDeletedPods.Has(replicas[target].Name) && !isTerminating(replicas[target]) {
// todo validate in-place for pub
inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions)
if inplaceUpdateErr != nil {
Expand All @@ -691,6 +698,46 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
return status, nil
}

func (ssc *defaultStatefulSetControl) handleSpecifiedDeletedPods(
set *appsv1beta1.StatefulSet,
status *appsv1beta1.StatefulSetStatus,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
replicas []*v1.Pod,
maxUnavailable int,
unavailablePods sets.String) (sets.String, error) {
specifiedDeletedPods := sets.NewString()
for target := len(replicas) - 1; target >= 0; target-- {
if replicas[target] == nil || !specifieddelete.IsSpecifiedDelete(replicas[target]) {
continue
}
// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// why skip here rather than return?
// case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready
// when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't
// pod 2 and pod 1 can be deleted because they were unavailable
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
klog.V(4).InfoS("StatefulSet was waiting for unavailable Pods to update, blocked pod",
"statefulSet", klog.KObj(set), "unavailablePods", unavailablePods.List(), "blockedPod", klog.KObj(replicas[target]))
continue

Check warning on line 722 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L720-L722

Added lines #L720 - L722 were not covered by tests
}

specifiedDeletedPods.Insert(replicas[target].Name)
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
return specifiedDeletedPods, err

Check warning on line 727 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L727

Added line #L727 was not covered by tests
}
// mark target as unavailable because it's updating
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--

Check warning on line 733 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L733

Added line #L733 was not covered by tests
} else if getPodRevision(replicas[target]) == updateRevision.Name {
status.UpdatedReplicas--

Check warning on line 735 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L735

Added line #L735 was not covered by tests
}
}
return specifiedDeletedPods, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) {
if set.Spec.Lifecycle != nil && lifecycle.IsPodHooked(set.Spec.Lifecycle.PreDelete, pod) {
markPodNotReady := set.Spec.Lifecycle.PreDelete.MarkPodNotReady
Expand Down
130 changes: 130 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -54,6 +55,7 @@ import (
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -2215,6 +2217,119 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(3)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to specified delete
originalPods, err := spc.setPodSpecifiedDelete(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is specified deleted
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// inplace update 5 and create 0
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
sort.Sort(ascendingOrdinal(pods))
_, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 is old image and pod 5/4 is new image
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx")

// set pod 1/2/5 to specified deleted and pod 0/4/5 to ready
spc.setPodSpecifiedDelete(set, 1)
spc.setPodSpecifiedDelete(set, 2)
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 5)
sort.Sort(ascendingOrdinal(originalPods))

// create new pod for 1/2/5, do not update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

_, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx")
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -3119,6 +3234,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi
return om.podsLister.Pods(set.Namespace).List(selector)
}

func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return om.podsLister.Pods(set.Namespace).List(selector)
}

var _ StatefulPodControlObjectManager = &fakeObjectManager{}

type fakeStatefulSetStatusUpdater struct {
Expand Down
138 changes: 138 additions & 0 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,36 @@ var _ = SIGDescribe("AppStatefulSetStorage", func() {
}
validateExpandVCT(2, injectFn, updateFn, true)
})

framework.ConformanceIt("should perform rolling updates with specified-deleted with pvc", func() {
ginkgo.By("Creating a new StatefulSet")
vms := []v1.VolumeMount{}
for i := 0; i < 1; i++ {
vms = append(vms, v1.VolumeMount{
Name: fmt.Sprintf("data%d", i),
MountPath: fmt.Sprintf("/data%d", i),
})
}
ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 4, vms, nil, labels)
injectSC(appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType, ss, appsv1beta1.OnPodRollingUpdateVolumeClaimUpdateStrategyType, canExpandSC)

updateFn := func(update *appsv1beta1.StatefulSet) {
update.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("20")
update.Spec.VolumeClaimUpdateStrategy = appsv1beta1.VolumeClaimUpdateStrategy{
Type: appsv1beta1.OnPodRollingUpdateVolumeClaimUpdateStrategyType,
}
resizeVCT(update, newSize, 2)
}
testWithSpecifiedDeleted(c, kc, ns, ss, updateFn)
notEqualPVCNum := 0
waitForPVCCapacity(context.TODO(), c, kc, ss, func(pvc, template resource.Quantity) bool {
if pvc.Cmp(template) != 0 {
notEqualPVCNum++
}
return true
})
gomega.Expect(2).To(gomega.Equal(notEqualPVCNum))
})
})

ginkgo.Describe("Resize PVC with rollback", func() {
Expand Down Expand Up @@ -1659,6 +1689,16 @@ var _ = SIGDescribe("StatefulSet", func() {
gomega.Expect(pods.Items[i].Labels["test-update"]).To(gomega.Equal("yes"))
}
})

/*
Testname: StatefulSet, Specified delete
Description: Specified delete pod MUST under maxUnavailable constrain.
*/
framework.ConformanceIt("should perform rolling updates with specified-deleted", func() {
ginkgo.By("Creating a new StatefulSet")
ss = framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels)
testWithSpecifiedDeleted(c, kc, ns, ss)
})
})

//ginkgo.Describe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() {
Expand Down Expand Up @@ -2799,3 +2839,101 @@ func waitForPVCCapacity(ctx context.Context, c clientset.Interface, kc kruisecli
return true, nil
})
}

// This function is used by two tests to test StatefulSet rollbacks: one using
// PVCs and one using no storage.
func testWithSpecifiedDeleted(c clientset.Interface, kc kruiseclientset.Interface, ns string, ss *appsv1beta1.StatefulSet,
fns ...func(update *appsv1beta1.StatefulSet)) {
sst := framework.NewStatefulSetTester(c, kc)
*(ss.Spec.Replicas) = 4
ss, err := kc.AppsV1beta1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
ss = sst.WaitForStatus(ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
gomega.Expect(currentRevision).To(gomega.Equal(updateRevision),
fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s",
ss.Namespace, ss.Name, updateRevision, currentRevision))
pods := sst.GetPodList(ss)
for i := range pods.Items {
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision),
fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel],
currentRevision))
}
specifiedDeletePod := func(idx int) {
sst.SortStatefulPods(pods)
oldUid := pods.Items[idx].UID
err = setPodSpecifiedDelete(c, ns, pods.Items[idx].Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ss = sst.WaitForStatus(ss)
name := pods.Items[idx].Name
// wait be deleted
sst.WaitForState(ss, func(set2 *appsv1beta1.StatefulSet, pods2 *v1.PodList) (bool, error) {
ss = set2
pods = pods2
for i := range pods.Items {
if pods.Items[i].Name == name {
return pods.Items[i].UID != oldUid, nil
}
}
return false, nil
})
sst.WaitForPodReady(ss, pods.Items[idx].Name)
pods = sst.GetPodList(ss)
sst.SortStatefulPods(pods)
}
specifiedDeletePod(1)
newImage := NewNginxImage
oldImage := ss.Spec.Template.Spec.Containers[0].Image

ginkgo.By(fmt.Sprintf("Updating StatefulSet template: update image from %s to %s", oldImage, newImage))
gomega.Expect(oldImage).NotTo(gomega.Equal(newImage), "Incorrect test setup: should update to a different image")
var partition int32 = 2
ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Image = newImage
if update.Spec.UpdateStrategy.RollingUpdate == nil {
update.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{}
}
update.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
}
for _, fn := range fns {
fn(update)
}
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
specifiedDeletePod(2)

ginkgo.By("Creating a new revision")
ss = sst.WaitForStatus(ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
gomega.Expect(currentRevision).NotTo(gomega.Equal(updateRevision),
"Current revision should not equal update revision during rolling update")
specifiedDeletePod(1)
for i := range pods.Items {
if i >= int(partition) {
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(updateRevision),
fmt.Sprintf("Pod %s/%s revision %s is not equal to updated revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel],
updateRevision))
} else {
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision),
fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel],
currentRevision))
}
}
}

func setPodSpecifiedDelete(c clientset.Interface, ns, name string) error {
_, err := c.CoreV1().Pods(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(`{"metadata":{"labels":{"apps.kruise.io/specified-delete":"true"}}}`), metav1.PatchOptions{})
return err
}
Loading

0 comments on commit 16b2ecf

Please sign in to comment.