diff --git a/server/api/config.go b/server/api/config.go index e075095d7a98..242594265d1c 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -180,6 +180,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa case "label-property": // TODO: support changing label-property case "keyspace": return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value) + case "micro-service": + return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value) } return errors.Errorf("config prefix %s not found", kp[0]) } @@ -200,6 +202,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va return err } +func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error { + updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value) + if err != nil { + return err + } + + if !found { + return errors.Errorf("config item %s not found", key) + } + + if updated { + err = h.svr.SetMicroServiceConfig(config.MicroService) + } + return err +} + func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error { updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value) if err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ecbd40e25828..826f3a472163 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -353,7 +353,7 @@ var once sync.Once func (c *RaftCluster) checkServices() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) - if err != nil || len(servers) == 0 { + if c.opt.GetMicroServiceConfig().IsDynamicSwitchEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) c.independentServices.Delete(mcsutils.SchedulingServiceName) } else { diff --git a/server/config/config.go b/server/config/config.go index da6b0e29e075..89f01528c32b 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -165,6 +165,8 @@ type Config struct { Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"` + MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"` + Controller rm.ControllerConfig `toml:"controller" json:"controller"` } @@ -249,6 +251,8 @@ const ( defaultCheckRegionSplitInterval = 50 * time.Millisecond minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond + + defaultEnableDynamicSwitch = true ) // Special keys for Labels @@ -461,6 +465,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Keyspace.adjust(configMetaData.Child("keyspace")) + c.MicroService.adjust(configMetaData.Child("micro-service")) + c.Security.Encryption.Adjust() if len(c.Log.Format) == 0 { @@ -847,6 +853,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { } } +// MicroServiceConfig is the configuration for micro service. +type MicroServiceConfig struct { + EnableDynamicSwitch bool `toml:"enable-dynamic-switch" json:"enable-dynamic-switch,string"` +} + +func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("enable-dynamic-switch") { + c.EnableDynamicSwitch = defaultEnableDynamicSwitch + } +} + +// Clone returns a copy of micro service config. +func (c *MicroServiceConfig) Clone() *MicroServiceConfig { + cfg := *c + return &cfg +} + +// IsDynamicSwitchEnabled returns whether to enable dynamic switch. +func (c *MicroServiceConfig) IsDynamicSwitchEnabled() bool { + return c.EnableDynamicSwitch +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 0fa1804b8796..e383f519e635 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -52,6 +52,7 @@ type PersistOptions struct { replicationMode atomic.Value labelProperty atomic.Value keyspace atomic.Value + microService atomic.Value storeConfig atomic.Value clusterVersion unsafe.Pointer } @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) // storeConfig will be fetched from TiKV later, // set it to an empty config here first. o.storeConfig.Store(&sc.StoreConfig{}) @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) { o.keyspace.Store(cfg) } +// GetMicroServiceConfig returns the micro service configuration. +func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig { + return o.microService.Load().(*MicroServiceConfig) +} + +// SetMicroServiceConfig sets the micro service configuration. +func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) { + o.microService.Store(cfg) +} + // GetStoreConfig returns the store config. func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig { return o.storeConfig.Load().(*sc.StoreConfig) @@ -768,6 +780,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { ReplicationMode: *o.GetReplicationModeConfig(), LabelProperty: o.GetLabelPropertyConfig(), Keyspace: *o.GetKeyspaceConfig(), + MicroService: *o.GetMicroServiceConfig(), ClusterVersion: *o.GetClusterVersion(), }, StoreConfig: *o.GetStoreConfig(), @@ -799,6 +812,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) o.storeConfig.Store(&cfg.StoreConfig) o.SetClusterVersion(&cfg.ClusterVersion) } diff --git a/server/server.go b/server/server.go index fcf71922a093..d8f46cedcf8e 100644 --- a/server/server.go +++ b/server/server.go @@ -939,6 +939,7 @@ func (s *Server) GetConfig() *config.Config { cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone() cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig() cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone() + cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone() cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.persistOptions.GetClusterVersion() if s.storage == nil { @@ -977,6 +978,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error { return nil } +// GetMicroServiceConfig gets the micro service config information. +func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig { + return s.persistOptions.GetMicroServiceConfig().Clone() +} + +// SetMicroServiceConfig sets the micro service config information. +func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error { + old := s.persistOptions.GetMicroServiceConfig() + s.persistOptions.SetMicroServiceConfig(&cfg) + if err := s.persistOptions.Persist(s.storage); err != nil { + s.persistOptions.SetMicroServiceConfig(old) + log.Error("failed to update micro service config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + errs.ZapError(err)) + return err + } + log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) + return nil +} + // GetScheduleConfig gets the balance config information. func (s *Server) GetScheduleConfig() *sc.ScheduleConfig { return s.persistOptions.GetScheduleConfig().Clone() diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 0c0442300d90..7fbe507d5027 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -240,6 +240,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() { }) } +func (suite *serverTestSuite) TestDisableDynamicSwitch() { + re := suite.Require() + + // API server will execute scheduling jobs since there is no scheduler server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + leaderServer := suite.pdLeader.GetServer() + // After Disabling dynamic switch, the API server will stop scheduling. + conf := leaderServer.GetMicroServiceConfig().Clone() + conf.EnableDynamicSwitch = false + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Enable dynamic switch again, the API server will restart scheduling. + conf.EnableDynamicSwitch = true + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + // Disable dynamic switch and stop scheduling server. API server won't execute scheduling jobs again. + conf.EnableDynamicSwitch = false + leaderServer.SetMicroServiceConfig(*conf) + tc.GetPrimaryServer().Close() + time.Sleep(time.Second) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) +} + func (suite *serverTestSuite) TestSchedulerSync() { re := suite.Require() tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index c63160a32e5e..bd66698ec513 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -906,6 +906,36 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *tests.TestCluster) { re.Equal(int(3), conf.FlowRoundByDigit) } +func (suite *configTestSuite) TestMicroServiceConfig() { + suite.env.RunTestInTwoModes(suite.checkMicroServiceConfig) +} + +func (suite *configTestSuite) checkMicroServiceConfig(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + cmd := pdctlCmd.GetRootCmd() + + store := &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + } + tests.MustPutStore(re, cluster, store) + svr := leaderServer.GetServer() + output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all") + re.NoError(err) + cfg := config.Config{} + re.NoError(json.Unmarshal(output, &cfg)) + re.True(svr.GetMicroServiceConfig().EnableDynamicSwitch) + re.True(cfg.MicroService.EnableDynamicSwitch) + // config set enable-dynamic-switch + args := []string{"-u", pdAddr, "config", "set", "enable-dynamic-switch", "false"} + _, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.False(svr.GetMicroServiceConfig().EnableDynamicSwitch) +} + func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) { re.Len(b, len(a)) for i := 0; i < len(a); i++ {