From c87782afb2fb4e193a2cac2514910721d765ae34 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 6 Jul 2023 17:02:15 +0800 Subject: [PATCH] schedule: move schedule plan to the right place (#6769) ref tikv/pd#5839 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/schedule/diagnostic_manager.go | 4 +- .../{schedulers => plan}/balance_plan.go | 105 +++---- pkg/schedule/plan/balance_plan_test.go | 271 +++++++++++++++++ pkg/schedule/schedulers/balance_leader.go | 66 ++--- pkg/schedule/schedulers/balance_plan_test.go | 272 ------------------ pkg/schedule/schedulers/balance_region.go | 72 ++--- pkg/schedule/schedulers/balance_test.go | 18 +- pkg/schedule/schedulers/balance_witness.go | 34 +-- pkg/schedule/schedulers/utils.go | 37 +-- 9 files changed, 443 insertions(+), 436 deletions(-) rename pkg/schedule/{schedulers => plan}/balance_plan.go (59%) create mode 100644 pkg/schedule/plan/balance_plan_test.go delete mode 100644 pkg/schedule/schedulers/balance_plan_test.go diff --git a/pkg/schedule/diagnostic_manager.go b/pkg/schedule/diagnostic_manager.go index f6610520858..3f83d13baad 100644 --- a/pkg/schedule/diagnostic_manager.go +++ b/pkg/schedule/diagnostic_manager.go @@ -51,8 +51,8 @@ const ( // DiagnosableSummaryFunc includes all implementations of plan.Summary. // And it also includes all schedulers which pd support to diagnose. var DiagnosableSummaryFunc = map[string]plan.Summary{ - schedulers.BalanceRegionName: schedulers.BalancePlanSummary, - schedulers.BalanceLeaderName: schedulers.BalancePlanSummary, + schedulers.BalanceRegionName: plan.BalancePlanSummary, + schedulers.BalanceLeaderName: plan.BalancePlanSummary, } type diagnosticManager struct { diff --git a/pkg/schedule/schedulers/balance_plan.go b/pkg/schedule/plan/balance_plan.go similarity index 59% rename from pkg/schedule/schedulers/balance_plan.go rename to pkg/schedule/plan/balance_plan.go index fc38de7a39b..819a00f94e7 100644 --- a/pkg/schedule/schedulers/balance_plan.go +++ b/pkg/schedule/plan/balance_plan.go @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedulers +package plan import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/schedule/plan" ) const ( @@ -29,82 +28,90 @@ const ( // createOperator ) -type balanceSchedulerPlan struct { - source *core.StoreInfo - target *core.StoreInfo - region *core.RegionInfo - status *plan.Status - step int +// BalanceSchedulerPlan is a plan for balance scheduler +type BalanceSchedulerPlan struct { + Source *core.StoreInfo + Target *core.StoreInfo + Region *core.RegionInfo + Status *Status + Step int } // NewBalanceSchedulerPlan returns a new balanceSchedulerBasePlan -func NewBalanceSchedulerPlan() *balanceSchedulerPlan { - basePlan := &balanceSchedulerPlan{ - status: plan.NewStatus(plan.StatusOK), +func NewBalanceSchedulerPlan() *BalanceSchedulerPlan { + basePlan := &BalanceSchedulerPlan{ + Status: NewStatus(StatusOK), } return basePlan } -func (p *balanceSchedulerPlan) GetStep() int { - return p.step +// GetStep is used to get current step of plan. +func (p *BalanceSchedulerPlan) GetStep() int { + return p.Step } -func (p *balanceSchedulerPlan) SetResource(resource interface{}) { - switch p.step { +// SetResource is used to set resource for current step. +func (p *BalanceSchedulerPlan) SetResource(resource interface{}) { + switch p.Step { // for balance-region/leader scheduler, the first step is selecting stores as source candidates. case pickSource: - p.source = resource.(*core.StoreInfo) + p.Source = resource.(*core.StoreInfo) // the second step is selecting region from source store. case pickRegion: - p.region = resource.(*core.RegionInfo) + p.Region = resource.(*core.RegionInfo) // the third step is selecting stores as target candidates. case pickTarget: - p.target = resource.(*core.StoreInfo) + p.Target = resource.(*core.StoreInfo) } } -func (p *balanceSchedulerPlan) SetResourceWithStep(resource interface{}, step int) { - p.step = step +// SetResourceWithStep is used to set resource for specific step. +func (p *BalanceSchedulerPlan) SetResourceWithStep(resource interface{}, step int) { + p.Step = step p.SetResource(resource) } -func (p *balanceSchedulerPlan) GetResource(step int) uint64 { - if p.step < step { +// GetResource is used to get resource for specific step. +func (p *BalanceSchedulerPlan) GetResource(step int) uint64 { + if p.Step < step { return 0 } // Please use with care. Add a nil check if need in the future switch step { case pickSource: - return p.source.GetID() + return p.Source.GetID() case pickRegion: - return p.region.GetID() + return p.Region.GetID() case pickTarget: - return p.target.GetID() + return p.Target.GetID() } return 0 } -func (p *balanceSchedulerPlan) GetStatus() *plan.Status { - return p.status +// GetStatus is used to get status of plan. +func (p *BalanceSchedulerPlan) GetStatus() *Status { + return p.Status } -func (p *balanceSchedulerPlan) SetStatus(status *plan.Status) { - p.status = status +// SetStatus is used to set status of plan. +func (p *BalanceSchedulerPlan) SetStatus(status *Status) { + p.Status = status } -func (p *balanceSchedulerPlan) Clone(opts ...plan.Option) plan.Plan { - plan := &balanceSchedulerPlan{ - status: p.status, +// Clone is used to clone a new plan. +func (p *BalanceSchedulerPlan) Clone(opts ...Option) Plan { + plan := &BalanceSchedulerPlan{ + Status: p.Status, } - plan.step = p.step - if p.step > pickSource { - plan.source = p.source + plan.Step = p.Step + if p.Step > pickSource { + plan.Source = p.Source } - if p.step > pickRegion { - plan.region = p.region + if p.Step > pickRegion { + plan.Region = p.Region } - if p.step > pickTarget { - plan.target = p.target + if p.Step > pickTarget { + plan.Target = p.Target } for _, opt := range opts { opt(plan) @@ -113,15 +120,15 @@ func (p *balanceSchedulerPlan) Clone(opts ...plan.Option) plan.Plan { } // BalancePlanSummary is used to summarize for BalancePlan -func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) { +func BalancePlanSummary(plans []Plan) (map[uint64]Status, bool, error) { // storeStatusCounter is used to count the number of various statuses of each store - storeStatusCounter := make(map[uint64]map[plan.Status]int) + storeStatusCounter := make(map[uint64]map[Status]int) // statusCounter is used to count the number of status which is regarded as best status of each store - statusCounter := make(map[uint64]plan.Status) + statusCounter := make(map[uint64]Status) storeMaxStep := make(map[uint64]int) normal := true for _, pi := range plans { - p, ok := pi.(*balanceSchedulerPlan) + p, ok := pi.(*BalanceSchedulerPlan) if !ok { return nil, false, errs.ErrDiagnosticLoadPlan } @@ -134,7 +141,7 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) // `step == pickRegion` is a special processing in summary, because we want to exclude the factor of region // and consider the failure as the status of source store. if step == pickRegion { - store = p.source.GetID() + store = p.Source.GetID() } else { store = p.GetResource(step) } @@ -143,20 +150,20 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) maxStep = -1 } if step > maxStep { - storeStatusCounter[store] = make(map[plan.Status]int) + storeStatusCounter[store] = make(map[Status]int) storeMaxStep[store] = step } else if step < maxStep { continue } - if !p.status.IsNormal() { + if !p.Status.IsNormal() { normal = false } - storeStatusCounter[store][*p.status]++ + storeStatusCounter[store][*p.Status]++ } for id, store := range storeStatusCounter { max := 0 - curStat := *plan.NewStatus(plan.StatusOK) + curStat := *NewStatus(StatusOK) for stat, c := range store { if balancePlanStatusComparer(max, curStat, c, stat) { max = c @@ -169,7 +176,7 @@ func BalancePlanSummary(plans []plan.Plan) (map[uint64]plan.Status, bool, error) } // balancePlanStatusComparer returns true if new status is better than old one. -func balancePlanStatusComparer(oldStatusCount int, oldStatus plan.Status, newStatusCount int, newStatus plan.Status) bool { +func balancePlanStatusComparer(oldStatusCount int, oldStatus Status, newStatusCount int, newStatus Status) bool { if newStatus.Priority() != oldStatus.Priority() { return newStatus.Priority() > oldStatus.Priority() } diff --git a/pkg/schedule/plan/balance_plan_test.go b/pkg/schedule/plan/balance_plan_test.go new file mode 100644 index 00000000000..59ad637d5c8 --- /dev/null +++ b/pkg/schedule/plan/balance_plan_test.go @@ -0,0 +1,271 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/core" +) + +type balanceSchedulerPlanAnalyzeTestSuite struct { + suite.Suite + + stores []*core.StoreInfo + regions []*core.RegionInfo + check func(map[uint64]Status, map[uint64]*Status) bool + ctx context.Context + cancel context.CancelFunc +} + +func TestBalanceSchedulerPlanAnalyzerTestSuite(t *testing.T) { + suite.Run(t, new(balanceSchedulerPlanAnalyzeTestSuite)) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) SetupSuite() { + suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.check = func(output map[uint64]Status, expects map[uint64]*Status) bool { + for id, Status := range expects { + outputStatus, ok := output[id] + if !ok { + return false + } + if outputStatus != *Status { + return false + } + } + return true + } + suite.stores = []*core.StoreInfo{ + core.NewStoreInfo( + &metapb.Store{ + Id: 1, + }, + ), + core.NewStoreInfo( + &metapb.Store{ + Id: 2, + }, + ), + core.NewStoreInfo( + &metapb.Store{ + Id: 3, + }, + ), + core.NewStoreInfo( + &metapb.Store{ + Id: 4, + }, + ), + core.NewStoreInfo( + &metapb.Store{ + Id: 5, + }, + ), + } + suite.regions = []*core.RegionInfo{ + core.NewRegionInfo( + &metapb.Region{ + Id: 1, + }, + &metapb.Peer{ + Id: 1, + StoreId: 1, + }, + ), + core.NewRegionInfo( + &metapb.Region{ + Id: 2, + }, + &metapb.Peer{ + Id: 2, + StoreId: 2, + }, + ), + core.NewRegionInfo( + &metapb.Region{ + Id: 3, + }, + &metapb.Peer{ + Id: 3, + StoreId: 3, + }, + ), + } +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TearDownSuite() { + suite.cancel() +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult1() { + plans := make([]Plan, 0) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 2, Target: suite.stores[0], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 2, Target: suite.stores[1], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 2, Target: suite.stores[2], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 2, Target: suite.stores[3], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 2, Target: suite.stores[4], Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 2, Target: suite.stores[0], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 2, Target: suite.stores[1], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 2, Target: suite.stores[2], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 2, Target: suite.stores[3], Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 2, Target: suite.stores[4], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 2, Target: suite.stores[0], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 2, Target: suite.stores[1], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 2, Target: suite.stores[2], Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 2, Target: suite.stores[3], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 2, Target: suite.stores[4], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 2, Target: suite.stores[0], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 2, Target: suite.stores[1], Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 2, Target: suite.stores[2], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 2, Target: suite.stores[3], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 2, Target: suite.stores[4], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 2, Target: suite.stores[0], Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 2, Target: suite.stores[1], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 2, Target: suite.stores[2], Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 2, Target: suite.stores[3], Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 2, Target: suite.stores[4], Status: NewStatus(StatusStoreScoreDisallowed)}) + statuses, isNormal, err := BalancePlanSummary(plans) + suite.NoError(err) + suite.True(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusStoreNotMatchRule), + 2: NewStatus(StatusStoreNotMatchRule), + 3: NewStatus(StatusStoreNotMatchRule), + 4: NewStatus(StatusStoreNotMatchRule), + 5: NewStatus(StatusStoreNotMatchRule), + })) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult2() { + plans := make([]Plan, 0) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Step: 0, Status: NewStatus(StatusStoreDown)}) + statuses, isNormal, err := BalancePlanSummary(plans) + suite.NoError(err) + suite.False(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusStoreDown), + 2: NewStatus(StatusStoreDown), + 3: NewStatus(StatusStoreDown), + 4: NewStatus(StatusStoreDown), + 5: NewStatus(StatusStoreDown), + })) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult3() { + plans := make([]Plan, 0) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Region: suite.regions[1], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Region: suite.regions[1], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + statuses, isNormal, err := BalancePlanSummary(plans) + suite.NoError(err) + suite.False(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusRegionNotMatchRule), + 2: NewStatus(StatusRegionNotMatchRule), + 3: NewStatus(StatusRegionNotMatchRule), + 4: NewStatus(StatusRegionNotMatchRule), + })) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult4() { + plans := make([]Plan, 0) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 0, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[0], Step: 2, Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[1], Step: 2, Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[2], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[3], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[4], Step: 2, Status: NewStatus(StatusStoreDown)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[0], Step: 2, Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[1], Step: 2, Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[2], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[3], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[4], Step: 2, Status: NewStatus(StatusStoreDown)}) + statuses, isNormal, err := BalancePlanSummary(plans) + suite.NoError(err) + suite.False(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusStoreAlreadyHasPeer), + 2: NewStatus(StatusStoreAlreadyHasPeer), + 3: NewStatus(StatusStoreNotMatchRule), + 4: NewStatus(StatusStoreNotMatchRule), + 5: NewStatus(StatusStoreDown), + })) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() { + plans := make([]Plan, 0) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[4], Step: 0, Status: NewStatus(StatusStoreRemoveLimitThrottled)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[3], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[2], Region: suite.regions[0], Step: 1, Status: NewStatus(StatusRegionNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[0], Step: 2, Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[1], Step: 2, Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[2], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[1], Target: suite.stores[3], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[0], Step: 2, Status: NewStatus(StatusStoreAlreadyHasPeer)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[1], Step: 3, Status: NewStatus(StatusStoreScoreDisallowed)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[2], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + plans = append(plans, &BalanceSchedulerPlan{Source: suite.stores[0], Target: suite.stores[3], Step: 2, Status: NewStatus(StatusStoreNotMatchRule)}) + statuses, isNormal, err := BalancePlanSummary(plans) + suite.NoError(err) + suite.False(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusStoreAlreadyHasPeer), + 2: NewStatus(StatusStoreAlreadyHasPeer), + 3: NewStatus(StatusStoreNotMatchRule), + 4: NewStatus(StatusStoreNotMatchRule), + 5: NewStatus(StatusStoreRemoveLimitThrottled), + })) +} + +func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult6() { + basePlan := NewBalanceSchedulerPlan() + collector := NewCollector(basePlan) + collector.Collect(SetResourceWithStep(suite.stores[0], 2), SetStatus(NewStatus(StatusStoreDown))) + collector.Collect(SetResourceWithStep(suite.stores[1], 2), SetStatus(NewStatus(StatusStoreDown))) + collector.Collect(SetResourceWithStep(suite.stores[2], 2), SetStatus(NewStatus(StatusStoreDown))) + collector.Collect(SetResourceWithStep(suite.stores[3], 2), SetStatus(NewStatus(StatusStoreDown))) + collector.Collect(SetResourceWithStep(suite.stores[4], 2), SetStatus(NewStatus(StatusStoreDown))) + basePlan.Source = suite.stores[0] + basePlan.Step++ + collector.Collect(SetResource(suite.regions[0]), SetStatus(NewStatus(StatusRegionNoLeader))) + statuses, isNormal, err := BalancePlanSummary(collector.GetPlans()) + suite.NoError(err) + suite.False(isNormal) + suite.True(suite.check(statuses, + map[uint64]*Status{ + 1: NewStatus(StatusStoreDown), + 2: NewStatus(StatusStoreDown), + 3: NewStatus(StatusStoreDown), + 4: NewStatus(StatusStoreDown), + 5: NewStatus(StatusStoreDown), + })) +} diff --git a/pkg/schedule/schedulers/balance_leader.go b/pkg/schedule/schedulers/balance_leader.go index 54773c1de0f..3c65ddbcee8 100644 --- a/pkg/schedule/schedulers/balance_leader.go +++ b/pkg/schedule/schedulers/balance_leader.go @@ -327,7 +327,7 @@ func (cs *candidateStores) resortStoreWithPos(pos int) { func (l *balanceLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { l.conf.mu.RLock() defer l.conf.mu.RUnlock() - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) @@ -381,16 +381,16 @@ func (l *balanceLeaderScheduler) Schedule(cluster sche.ScheduleCluster, dryRun b func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() - ssolver.step++ - defer func() { ssolver.step-- }() + ssolver.Step++ + defer func() { ssolver.Step-- }() retryLimit := l.retryQuota.GetLimit(store) var creator func(*solver, *plan.Collector) *operator.Operator switch dir { case transferOut: - ssolver.source, ssolver.target = store, nil + ssolver.Source, ssolver.Target = store, nil creator = l.transferLeaderOut case transferIn: - ssolver.source, ssolver.target = nil, store + ssolver.Source, ssolver.Target = nil, store creator = l.transferLeaderIn } var op *operator.Operator @@ -416,7 +416,7 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s usedRegions[op.RegionID()] = struct{}{} candidateUpdateStores := make([][]int, len(candidates)) for id, candidate := range candidates { - storesIDs := candidate.binarySearchStores(plan.source, plan.target) + storesIDs := candidate.binarySearchStores(plan.Source, plan.Target) candidateUpdateStores[id] = storesIDs } operator.AddOpInfluence(op, plan.opInfluence, plan.ScheduleCluster.GetBasicCluster()) @@ -431,27 +431,27 @@ func makeInfluence(op *operator.Operator, plan *solver, usedRegions map[uint64]s // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandLeaderRegions(solver.SourceStoreID(), l.conf.Ranges), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) - if solver.region == nil { + if solver.Region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) balanceLeaderNoLeaderRegionCounter.Inc() return nil } - if solver.IsRegionHot(solver.region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID())) + if solver.IsRegionHot(solver.Region) { + log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) if collector != nil { - collector.Collect(plan.SetResource(solver.region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) + collector.Collect(plan.SetResource(solver.Region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) } balanceLeaderRegionHotCounter.Inc() return nil } - solver.step++ - defer func() { solver.step-- }() - targets := solver.GetFollowerStores(solver.region) + solver.Step++ + defer func() { solver.Step-- }() + targets := solver.GetFollowerStores(solver.Region) finalFilters := l.filters conf := solver.GetOpts() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil { + if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } targets = filter.SelectTargetStores(targets, finalFilters, conf, collector, l.filterCounter) @@ -461,12 +461,12 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl jOp := solver.GetOpInfluence(targets[j].GetID()) return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp) }) - for _, solver.target = range targets { + for _, solver.Target = range targets { if op := l.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } @@ -475,24 +475,24 @@ func (l *balanceLeaderScheduler) transferLeaderOut(solver *solver, collector *pl // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *plan.Collector) *operator.Operator { - solver.region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandFollowerRegions(solver.TargetStoreID(), l.conf.Ranges), nil, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) - if solver.region == nil { + if solver.Region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", solver.TargetStoreID())) balanceLeaderNoFollowerRegionCounter.Inc() return nil } - if solver.IsRegionHot(solver.region) { - log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID())) + if solver.IsRegionHot(solver.Region) { + log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderRegionHotCounter.Inc() return nil } - leaderStoreID := solver.region.GetLeader().GetStoreId() - solver.source = solver.GetStore(leaderStoreID) - if solver.source == nil { + leaderStoreID := solver.Region.GetLeader().GetStoreId() + solver.Source = solver.GetStore(leaderStoreID) + if solver.Source == nil { log.Debug("region has no leader or leader store cannot be found", zap.String("scheduler", l.GetName()), - zap.Uint64("region-id", solver.region.GetID()), + zap.Uint64("region-id", solver.Region.GetID()), zap.Uint64("store-id", leaderStoreID), ) balanceLeaderNoLeaderRegionCounter.Inc() @@ -500,14 +500,14 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla } finalFilters := l.filters conf := solver.GetOpts() - if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, false /*allowMoveLeader*/); leaderFilter != nil { + if leaderFilter := filter.NewPlacementLeaderSafeguard(l.GetName(), conf, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, false /*allowMoveLeader*/); leaderFilter != nil { finalFilters = append(l.filters, leaderFilter) } - target := filter.NewCandidates([]*core.StoreInfo{solver.target}). + target := filter.NewCandidates([]*core.StoreInfo{solver.Target}). FilterTarget(conf, nil, l.filterCounter, finalFilters...). PickFirst() if target == nil { - log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", solver.Region.GetID())) balanceLeaderNoTargetStoreCounter.Inc() return nil } @@ -519,8 +519,8 @@ func (l *balanceLeaderScheduler) transferLeaderIn(solver *solver, collector *pla // no new operator need to be created, otherwise create an operator that transfers // the leader from the source store to the target store for the region. func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { - solver.step++ - defer func() { solver.step-- }() + solver.Step++ + defer func() { solver.Step-- }() solver.sourceScore, solver.targetScore = solver.sourceStoreScore(l.GetName()), solver.targetStoreScore(l.GetName()) if !solver.shouldBalance(l.GetName()) { balanceLeaderSkipCounter.Inc() @@ -529,9 +529,9 @@ func (l *balanceLeaderScheduler) createOperator(solver *solver, collector *plan. } return nil } - solver.step++ - defer func() { solver.step-- }() - op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.region, solver.region.GetLeader().GetStoreId(), solver.TargetStoreID(), []uint64{}, operator.OpLeader) + solver.Step++ + defer func() { solver.Step-- }() + op, err := operator.CreateTransferLeaderOperator(BalanceLeaderType, solver, solver.Region, solver.Region.GetLeader().GetStoreId(), solver.TargetStoreID(), []uint64{}, operator.OpLeader) if err != nil { log.Debug("fail to create balance leader operator", errs.ZapError(err)) if collector != nil { diff --git a/pkg/schedule/schedulers/balance_plan_test.go b/pkg/schedule/schedulers/balance_plan_test.go deleted file mode 100644 index 0599ee9eadc..00000000000 --- a/pkg/schedule/schedulers/balance_plan_test.go +++ /dev/null @@ -1,272 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package schedulers - -import ( - "context" - "testing" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/plan" -) - -type balanceSchedulerPlanAnalyzeTestSuite struct { - suite.Suite - - stores []*core.StoreInfo - regions []*core.RegionInfo - check func(map[uint64]plan.Status, map[uint64]*plan.Status) bool - ctx context.Context - cancel context.CancelFunc -} - -func TestBalanceSchedulerPlanAnalyzerTestSuite(t *testing.T) { - suite.Run(t, new(balanceSchedulerPlanAnalyzeTestSuite)) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) SetupSuite() { - suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.check = func(output map[uint64]plan.Status, expects map[uint64]*plan.Status) bool { - for id, status := range expects { - outputStatus, ok := output[id] - if !ok { - return false - } - if outputStatus != *status { - return false - } - } - return true - } - suite.stores = []*core.StoreInfo{ - core.NewStoreInfo( - &metapb.Store{ - Id: 1, - }, - ), - core.NewStoreInfo( - &metapb.Store{ - Id: 2, - }, - ), - core.NewStoreInfo( - &metapb.Store{ - Id: 3, - }, - ), - core.NewStoreInfo( - &metapb.Store{ - Id: 4, - }, - ), - core.NewStoreInfo( - &metapb.Store{ - Id: 5, - }, - ), - } - suite.regions = []*core.RegionInfo{ - core.NewRegionInfo( - &metapb.Region{ - Id: 1, - }, - &metapb.Peer{ - Id: 1, - StoreId: 1, - }, - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 2, - }, - &metapb.Peer{ - Id: 2, - StoreId: 2, - }, - ), - core.NewRegionInfo( - &metapb.Region{ - Id: 3, - }, - &metapb.Peer{ - Id: 3, - StoreId: 3, - }, - ), - } -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TearDownSuite() { - suite.cancel() -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult1() { - plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 2, target: suite.stores[0], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 2, target: suite.stores[1], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 2, target: suite.stores[2], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 2, target: suite.stores[3], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 2, target: suite.stores[4], status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 2, target: suite.stores[0], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 2, target: suite.stores[1], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 2, target: suite.stores[2], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 2, target: suite.stores[3], status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 2, target: suite.stores[4], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 2, target: suite.stores[0], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 2, target: suite.stores[1], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 2, target: suite.stores[2], status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 2, target: suite.stores[3], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 2, target: suite.stores[4], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 2, target: suite.stores[0], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 2, target: suite.stores[1], status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 2, target: suite.stores[2], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 2, target: suite.stores[3], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 2, target: suite.stores[4], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 2, target: suite.stores[0], status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 2, target: suite.stores[1], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 2, target: suite.stores[2], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 2, target: suite.stores[3], status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 2, target: suite.stores[4], status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - statuses, isNormal, err := BalancePlanSummary(plans) - suite.NoError(err) - suite.True(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusStoreNotMatchRule), - 2: plan.NewStatus(plan.StatusStoreNotMatchRule), - 3: plan.NewStatus(plan.StatusStoreNotMatchRule), - 4: plan.NewStatus(plan.StatusStoreNotMatchRule), - 5: plan.NewStatus(plan.StatusStoreNotMatchRule), - })) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult2() { - plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - statuses, isNormal, err := BalancePlanSummary(plans) - suite.NoError(err) - suite.False(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusStoreDown), - 2: plan.NewStatus(plan.StatusStoreDown), - 3: plan.NewStatus(plan.StatusStoreDown), - 4: plan.NewStatus(plan.StatusStoreDown), - 5: plan.NewStatus(plan.StatusStoreDown), - })) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult3() { - plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], region: suite.regions[1], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], region: suite.regions[1], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - statuses, isNormal, err := BalancePlanSummary(plans) - suite.NoError(err) - suite.False(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusRegionNotMatchRule), - 2: plan.NewStatus(plan.StatusRegionNotMatchRule), - 3: plan.NewStatus(plan.StatusRegionNotMatchRule), - 4: plan.NewStatus(plan.StatusRegionNotMatchRule), - })) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult4() { - plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[4], step: 2, status: plan.NewStatus(plan.StatusStoreDown)}) - statuses, isNormal, err := BalancePlanSummary(plans) - suite.NoError(err) - suite.False(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusStoreAlreadyHasPeer), - 2: plan.NewStatus(plan.StatusStoreAlreadyHasPeer), - 3: plan.NewStatus(plan.StatusStoreNotMatchRule), - 4: plan.NewStatus(plan.StatusStoreNotMatchRule), - 5: plan.NewStatus(plan.StatusStoreDown), - })) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult5() { - plans := make([]plan.Plan, 0) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[4], step: 0, status: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[3], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[2], region: suite.regions[0], step: 1, status: plan.NewStatus(plan.StatusRegionNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[1], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[1], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[0], step: 2, status: plan.NewStatus(plan.StatusStoreAlreadyHasPeer)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[1], step: 3, status: plan.NewStatus(plan.StatusStoreScoreDisallowed)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[2], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - plans = append(plans, &balanceSchedulerPlan{source: suite.stores[0], target: suite.stores[3], step: 2, status: plan.NewStatus(plan.StatusStoreNotMatchRule)}) - statuses, isNormal, err := BalancePlanSummary(plans) - suite.NoError(err) - suite.False(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusStoreAlreadyHasPeer), - 2: plan.NewStatus(plan.StatusStoreAlreadyHasPeer), - 3: plan.NewStatus(plan.StatusStoreNotMatchRule), - 4: plan.NewStatus(plan.StatusStoreNotMatchRule), - 5: plan.NewStatus(plan.StatusStoreRemoveLimitThrottled), - })) -} - -func (suite *balanceSchedulerPlanAnalyzeTestSuite) TestAnalyzerResult6() { - basePlan := NewBalanceSchedulerPlan() - collector := plan.NewCollector(basePlan) - collector.Collect(plan.SetResourceWithStep(suite.stores[0], 2), plan.SetStatus(plan.NewStatus(plan.StatusStoreDown))) - collector.Collect(plan.SetResourceWithStep(suite.stores[1], 2), plan.SetStatus(plan.NewStatus(plan.StatusStoreDown))) - collector.Collect(plan.SetResourceWithStep(suite.stores[2], 2), plan.SetStatus(plan.NewStatus(plan.StatusStoreDown))) - collector.Collect(plan.SetResourceWithStep(suite.stores[3], 2), plan.SetStatus(plan.NewStatus(plan.StatusStoreDown))) - collector.Collect(plan.SetResourceWithStep(suite.stores[4], 2), plan.SetStatus(plan.NewStatus(plan.StatusStoreDown))) - basePlan.source = suite.stores[0] - basePlan.step++ - collector.Collect(plan.SetResource(suite.regions[0]), plan.SetStatus(plan.NewStatus(plan.StatusRegionNoLeader))) - statuses, isNormal, err := BalancePlanSummary(collector.GetPlans()) - suite.NoError(err) - suite.False(isNormal) - suite.True(suite.check(statuses, - map[uint64]*plan.Status{ - 1: plan.NewStatus(plan.StatusStoreDown), - 2: plan.NewStatus(plan.StatusStoreDown), - 3: plan.NewStatus(plan.StatusStoreDown), - 4: plan.NewStatus(plan.StatusStoreDown), - 5: plan.NewStatus(plan.StatusStoreDown), - })) -} diff --git a/pkg/schedule/schedulers/balance_region.go b/pkg/schedule/schedulers/balance_region.go index c3dc4cf9d68..86705045e48 100644 --- a/pkg/schedule/schedulers/balance_region.go +++ b/pkg/schedule/schedulers/balance_region.go @@ -122,7 +122,7 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster) } func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) @@ -160,12 +160,12 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun b collector.Collect(plan.SetResource(sourceStores[0]), plan.SetStatus(plan.NewStatus(plan.StatusStoreScoreDisallowed))) } - solver.step++ + solver.Step++ var sourceIndex int // sourcesStore is sorted by region score desc, so we pick the first store as source store. - for sourceIndex, solver.source = range sourceStores { - retryLimit := s.retryQuota.GetLimit(solver.source) + for sourceIndex, solver.Source = range sourceStores { + retryLimit := s.retryQuota.GetLimit(solver.Source) solver.sourceScore = solver.sourceStoreScore(s.GetName()) if sourceIndex == len(sourceStores)-1 { break @@ -173,58 +173,58 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun b for i := 0; i < retryLimit; i++ { // Priority pick the region that has a pending peer. // Pending region may mean the disk is overload, remove the pending region firstly. - solver.region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.SourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandPendingRegions(solver.SourceStoreID(), s.conf.Ranges), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()))...) - if solver.region == nil { + if solver.Region == nil { // Then pick the region that has a follower in the source store. - solver.region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandFollowerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) } - if solver.region == nil { + if solver.Region == nil { // Then pick the region has the leader in the source store. - solver.region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.SourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandLeaderRegions(solver.SourceStoreID(), s.conf.Ranges), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) } - if solver.region == nil { + if solver.Region == nil { // Finally, pick learner. - solver.region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, + solver.Region = filter.SelectOneRegion(cluster.RandLearnerRegions(solver.SourceStoreID(), s.conf.Ranges), collector, append(baseRegionFilters, filter.NewRegionWitnessFilter(solver.SourceStoreID()), pendingFilter)...) } - if solver.region == nil { + if solver.Region == nil { balanceRegionNoRegionCounter.Inc() continue } - log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.region.GetID())) + log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) // Skip hot regions. - if cluster.IsRegionHot(solver.region) { - log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.region.GetID())) + if cluster.IsRegionHot(solver.Region) { + log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) if collector != nil { - collector.Collect(plan.SetResource(solver.region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) + collector.Collect(plan.SetResource(solver.Region), plan.SetStatus(plan.NewStatus(plan.StatusRegionHot))) } balanceRegionHotCounter.Inc() continue } // Check region leader - if solver.region.GetLeader() == nil { - log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.region.GetID())) + if solver.Region.GetLeader() == nil { + log.Warn("region have no leader", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", solver.Region.GetID())) if collector != nil { - collector.Collect(plan.SetResource(solver.region), plan.SetStatus(plan.NewStatus(plan.StatusRegionNoLeader))) + collector.Collect(plan.SetResource(solver.Region), plan.SetStatus(plan.NewStatus(plan.StatusRegionNoLeader))) } balanceRegionNoLeaderCounter.Inc() continue } - solver.step++ + solver.Step++ // the replica filter will cache the last region fit and the select one will only pict the first one region that // satisfy all the filters, so the region fit must belong the scheduled region. solver.fit = replicaFilter.(*filter.RegionReplicatedFilter).GetFit() if op := s.transferPeer(solver, collector, sourceStores[sourceIndex+1:], faultTargets); op != nil { - s.retryQuota.ResetLimit(solver.source) + s.retryQuota.ResetLimit(solver.Source) op.Counters = append(op.Counters, balanceRegionNewOpCounter) return []*operator.Operator{op}, collector.GetPlans() } - solver.step-- + solver.Step-- } - s.retryQuota.Attenuate(solver.source) + s.retryQuota.Attenuate(solver.Source) } s.filterCounter.Flush() s.retryQuota.GC(stores) @@ -233,7 +233,7 @@ func (s *balanceRegionScheduler) Schedule(cluster sche.ScheduleCluster, dryRun b // transferPeer selects the best store to create a new peer to replace the old peer. func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Collector, dstStores []*core.StoreInfo, faultStores []*core.StoreInfo) *operator.Operator { - excludeTargets := solver.region.GetStoreIDs() + excludeTargets := solver.Region.GetStoreIDs() for _, store := range faultStores { excludeTargets[store.GetID()] = struct{}{} } @@ -242,20 +242,20 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co filters := []filter.Filter{ filter.NewExcludedFilter(s.GetName(), nil, excludeTargets), filter.NewPlacementSafeguard(s.GetName(), solver.GetOpts(), solver.GetBasicCluster(), solver.GetRuleManager(), - solver.region, solver.source, solver.fit), + solver.Region, solver.Source, solver.fit), } candidates := filter.NewCandidates(dstStores).FilterTarget(solver.GetOpts(), collector, s.filterCounter, filters...) if len(candidates.Stores) != 0 { - solver.step++ + solver.Step++ } // candidates are sorted by region score desc, so we pick the last store as target store. for i := range candidates.Stores { - solver.target = candidates.Stores[len(candidates.Stores)-i-1] + solver.Target = candidates.Stores[len(candidates.Stores)-i-1] solver.targetScore = solver.targetStoreScore(s.GetName()) - regionID := solver.region.GetID() - sourceID := solver.source.GetID() - targetID := solver.target.GetID() + regionID := solver.Region.GetID() + sourceID := solver.Source.GetID() + targetID := solver.Target.GetID() log.Debug("candidate store", zap.Uint64("region-id", regionID), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID)) if !solver.shouldBalance(s.GetName()) { @@ -266,10 +266,10 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co continue } - oldPeer := solver.region.GetStorePeer(sourceID) - newPeer := &metapb.Peer{StoreId: solver.target.GetID(), Role: oldPeer.Role} - solver.step++ - op, err := operator.CreateMovePeerOperator(BalanceRegionType, solver, solver.region, operator.OpRegion, oldPeer.GetStoreId(), newPeer) + oldPeer := solver.Region.GetStorePeer(sourceID) + newPeer := &metapb.Peer{StoreId: solver.Target.GetID(), Role: oldPeer.Role} + solver.Step++ + op, err := operator.CreateMovePeerOperator(BalanceRegionType, solver, solver.Region, operator.OpRegion, oldPeer.GetStoreId(), newPeer) if err != nil { balanceRegionCreateOpFailCounter.Inc() if collector != nil { @@ -280,7 +280,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co if collector != nil { collector.Collect() } - solver.step-- + solver.Step-- sourceLabel := strconv.FormatUint(sourceID, 10) targetLabel := strconv.FormatUint(targetID, 10) op.FinishedCounters = append(op.FinishedCounters, @@ -296,7 +296,7 @@ func (s *balanceRegionScheduler) transferPeer(solver *solver, collector *plan.Co balanceRegionNoReplacementCounter.Inc() if len(candidates.Stores) != 0 { - solver.step-- + solver.Step-- } return nil } diff --git a/pkg/schedule/schedulers/balance_test.go b/pkg/schedule/schedulers/balance_test.go index 8fa0294a572..fd07cb1d04b 100644 --- a/pkg/schedule/schedulers/balance_test.go +++ b/pkg/schedule/schedulers/balance_test.go @@ -64,16 +64,16 @@ func TestInfluenceAmp(t *testing.T) { tc.AddLeaderRegion(1, 1, 2) region := tc.GetRegion(1).Clone(core.SetApproximateSize(R)) tc.PutRegion(region) - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() solver := newSolver(basePlan, kind, tc, influence) - solver.source, solver.target, solver.region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) + solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.True(solver.shouldBalance("")) // It will not schedule if the diff region count is greater than the sum // of TolerantSizeRatio and influenceAmp*2. tc.AddRegionStore(1, int(100+influenceAmp+2)) - solver.source = tc.GetStore(1) + solver.Source = tc.GetStore(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.False(solver.shouldBalance("")) re.Less(solver.sourceScore-solver.targetScore, float64(1)) @@ -149,9 +149,9 @@ func TestShouldBalance(t *testing.T) { tc.PutRegion(region) tc.SetLeaderSchedulePolicy(testCase.kind.String()) kind := constant.NewScheduleKind(constant.LeaderKind, testCase.kind) - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster())) - solver.source, solver.target, solver.region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) + solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.Equal(testCase.expectedResult, solver.shouldBalance("")) } @@ -163,9 +163,9 @@ func TestShouldBalance(t *testing.T) { region := tc.GetRegion(1).Clone(core.SetApproximateSize(testCase.regionSize)) tc.PutRegion(region) kind := constant.NewScheduleKind(constant.RegionKind, testCase.kind) - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() solver := newSolver(basePlan, kind, tc, oc.GetOpInfluence(tc.GetBasicCluster())) - solver.source, solver.target, solver.region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) + solver.Source, solver.Target, solver.Region = tc.GetStore(1), tc.GetStore(2), tc.GetRegion(1) solver.sourceScore, solver.targetScore = solver.sourceStoreScore(""), solver.targetStoreScore("") re.Equal(testCase.expectedResult, solver.shouldBalance("")) } @@ -213,9 +213,9 @@ func TestTolerantRatio(t *testing.T) { } for _, t := range tbl { tc.SetTolerantSizeRatio(t.ratio) - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() solver := newSolver(basePlan, t.kind, tc, operator.OpInfluence{}) - solver.region = region + solver.Region = region sourceScore := t.expectTolerantResource(t.kind) targetScore := solver.getTolerantResource() diff --git a/pkg/schedule/schedulers/balance_witness.go b/pkg/schedule/schedulers/balance_witness.go index e2881123aee..6e575c0f932 100644 --- a/pkg/schedule/schedulers/balance_witness.go +++ b/pkg/schedule/schedulers/balance_witness.go @@ -220,7 +220,7 @@ func (b *balanceWitnessScheduler) IsScheduleAllowed(cluster sche.ScheduleCluster func (b *balanceWitnessScheduler) Schedule(cluster sche.ScheduleCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) { b.conf.mu.RLock() defer b.conf.mu.RUnlock() - basePlan := NewBalanceSchedulerPlan() + basePlan := plan.NewBalanceSchedulerPlan() var collector *plan.Collector if dryRun { collector = plan.NewCollector(basePlan) @@ -257,10 +257,10 @@ func (b *balanceWitnessScheduler) Schedule(cluster sche.ScheduleCluster, dryRun func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessScheduler, ssolver *solver, usedRegions map[uint64]struct{}, collector *plan.Collector) *operator.Operator { store := cs.getStore() - ssolver.step++ - defer func() { ssolver.step-- }() + ssolver.Step++ + defer func() { ssolver.Step-- }() retryLimit := b.retryQuota.GetLimit(store) - ssolver.source, ssolver.target = store, nil + ssolver.Source, ssolver.Target = store, nil var op *operator.Operator for i := 0; i < retryLimit; i++ { schedulerCounter.WithLabelValues(b.GetName(), "total").Inc() @@ -285,19 +285,19 @@ func createTransferWitnessOperator(cs *candidateStores, b *balanceWitnessSchedul // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the witness. func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector *plan.Collector) *operator.Operator { - solver.region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.SourceStoreID(), b.conf.Ranges), + solver.Region = filter.SelectOneRegion(solver.RandWitnessRegions(solver.SourceStoreID(), b.conf.Ranges), collector, filter.NewRegionPendingFilter(), filter.NewRegionDownFilter()) - if solver.region == nil { + if solver.Region == nil { log.Debug("store has no witness", zap.String("scheduler", b.GetName()), zap.Uint64("store-id", solver.SourceStoreID())) schedulerCounter.WithLabelValues(b.GetName(), "no-witness-region").Inc() return nil } - solver.step++ - defer func() { solver.step-- }() - targets := solver.GetNonWitnessVoterStores(solver.region) + solver.Step++ + defer func() { solver.Step-- }() + targets := solver.GetNonWitnessVoterStores(solver.Region) finalFilters := b.filters opts := solver.GetOpts() - if witnessFilter := filter.NewPlacementWitnessSafeguard(b.GetName(), opts, solver.GetBasicCluster(), solver.GetRuleManager(), solver.region, solver.source, solver.fit); witnessFilter != nil { + if witnessFilter := filter.NewPlacementWitnessSafeguard(b.GetName(), opts, solver.GetBasicCluster(), solver.GetRuleManager(), solver.Region, solver.Source, solver.fit); witnessFilter != nil { finalFilters = append(b.filters, witnessFilter) } targets = filter.SelectTargetStores(targets, finalFilters, opts, collector, b.filterCounter) @@ -306,12 +306,12 @@ func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector * jOp := solver.GetOpInfluence(targets[j].GetID()) return targets[i].WitnessScore(iOp) < targets[j].WitnessScore(jOp) }) - for _, solver.target = range targets { + for _, solver.Target = range targets { if op := b.createOperator(solver, collector); op != nil { return op } } - log.Debug("region has no target store", zap.String("scheduler", b.GetName()), zap.Uint64("region-id", solver.region.GetID())) + log.Debug("region has no target store", zap.String("scheduler", b.GetName()), zap.Uint64("region-id", solver.Region.GetID())) schedulerCounter.WithLabelValues(b.GetName(), "no-target-store").Inc() return nil } @@ -321,8 +321,8 @@ func (b *balanceWitnessScheduler) transferWitnessOut(solver *solver, collector * // no new operator need to be created, otherwise create an operator that transfers // the witness from the source store to the target store for the region. func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan.Collector) *operator.Operator { - solver.step++ - defer func() { solver.step-- }() + solver.Step++ + defer func() { solver.Step-- }() solver.sourceScore, solver.targetScore = solver.sourceStoreScore(b.GetName()), solver.targetStoreScore(b.GetName()) if !solver.shouldBalance(b.GetName()) { schedulerCounter.WithLabelValues(b.GetName(), "skip").Inc() @@ -331,9 +331,9 @@ func (b *balanceWitnessScheduler) createOperator(solver *solver, collector *plan } return nil } - solver.step++ - defer func() { solver.step-- }() - op, err := operator.CreateMoveWitnessOperator(BalanceWitnessType, solver, solver.region, solver.SourceStoreID(), solver.TargetStoreID()) + solver.Step++ + defer func() { solver.Step-- }() + op, err := operator.CreateMoveWitnessOperator(BalanceWitnessType, solver, solver.Region, solver.SourceStoreID(), solver.TargetStoreID()) if err != nil { log.Debug("fail to create balance witness operator", errs.ZapError(err)) return nil diff --git a/pkg/schedule/schedulers/utils.go b/pkg/schedule/schedulers/utils.go index 5b034e79842..afeb28044c7 100644 --- a/pkg/schedule/schedulers/utils.go +++ b/pkg/schedule/schedulers/utils.go @@ -26,6 +26,7 @@ import ( sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/plan" "github.com/tikv/pd/pkg/statistics" "go.uber.org/zap" ) @@ -42,7 +43,7 @@ const ( ) type solver struct { - *balanceSchedulerPlan + *plan.BalanceSchedulerPlan sche.ScheduleCluster kind constant.ScheduleKind opInfluence operator.OpInfluence @@ -54,9 +55,9 @@ type solver struct { targetScore float64 } -func newSolver(basePlan *balanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.ScheduleCluster, opInfluence operator.OpInfluence) *solver { +func newSolver(basePlan *plan.BalanceSchedulerPlan, kind constant.ScheduleKind, cluster sche.ScheduleCluster, opInfluence operator.OpInfluence) *solver { return &solver{ - balanceSchedulerPlan: basePlan, + BalanceSchedulerPlan: basePlan, ScheduleCluster: cluster, kind: kind, opInfluence: opInfluence, @@ -69,7 +70,7 @@ func (p *solver) GetOpInfluence(storeID uint64) int64 { } func (p *solver) SourceStoreID() uint64 { - return p.source.GetID() + return p.Source.GetID() } func (p *solver) SourceMetricLabel() string { @@ -77,7 +78,7 @@ func (p *solver) SourceMetricLabel() string { } func (p *solver) TargetStoreID() uint64 { - return p.target.GetID() + return p.Target.GetID() } func (p *solver) TargetMetricLabel() string { @@ -85,7 +86,7 @@ func (p *solver) TargetMetricLabel() string { } func (p *solver) sourceStoreScore(scheduleName string) float64 { - sourceID := p.source.GetID() + sourceID := p.Source.GetID() tolerantResource := p.getTolerantResource() // to avoid schedule too much, if A's core greater than B and C a little // we want that A should be moved out one region not two @@ -103,19 +104,19 @@ func (p *solver) sourceStoreScore(scheduleName string) float64 { switch p.kind.Resource { case constant.LeaderKind: sourceDelta := influence - tolerantResource - score = p.source.LeaderScore(p.kind.Policy, sourceDelta) + score = p.Source.LeaderScore(p.kind.Policy, sourceDelta) case constant.RegionKind: sourceDelta := influence*influenceAmp - tolerantResource - score = p.source.RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), sourceDelta) + score = p.Source.RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), sourceDelta) case constant.WitnessKind: sourceDelta := influence - tolerantResource - score = p.source.WitnessScore(sourceDelta) + score = p.Source.WitnessScore(sourceDelta) } return score } func (p *solver) targetStoreScore(scheduleName string) float64 { - targetID := p.target.GetID() + targetID := p.Target.GetID() // to avoid schedule too much, if A's score less than B and C in small range, // we want that A can be moved in one region not two tolerantResource := p.getTolerantResource() @@ -134,13 +135,13 @@ func (p *solver) targetStoreScore(scheduleName string) float64 { switch p.kind.Resource { case constant.LeaderKind: targetDelta := influence + tolerantResource - score = p.target.LeaderScore(p.kind.Policy, targetDelta) + score = p.Target.LeaderScore(p.kind.Policy, targetDelta) case constant.RegionKind: targetDelta := influence*influenceAmp + tolerantResource - score = p.target.RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), targetDelta) + score = p.Target.RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), targetDelta) case constant.WitnessKind: targetDelta := influence + tolerantResource - score = p.target.WitnessScore(targetDelta) + score = p.Target.WitnessScore(targetDelta) } return score } @@ -151,16 +152,16 @@ func (p *solver) shouldBalance(scheduleName string) bool { // The reason we use max(regionSize, averageRegionSize) to check is: // 1. prevent moving small regions between stores with close scores, leading to unnecessary balance. // 2. prevent moving huge regions, leading to over balance. - sourceID := p.source.GetID() - targetID := p.target.GetID() + sourceID := p.Source.GetID() + targetID := p.Target.GetID() // Make sure after move, source score is still greater than target score. shouldBalance := p.sourceScore > p.targetScore if !shouldBalance && log.GetLevel() <= zap.DebugLevel { log.Debug("skip balance "+p.kind.Resource.String(), - zap.String("scheduler", scheduleName), zap.Uint64("region-id", p.region.GetID()), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID), - zap.Int64("source-size", p.source.GetRegionSize()), zap.Float64("source-score", p.sourceScore), - zap.Int64("target-size", p.target.GetRegionSize()), zap.Float64("target-score", p.targetScore), + zap.String("scheduler", scheduleName), zap.Uint64("region-id", p.Region.GetID()), zap.Uint64("source-store", sourceID), zap.Uint64("target-store", targetID), + zap.Int64("source-size", p.Source.GetRegionSize()), zap.Float64("source-score", p.sourceScore), + zap.Int64("target-size", p.Target.GetRegionSize()), zap.Float64("target-score", p.targetScore), zap.Int64("average-region-size", p.GetAverageRegionSize()), zap.Int64("tolerant-resource", p.getTolerantResource())) }