diff --git a/server/api/config.go b/server/api/config.go index f87331d5e09..6037de650a0 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -181,6 +181,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa case "label-property": // TODO: support changing label-property case "keyspace": return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value) + case "micro-service": + return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value) } return errors.Errorf("config prefix %s not found", kp[0]) } @@ -201,6 +203,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va return err } +func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error { + updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value) + if err != nil { + return err + } + + if !found { + return errors.Errorf("config item %s not found", key) + } + + if updated { + err = h.svr.SetMicroServiceConfig(config.MicroService) + } + return err +} + func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error { updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value) if err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 20a4a7f0bfc..d426edd4e53 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -355,7 +355,7 @@ var once sync.Once func (c *RaftCluster) checkServices() { if c.isAPIServiceMode { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) - if err != nil || len(servers) == 0 { + if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) c.independentServices.Delete(mcsutils.SchedulingServiceName) } else { diff --git a/server/config/config.go b/server/config/config.go index da6b0e29e07..25e13d59652 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -165,6 +165,8 @@ type Config struct { Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"` + MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"` + Controller rm.ControllerConfig `toml:"controller" json:"controller"` } @@ -249,6 +251,8 @@ const ( defaultCheckRegionSplitInterval = 50 * time.Millisecond minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond + + defaultEnableSchedulingFallback = true ) // Special keys for Labels @@ -461,6 +465,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { c.Keyspace.adjust(configMetaData.Child("keyspace")) + c.MicroService.adjust(configMetaData.Child("micro-service")) + c.Security.Encryption.Adjust() if len(c.Log.Format) == 0 { @@ -847,6 +853,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { } } +// MicroServiceConfig is the configuration for micro service. +type MicroServiceConfig struct { + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` +} + +func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("enable-scheduling-fallback") { + c.EnableSchedulingFallback = defaultEnableSchedulingFallback + } +} + +// Clone returns a copy of micro service config. +func (c *MicroServiceConfig) Clone() *MicroServiceConfig { + cfg := *c + return &cfg +} + +// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to api service. +func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool { + return c.EnableSchedulingFallback +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 0fa1804b879..e383f519e63 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -52,6 +52,7 @@ type PersistOptions struct { replicationMode atomic.Value labelProperty atomic.Value keyspace atomic.Value + microService atomic.Value storeConfig atomic.Value clusterVersion unsafe.Pointer } @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) // storeConfig will be fetched from TiKV later, // set it to an empty config here first. o.storeConfig.Store(&sc.StoreConfig{}) @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) { o.keyspace.Store(cfg) } +// GetMicroServiceConfig returns the micro service configuration. +func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig { + return o.microService.Load().(*MicroServiceConfig) +} + +// SetMicroServiceConfig sets the micro service configuration. +func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) { + o.microService.Store(cfg) +} + // GetStoreConfig returns the store config. func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig { return o.storeConfig.Load().(*sc.StoreConfig) @@ -768,6 +780,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error { ReplicationMode: *o.GetReplicationModeConfig(), LabelProperty: o.GetLabelPropertyConfig(), Keyspace: *o.GetKeyspaceConfig(), + MicroService: *o.GetMicroServiceConfig(), ClusterVersion: *o.GetClusterVersion(), }, StoreConfig: *o.GetStoreConfig(), @@ -799,6 +812,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error { o.replicationMode.Store(&cfg.ReplicationMode) o.labelProperty.Store(cfg.LabelProperty) o.keyspace.Store(&cfg.Keyspace) + o.microService.Store(&cfg.MicroService) o.storeConfig.Store(&cfg.StoreConfig) o.SetClusterVersion(&cfg.ClusterVersion) } diff --git a/server/server.go b/server/server.go index bcd99bb9c48..5d930baae2b 100644 --- a/server/server.go +++ b/server/server.go @@ -952,6 +952,7 @@ func (s *Server) GetConfig() *config.Config { cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone() cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig() cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone() + cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone() cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.persistOptions.GetClusterVersion() if s.storage == nil { @@ -990,6 +991,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error { return nil } +// GetMicroServiceConfig gets the micro service config information. +func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig { + return s.persistOptions.GetMicroServiceConfig().Clone() +} + +// SetMicroServiceConfig sets the micro service config information. +func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error { + old := s.persistOptions.GetMicroServiceConfig() + s.persistOptions.SetMicroServiceConfig(&cfg) + if err := s.persistOptions.Persist(s.storage); err != nil { + s.persistOptions.SetMicroServiceConfig(old) + log.Error("failed to update micro service config", + zap.Reflect("new", cfg), + zap.Reflect("old", old), + errs.ZapError(err)) + return err + } + log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) + return nil +} + // GetScheduleConfig gets the balance config information. func (s *Server) GetScheduleConfig() *sc.ScheduleConfig { return s.persistOptions.GetScheduleConfig().Clone() diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 164c6ffdc7d..63bac2c4415 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -203,9 +203,14 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() { }) } -func (suite *serverTestSuite) TestDynamicSwitch() { +func (suite *serverTestSuite) TestSchedulingServiceFallback() { re := suite.Require() - // API server will execute scheduling jobs since there is no scheduler server. + leaderServer := suite.pdLeader.GetServer() + conf := leaderServer.GetMicroServiceConfig().Clone() + // Change back to the default value. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + // API server will execute scheduling jobs since there is no scheduling server. testutil.Eventually(re, func() bool { return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) @@ -241,6 +246,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() { }) } +func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { + re := suite.Require() + + // API server will execute scheduling jobs since there is no scheduling server. + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + leaderServer := suite.pdLeader.GetServer() + // After Disabling scheduling service fallback, the API server will stop scheduling. + conf := leaderServer.GetMicroServiceConfig().Clone() + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Enable scheduling service fallback again, the API server will restart scheduling. + conf.EnableSchedulingFallback = true + leaderServer.SetMicroServiceConfig(*conf) + testutil.Eventually(re, func() bool { + return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + + tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForPrimaryServing(re) + // After scheduling server is started, API server will not execute scheduling jobs. + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) + // Scheduling server is responsible for executing scheduling jobs. + testutil.Eventually(re, func() bool { + return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning() + }) + // Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again. + conf.EnableSchedulingFallback = false + leaderServer.SetMicroServiceConfig(*conf) + tc.GetPrimaryServer().Close() + time.Sleep(time.Second) + testutil.Eventually(re, func() bool { + return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() + }) +} + func (suite *serverTestSuite) TestSchedulerSync() { re := suite.Require() tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index e21bd82c720..cd3f2ac34dc 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -466,7 +466,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re)) re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config tu.Eventually(re, func() bool { return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable }) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 9819e821d29..a845d2f3e05 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -1437,7 +1437,7 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl err = tu.CheckPostJSON(testDialClient, u, reqData, tu.StatusOK(re)) re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config tu.Eventually(re, func() bool { return !sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() }) diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index d5d9eba6c9e..4a585851227 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -1111,6 +1111,36 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *pdTests.TestCluster) re.Equal(int(3), conf.FlowRoundByDigit) } +func (suite *configTestSuite) TestMicroServiceConfig() { + suite.env.RunTestInTwoModes(suite.checkMicroServiceConfig) +} + +func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + cmd := ctl.GetRootCmd() + + store := &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + LastHeartbeat: time.Now().UnixNano(), + } + pdTests.MustPutStore(re, cluster, store) + svr := leaderServer.GetServer() + output, err := tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all") + re.NoError(err) + cfg := config.Config{} + re.NoError(json.Unmarshal(output, &cfg)) + re.True(svr.GetMicroServiceConfig().EnableSchedulingFallback) + re.True(cfg.MicroService.EnableSchedulingFallback) + // config set enable-scheduling-fallback + args := []string{"-u", pdAddr, "config", "set", "enable-scheduling-fallback", "false"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) + re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback) +} + func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) { re.Len(b, len(a)) for i := 0; i < len(a); i++ { diff --git a/tools/pd-ctl/tests/operator/operator_test.go b/tools/pd-ctl/tests/operator/operator_test.go index bf361f962ce..335e6635948 100644 --- a/tools/pd-ctl/tests/operator/operator_test.go +++ b/tools/pd-ctl/tests/operator/operator_test.go @@ -226,7 +226,7 @@ func (suite *operatorTestSuite) checkOperator(cluster *pdTests.TestCluster) { _, err = tests.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true") re.NoError(err) if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { - // wait for the scheduler server to update the config + // wait for the scheduling server to update the config testutil.Eventually(re, func() bool { return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() })