Skip to content

Commit

Permalink
support specified-delete in asts (#1734)
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <[email protected]>
  • Loading branch information
ABNER-1 committed Sep 19, 2024
1 parent a67947e commit 1dd8871
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 19 deletions.
85 changes: 67 additions & 18 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
intstrutil "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/history"
Expand All @@ -42,6 +43,7 @@ import (
imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
)

// StatefulSetControlInterface implements the control logic for updating StatefulSets and their children Pods. It is implemented
Expand Down Expand Up @@ -499,7 +501,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
set.Namespace,
set.Name,
replicas[i].Name)
if _, err := ssc.deletePod(set, replicas[i]); err != nil {
if _, _, err := ssc.deletePod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
Expand Down Expand Up @@ -684,7 +686,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
set.Name,
condemned[target].Name)

modified, err := ssc.deletePod(set, condemned[target])
modified, _, err := ssc.deletePod(set, condemned[target])
if err != nil || modified {
return &status, err
}
Expand Down Expand Up @@ -796,11 +798,17 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}
}

// handle specified deleted pod under maxUnavailable constrain
// NOTE: specified deletion is not constraint by partition setting
specifiedDeletedPods, err := ssc.handleSpecifiedDeletedPods(set, status, currentRevision, updateRevision, replicas, maxUnavailable, unavailablePods)
if err != nil {
return status, err
}

updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas)
klog.V(3).Infof("Prepare to update pods indexes %v for StatefulSet %s", updateIndexes, getStatefulSetKey(set))
// update pods in sequence
for _, target := range updateIndexes {

// the target is already up-to-date, go to next
if getPodRevision(replicas[target]) == updateRevision.Name {
continue
Expand All @@ -819,25 +827,26 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
}

// delete the Pod if it is not already terminating and does not match the update revision.
if !isTerminating(replicas[target]) {
if !specifiedDeletedPods.Has(replicas[target].Name) && !isTerminating(replicas[target]) {
// todo validate in-place for pub
inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions)
if inplaceUpdateErr != nil {
return status, inplaceUpdateErr
}
// if pod is inplacing or actual deleting, decrease revision
revisionNeedDecrease := inplacing
if !inplacing {
klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
set.Namespace,
set.Name,
replicas[target].Name)
if _, err := ssc.deletePod(set, replicas[target]); err != nil {
klog.V(2).InfoS("StatefulSet terminating Pod for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return status, err
} else {
revisionNeedDecrease = actualDeleting
}
}
// mark target as unavailable because it's updated
unavailablePods.Insert(replicas[target].Name)

if getPodRevision(replicas[target]) == currentRevision.Name {
if revisionNeedDecrease && getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--
}
}
Expand All @@ -846,23 +855,63 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods(
return status, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) {
func (ssc *defaultStatefulSetControl) handleSpecifiedDeletedPods(
set *appsv1beta1.StatefulSet,
status *appsv1beta1.StatefulSetStatus,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
replicas []*v1.Pod,
maxUnavailable int,
unavailablePods sets.String) (sets.String, error) {
specifiedDeletedPods := sets.NewString()
for target := len(replicas) - 1; target >= 0; target-- {
if replicas[target] == nil || !specifieddelete.IsSpecifiedDelete(replicas[target]) {
continue
}
// the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it,
// why skip here rather than return?
// case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready
// when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't
// pod 2 and pod 1 can be deleted because they were unavailable
if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) {
klog.V(4).InfoS("StatefulSet was waiting for unavailable Pods to update, blocked pod",
"statefulSet", klog.KObj(set), "unavailablePods", unavailablePods.List(), "blockedPod", klog.KObj(replicas[target]))
continue
}

specifiedDeletedPods.Insert(replicas[target].Name)
if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil {
return specifiedDeletedPods, err
} else if actualDeleting {
// if actual deleted, update revision count in status
if getPodRevision(replicas[target]) == currentRevision.Name {
status.CurrentReplicas--
} else if getPodRevision(replicas[target]) == updateRevision.Name {
status.UpdatedReplicas--
}
}
// mark target as unavailable because it's deleting or pre-deleting
unavailablePods.Insert(replicas[target].Name)
}
return specifiedDeletedPods, nil
}

func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (modified, actualDeleting bool, err error) {
if set.Spec.Lifecycle != nil && lifecycle.IsPodHooked(set.Spec.Lifecycle.PreDelete, pod) {
markPodNotReady := set.Spec.Lifecycle.PreDelete.MarkPodNotReady
if updated, _, err := ssc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete, markPodNotReady); err != nil {
return false, err
return false, false, err
} else if updated {
klog.V(3).Infof("StatefulSet %s scaling update pod %s lifecycle to PreparingDelete",
getStatefulSetKey(set), pod.Name)
return true, nil
klog.V(3).InfoS("StatefulSet scaling update pod lifecycle to PreparingDelete", "statefulSet", klog.KObj(set), "pod", klog.KObj(pod))
return true, false, nil
}
return false, nil
return false, false, nil
}
if err := ssc.podControl.DeleteStatefulPod(set, pod); err != nil {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err)
return false, err
return false, false, err
}
return true, nil
return true, true, nil
}

