diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 6659e8ae59f..16042503565 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -189,8 +189,8 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { h.summaryPendingInfluence() + stores := cluster.GetStores() storesStat := cluster.GetStoresStats() - minHotDegree := cluster.GetHotRegionCacheHitsThreshold() { // update read statistics regionRead := cluster.RegionReadStats() @@ -198,6 +198,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { storeKey := storesStat.GetStoresKeysReadStat() hotRegionThreshold := getHotRegionThreshold(storesStat, read) h.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storeByte, storeKey, h.pendingSums[readLeader], @@ -213,6 +214,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { storeKey := storesStat.GetStoresKeysWriteStat() hotRegionThreshold := getHotRegionThreshold(storesStat, write) h.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storeByte, storeKey, h.pendingSums[writeLeader], @@ -222,6 +224,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { write, core.LeaderKind, mixed) h.stLoadInfos[writePeer] = summaryStoresLoad( + stores, storeByte, storeKey, h.pendingSums[writePeer], @@ -291,6 +294,7 @@ func (h *hotScheduler) gcRegionPendings() { // Load information of all available stores. func summaryStoresLoad( + stores []*core.StoreInfo, storeByteRate map[uint64]float64, storeKeyRate map[uint64]float64, pendings map[uint64]Influence, @@ -307,7 +311,12 @@ func summaryStoresLoad( allCount := 0.0 // Stores without byte rate statistics is not available to schedule. - for id, byteRate := range storeByteRate { + for _, store := range stores { + id := store.GetID() + byteRate, ok := storeByteRate[id] + if !ok { + continue + } keyRate := storeKeyRate[id] // Find all hot peers first @@ -349,6 +358,7 @@ func summaryStoresLoad( // Construct store load info. loadDetail[id] = &storeLoadDetail{ + Store: store, LoadPred: stLoadPred, HotPeers: hotPeers, } @@ -611,10 +621,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail) for id, detail := range bs.stLoadDetail { - if bs.cluster.GetStore(id) == nil { - log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore)) - continue - } if len(detail.HotPeers) == 0 { continue } @@ -744,7 +750,7 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { var ( filters []filter.Filter - candidates []*core.StoreInfo + candidates []*storeLoadDetail ) switch bs.opTy { case movePeer: @@ -752,10 +758,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { if bs.cluster.IsPlacementRulesEnabled() { scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.cur.region, bs.cur.srcStoreID) } else { - srcStore := bs.cluster.GetStore(bs.cur.srcStoreID) - if srcStore == nil { - return nil - } + srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.cur.region), srcStore) } @@ -767,8 +770,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { scoreGuard, } - for storeID := range bs.stLoadDetail { - candidates = append(candidates, bs.cluster.GetStore(storeID)) + for _, detail := range bs.stLoadDetail { + candidates = append(candidates, detail) } case transferLeader: @@ -778,9 +781,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion), } - for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) { - if _, ok := bs.stLoadDetail[store.GetID()]; ok { - candidates = append(candidates, store) + for _, peer := range bs.cur.region.GetFollowers() { + if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok { + candidates = append(candidates, detail) } } @@ -790,15 +793,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { return bs.pickDstStores(filters, candidates) } -func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail { +func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail, len(candidates)) dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() - for _, store := range candidates { + for _, detail := range candidates { + store := detail.Store if filter.Target(bs.cluster, store, filters) { - detail := bs.stLoadDetail[store.GetID()] if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Future.ExpByteRate && detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Future.ExpKeyRate { - ret[store.GetID()] = bs.stLoadDetail[store.GetID()] + ret[store.GetID()] = detail balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc() } balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc() diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index b7caeefd6b9..692ddd2c907 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -132,12 +132,14 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator { + stores := cluster.GetStores() storesStats := cluster.GetStoresStats() minHotDegree := cluster.GetHotRegionCacheHitsThreshold() switch typ { case read: hotRegionThreshold := getHotRegionThreshold(storesStats, read) s.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storesStats.GetStoresBytesReadStat(), storesStats.GetStoresKeysReadStat(), map[uint64]Influence{}, @@ -149,6 +151,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) [] case write: hotRegionThreshold := getHotRegionThreshold(storesStats, write) s.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storesStats.GetStoresBytesWriteStat(), storesStats.GetStoresKeysWriteStat(), map[uint64]Influence{}, diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index b0932c577e3..166ed211eb1 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -342,6 +342,7 @@ func maxLoad(a, b *storeLoad) *storeLoad { } type storeLoadDetail struct { + Store *core.StoreInfo LoadPred *storeLoadPred HotPeers []*statistics.HotPeerStat }