From dea649a950fdbbac827a74e6e0b82baff3f0213e Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 16 Aug 2023 19:53:56 +0800 Subject: [PATCH 1/7] scatter Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 31 ++++++++++++++---------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index c31461eb06f..34f6a8a1d65 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "sync" "time" @@ -341,7 +342,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s continue } for { - candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context) + candidates := r.selectCandidates(group, region, oldFit, peer.GetStoreId(), selectedStores, context) newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} @@ -363,7 +364,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s // FIXME: target leader only considers the ordinary stores, maybe we need to consider the // special engine stores if the engine supports to become a leader. But now there is only // one engine, tiflash, which does not support the leader, so don't consider it for now. - targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) + targetLeader, leaderHit := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) if targetLeader == 0 { scatterSkipNoLeaderCounter.Inc() return nil @@ -398,6 +399,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s if op != nil { scatterSuccessCounter.Inc() r.Put(targetPeers, targetLeader, group) + op.AdditionalInfos["group"] = group + op.AdditionalInfos["leader-hit"] = strconv.FormatUint(leaderHit, 10) op.SetPriorityLevel(constant.High) } return op @@ -432,7 +435,8 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { +func (r *RegionScatterer) selectCandidates(group string, region *core.RegionInfo, oldFit *placement.RegionFit, + sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { sourceStore := r.cluster.GetStore(sourceStoreID) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) @@ -451,7 +455,7 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *plac maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) for _, store := range stores { - count := context.selectedPeer.TotalCountByStore(store.GetID()) + count := context.selectedPeer.Get(store.GetID(), group) if count > maxStoreTotalCount { maxStoreTotalCount = count } @@ -460,7 +464,7 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *plac } } for _, store := range stores { - storeCount := context.selectedPeer.TotalCountByStore(store.GetID()) + storeCount := context.selectedPeer.Get(store.GetID(), group) // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store // could be selected as candidate. @@ -479,8 +483,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto } var newPeer *metapb.Peer minCount := uint64(math.MaxUint64) + sourceHit := uint64(math.MaxUint64) for _, storeID := range candidates { count := context.selectedPeer.Get(storeID, group) + if storeID == storeID { + sourceHit = count + } if count < minCount { minCount = count newPeer = &metapb.Peer{ @@ -489,11 +497,8 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto } } } - // if the source store have the least count, we don't need to scatter this peer - for _, storeID := range candidates { - if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount { - return peer - } + if sourceHit <= minCount { + return peer } if newPeer == nil { return peer @@ -503,11 +508,11 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. -func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 { +func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) (uint64, uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) - return 0 + return 0, 0 } minStoreGroupLeader := uint64(math.MaxUint64) id := uint64(0) @@ -522,7 +527,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core. id = storeID } } - return id + return id, minStoreGroupLeader } // Put put the final distribution in the context no matter the operator was created From f8f38fd9e4eaa9ba0c720daf043eddbd4132c57d Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 16 Aug 2023 20:48:53 +0800 Subject: [PATCH 2/7] simply scatter Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 70 ++++++++----------- pkg/schedule/scatter/region_scatterer_test.go | 47 ++----------- 2 files changed, 33 insertions(+), 84 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 34f6a8a1d65..5e5246ee348 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -333,6 +333,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer + filterLen := len(context.filterFuncs) + 2 + filters := make([]filter.Filter, filterLen) + for i, filterFunc := range context.filterFuncs { + filters[i] = filterFunc() + } for _, peer := range peers { if _, ok := selectedStores[peer.GetStoreId()]; ok { if allowLeader(oldFit, peer) { @@ -342,8 +347,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s continue } for { - candidates := r.selectCandidates(group, region, oldFit, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + sourceStore := r.cluster.GetStore(peer.GetStoreId()) + if sourceStore == nil { + log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) + continue + } + filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) + filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) + newPeer := r.selectCandidates(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} // If the selected peer is a peer other than origin peer in this region, @@ -435,23 +446,8 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(group string, region *core.RegionInfo, oldFit *placement.RegionFit, - sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { - sourceStore := r.cluster.GetStore(sourceStoreID) - if sourceStore == nil { - log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) - return nil - } - filters := []filter.Filter{ - filter.NewExcludedFilter(r.name, nil, selectedStores), - } - scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) - for _, filterFunc := range context.filterFuncs { - filters = append(filters, filterFunc()) - } - filters = append(filters, scoreGuard) +func (r *RegionScatterer) selectCandidates(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() - candidates := make([]uint64, 0) maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) for _, store := range stores { @@ -463,37 +459,27 @@ func (r *RegionScatterer) selectCandidates(group string, region *core.RegionInfo minStoreTotalCount = count } } + + var newPeer *metapb.Peer + minCount := uint64(math.MaxUint64) + sourceHit := uint64(math.MaxUint64) for _, store := range stores { storeCount := context.selectedPeer.Get(store.GetID(), group) + if store.GetID() == peer.GetId() { + sourceHit = storeCount + } // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store // could be selected as candidate. if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { if filter.Target(r.cluster.GetSharedConfig(), store, filters) { - candidates = append(candidates, store.GetID()) - } - } - } - return candidates -} - -func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer { - if len(candidates) < 1 { - return peer - } - var newPeer *metapb.Peer - minCount := uint64(math.MaxUint64) - sourceHit := uint64(math.MaxUint64) - for _, storeID := range candidates { - count := context.selectedPeer.Get(storeID, group) - if storeID == storeID { - sourceHit = count - } - if count < minCount { - minCount = count - newPeer = &metapb.Peer{ - StoreId: storeID, - Role: peer.GetRole(), + if storeCount < minCount { + minCount = storeCount + newPeer = &metapb.Peer{ + StoreId: store.GetID(), + Role: peer.GetRole(), + } + } } } } diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index c0724e481f6..d531f62c661 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "math/rand" "strconv" "sync" "testing" @@ -533,48 +532,11 @@ func TestSelectedStoreGC(t *testing.T) { re.False(ok) } -// TestRegionFromDifferentGroups test the multi regions. each region have its own group. -// After scatter, the distribution for the whole cluster should be well. -func TestRegionFromDifferentGroups(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - opt := mockconfig.NewTestOptions() - tc := mockcluster.NewCluster(ctx, opt) - stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetSharedConfig(), stream) - // Add 6 stores. - storeCount := 6 - for i := uint64(1); i <= uint64(storeCount); i++ { - tc.AddRegionStore(i, 0) - } - scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) - regionCount := 50 - for i := 1; i <= regionCount; i++ { - p := rand.Perm(storeCount) - scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i), false) - } - check := func(ss *selectedStores) { - max := uint64(0) - min := uint64(math.MaxUint64) - for i := uint64(1); i <= uint64(storeCount); i++ { - count := ss.TotalCountByStore(i) - if count > max { - max = count - } - if count < min { - min = count - } - } - re.LessOrEqual(max-min, uint64(2)) - } - check(scatterer.ordinaryEngine.selectedPeer) -} - func TestRegionHasLearner(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + group := "group" opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) @@ -617,14 +579,14 @@ func TestRegionHasLearner(t *testing.T) { scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions) regionCount := 50 for i := 1; i <= regionCount; i++ { - _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group", false) + _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group, false) re.NoError(err) } check := func(ss *selectedStores) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= max; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -639,7 +601,7 @@ func TestRegionHasLearner(t *testing.T) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= voterCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -691,6 +653,7 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) op := scatterer.scatterRegion(region, group, false) re.False(isPeerCountChanged(op)) + re.Equal(group, op.AdditionalInfos["group"]) } } From e77d70aceab10051f096946665426cbe79f75c99 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 16 Aug 2023 20:52:21 +0800 Subject: [PATCH 3/7] remove total count Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 20 ------------------- pkg/schedule/scatter/region_scatterer_test.go | 2 +- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 5e5246ee348..c86727c7a66 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -107,26 +107,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64, return s.getDistributionByGroupLocked(group) } -// TotalCountByStore counts the total count by store -func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 { - s.mu.RLock() - defer s.mu.RUnlock() - groups := s.groupDistribution.GetAllID() - totalCount := uint64(0) - for _, group := range groups { - storeDistribution, ok := s.getDistributionByGroupLocked(group) - if !ok { - continue - } - count, ok := storeDistribution[storeID] - if !ok { - continue - } - totalCount += count - } - return totalCount -} - // getDistributionByGroupLocked should be called with lock func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) { if result, ok := s.groupDistribution.Get(group); ok { diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index d531f62c661..1fa1405e748 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -612,7 +612,7 @@ func TestRegionHasLearner(t *testing.T) { re.LessOrEqual(max-2, uint64(regionCount)/voterCount) re.LessOrEqual(min-1, uint64(regionCount)/voterCount) for i := voterCount + 1; i <= storeCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) re.LessOrEqual(count, uint64(0)) } } From 3540f7123fd1d5cd7814797e8e1199504b02183b Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 16 Aug 2023 20:59:32 +0800 Subject: [PATCH 4/7] rename Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index c86727c7a66..16ae4983b96 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -326,15 +326,15 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s // It is both sourcePeer and targetPeer itself, no need to select. continue } + sourceStore := r.cluster.GetStore(peer.GetStoreId()) + if sourceStore == nil { + log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) + continue + } + filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) for { - sourceStore := r.cluster.GetStore(peer.GetStoreId()) - if sourceStore == nil { - log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) - continue - } filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) - filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) - newPeer := r.selectCandidates(context, group, peer, filters) + newPeer := r.selectNewPeer(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} // If the selected peer is a peer other than origin peer in this region, @@ -426,7 +426,7 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { +func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) From 0654696bb485d971951eeabaae49fcc4193c6151 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Fri, 18 Aug 2023 10:23:29 +0800 Subject: [PATCH 5/7] pass ut Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 16ae4983b96..0995639b1b7 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -474,7 +474,8 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. -func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) (uint64, uint64) { +func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, + leaderCandidateStores []uint64, context engineContext) (leaderID uint64, hit uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) From 78b52bdde9b5caf9d0d30f9204295748ba062f9c Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Tue, 22 Aug 2023 17:37:56 +0800 Subject: [PATCH 6/7] addition Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/schedule/scatter/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go index 1fa1405e748..bc5fd9c9e29 100644 --- a/pkg/schedule/scatter/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -653,7 +653,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) { region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) op := scatterer.scatterRegion(region, group, false) re.False(isPeerCountChanged(op)) - re.Equal(group, op.AdditionalInfos["group"]) + if op != nil { + re.Equal(group, op.AdditionalInfos["group"]) + } } } From 8412f8ce4b852da9cd7751b0e36a901b6182431d Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 30 Aug 2023 16:02:27 +0800 Subject: [PATCH 7/7] address comment Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/schedule/scatter/region_scatterer.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 0995639b1b7..a676afca6cf 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -318,6 +318,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s for i, filterFunc := range context.filterFuncs { filters[i] = filterFunc() } + filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) for _, peer := range peers { if _, ok := selectedStores[peer.GetStoreId()]; ok { if allowLeader(oldFit, peer) { @@ -333,7 +334,6 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s } filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit) for { - filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) newPeer := r.selectNewPeer(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} @@ -355,7 +355,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s // FIXME: target leader only considers the ordinary stores, maybe we need to consider the // special engine stores if the engine supports to become a leader. But now there is only // one engine, tiflash, which does not support the leader, so don't consider it for now. - targetLeader, leaderHit := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) + targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) if targetLeader == 0 { scatterSkipNoLeaderCounter.Inc() return nil @@ -391,7 +391,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string, s scatterSuccessCounter.Inc() r.Put(targetPeers, targetLeader, group) op.AdditionalInfos["group"] = group - op.AdditionalInfos["leader-hit"] = strconv.FormatUint(leaderHit, 10) + op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) op.SetPriorityLevel(constant.High) } return op @@ -426,6 +426,12 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } +// selectNewPeer return the new peer which pick the fewest picked count. +// it keeps the origin peer if the origin store's pick count is equal the fewest pick. +// it can be diveded into three steps: +// 1. found the max pick count and the min pick count. +// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer. +// 3. otherwise, select the store which pick count is the min pick count and pass all filter. func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() maxStoreTotalCount := uint64(0) @@ -442,11 +448,11 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee var newPeer *metapb.Peer minCount := uint64(math.MaxUint64) - sourceHit := uint64(math.MaxUint64) + originStorePickedCount := uint64(math.MaxUint64) for _, store := range stores { storeCount := context.selectedPeer.Get(store.GetID(), group) if store.GetID() == peer.GetId() { - sourceHit = storeCount + originStorePickedCount = storeCount } // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store @@ -463,7 +469,7 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee } } } - if sourceHit <= minCount { + if originStorePickedCount <= minCount { return peer } if newPeer == nil { @@ -475,7 +481,7 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, - leaderCandidateStores []uint64, context engineContext) (leaderID uint64, hit uint64) { + leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))