Skip to content

Commit

Permalink
add a switch for dynamically enable scheduling service
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Dec 20, 2023
1 parent cfbc9b9 commit 58eceaa
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 1 deletion.
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
case "micro-service":
return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand All @@ -200,6 +202,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va
return err
}

func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetMicroServiceConfig(config.MicroService)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ var once sync.Once
func (c *RaftCluster) checkServices() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil || len(servers) == 0 {
if c.opt.GetMicroServiceConfig().IsDynamicSwitchEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
} else {
Expand Down
28 changes: 28 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type Config struct {

Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"`

MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"`

Controller rm.ControllerConfig `toml:"controller" json:"controller"`
}

Expand Down Expand Up @@ -249,6 +251,8 @@ const (
defaultCheckRegionSplitInterval = 50 * time.Millisecond
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableDynamicSwitch = true
)

// Special keys for Labels
Expand Down Expand Up @@ -461,6 +465,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.Keyspace.adjust(configMetaData.Child("keyspace"))

c.MicroService.adjust(configMetaData.Child("micro-service"))

c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
Expand Down Expand Up @@ -847,6 +853,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
}
}

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableDynamicSwitch bool `toml:"enable-dynamic-switch" json:"enable-dynamic-switch,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-dynamic-switch") {
c.EnableDynamicSwitch = defaultEnableDynamicSwitch
}
}

// Clone returns a copy of micro service config.
func (c *MicroServiceConfig) Clone() *MicroServiceConfig {
cfg := *c
return &cfg
}

// IsDynamicSwitchEnabled returns whether to enable dynamic switch.
func (c *MicroServiceConfig) IsDynamicSwitchEnabled() bool {
return c.EnableDynamicSwitch
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
14 changes: 14 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PersistOptions struct {
replicationMode atomic.Value
labelProperty atomic.Value
keyspace atomic.Value
microService atomic.Value
storeConfig atomic.Value
clusterVersion unsafe.Pointer
}
Expand All @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
// storeConfig will be fetched from TiKV later,
// set it to an empty config here first.
o.storeConfig.Store(&sc.StoreConfig{})
Expand Down Expand Up @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) {
o.keyspace.Store(cfg)
}

// GetMicroServiceConfig returns the micro service configuration.
func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig {
return o.microService.Load().(*MicroServiceConfig)
}

// SetMicroServiceConfig sets the micro service configuration.
func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) {
o.microService.Store(cfg)
}

// GetStoreConfig returns the store config.
func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
Expand Down Expand Up @@ -768,6 +780,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
ReplicationMode: *o.GetReplicationModeConfig(),
LabelProperty: o.GetLabelPropertyConfig(),
Keyspace: *o.GetKeyspaceConfig(),
MicroService: *o.GetMicroServiceConfig(),
ClusterVersion: *o.GetClusterVersion(),
},
StoreConfig: *o.GetStoreConfig(),
Expand Down Expand Up @@ -799,6 +812,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
o.storeConfig.Store(&cfg.StoreConfig)
o.SetClusterVersion(&cfg.ClusterVersion)
}
Expand Down
22 changes: 22 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ func (s *Server) GetConfig() *config.Config {
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig()
cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone()
cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone()
cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone()
cfg.ClusterVersion = *s.persistOptions.GetClusterVersion()
if s.storage == nil {
Expand Down Expand Up @@ -977,6 +978,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error {
return nil
}

// GetMicroServiceConfig gets the micro service config information.
func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig {
return s.persistOptions.GetMicroServiceConfig().Clone()
}

// SetMicroServiceConfig sets the micro service config information.
func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error {
old := s.persistOptions.GetMicroServiceConfig()
s.persistOptions.SetMicroServiceConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetMicroServiceConfig(old)
log.Error("failed to update micro service config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}

// GetScheduleConfig gets the balance config information.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistOptions.GetScheduleConfig().Clone()
Expand Down
44 changes: 44 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() {
})
}

func (suite *serverTestSuite) TestDisableDynamicSwitch() {
re := suite.Require()

// API server will execute scheduling jobs since there is no scheduler server.
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
leaderServer := suite.pdLeader.GetServer()
// After Disabling dynamic switch, the API server will stop scheduling.
conf := leaderServer.GetMicroServiceConfig().Clone()
conf.EnableDynamicSwitch = false
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Enable dynamic switch again, the API server will restart scheduling.
conf.EnableDynamicSwitch = true
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
// After scheduling server is started, API server will not execute scheduling jobs.
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Scheduling server is responsible for executing scheduling jobs.
testutil.Eventually(re, func() bool {
return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning()
})
// Disable dynamic switch and stop scheduling server. API server won't execute scheduling jobs again.
conf.EnableDynamicSwitch = false
leaderServer.SetMicroServiceConfig(*conf)
tc.GetPrimaryServer().Close()
time.Sleep(time.Second)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
}

func (suite *serverTestSuite) TestSchedulerSync() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
Expand Down
30 changes: 30 additions & 0 deletions tests/pdctl/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,36 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) {
re.Equal(int(3), conf.FlowRoundByDigit)
}

func (suite *configTestSuite) TestMicroServiceConfig() {
suite.env.RunTestInTwoModes(suite.checkMicroServiceConfig)
}

func (suite *configTestSuite) checkMicroServiceConfig(cluster *tests.TestCluster) {
re := suite.Require()
leaderServer := cluster.GetLeaderServer()
pdAddr := leaderServer.GetAddr()
cmd := pdctlCmd.GetRootCmd()

store := &metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
}
tests.MustPutStore(re, cluster, store)
svr := leaderServer.GetServer()
output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all")
re.NoError(err)
cfg := config.Config{}
re.NoError(json.Unmarshal(output, &cfg))
re.True(svr.GetMicroServiceConfig().EnableDynamicSwitch)
re.True(cfg.MicroService.EnableDynamicSwitch)
// config set enable-dynamic-switch <value>
args := []string{"-u", pdAddr, "config", "set", "enable-dynamic-switch", "false"}
_, err = pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
re.False(svr.GetMicroServiceConfig().EnableDynamicSwitch)
}

func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) {
re.Len(b, len(a))
for i := 0; i < len(a); i++ {
Expand Down

0 comments on commit 58eceaa

Please sign in to comment.