Skip to content

Commit

Permalink
*: move scheduling-related config definitions into pkg (#6857)
Browse files Browse the repository at this point in the history
ref #5839

Move scheduling-related config definitions into `pkg` to decouple the `server` dependency.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Jul 28, 2023
1 parent 31343e0 commit fe52361
Show file tree
Hide file tree
Showing 47 changed files with 1,195 additions and 1,093 deletions.
49 changes: 25 additions & 24 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,131 +17,132 @@ package mockcluster
import (
"time"

sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
)

// SetMaxMergeRegionSize updates the MaxMergeRegionSize configuration.
func (mc *Cluster) SetMaxMergeRegionSize(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
}

// SetMaxMergeRegionKeys updates the MaxMergeRegionKeys configuration.
func (mc *Cluster) SetMaxMergeRegionKeys(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
}

// SetSplitMergeInterval updates the SplitMergeInterval configuration.
func (mc *Cluster) SetSplitMergeInterval(v time.Duration) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
}

// SetEnableOneWayMerge updates the EnableOneWayMerge configuration.
func (mc *Cluster) SetEnableOneWayMerge(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableOneWayMerge = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableOneWayMerge = v })
}

// SetMaxSnapshotCount updates the MaxSnapshotCount configuration.
func (mc *Cluster) SetMaxSnapshotCount(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
}

// SetEnableMakeUpReplica updates the EnableMakeUpReplica configuration.
func (mc *Cluster) SetEnableMakeUpReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableMakeUpReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableMakeUpReplica = v })
}

// SetEnableRemoveExtraReplica updates the EnableRemoveExtraReplica configuration.
func (mc *Cluster) SetEnableRemoveExtraReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
}

// SetEnableLocationReplacement updates the EnableLocationReplacement configuration.
func (mc *Cluster) SetEnableLocationReplacement(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableLocationReplacement = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableLocationReplacement = v })
}

// SetEnableRemoveDownReplica updates the EnableRemoveDownReplica configuration.
func (mc *Cluster) SetEnableRemoveDownReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveDownReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveDownReplica = v })
}

// SetEnableReplaceOfflineReplica updates the EnableReplaceOfflineReplica configuration.
func (mc *Cluster) SetEnableReplaceOfflineReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
}

// SetLeaderSchedulePolicy updates the LeaderSchedulePolicy configuration.
func (mc *Cluster) SetLeaderSchedulePolicy(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderSchedulePolicy = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderSchedulePolicy = v })
}

// SetTolerantSizeRatio updates the TolerantSizeRatio configuration.
func (mc *Cluster) SetTolerantSizeRatio(v float64) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.TolerantSizeRatio = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.TolerantSizeRatio = v })
}

// SetRegionScoreFormulaVersion updates the RegionScoreFormulaVersion configuration.
func (mc *Cluster) SetRegionScoreFormulaVersion(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
}

// SetLeaderScheduleLimit updates the LeaderScheduleLimit configuration.
func (mc *Cluster) SetLeaderScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
}

// SetRegionScheduleLimit updates the RegionScheduleLimit configuration.
func (mc *Cluster) SetRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
}

// SetMergeScheduleLimit updates the MergeScheduleLimit configuration.
func (mc *Cluster) SetMergeScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
}

// SetHotRegionScheduleLimit updates the HotRegionScheduleLimit configuration.
func (mc *Cluster) SetHotRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
}

// SetHotRegionCacheHitsThreshold updates the HotRegionCacheHitsThreshold configuration.
func (mc *Cluster) SetHotRegionCacheHitsThreshold(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
}

// SetEnablePlacementRules updates the EnablePlacementRules configuration.
func (mc *Cluster) SetEnablePlacementRules(v bool) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.EnablePlacementRules = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.EnablePlacementRules = v })
if v {
mc.initRuleManager()
}
}

// SetMaxReplicas updates the maxReplicas configuration.
func (mc *Cluster) SetMaxReplicas(v int) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.MaxReplicas = uint64(v) })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.MaxReplicas = uint64(v) })
}

// SetLocationLabels updates the LocationLabels configuration.
func (mc *Cluster) SetLocationLabels(v []string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.LocationLabels = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.LocationLabels = v })
}

// SetIsolationLevel updates the IsolationLevel configuration.
func (mc *Cluster) SetIsolationLevel(v string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.IsolationLevel = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.IsolationLevel = v })
}

func (mc *Cluster) updateScheduleConfig(f func(*config.ScheduleConfig)) {
func (mc *Cluster) updateScheduleConfig(f func(*sc.ScheduleConfig)) {
s := mc.GetScheduleConfig().Clone()
f(s)
mc.SetScheduleConfig(s)
}

func (mc *Cluster) updateReplicationConfig(f func(*config.ReplicationConfig)) {
func (mc *Cluster) updateReplicationConfig(f func(*sc.ReplicationConfig)) {
r := mc.GetReplicationConfig().Clone()
f(r)
mc.SetReplicationConfig(r)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
}

// GetStoreConfig returns the store config.
func (mc *Cluster) GetStoreConfig() sc.StoreConfig {
func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider {
return mc.StoreConfigManager.GetStoreConfig()
}

// GetCheckerConfig returns the checker config.
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfig {
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider {
return mc
}

// GetSchedulerConfig returns the scheduler config.
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfig {
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider {
return mc
}

// GetSharedConfig returns the shared config.
func (mc *Cluster) GetSharedConfig() sc.SharedConfig {
func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider {
return mc
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range config.DefaultSchedulers {
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
}
c := config.NewConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("
// Controller is used to manage all checkers.
type Controller struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
Expand All @@ -53,7 +53,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ var (
type MergeChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ const defaultPriorityQueueSize = 1280
// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfig) *PriorityInspector {
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ var (
type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfig, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
Loading

0 comments on commit fe52361

Please sign in to comment.