From 02171fb728d743eeb28ef2bb74ec9bbbbde84da4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 25 Feb 2020 17:54:40 +0800 Subject: [PATCH] fix schedulers maybe lost after restarting Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 8 +++++-- server/cluster/cluster_test.go | 2 +- server/cluster/coordinator.go | 3 +++ server/config_manager/config_manager.go | 8 +++++++ server/grpc_service.go | 7 +++--- server/server.go | 30 ++++++++++++++++++++++--- tests/server/cluster/cluster_test.go | 4 ++-- 7 files changed, 50 insertions(+), 12 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 7129bca4774..2c766499506 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -59,6 +59,7 @@ type Server interface { GetHBStreams() opt.HeartbeatStreams GetRaftCluster() *RaftCluster GetBasicCluster() *core.BasicCluster + GetSchedulersCallback() func() } // RaftCluster is used for cluster config management. @@ -101,6 +102,8 @@ type RaftCluster struct { ruleManager *placement.RuleManager client *clientv3.Client + + schedulersCallback func() } // Status saves some state information. @@ -170,7 +173,7 @@ func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { } // InitCluster initializes the raft cluster. -func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) { +func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster, cb func()) { c.core = basicCluster c.opt = opt c.storage = storage @@ -180,6 +183,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, s c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.hotSpotCache = statistics.NewHotCache() + c.schedulersCallback = cb } // Start starts a cluster. @@ -192,7 +196,7 @@ func (c *RaftCluster) Start(s Server) error { return nil } - c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage(), s.GetBasicCluster()) + c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage(), s.GetBasicCluster(), s.GetSchedulersCallback()) cluster, err := c.LoadClusterInfo() if err != nil { return err diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c4fad97ca56..daef44df6e6 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -635,7 +635,7 @@ func newTestCluster(opt *config.ScheduleOption) *testCluster { func newTestRaftCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster { rc := &RaftCluster{} - rc.InitCluster(id, opt, storage, basicCluster) + rc.InitCluster(id, opt, storage, basicCluster, func() {}) return rc } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index b5e9de9d50e..c1f593cb1fd 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -472,6 +472,7 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) go c.runScheduler(s) c.schedulers[s.GetName()] = s c.cluster.opt.AddSchedulerCfg(s.GetType(), args) + c.cluster.schedulersCallback() return nil } @@ -503,6 +504,8 @@ func (c *coordinator) removeScheduler(name string) error { log.Error("can not remove the scheduler config", zap.Error(err)) } } + + c.cluster.schedulersCallback() return err } diff --git a/server/config_manager/config_manager.go b/server/config_manager/config_manager.go index 2d8157d68e4..aa4101c5e70 100644 --- a/server/config_manager/config_manager.go +++ b/server/config_manager/config_manager.go @@ -523,6 +523,14 @@ func update(config map[string]interface{}, configName []string, value string) er return errors.Errorf("failed to parse version: %v", err.Error()) } container[configName[0]] = cv + } else if configName[0] == "schedulers" { + var tmp map[string]interface{} + _, err := toml.Decode(value, &tmp) + if err != nil { + return errors.Errorf("failed to decode schedulers: %v", err.Error()) + } + config[configName[0]] = tmp["schedulers"] + return nil } else if _, err := toml.Decode(value, &container); err != nil { if !strings.Contains(err.Error(), "bare keys") { return errors.Errorf("failed to decode value: %v", err.Error()) diff --git a/server/grpc_service.go b/server/grpc_service.go index 25b9208d03c..f5718db7601 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -21,7 +21,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/kvproto/pkg/configpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -230,9 +229,9 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* log.Info("put store ok", zap.Stringer("store", store)) v := rc.OnStoreVersionChange() if s.GetConfig().EnableDynamicConfig && v != nil { - status := s.updateConfigManager("cluster-version", v.String()) - if status.GetCode() != configpb.StatusCode_OK { - log.Error("failed to update the cluster version", zap.Error(errors.New(status.GetMessage()))) + err := s.updateConfigManager("cluster-version", v.String()) + if err != nil { + log.Error("failed to update the cluster version", zap.Error(err)) } } CheckPDVersion(s.scheduleOpt) diff --git a/server/server.go b/server/server.go index f969c680401..d65d6506efc 100644 --- a/server/server.go +++ b/server/server.go @@ -688,6 +688,26 @@ func (s *Server) GetAllocator() *id.AllocatorImpl { return s.idAllocator } +// GetSchedulersCallback returns a callback function to update config manager. +func (s *Server) GetSchedulersCallback() func() { + return func() { + if s.GetConfig().EnableDynamicConfig { + value := s.GetScheduleConfig().Schedulers + tmp := map[string]interface{}{ + "schedulers": value, + } + var buf bytes.Buffer + if err := toml.NewEncoder(&buf).Encode(tmp); err != nil { + log.Error("failed to encode config", zap.Error(err)) + } + + if err := s.updateConfigManager("schedule.schedulers", buf.String()); err != nil { + log.Error("failed to update the schedulers", zap.Error(err)) + } + } + } +} + // Name returns the unique etcd Name for this server in etcd cluster. func (s *Server) Name() string { return s.cfg.Name @@ -908,15 +928,19 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { return nil } -func (s *Server) updateConfigManager(name, value string) *configpb.Status { +func (s *Server) updateConfigManager(name, value string) error { configManager := s.GetConfigManager() globalVersion := configManager.GetGlobalConfigs(Component).GetVersion() version := &configpb.Version{Global: globalVersion} entries := []*configpb.ConfigEntry{{Name: name, Value: value}} configManager.Lock() - defer configManager.Unlock() _, status := configManager.UpdateGlobal(Component, version, entries) - return status + configManager.Unlock() + if status.GetCode() != configpb.StatusCode_OK { + return errors.New(status.GetMessage()) + } + + return configManager.Persist(s.GetStorage()) } // GetLabelProperty returns the whole label property config. diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 702ed347d40..5cdda941700 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -624,7 +624,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { rc := cluster.NewRaftCluster(s.ctx, svr.GetClusterRootPath(), svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient()) // Cluster is not bootstrapped. - rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage(), svr.GetBasicCluster()) + rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage(), svr.GetBasicCluster(), func() {}) raftCluster, err := rc.LoadClusterInfo() c.Assert(err, IsNil) c.Assert(raftCluster, IsNil) @@ -663,7 +663,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { c.Assert(storage.Flush(), IsNil) raftCluster = &cluster.RaftCluster{} - raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster) + raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster, func() {}) raftCluster, err = raftCluster.LoadClusterInfo() c.Assert(err, IsNil) c.Assert(raftCluster, NotNil)