func (ssc *defaultStatefulSetControl) refreshPodState(set *appsv1beta1.StatefulSet, pod *v1.Pod, updateRevision string) (bool, time.Duration, error) {
Expand Down
164 changes: 164 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -50,6 +51,7 @@ import (
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -2211,6 +2213,153 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) {
}
}

func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) {
set := burst(newStatefulSet(6))
var partition int32 = 3
var maxUnavailable = intstr.FromInt(3)
set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy {
return &appsv1beta1.RollingUpdateStatefulSetStrategy{
Partition: &partition,
MaxUnavailable: &maxUnavailable,
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
}(),
}

client := fake.NewSimpleClientset()
kruiseClient := kruisefake.NewSimpleClientset(set)
spc, _, ssc, stop := setupController(client, kruiseClient)
defer close(stop)
if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
t.Fatal(err)
}
set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Fatal(err)
}
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Fatal(err)
}

// set pod 0 to specified delete
originalPods, err := spc.setPodSpecifiedDelete(set, 0)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(originalPods))

// start to update
set.Spec.Template.Spec.Containers[0].Image = "foo"

// first update pod 5 only because pod 0 is specified deleted
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// inplace update 5 and create 0
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
sort.Sort(ascendingOrdinal(pods))
_, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 is old image and pod 5/4 is new image
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx")

// set pod 1/2/5 to specified deleted and pod 0/4/5 to ready
spc.setPodSpecifiedDelete(set, 0)
spc.setPodSpecifiedDelete(set, 1)
spc.setPodSpecifiedDelete(set, 2)
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 5)
sort.Sort(ascendingOrdinal(originalPods))

// create new pod for 1/2/5, do not update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}

_, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
_, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, !exist)
// pod 0 still undeleted
_, exist = pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey]
assert.True(t, exist)
assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo")
assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx")
assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx")

// set pod 3 to specified deleted and all pod to ready => pod3 will be deleted and updated
for i := 0; i < 6; i++ {
spc.setPodRunning(set, i)
spc.setPodReady(set, i)
}
originalPods, _ = spc.setPodSpecifiedDelete(set, 3)
sort.Sort(ascendingOrdinal(originalPods))
// create new pod for 3, do not inplace-update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}

// create new pods 5 and inplace update 3
if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatal(err)
}
sort.Sort(ascendingOrdinal(pods))
if len(pods) != 6 {
t.Fatalf("Expected create pods 5, got pods %v", pods)
}
assert.Equal(t, pods[3].Spec.Containers[0].Image, "foo")
}

func TestStatefulSetControlInPlaceUpdate(t *testing.T) {
set := burst(newStatefulSet(3))
var partition int32 = 1
Expand Down Expand Up @@ -3107,6 +3256,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi
return om.podsLister.Pods(set.Namespace).List(selector)
}

func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) {
pod := newStatefulSetPod(set, ordinal)
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true"
fakeResourceVersion(pod)
om.podsIndexer.Update(pod)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
return nil, err
}
return om.podsLister.Pods(set.Namespace).List(selector)
}

var _ StatefulPodControlObjectManager = &fakeObjectManager{}

type fakeStatefulSetStatusUpdater struct {
Expand Down
Loading

0 comments on commit 1dd8871

Please sign in to comment.