Skip to content

Commit

Permalink
decideScalingOutNotUpdatedPods
Browse files Browse the repository at this point in the history
  • Loading branch information
ColdsteelRail committed Jun 12, 2024
1 parent a915611 commit 9f6b1b2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
4 changes: 3 additions & 1 deletion pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,13 +545,15 @@ func (r *RealSyncControl) Scale(
if pod, err = r.podControl.CreatePod(newPod); err != nil {
return err
}
// add revision to resourceContext ID if create pod succeeded
availableIDContext.Put(podcontext.RevisionContextDataKey, revision.Name)
// add an expectation for this pod creation, before next reconciling
return collasetutils.ActiveExpectations.ExpectCreate(cls, expectations.Pod, pod.Name)
})
if len(createFailedIDs) > 0 || succCount > 0 {
for len(createFailedIDs) > 0 {
id := <-createFailedIDs
// clear revision from resourceContext ID if create failed
ownedIDs[id].Remove(podcontext.RevisionContextDataKey)
}
logger.V(1).Info("try to update ResourceContext for CollaSet after scaling out")
Expand Down Expand Up @@ -796,7 +798,7 @@ func (r *RealSyncControl) Update(
}

// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
podToUpdate := decidePodToUpdate(cls, podUpdateInfos, ownedIDs, resources.UpdatedRevision)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)
updating := false
Expand Down
59 changes: 55 additions & 4 deletions pkg/controllers/collaset/synccontrol/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -173,12 +174,17 @@ func attachPodUpdateInfo(ctx context.Context, cls *appsv1alpha1.CollaSet, pods [
return podUpdateInfoList, nil
}

func decidePodToUpdate(cls *appsv1alpha1.CollaSet, podInfos []*PodUpdateInfo) []*PodUpdateInfo {
func decidePodToUpdate(
cls *appsv1alpha1.CollaSet,
podInfos []*PodUpdateInfo,
ownedIDs map[int]*appsv1alpha1.ContextDetail,
updatedRevision *appsv1.ControllerRevision) []*PodUpdateInfo {

if cls.Spec.UpdateStrategy.RollingUpdate != nil && cls.Spec.UpdateStrategy.RollingUpdate.ByLabel != nil {
return decidePodToUpdateByLabel(cls, podInfos)
}

return decidePodToUpdateByPartition(cls, podInfos)
return decidePodToUpdateByPartition(cls, podInfos, ownedIDs, updatedRevision)
}

func decidePodToUpdateByLabel(_ *appsv1alpha1.CollaSet, podInfos []*PodUpdateInfo) (podToUpdate []*PodUpdateInfo) {
Expand Down Expand Up @@ -207,7 +213,12 @@ func decidePodToUpdateByLabel(_ *appsv1alpha1.CollaSet, podInfos []*PodUpdateInf
return podToUpdate
}

func decidePodToUpdateByPartition(cls *appsv1alpha1.CollaSet, podInfos []*PodUpdateInfo) (podToUpdate []*PodUpdateInfo) {
func decidePodToUpdateByPartition(
cls *appsv1alpha1.CollaSet,
podInfos []*PodUpdateInfo,
ownedIDs map[int]*appsv1alpha1.ContextDetail,
updatedRevision *appsv1.ControllerRevision) (podToUpdate []*PodUpdateInfo) {

filteredPodInfos := filterReplacingNewCreatedPod(podInfos)
if cls.Spec.UpdateStrategy.RollingUpdate == nil ||
cls.Spec.UpdateStrategy.RollingUpdate.ByPartition.Partition == nil {
Expand All @@ -221,7 +232,9 @@ func decidePodToUpdateByPartition(cls *appsv1alpha1.CollaSet, podInfos []*PodUpd
if partition >= podsNum {
return podToUpdate
}
podToUpdate = ordered[:podsNum-partition]

notCreatedOld := minInt(partition, decideNotCreatedOldPods(podInfos, ownedIDs, updatedRevision))
podToUpdate = ordered[:podsNum-partition+notCreatedOld]
for i := podsNum - partition; i < podsNum; i++ {
if podInfos[i].PodDecorationChanged {
podToUpdate = append(podToUpdate, podInfos[i])
Expand All @@ -230,6 +243,44 @@ func decidePodToUpdateByPartition(cls *appsv1alpha1.CollaSet, podInfos []*PodUpd
return podToUpdate
}

func decideNotCreatedOldPods(
podInfos []*PodUpdateInfo,
ownedIDs map[int]*appsv1alpha1.ContextDetail,
updatedRevision *appsv1.ControllerRevision) int {

mapIDToPod := make(map[string]*PodUpdateInfo)
for _, pod := range podInfos {
id, exist := pod.Labels[appsv1alpha1.PodInstanceIDLabelKey]
if !exist {
return 0
}
mapIDToPod[id] = pod
}

var idToUpdate []*appsv1alpha1.ContextDetail
for _, contextDetail := range ownedIDs {
revision, exist := contextDetail.Data[podcontext.RevisionContextDataKey]
if !exist || revision == updatedRevision.Name {
continue
}
if _, exist := contextDetail.Data[ScaleInContextDataKey]; exist {
continue
}
if _, exist := contextDetail.Data[ReplaceOriginPodIDContextDataKey]; exist {
continue
}
idToUpdate = append(idToUpdate, contextDetail)
}

notCreated := 0
for _, contextDetail := range idToUpdate {
if _, exist := mapIDToPod[strconv.Itoa(contextDetail.ID)]; !exist {
notCreated++
}
}
return notCreated
}

// filter these pods in replacing and is new created pod
func filterReplacingNewCreatedPod(podInfos []*PodUpdateInfo) (filteredPodInfos []*PodUpdateInfo) {
for _, podInfo := range podInfos {
Expand Down

0 comments on commit 9f6b1b2

Please sign in to comment.