Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support specified-delete in asts #1734

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
)

// Realistic value for maximum in-flight requests when processing in parallel mode.
Expand Down Expand Up @@ -622,11 +623,17 @@
}
}

// handle specified deleted pod under maxUnavailable constrain
// NOTE: specified deletion is not constraint by partition setting
specifiedDeletedPods, err := ssc.handleSpecifiedDeletedPods(set, status, currentRevision, updateRevision, replicas, maxUnavailable, unavailablePods)
if err != nil {
return status, err

Check warning on line 630 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L630

Added line #L630 was not covered by tests
}

updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes)
// update pods in sequence
for _, target := range updateIndexes {

// the target is already up-to-date, go to next
if getPodRevision(replicas[target]) == updateRevision.Name {
continue
Expand Down Expand Up @@ -667,22 +674,26 @@
}

// delete the Pod if it is not already terminating and does not match the update revision.
if !isTerminating(replicas[target]) {
if !specifiedDeletedPods.Has(replicas[target].Name) && !isTerminating(replicas[target]) {
// todo validate in-place for pub
inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions)
if inplaceUpdateErr != nil {
return status, inplaceUpdateErr
}
// if pod is inplacing or actual deleting, decrease revision
revisionNeedDecrease := inplacing
ABNER-1 marked this conversation as resolved.
Show resolved Hide resolved
if !inplacing {
klog.V(2).InfoS("StatefulSet terminating Pod for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return status, err
} else {
revisionNeedDecrease = actualDeleting
}
}
// mark target as unavailable because it's updated
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
if revisionNeedDecrease && getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--
}
}
Expand All @@ -691,22 +702,63 @@
return status, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) {
func (ssc *defaultStatefulSetControl) handleSpecifiedDeletedPods(
set *appsv1beta1.StatefulSet,
status *appsv1beta1.StatefulSetStatus,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
replicas []*v1.Pod,
maxUnavailable int,
unavailablePods sets.String) (sets.String, error) {
specifiedDeletedPods := sets.NewString()
for target := len(replicas) - 1; target >= 0; target-- {
if replicas[target] == nil || !specifieddelete.IsSpecifiedDelete(replicas[target]) {
continue
}
// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// why skip here rather than return?
// case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready
// when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't
// pod 2 and pod 1 can be deleted because they were unavailable
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
klog.V(4).InfoS("StatefulSet was waiting for unavailable Pods to update, blocked pod",
"statefulSet", klog.KObj(set), "unavailablePods", unavailablePods.List(), "blockedPod", klog.KObj(replicas[target]))
continue
}

specifiedDeletedPods.Insert(replicas[target].Name)
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return specifiedDeletedPods, err

Check warning on line 731 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L731

Added line #L731 was not covered by tests
} else if actualDeleting {
// if actual deleted, update revision count in status
if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--

Check warning on line 735 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L735

Added line #L735 was not covered by tests
} else if getPodRevision(replicas[target]) == updateRevision.Name {
status.UpdatedReplicas--

Check warning on line 737 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L737

Added line #L737 was not covered by tests
}
}
// mark target as unavailable because it's deleting or pre-deleting
unavailablePods.Insert(replicas[target].Name)
}
return specifiedDeletedPods, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (modified, actualDeleting bool, err error) {
if set.Spec.Lifecycle != nil && lifecycle.IsPodHooked(set.Spec.Lifecycle.PreDelete, pod) {
markPodNotReady := set.Spec.Lifecycle.PreDelete.MarkPodNotReady
if updated, _, err := ssc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete, markPodNotReady); err != nil {
return false, err
return false, false, err

Check warning on line 750 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L750

Added line #L750 was not covered by tests
} else if updated {
klog.V(3).InfoS("StatefulSet scaling update pod lifecycle to PreparingDelete", "statefulSet", klog.KObj(set), "pod", klog.KObj(pod))
return true, nil
return true, false, nil

Check warning on line 753 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L753

Added line #L753 was not covered by tests
}
return false, nil
return false, false, nil

Check warning on line 755 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L755

Added line #L755 was not covered by tests
}
if err := ssc.podControl.DeleteStatefulPod(set, pod); err != nil {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return false, err
return false, false, err
}
return true, nil
return true, true, nil
}

func (ssc *defaultStatefulSetControl) refreshPodState(set *appsv1beta1.StatefulSet, pod *v1.Pod, updateRevision string) (bool, time.Duration, error) {
Expand Down Expand Up @@ -992,7 +1044,7 @@
logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))

modified, err := ssc.deletePod(set, condemned[i])
modified, _, err := ssc.deletePod(set, condemned[i])
if err != nil || (monotonic && modified) {
return true, err
}
Expand Down Expand Up @@ -1035,7 +1087,7 @@
// regardless of the exit code.
if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
if replicas[i].DeletionTimestamp == nil {
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
if _, _, err := ssc.deletePod(set, replicas[i]); err != nil {

Check warning on line 1090 in pkg/controller/statefulset/stateful_set_control.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/statefulset/stateful_set_control.go#L1090

Added line #L1090 was not covered by tests
return true, false, err
}
}
Expand Down
164 changes: 164 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -54,6 +55,7 @@ import (
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -2215,6 +2217,153 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(3)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to specified delete
originalPods, err := spc.setPodSpecifiedDelete(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is specified deleted
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// inplace update 5 and create 0
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
sort.Sort(ascendingOrdinal(pods))
_, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 is old image and pod 5/4 is new image
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx")

// set pod 1/2/5 to specified deleted and pod 0/4/5 to ready
spc.setPodSpecifiedDelete(set, 0)
spc.setPodSpecifiedDelete(set, 1)
spc.setPodSpecifiedDelete(set, 2)
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 5)
sort.Sort(ascendingOrdinal(originalPods))

// create new pod for 1/2/5, do not update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

_, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 still undeleted
_, exist = pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, exist)
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx")

// set pod 3 to specified deleted and all pod to ready => pod3 will be deleted and updated
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 3)
sort.Sort(ascendingOrdinal(originalPods))
// create new pod for 3, do not inplace-update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
assert.Equal(t, pods[3].Spec.Containers[0].Image, "foo")
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -3119,6 +3268,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi
return om.podsLister.Pods(set.Namespace).List(selector)
}

func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return om.podsLister.Pods(set.Namespace).List(selector)
}

var _ StatefulPodControlObjectManager = &fakeObjectManager{}

type fakeStatefulSetStatusUpdater struct {
Expand Down
Loading
Loading