Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: reduce GetStore in hot-region-scheduler (#3870) #3909

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesStat := cluster.GetStoresStats()

minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
{ // update read statistics
regionRead := cluster.RegionReadStats()
storeByte := storesStat.GetStoresBytesReadStat()
storeKey := storesStat.GetStoresKeysReadStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, read)
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[readLeader],
Expand All @@ -213,6 +214,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
storeKey := storesStat.GetStoresKeysWriteStat()
hotRegionThreshold := getHotRegionThreshold(storesStat, write)
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writeLeader],
Expand All @@ -222,6 +224,7 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
write, core.LeaderKind, mixed)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storeByte,
storeKey,
h.pendingSums[writePeer],
Expand Down Expand Up @@ -291,6 +294,7 @@ func (h *hotScheduler) gcRegionPendings() {

// Load information of all available stores.
func summaryStoresLoad(
stores []*core.StoreInfo,
storeByteRate map[uint64]float64,
storeKeyRate map[uint64]float64,
pendings map[uint64]Influence,
Expand All @@ -307,7 +311,12 @@ func summaryStoresLoad(
allCount := 0.0

// Stores without byte rate statistics is not available to schedule.
for id, byteRate := range storeByteRate {
for _, store := range stores {
id := store.GetID()
byteRate, ok := storeByteRate[id]
if !ok {
continue
}
keyRate := storeKeyRate[id]

// Find all hot peers first
Expand Down Expand Up @@ -349,6 +358,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -611,10 +621,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -744,18 +750,15 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
switch bs.opTy {
case movePeer:
var scoreGuard filter.Filter
if bs.cluster.IsPlacementRulesEnabled() {
scoreGuard = filter.NewRuleFitFilter(bs.sche.GetName(), bs.cluster, bs.cur.region, bs.cur.srcStoreID)
} else {
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
scoreGuard = filter.NewDistinctScoreFilter(bs.sche.GetName(), bs.cluster.GetLocationLabels(), bs.cluster.GetRegionStores(bs.cur.region), srcStore)
}

Expand All @@ -767,8 +770,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
scoreGuard,
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -778,9 +781,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewSpecialUseFilter(bs.sche.GetName(), filter.SpecialUseHotRegion),
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -790,15 +793,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster, store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Future.ExpByteRate &&
detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Future.ExpKeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
balanceHotRegionCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
}
balanceHotRegionCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10), bs.rwTy.String()).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesStats := cluster.GetStoresStats()
minHotDegree := cluster.GetHotRegionCacheHitsThreshold()
switch typ {
case read:
hotRegionThreshold := getHotRegionThreshold(storesStats, read)
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesReadStat(),
storesStats.GetStoresKeysReadStat(),
map[uint64]Influence{},
Expand All @@ -149,6 +151,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []
case write:
hotRegionThreshold := getHotRegionThreshold(storesStats, write)
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesStats.GetStoresBytesWriteStat(),
storesStats.GetStoresKeysWriteStat(),
map[uint64]Influence{},
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down