Skip to content

Commit

Permalink
decouple the scheduler and checker interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jul 11, 2023
1 parent 3cce629 commit a2cda87
Show file tree
Hide file tree
Showing 65 changed files with 550 additions and 485 deletions.
18 changes: 14 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,19 @@ func (mc *Cluster) GetStoreConfig() sc.StoreConfig {
return mc.StoreConfigManager.GetStoreConfig()
}

// GetOpts returns the cluster configuration.
func (mc *Cluster) GetOpts() sc.Config {
return mc.PersistOptions
// GetCheckerConfig returns the checker config.
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfig {
return mc
}

// GetSchedulerConfig returns the scheduler config.
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfig {
return mc
}

// GetSharedConfig returns the shared config.
func (mc *Cluster) GetSharedConfig() sc.SharedConfig {
return mc
}

// GetStorage returns the storage.
Expand Down Expand Up @@ -198,7 +208,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {

func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetOpts())
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("

// Controller is used to manage all checkers.
type Controller struct {
cluster sche.ClusterInformer
conf config.Config
cluster sche.CheckerCluster
conf config.CheckerConfig
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
Expand All @@ -53,7 +53,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down Expand Up @@ -87,7 +87,7 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
}

if c.conf.IsPlacementRulesEnabled() {
skipRuleCheck := c.cluster.GetOpts().IsPlacementRulesCacheEnabled() &&
skipRuleCheck := c.cluster.GetCheckerConfig().IsPlacementRulesCacheEnabled() &&
c.cluster.GetRuleManager().IsRegionFitCached(c.cluster, region)
if skipRuleCheck {
// If the fit is fetched from cache, it seems that the region doesn't need check
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// JointStateChecker ensures region is in joint state will leave.
type JointStateChecker struct {
PauseController
cluster sche.ClusterInformer
cluster sche.CheckerCluster
}

const jointStateCheckerName = "joint_state_checker"
Expand All @@ -41,7 +41,7 @@ var (
)

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster sche.ClusterInformer) *JointStateChecker {
func NewJointStateChecker(cluster sche.CheckerCluster) *JointStateChecker {
return &JointStateChecker{
cluster: cluster,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
PauseController
cluster sche.ClusterInformer
cluster sche.CheckerCluster
}

var (
Expand All @@ -34,7 +34,7 @@ var (
)

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster sche.ClusterInformer) *LearnerChecker {
func NewLearnerChecker(cluster sche.CheckerCluster) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ var (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster sche.ScheduleCluster
conf config.Config
cluster sche.CheckerCluster
conf config.CheckerConfig
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.ScheduleCluster, conf config.Config) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster sche.SharedCluster, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand All @@ -266,7 +266,7 @@ func AllowMerge(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo)
// We can consider using dependency injection techniques to optimize in
// the future.

if cluster.GetOpts().IsPlacementRulesEnabled() {
if cluster.GetSharedConfig().IsPlacementRulesEnabled() {
cl, ok := cluster.(interface{ GetRuleManager() *placement.RuleManager })
if !ok || len(cl.GetRuleManager().GetSplitKeys(start, end)) > 0 {
return false
Expand All @@ -283,10 +283,10 @@ func AllowMerge(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo)
}
}

policy := cluster.GetOpts().GetKeyType()
policy := cluster.GetSharedConfig().GetKeyType()
switch policy {
case constant.Table:
if cluster.GetOpts().IsCrossTableMergeEnabled() {
if cluster.GetSharedConfig().IsCrossTableMergeEnabled() {
return true
}
return isTableIDSame(region, adjacent)
Expand All @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster sche.ScheduleCluster, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster sche.SharedCluster, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIDs()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *mergeCheckerTestSuite) SetupTest() {
for _, region := range suite.regions {
suite.cluster.PutRegion(region)
}
suite.mc = NewMergeChecker(suite.ctx, suite.cluster, suite.cluster.GetOpts())
suite.mc = NewMergeChecker(suite.ctx, suite.cluster, suite.cluster.GetCheckerConfig())
}

func (suite *mergeCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -461,9 +461,9 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {
tc.PutRegion(region)
}

mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts())
mc := NewMergeChecker(suite.ctx, tc, tc.GetCheckerConfig())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := operator.NewController(suite.ctx, tc.GetBasicCluster(), tc.GetOpts(), stream)
oc := operator.NewController(suite.ctx, tc.GetBasicCluster(), tc.GetSharedConfig(), stream)

regions[2] = regions[2].Clone(
core.SetPeers([]*metapb.Peer{
Expand Down Expand Up @@ -530,7 +530,7 @@ func (suite *mergeCheckerTestSuite) TestCache() {
suite.cluster.PutRegion(region)
}

suite.mc = NewMergeChecker(suite.ctx, suite.cluster, suite.cluster.GetOpts())
suite.mc = NewMergeChecker(suite.ctx, suite.cluster, suite.cluster.GetCheckerConfig())

ops := suite.mc.Check(suite.regions[1])
suite.Nil(ops)
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster sche.ClusterInformer
conf config.Config
cluster sche.CheckerCluster
conf config.CheckerConfig
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.ClusterInformer, conf config.Config) *PriorityInspector {
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfig) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/priority_inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCheckPriorityRegions(t *testing.T) {
tc.AddLeaderRegion(2, 2, 3)
tc.AddLeaderRegion(3, 2)

pc := NewPriorityInspector(tc, tc.GetOpts())
pc := NewPriorityInspector(tc, tc.GetCheckerConfig())
checkPriorityRegionTest(re, pc, tc)
opt.SetPlacementRuleEnabled(true)
re.True(opt.IsPlacementRulesEnabled())
Expand All @@ -47,7 +47,7 @@ func TestCheckPriorityRegions(t *testing.T) {
func checkPriorityRegionTest(re *require.Assertions, pc *PriorityInspector, tc *mockcluster.Cluster) {
// case1: inspect region 1, it doesn't lack replica
region := tc.GetRegion(1)
opt := tc.GetOpts()
opt := tc.GetCheckerConfig()
pc.Inspect(region)
re.Equal(0, pc.queue.Len())

Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
cluster sche.ClusterInformer
conf config.Config
cluster sche.CheckerCluster
conf config.CheckerConfig
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.ClusterInformer, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfig, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
20 changes: 10 additions & 10 deletions pkg/schedule/checker/replica_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetOpts(), cache.NewDefaultCache(10))
suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10))
stats := &pdpb.StoreStats{
Capacity: 100,
Available: 100,
Expand Down Expand Up @@ -207,7 +207,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetMaxSnapshotCount(2)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

// Add stores 1,2,3,4.
tc.AddRegionStore(1, 4)
Expand Down Expand Up @@ -283,7 +283,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() {
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)

rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

// now region peer in store 1,2,3.but we just have store 1,2
// This happens only in recovering the PD tc
Expand All @@ -301,7 +301,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))
tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -352,7 +352,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() {
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "rack", "host"})

rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -431,7 +431,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() {
tc.SetMaxReplicas(5)
tc.SetLocationLabels([]string{"zone", "host"})

rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"})
Expand Down Expand Up @@ -459,7 +459,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetLocationLabels([]string{"zone"})
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.UpdateStorageRatio(1, 0.5, 0.5)
Expand Down Expand Up @@ -494,7 +494,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddRegionStore(1, 100)
tc.AddRegionStore(2, 100)
Expand Down Expand Up @@ -526,7 +526,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down Expand Up @@ -557,7 +557,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() {
tc := mockcluster.NewCluster(suite.ctx, opt)
tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0))
tc.SetLocationLabels([]string{"zone"})
rc := NewReplicaChecker(tc, tc.GetOpts(), cache.NewDefaultCache(10))
rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10))

tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"})
tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"})
Expand Down
12 changes: 6 additions & 6 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster sche.ClusterInformer
cluster sche.CheckerCluster
locationLabels []string
isolationLevel string
region *core.RegionInfo
Expand Down Expand Up @@ -77,13 +77,13 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e
isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
FilterTarget(s.cluster.GetOpts(), nil, nil, filters...).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
return 0, false
}
target := targetCandidate.FilterTarget(s.cluster.GetOpts(), nil, nil, strictStateFilter).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), true) // less region score is better
target := targetCandidate.FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, strictStateFilter).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), true) // less region score is better
if target == nil {
return 0, true // filter by temporary states
}
Expand Down Expand Up @@ -139,9 +139,9 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
FilterSource(s.cluster.GetOpts(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetOpts()), false)
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)
if source == nil {
log.Debug("no removable store", zap.Uint64("region-id", s.region.GetID()))
return 0
Expand Down
Loading

0 comments on commit a2cda87

Please sign in to comment.