Skip to content

Commit

Permalink
schedule: reduce the rule fit overhead (#5523)
Browse files Browse the repository at this point in the history
close #5522

Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies authored Sep 22, 2022
1 parent 8209515 commit 0f44e87
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 50 deletions.
33 changes: 7 additions & 26 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ import (
"strconv"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedule/plan"
"go.uber.org/zap"
)

// SelectSourceStores selects stores that be selected as source store from the list.
Expand Down Expand Up @@ -586,12 +584,13 @@ func (f *ruleFitFilter) Source(options *config.PersistOptions, store *core.Store
return statusOK
}

// Target filters stores when select them as schedule target.
// It ensures after replace a peer with new one, the isolation level will not decrease and
// the replaced store can match the source rule.
// RegionA:[1,2,3], move peer1 --> peer2 will not allow, because it's count not match the rule.
// but transfer role peer1 --> peer2, it will support.
func (f *ruleFitFilter) Target(options *config.PersistOptions, store *core.StoreInfo) *plan.Status {
region := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(),
f.region.GetPeers(), f.region.GetLeader(),
core.WithReplacePeerStore(f.srcStore, store.GetID()))
newFit := f.ruleManager.FitRegion(f.cluster, region)
if placement.CompareRegionFit(f.oldFit, newFit) <= 0 {
if f.oldFit.Replace(f.srcStore, store, f.region) {
return statusOK
}
return statusStoreNotMatchRule
Expand Down Expand Up @@ -639,25 +638,7 @@ func (f *ruleLeaderFitFilter) Source(options *config.PersistOptions, store *core
}

func (f *ruleLeaderFitFilter) Target(options *config.PersistOptions, store *core.StoreInfo) *plan.Status {
targetStoreID := store.GetID()
sourcePeer := f.region.GetStorePeer(f.srcLeaderStoreID)
targetPeer := f.region.GetStorePeer(targetStoreID)
newRegionOptions := []core.RegionCreateOption{core.WithLeader(targetPeer)}
if targetPeer == nil {
if !f.allowMoveLeader {
log.Warn("ruleLeaderFitFilter couldn't find peer on target Store", zap.Uint64("target-store", store.GetID()))
return statusStoreNotMatchRule
}
newRegionOptions = []core.RegionCreateOption{
core.WithReplacePeerStore(f.srcLeaderStoreID, targetStoreID),
core.WithLeader(&metapb.Peer{Id: sourcePeer.GetId(), StoreId: targetStoreID}),
}
}
copyRegion := createRegionForRuleFit(f.region.GetStartKey(), f.region.GetEndKey(),
f.region.GetPeers(), f.region.GetLeader(), newRegionOptions...,
)
newFit := f.ruleManager.FitRegion(f.cluster, copyRegion)
if placement.CompareRegionFit(f.oldFit, newFit) <= 0 {
if f.oldFit.Replace(f.srcLeaderStoreID, store, f.region) {
return statusOK
}
return statusStoreNotMatchRule
Expand Down
7 changes: 5 additions & 2 deletions server/schedule/filter/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ func TestRuleFitFilter(t *testing.T) {
}{
{1, 1, map[string]string{"zone": "z1"}, plan.StatusOK, plan.StatusOK},
{2, 1, map[string]string{"zone": "z1"}, plan.StatusOK, plan.StatusOK},
{3, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusStoreNotMatchRule},
// store 3 and store 1 is the peers of this region, so it will allow transferring leader to store 3.
{3, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusOK},
// the labels of store 4 and store 3 are same, so the isolation score will decrease.
{4, 1, map[string]string{"zone": "z2"}, plan.StatusOK, plan.StatusStoreNotMatchRule},
{5, 1, map[string]string{"zone": "z3"}, plan.StatusOK, plan.StatusStoreNotMatchRule},
// store 5 and store 1 is the peers of this region, so it will allow transferring leader to store 3.
{5, 1, map[string]string{"zone": "z3"}, plan.StatusOK, plan.StatusOK},
{6, 1, map[string]string{"zone": "z4"}, plan.StatusOK, plan.StatusOK},
}
// Init cluster
Expand Down
70 changes: 63 additions & 7 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,35 @@ func (f *RegionFit) IsCached() bool {
return f.mu.cached
}

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo, region *core.RegionInfo) bool {
fit := f.getRuleFitByStoreID(srcStoreID)
// check the target store is fit all constraints.
if fit == nil || !MatchLabelConstraints(dstStore, fit.Rule.LabelConstraints) {
return false
}

// the members of the rule are same, it shouldn't check the score.
if fit.contain(dstStore.GetID()) {
return true
}

peers := newFitPeer(f.regionStores, region, fit.Peers, replaceFitPeerOpt(srcStoreID, dstStore))
score := isolationScore(peers, fit.Rule.LocationLabels)
return fit.IsolationScore <= score
}

func (f *RegionFit) getRuleFitByStoreID(storeID uint64) *RuleFit {
for _, rf := range f.RuleFits {
for _, p := range rf.Peers {
if p.GetStoreId() == storeID {
return rf
}
}
}
return nil
}

// IsSatisfied returns if the rules are properly satisfied.
// It means all Rules are fulfilled and there is no orphan peers.
func (f *RegionFit) IsSatisfied() bool {
Expand Down Expand Up @@ -123,6 +152,15 @@ func (f *RuleFit) IsSatisfied() bool {
return len(f.Peers) == f.Rule.Count && len(f.PeersWithDifferentRole) == 0
}

func (f *RuleFit) contain(storeID uint64) bool {
for _, p := range f.Peers {
if p.GetStoreId() == storeID {
return true
}
}
return false
}

func compareRuleFit(a, b *RuleFit) int {
switch {
case len(a.Peers) < len(b.Peers):
Expand Down Expand Up @@ -164,22 +202,40 @@ type fitWorker struct {
exit bool
}

func newFitWorker(stores []*core.StoreInfo, region *core.RegionInfo, rules []*Rule) *fitWorker {
regionPeers := region.GetPeers()
peers := make([]*fitPeer, 0, len(regionPeers))
for _, p := range regionPeers {
peers = append(peers, &fitPeer{
type fitPeerOpt func(peer *fitPeer)

func replaceFitPeerOpt(srcStoreID uint64, dstStore *core.StoreInfo) fitPeerOpt {
return func(peer *fitPeer) {
if peer.Peer.GetStoreId() == srcStoreID {
peer.store = dstStore
}
}
}

func newFitPeer(stores []*core.StoreInfo, region *core.RegionInfo, fitPeers []*metapb.Peer, opts ...fitPeerOpt) []*fitPeer {
peers := make([]*fitPeer, len(fitPeers))
for i, p := range fitPeers {
peer := &fitPeer{
Peer: p,
store: getStoreByID(stores, p.GetStoreId()),
isLeader: region.GetLeader().GetId() == p.GetId(),
})
}
for _, opt := range opts {
opt(peer)
}
peers[i] = peer
}
// Sort peers to keep the match result deterministic.
sort.Slice(peers, func(i, j int) bool {
// Put healthy peers in front to priority to fit healthy peers.
// Put healthy peers in front of priority to fit healthy peers.
si, sj := stateScore(region, peers[i].GetId()), stateScore(region, peers[j].GetId())
return si > sj || (si == sj && peers[i].GetId() < peers[j].GetId())
})
return peers
}

func newFitWorker(stores []*core.StoreInfo, region *core.RegionInfo, rules []*Rule) *fitWorker {
peers := newFitPeer(stores, region, region.GetPeers())

return &fitWorker{
stores: stores,
Expand Down
43 changes: 43 additions & 0 deletions server/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func makeStores() StoreSet {
"host": fmt.Sprintf("host%d", host),
"id": fmt.Sprintf("id%d", x),
}
if x == 5 {
labels["engine"] = "tiflash"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, 0, labels))
}
}
Expand Down Expand Up @@ -109,6 +112,45 @@ func checkPeerMatch(peers []*metapb.Peer, expect string) bool {
return len(m) == 0
}

func TestReplace(t *testing.T) {
re := require.New(t)
stores := makeStores()

testCases := []struct {
region string
rules []string
srcStoreID uint64
dstStoreID uint64
ok bool
}{
{"1111,2111,3111", []string{"3/voter//zone"}, 1111, 4111, true},
// replace failed when the target store doesn't match the rule.
{"1111,2111,3111", []string{"3/voter/zone=zone1+zone2+zone3/zone"}, 1111, 4111, false},
// replace failed when the isolation level decrease.
{"1111,2111,3111", []string{"3/voter//zone"}, 1111, 2113, false},
{"1111,2111,3111,1115_learner", []string{"3/voter//zone", "1/learner/engine=tiflash/host"}, 1115, 2115, true},
// replace failed when the target store is not tiflash
{"1111,2111,3111,1115_learner", []string{"3/voter//zone", "1/learner/engine=tiflash/host"}, 1115, 1112, false},
{"1111_lead,2111,3111", []string{"1/leader/zone=zone1/zone", "2/voter//zone"}, 1111, 1112, true},
// replace failed when the leader is not match the leader constraint.
{"1111_leader,2111,3111", []string{"1/leader/zone=zone1/zone", "2/voter//zone"}, 1111, 2112, false},
// transfer leader
{"1111_leader,1121,1131", []string{"1/leader/host=host1+host2/host", "3/voter//host"}, 1111, 1121, true},
// replace failed when the leader is not match the leader constraint.
{"1111_leader,1121,1131", []string{"1/leader/host=host1+host2/host", "2/voter//host"}, 1111, 1131, false},
}
for _, tc := range testCases {
region := makeRegion(tc.region)
var rules []*Rule
for _, r := range tc.rules {
rules = append(rules, makeRule(r))
}
rf := fitRegion(stores.GetStores(), region, rules)
rf.regionStores = stores.GetStores()
re.Equal(rf.Replace(tc.srcStoreID, stores.GetStore(tc.dstStoreID), region), tc.ok)
}
}

func TestFitRegion(t *testing.T) {
re := require.New(t)
stores := makeStores()
Expand Down Expand Up @@ -154,6 +196,7 @@ func TestFitRegion(t *testing.T) {
}
}
}

func TestIsolationScore(t *testing.T) {
as := assert.New(t)
stores := makeStores()
Expand Down
41 changes: 28 additions & 13 deletions server/schedulers/balance_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ import (
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedule/plan"
)

var (
zones = []string{"az1", "az2", "az3"}
racks = []string{"rack1", "rack2", "rack3"}
hosts = []string{"host1", "host2", "host3", "host4", "host5", "host6", "host7", "host8", "host9"}

regionCount = 100
storeCount = 81
tiflashCount = 9
zones = []string{"zone1", "zone2", "zone3"}
racks = []string{"rack1", "rack2", "rack3", "rack4", "rack5", "rack6"}
hosts = []string{"host1", "host2", "host3", "host4", "host5", "host6",
"host7", "host8", "host9"}

regionCount = 2000
storeCount = len(zones) * len(racks) * len(hosts)
tiflashCount = 30
)

// newBenchCluster store region count is same with storeID and
Expand All @@ -53,23 +57,23 @@ func newBenchCluster(ctx context.Context, ruleEnable, labelEnable bool, tombston
if ruleEnable {
addTiflash(tc)
}
storeID, regionID := uint64(0), uint64(0)
storeID, regionID := uint64(1), uint64(1)
for _, host := range hosts {
for _, rack := range racks {
for _, az := range zones {
label := make(map[string]string, 3)
label["az"] = az
label["rack"] = rack
label["host"] = host
tc.AddLabelsStore(storeID, regionCount-int(storeID), label)
storeID++
tc.AddLabelsStore(storeID, int(storeID), label)
}
for j := 0; j < regionCount; j++ {
if ruleEnable {
learnID := regionID%uint64(tiflashCount) + uint64(storeCount)
tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 1, storeID - 2}, []uint64{learnID})
tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 2, storeID - 3}, []uint64{learnID})
} else {
tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 1, storeID - 2}, nil)
tc.AddRegionWithLearner(regionID, storeID-1, []uint64{storeID - 2, storeID - 3}, nil)
}
regionID++
}
Expand Down Expand Up @@ -123,7 +127,11 @@ func addTiflash(tc *mockcluster.Cluster) {
for i := 0; i < tiflashCount; i++ {
label := make(map[string]string, 3)
label["engine"] = "tiflash"
tc.AddLabelsStore(uint64(storeCount+i), regionCount, label)
if i == tiflashCount-1 {
tc.AddLabelsStore(uint64(storeCount+i), 1, label)
} else {
tc.AddLabelsStore(uint64(storeCount+i), regionCount-storeCount-i, label)
}
}
rule := &placement.Rule{
GroupID: "tiflash-override",
Expand All @@ -140,13 +148,20 @@ func addTiflash(tc *mockcluster.Cluster) {

func BenchmarkPlacementRule(b *testing.B) {
ctx := context.Background()
re := assert.New(b)
tc := newBenchCluster(ctx, true, true, false)
oc := schedule.NewOperatorController(ctx, nil, nil)
sc := newBalanceRegionScheduler(oc, &balanceRegionSchedulerConfig{}, []BalanceRegionCreateOption{WithBalanceRegionName(BalanceRegionType)}...)
b.ResetTimer()
var ops []*operator.Operator
var plans []plan.Plan
for i := 0; i < b.N; i++ {
sc.Schedule(tc, false)
ops, plans = sc.Schedule(tc, false)
}
b.StopTimer()
re.Len(plans, 0)
re.Len(ops, 1)
re.Contains(ops[0].String(), "to [191]")
}

func BenchmarkLabel(b *testing.B) {
Expand Down
6 changes: 4 additions & 2 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,14 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster, dryRun bool)

// transferPeer selects the best store to create a new peer to replace the old peer.
func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Collector) *operator.Operator {
// the order of the filters should be sorted by the cost of the cpu overhead.
// the more expensive the filter is, the later it should be placed.
filters := []filter.Filter{
filter.NewExcludedFilter(s.GetName(), nil, solver.region.GetStoreIDs()),
filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source),
filter.NewRegionScoreFilter(s.GetName(), solver.source, solver.GetOpts()),
filter.NewSpecialUseFilter(s.GetName()),
&filter.StoreStateFilter{ActionScope: s.GetName(), MoveRegion: true},
filter.NewRegionScoreFilter(s.GetName(), solver.source, solver.GetOpts()),
filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source),
}

candidates := filter.NewCandidates(solver.GetStores()).
Expand Down

0 comments on commit 0f44e87

Please sign in to comment.