Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add a switch to dynamically enable scheduling service #7595

Merged
merged 4 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
case "micro-service":
return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand All @@ -201,6 +203,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va
return err
}

func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetMicroServiceConfig(config.MicroService)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ var once sync.Once
func (c *RaftCluster) checkServices() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil || len(servers) == 0 {
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
} else {
Expand Down
28 changes: 28 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type Config struct {

Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"`

MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"`

Controller rm.ControllerConfig `toml:"controller" json:"controller"`
}

Expand Down Expand Up @@ -249,6 +251,8 @@ const (
defaultCheckRegionSplitInterval = 50 * time.Millisecond
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
)

// Special keys for Labels
Expand Down Expand Up @@ -461,6 +465,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.Keyspace.adjust(configMetaData.Child("keyspace"))

c.MicroService.adjust(configMetaData.Child("micro-service"))

c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
Expand Down Expand Up @@ -847,6 +853,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
}
}

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-scheduling-fallback") {
c.EnableSchedulingFallback = defaultEnableSchedulingFallback
}
}

// Clone returns a copy of micro service config.
func (c *MicroServiceConfig) Clone() *MicroServiceConfig {
cfg := *c
return &cfg
}

// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to api service.
func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
14 changes: 14 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PersistOptions struct {
replicationMode atomic.Value
labelProperty atomic.Value
keyspace atomic.Value
microService atomic.Value
storeConfig atomic.Value
clusterVersion unsafe.Pointer
}
Expand All @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
// storeConfig will be fetched from TiKV later,
// set it to an empty config here first.
o.storeConfig.Store(&sc.StoreConfig{})
Expand Down Expand Up @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) {
o.keyspace.Store(cfg)
}

// GetMicroServiceConfig returns the micro service configuration.
func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig {
return o.microService.Load().(*MicroServiceConfig)
}

// SetMicroServiceConfig sets the micro service configuration.
func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) {
o.microService.Store(cfg)
}

// GetStoreConfig returns the store config.
func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
Expand Down Expand Up @@ -768,6 +780,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
ReplicationMode: *o.GetReplicationModeConfig(),
LabelProperty: o.GetLabelPropertyConfig(),
Keyspace: *o.GetKeyspaceConfig(),
MicroService: *o.GetMicroServiceConfig(),
ClusterVersion: *o.GetClusterVersion(),
},
StoreConfig: *o.GetStoreConfig(),
Expand Down Expand Up @@ -799,6 +812,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
o.storeConfig.Store(&cfg.StoreConfig)
o.SetClusterVersion(&cfg.ClusterVersion)
}
Expand Down
22 changes: 22 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func (s *Server) GetConfig() *config.Config {
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig()
cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone()
cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone()
cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone()
cfg.ClusterVersion = *s.persistOptions.GetClusterVersion()
if s.storage == nil {
Expand Down Expand Up @@ -990,6 +991,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error {
return nil
}

// GetMicroServiceConfig gets the micro service config information.
func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig {
return s.persistOptions.GetMicroServiceConfig().Clone()
}

// SetMicroServiceConfig sets the micro service config information.
func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error {
old := s.persistOptions.GetMicroServiceConfig()
s.persistOptions.SetMicroServiceConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetMicroServiceConfig(old)
log.Error("failed to update micro service config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}

// GetScheduleConfig gets the balance config information.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistOptions.GetScheduleConfig().Clone()
Expand Down
53 changes: 51 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,14 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() {
})
}

func (suite *serverTestSuite) TestDynamicSwitch() {
func (suite *serverTestSuite) TestSchedulingServiceFallback() {
re := suite.Require()
// API server will execute scheduling jobs since there is no scheduler server.
leaderServer := suite.pdLeader.GetServer()
conf := leaderServer.GetMicroServiceConfig().Clone()
// Change back to the default value.
conf.EnableSchedulingFallback = true
leaderServer.SetMicroServiceConfig(*conf)
// API server will execute scheduling jobs since there is no scheduling server.
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
Expand Down Expand Up @@ -241,6 +246,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() {
})
}

func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {
re := suite.Require()

// API server will execute scheduling jobs since there is no scheduling server.
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
leaderServer := suite.pdLeader.GetServer()
// After Disabling scheduling service fallback, the API server will stop scheduling.
conf := leaderServer.GetMicroServiceConfig().Clone()
conf.EnableSchedulingFallback = false
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Enable scheduling service fallback again, the API server will restart scheduling.
conf.EnableSchedulingFallback = true
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
// After scheduling server is started, API server will not execute scheduling jobs.
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Scheduling server is responsible for executing scheduling jobs.
testutil.Eventually(re, func() bool {
return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning()
})
// Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again.
conf.EnableSchedulingFallback = false
leaderServer.SetMicroServiceConfig(*conf)
tc.GetPrimaryServer().Close()
time.Sleep(time.Second)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
}

func (suite *serverTestSuite) TestSchedulerSync() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re))
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
tu.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable
})
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl
err = tu.CheckPostJSON(testDialClient, u, reqData, tu.StatusOK(re))
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
tu.Eventually(re, func() bool {
return !sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled()
})
Expand Down
30 changes: 30 additions & 0 deletions tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,36 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *pdTests.TestCluster)
re.Equal(int(3), conf.FlowRoundByDigit)
}

func (suite *configTestSuite) TestMicroServiceConfig() {
suite.env.RunTestInTwoModes(suite.checkMicroServiceConfig)
}

func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestCluster) {
re := suite.Require()
leaderServer := cluster.GetLeaderServer()
pdAddr := leaderServer.GetAddr()
cmd := ctl.GetRootCmd()

store := &metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
}
pdTests.MustPutStore(re, cluster, store)
svr := leaderServer.GetServer()
output, err := tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all")
re.NoError(err)
cfg := config.Config{}
re.NoError(json.Unmarshal(output, &cfg))
re.True(svr.GetMicroServiceConfig().EnableSchedulingFallback)
re.True(cfg.MicroService.EnableSchedulingFallback)
// config set enable-scheduling-fallback <value>
args := []string{"-u", pdAddr, "config", "set", "enable-scheduling-fallback", "false"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback)
}

func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) {
re.Len(b, len(a))
for i := 0; i < len(a); i++ {
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (suite *operatorTestSuite) checkOperator(cluster *pdTests.TestCluster) {
_, err = tests.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true")
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
testutil.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled()
})
Expand Down
Loading