Skip to content

Commit

Permalink
fix schedulers maybe lost after restarting
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 27, 2020
1 parent 05d6873 commit 02171fb
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 12 deletions.
8 changes: 6 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Server interface {
GetHBStreams() opt.HeartbeatStreams
GetRaftCluster() *RaftCluster
GetBasicCluster() *core.BasicCluster
GetSchedulersCallback() func()
}

// RaftCluster is used for cluster config management.
Expand Down Expand Up @@ -101,6 +102,8 @@ type RaftCluster struct {

ruleManager *placement.RuleManager
client *clientv3.Client

schedulersCallback func()
}

// Status saves some state information.
Expand Down Expand Up @@ -170,7 +173,7 @@ func (c *RaftCluster) loadBootstrapTime() (time.Time, error) {
}

// InitCluster initializes the raft cluster.
func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) {
func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster, cb func()) {
c.core = basicCluster
c.opt = opt
c.storage = storage
Expand All @@ -180,6 +183,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.ScheduleOption, s
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.hotSpotCache = statistics.NewHotCache()
c.schedulersCallback = cb
}

// Start starts a cluster.
Expand All @@ -192,7 +196,7 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage(), s.GetBasicCluster())
c.InitCluster(s.GetAllocator(), s.GetScheduleOption(), s.GetStorage(), s.GetBasicCluster(), s.GetSchedulersCallback())
cluster, err := c.LoadClusterInfo()
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func newTestCluster(opt *config.ScheduleOption) *testCluster {

func newTestRaftCluster(id id.Allocator, opt *config.ScheduleOption, storage *core.Storage, basicCluster *core.BasicCluster) *RaftCluster {
rc := &RaftCluster{}
rc.InitCluster(id, opt, storage, basicCluster)
rc.InitCluster(id, opt, storage, basicCluster, func() {})
return rc
}

Expand Down
3 changes: 3 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string)
go c.runScheduler(s)
c.schedulers[s.GetName()] = s
c.cluster.opt.AddSchedulerCfg(s.GetType(), args)
c.cluster.schedulersCallback()

return nil
}
Expand Down Expand Up @@ -503,6 +504,8 @@ func (c *coordinator) removeScheduler(name string) error {
log.Error("can not remove the scheduler config", zap.Error(err))
}
}

c.cluster.schedulersCallback()
return err
}

Expand Down
8 changes: 8 additions & 0 deletions server/config_manager/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,14 @@ func update(config map[string]interface{}, configName []string, value string) er
return errors.Errorf("failed to parse version: %v", err.Error())
}
container[configName[0]] = cv
} else if configName[0] == "schedulers" {
var tmp map[string]interface{}
_, err := toml.Decode(value, &tmp)
if err != nil {
return errors.Errorf("failed to decode schedulers: %v", err.Error())
}
config[configName[0]] = tmp["schedulers"]
return nil
} else if _, err := toml.Decode(value, &container); err != nil {
if !strings.Contains(err.Error(), "bare keys") {
return errors.Errorf("failed to decode value: %v", err.Error())
Expand Down
7 changes: 3 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -230,9 +229,9 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*
log.Info("put store ok", zap.Stringer("store", store))
v := rc.OnStoreVersionChange()
if s.GetConfig().EnableDynamicConfig && v != nil {
status := s.updateConfigManager("cluster-version", v.String())
if status.GetCode() != configpb.StatusCode_OK {
log.Error("failed to update the cluster version", zap.Error(errors.New(status.GetMessage())))
err := s.updateConfigManager("cluster-version", v.String())
if err != nil {
log.Error("failed to update the cluster version", zap.Error(err))
}
}
CheckPDVersion(s.scheduleOpt)
Expand Down
30 changes: 27 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,26 @@ func (s *Server) GetAllocator() *id.AllocatorImpl {
return s.idAllocator
}

// GetSchedulersCallback returns a callback function to update config manager.
func (s *Server) GetSchedulersCallback() func() {
return func() {
if s.GetConfig().EnableDynamicConfig {
value := s.GetScheduleConfig().Schedulers
tmp := map[string]interface{}{
"schedulers": value,
}
var buf bytes.Buffer
if err := toml.NewEncoder(&buf).Encode(tmp); err != nil {
log.Error("failed to encode config", zap.Error(err))
}

if err := s.updateConfigManager("schedule.schedulers", buf.String()); err != nil {
log.Error("failed to update the schedulers", zap.Error(err))
}
}
}
}

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return s.cfg.Name
Expand Down Expand Up @@ -908,15 +928,19 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error {
return nil
}

func (s *Server) updateConfigManager(name, value string) *configpb.Status {
func (s *Server) updateConfigManager(name, value string) error {
configManager := s.GetConfigManager()
globalVersion := configManager.GetGlobalConfigs(Component).GetVersion()
version := &configpb.Version{Global: globalVersion}
entries := []*configpb.ConfigEntry{{Name: name, Value: value}}
configManager.Lock()
defer configManager.Unlock()
_, status := configManager.UpdateGlobal(Component, version, entries)
return status
configManager.Unlock()
if status.GetCode() != configpb.StatusCode_OK {
return errors.New(status.GetMessage())
}

return configManager.Persist(s.GetStorage())
}

// GetLabelProperty returns the whole label property config.
Expand Down
4 changes: 2 additions & 2 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
rc := cluster.NewRaftCluster(s.ctx, svr.GetClusterRootPath(), svr.ClusterID(), syncer.NewRegionSyncer(svr), svr.GetClient())

// Cluster is not bootstrapped.
rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage(), svr.GetBasicCluster())
rc.InitCluster(svr.GetAllocator(), svr.GetScheduleOption(), svr.GetStorage(), svr.GetBasicCluster(), func() {})
raftCluster, err := rc.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(raftCluster, IsNil)
Expand Down Expand Up @@ -663,7 +663,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) {
c.Assert(storage.Flush(), IsNil)

raftCluster = &cluster.RaftCluster{}
raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster)
raftCluster.InitCluster(mockid.NewIDAllocator(), opt, storage, basicCluster, func() {})
raftCluster, err = raftCluster.LoadClusterInfo()
c.Assert(err, IsNil)
c.Assert(raftCluster, NotNil)
Expand Down

0 comments on commit 02171fb

Please sign in to comment.