From 0f44e87632dde84da1321b4feb6355281a28305d Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Thu, 22 Sep 2022 18:09:03 +0800 Subject: [PATCH] schedule: reduce the rule fit overhead (#5523) close tikv/pd#5522 Signed-off-by: bufferflies <1045931706@qq.com> --- server/schedule/filter/filters.go | 33 +++------- server/schedule/filter/filters_test.go | 7 ++- server/schedule/placement/fit.go | 70 ++++++++++++++++++--- server/schedule/placement/fit_test.go | 43 +++++++++++++ server/schedulers/balance_benchmark_test.go | 41 ++++++++---- server/schedulers/balance_region.go | 6 +- 6 files changed, 150 insertions(+), 50 deletions(-) diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index 1bd74363e89..00a76c369bd 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -19,7 +19,6 @@ import ( "strconv" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/config" @@ -27,7 +26,6 @@ import ( "github.com/tikv/pd/server/core/storelimit" "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/server/schedule/plan" - "go.uber.org/zap" ) // SelectSourceStores selects stores that be selected as source store from the list. @@ -586,12 +584,13 @@ func (f *ruleFitFilter) Source(options *config.PersistOptions, store *core.Store return statusOK } +// Target filters stores when select them as schedule target. +// It ensures after replace a peer with new one, the isolation level will not decrease and +// the replaced store can match the source rule. +// RegionA:[1,2,3], move peer1 --> peer2 will not allow, because it's count not match the rule. +// but transfer role peer1 --> peer2, it will support. func (f *ruleFitFilter) Target(options *config.PersistOptions, store *core.StoreInfo) *plan.Status { - region := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(), - f.region.GetPeers(), f.region.GetLeader(), - core.WithReplacePeerStore(f.srcStore, store.GetID())) - newFit := f.ruleManager.FitRegion(f.cluster, region) - if placement.CompareRegionFit(f.oldFit, newFit) <= 0 { + if f.oldFit.Replace(f.srcStore, store, f.region) { return statusOK } return statusStoreNotMatchRule @@ -639,25 +638,7 @@ func (f *ruleLeaderFitFilter) Source(options *config.PersistOptions, store *core } func (f *ruleLeaderFitFilter) Target(options *config.PersistOptions, store *core.StoreInfo) *plan.Status { - targetStoreID := store.GetID() - sourcePeer := f.region.GetStorePeer(f.srcLeaderStoreID) - targetPeer := f.region.GetStorePeer(targetStoreID) - newRegionOptions := []core.RegionCreateOption{core.WithLeader(targetPeer)} - if targetPeer == nil { - if !f.allowMoveLeader { - log.Warn("ruleLeaderFitFilter couldn't find peer on target Store", zap.Uint64("target-store", store.GetID())) - return statusStoreNotMatchRule - } - newRegionOptions = []core.RegionCreateOption{ - core.WithReplacePeerStore(f.srcLeaderStoreID, targetStoreID), - core.WithLeader(&metapb.Peer{Id: sourcePeer.GetId(), StoreId: targetStoreID}), - } - } - copyRegion := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(), - f.region.GetPeers(), f.region.GetLeader(), newRegionOptions..., - ) - newFit := f.ruleManager.FitRegion(f.cluster, copyRegion) - if placement.CompareRegionFit(f.oldFit, newFit) <= 0 { + if f.oldFit.Replace(f.srcLeaderStoreID, store, f.region) { return statusOK } return statusStoreNotMatchRule diff --git a/server/schedule/filter/filters_test.go b/server/schedule/filter/filters_test.go index 782c17c320c..5bd6dc144ed 100644 --- a/server/schedule/filter/filters_test.go +++ b/server/schedule/filter/filters_test.go @@ -119,9 +119,12 @@ func TestRuleFitFilter(t *testing.T) { }{ {1, 1, map[string]string{"zone": "z1"}, plan.StatusOK, plan.StatusOK}, {2, 1, map[string]string{"zone": "z1"}, plan.StatusOK, plan.StatusOK}, - {3, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusStoreNotMatchRule}, + // store 3 and store 1 is the peers of this region, so it will allow transferring leader to store 3. + {3, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusOK}, + // the labels of store 4 and store 3 are same, so the isolation score will decrease. {4, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusStoreNotMatchRule}, - {5, 1, map[string]string{"zone": "z3"}, plan.StatusOK, plan.StatusStoreNotMatchRule}, + // store 5 and store 1 is the peers of this region, so it will allow transferring leader to store 3. + {5, 1, map[string]string{"zone": "z3"}, plan.StatusOK, plan.StatusOK}, {6, 1, map[string]string{"zone": "z4"}, plan.StatusOK, plan.StatusOK}, } // Init cluster diff --git a/server/schedule/placement/fit.go b/server/schedule/placement/fit.go index 769d935b9be..8d862fbe24d 100644 --- a/server/schedule/placement/fit.go +++ b/server/schedule/placement/fit.go @@ -52,6 +52,35 @@ func (f *RegionFit) IsCached() bool { return f.mu.cached } +// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin. +func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo, region *core.RegionInfo) bool { + fit := f.getRuleFitByStoreID(srcStoreID) + // check the target store is fit all constraints. + if fit == nil || !MatchLabelConstraints(dstStore, fit.Rule.LabelConstraints) { + return false + } + + // the members of the rule are same, it shouldn't check the score. + if fit.contain(dstStore.GetID()) { + return true + } + + peers := newFitPeer(f.regionStores, region, fit.Peers, replaceFitPeerOpt(srcStoreID, dstStore)) + score := isolationScore(peers, fit.Rule.LocationLabels) + return fit.IsolationScore <= score +} + +func (f *RegionFit) getRuleFitByStoreID(storeID uint64) *RuleFit { + for _, rf := range f.RuleFits { + for _, p := range rf.Peers { + if p.GetStoreId() == storeID { + return rf + } + } + } + return nil +} + // IsSatisfied returns if the rules are properly satisfied. // It means all Rules are fulfilled and there is no orphan peers. func (f *RegionFit) IsSatisfied() bool { @@ -123,6 +152,15 @@ func (f *RuleFit) IsSatisfied() bool { return len(f.Peers) == f.Rule.Count && len(f.PeersWithDifferentRole) == 0 } +func (f *RuleFit) contain(storeID uint64) bool { + for _, p := range f.Peers { + if p.GetStoreId() == storeID { + return true + } + } + return false +} + func compareRuleFit(a, b *RuleFit) int { switch { case len(a.Peers) < len(b.Peers): @@ -164,22 +202,40 @@ type fitWorker struct { exit bool } -func newFitWorker(stores []*core.StoreInfo, region *core.RegionInfo, rules []*Rule) *fitWorker { - regionPeers := region.GetPeers() - peers := make([]*fitPeer, 0, len(regionPeers)) - for _, p := range regionPeers { - peers = append(peers, &fitPeer{ +type fitPeerOpt func(peer *fitPeer) + +func replaceFitPeerOpt(srcStoreID uint64, dstStore *core.StoreInfo) fitPeerOpt { + return func(peer *fitPeer) { + if peer.Peer.GetStoreId() == srcStoreID { + peer.store = dstStore + } + } +} + +func newFitPeer(stores []*core.StoreInfo, region *core.RegionInfo, fitPeers []*metapb.Peer, opts ...fitPeerOpt) []*fitPeer { + peers := make([]*fitPeer, len(fitPeers)) + for i, p := range fitPeers { + peer := &fitPeer{ Peer: p, store: getStoreByID(stores, p.GetStoreId()), isLeader: region.GetLeader().GetId() == p.GetId(), - }) + } + for _, opt := range opts { + opt(peer) + } + peers[i] = peer } // Sort peers to keep the match result deterministic. sort.Slice(peers, func(i, j int) bool { - // Put healthy peers in front to priority to fit healthy peers. + // Put healthy peers in front of priority to fit healthy peers. si, sj := stateScore(region, peers[i].GetId()), stateScore(region, peers[j].GetId()) return si > sj || (si == sj && peers[i].GetId() < peers[j].GetId()) }) + return peers +} + +func newFitWorker(stores []*core.StoreInfo, region *core.RegionInfo, rules []*Rule) *fitWorker { + peers := newFitPeer(stores, region, region.GetPeers()) return &fitWorker{ stores: stores, diff --git a/server/schedule/placement/fit_test.go b/server/schedule/placement/fit_test.go index 15ace4d817b..76f7096551b 100644 --- a/server/schedule/placement/fit_test.go +++ b/server/schedule/placement/fit_test.go @@ -39,6 +39,9 @@ func makeStores() StoreSet { "host": fmt.Sprintf("host%d", host), "id": fmt.Sprintf("id%d", x), } + if x == 5 { + labels["engine"] = "tiflash" + } stores.SetStore(core.NewStoreInfoWithLabel(id, 0, labels)) } } @@ -109,6 +112,45 @@ func checkPeerMatch(peers []*metapb.Peer, expect string) bool { return len(m) == 0 } +func TestReplace(t *testing.T) { + re := require.New(t) + stores := makeStores() + + testCases := []struct { + region string + rules []string + srcStoreID uint64 + dstStoreID uint64 + ok bool + }{ + {"1111,2111,3111", []string{"3/voter//zone"}, 1111, 4111, true}, + // replace failed when the target store doesn't match the rule. + {"1111,2111,3111", []string{"3/voter/zone=zone1+zone2+zone3/zone"}, 1111, 4111, false}, + // replace failed when the isolation level decrease. + {"1111,2111,3111", []string{"3/voter//zone"}, 1111, 2113, false}, + {"1111,2111,3111,1115_learner", []string{"3/voter//zone", "1/learner/engine=tiflash/host"}, 1115, 2115, true}, + // replace failed when the target store is not tiflash + {"1111,2111,3111,1115_learner", []string{"3/voter//zone", "1/learner/engine=tiflash/host"}, 1115, 1112, false}, + {"1111_lead,2111,3111", []string{"1/leader/zone=zone1/zone", "2/voter//zone"}, 1111, 1112, true}, + // replace failed when the leader is not match the leader constraint. + {"1111_leader,2111,3111", []string{"1/leader/zone=zone1/zone", "2/voter//zone"}, 1111, 2112, false}, + // transfer leader + {"1111_leader,1121,1131", []string{"1/leader/host=host1+host2/host", "3/voter//host"}, 1111, 1121, true}, + // replace failed when the leader is not match the leader constraint. + {"1111_leader,1121,1131", []string{"1/leader/host=host1+host2/host", "2/voter//host"}, 1111, 1131, false}, + } + for _, tc := range testCases { + region := makeRegion(tc.region) + var rules []*Rule + for _, r := range tc.rules { + rules = append(rules, makeRule(r)) + } + rf := fitRegion(stores.GetStores(), region, rules) + rf.regionStores = stores.GetStores() + re.Equal(rf.Replace(tc.srcStoreID, stores.GetStore(tc.dstStoreID), region), tc.ok) + } +} + func TestFitRegion(t *testing.T) { re := require.New(t) stores := makeStores() @@ -154,6 +196,7 @@ func TestFitRegion(t *testing.T) { } } } + func TestIsolationScore(t *testing.T) { as := assert.New(t) stores := makeStores() diff --git a/server/schedulers/balance_benchmark_test.go b/server/schedulers/balance_benchmark_test.go index 5abae66864c..4653d9aed21 100644 --- a/server/schedulers/balance_benchmark_test.go +++ b/server/schedulers/balance_benchmark_test.go @@ -19,20 +19,24 @@ import ( "testing" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/assert" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedule/placement" + "github.com/tikv/pd/server/schedule/plan" ) var ( - zones = []string{"az1", "az2", "az3"} - racks = []string{"rack1", "rack2", "rack3"} - hosts = []string{"host1", "host2", "host3", "host4", "host5", "host6", "host7", "host8", "host9"} - - regionCount = 100 - storeCount = 81 - tiflashCount = 9 + zones = []string{"zone1", "zone2", "zone3"} + racks = []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"} + hosts = []string{"host1", "host2", "host3", "host4", "host5", "host6", + "host7", "host8", "host9"} + + regionCount = 2000 + storeCount = len(zones) * len(racks) * len(hosts) + tiflashCount = 30 ) // newBenchCluster store region count is same with storeID and @@ -53,7 +57,7 @@ func newBenchCluster(ctx context.Context, ruleEnable, labelEnable bool, tombston if ruleEnable { addTiflash(tc) } - storeID, regionID := uint64(0), uint64(0) + storeID, regionID := uint64(1), uint64(1) for _, host := range hosts { for _, rack := range racks { for _, az := range zones { @@ -61,15 +65,15 @@ func newBenchCluster(ctx context.Context, ruleEnable, labelEnable bool, tombston label["az"] = az label["rack"] = rack label["host"] = host + tc.AddLabelsStore(storeID, regionCount-int(storeID), label) storeID++ - tc.AddLabelsStore(storeID, int(storeID), label) } for j := 0; j < regionCount; j++ { if ruleEnable { learnID := regionID%uint64(tiflashCount) + uint64(storeCount) - tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 1, storeID - 2}, []uint64{learnID}) + tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 2, storeID - 3}, []uint64{learnID}) } else { - tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 1, storeID - 2}, nil) + tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 2, storeID - 3}, nil) } regionID++ } @@ -123,7 +127,11 @@ func addTiflash(tc *mockcluster.Cluster) { for i := 0; i < tiflashCount; i++ { label := make(map[string]string, 3) label["engine"] = "tiflash" - tc.AddLabelsStore(uint64(storeCount+i), regionCount, label) + if i == tiflashCount-1 { + tc.AddLabelsStore(uint64(storeCount+i), 1, label) + } else { + tc.AddLabelsStore(uint64(storeCount+i), regionCount-storeCount-i, label) + } } rule := &placement.Rule{ GroupID: "tiflash-override", @@ -140,13 +148,20 @@ func addTiflash(tc *mockcluster.Cluster) { func BenchmarkPlacementRule(b *testing.B) { ctx := context.Background() + re := assert.New(b) tc := newBenchCluster(ctx, true, true, false) oc := schedule.NewOperatorController(ctx, nil, nil) sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...) b.ResetTimer() + var ops []*operator.Operator + var plans []plan.Plan for i := 0; i < b.N; i++ { - sc.Schedule(tc, false) + ops, plans = sc.Schedule(tc, false) } + b.StopTimer() + re.Len(plans, 0) + re.Len(ops, 1) + re.Contains(ops[0].String(), "to [191]") } func BenchmarkLabel(b *testing.B) { diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 32dcc020255..ecaccc9fb09 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -233,12 +233,14 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool) // transferPeer selects the best store to create a new peer to replace the old peer. func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Collector) *operator.Operator { + // the order of the filters should be sorted by the cost of the cpu overhead. + // the more expensive the filter is, the later it should be placed. filters := []filter.Filter{ filter.NewExcludedFilter(s.GetName(), nil, solver.region.GetStoreIDs()), - filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source), - filter.NewRegionScoreFilter(s.GetName(), solver.source, solver.GetOpts()), filter.NewSpecialUseFilter(s.GetName()), &filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true}, + filter.NewRegionScoreFilter(s.GetName(), solver.source, solver.GetOpts()), + filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source), } candidates := filter.NewCandidates(solver.GetStores()).