Skip to content

Commit

Permalink
master(dm): specify worker when create upstream source (#4167)
Browse files Browse the repository at this point in the history
close #4169
  • Loading branch information
jerrylisl authored Jan 28, 2022
1 parent 14f8c4f commit 2b45869
Show file tree
Hide file tree
Showing 12 changed files with 585 additions and 418 deletions.
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ ErrSchedulerStopRelayOnSpecified,[code=46029:class=scheduler:scope=internal:leve
ErrSchedulerStartRelayOnBound,[code=46030:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `start-relay` with worker name now, Workaround: Please stop relay by `stop-relay` without worker name first."
ErrSchedulerStopRelayOnBound,[code=46031:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` automatically for bound worker, so it can't `stop-relay` with worker name now, Workaround: Please use `stop-relay` without worker name."
ErrSchedulerPauseTaskForTransferSource,[code=46032:class=scheduler:scope=internal:level=low], "Message: failed to auto pause tasks %s when transfer-source, Workaround: Please pause task by `dmctl pause-task`."
ErrSchedulerWorkerNotFree,[code=46033:class=scheduler:scope=internal:level=low], "Message: dm-worker with name %s not free"
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrCtlLoadTLSCfg,[code=48003:class=dmctl:scope=internal:level=high], "Message: can not load tls config, Workaround: Please ensure that the tls certificate is accessible on the node currently running dmctl."
Expand Down
24 changes: 20 additions & 4 deletions dm/dm/ctl/master/operate_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
// NewOperateSourceCmd creates a OperateSource command.
func NewOperateSourceCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "operate-source <operate-type> [config-file ...] [--print-sample-config]",
Use: "operate-source <operate-type> [config-file ...] [-w worker] [--print-sample-config]",
Short: "`create`/`update`/`stop`/`show` upstream MySQL/MariaDB source",
RunE: operateSourceFunc,
}
cmd.Flags().BoolP("print-sample-config", "p", false, "print sample config file of source")
cmd.Flags().StringP("worker", "w", "", "specify bound worker for created source")
return cmd
}

Expand Down Expand Up @@ -85,6 +86,20 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error {
return errors.New("please check output to see error")
}

var specifyWorker string
if op == pb.SourceOp_StartSource {
specifyWorker, err = cmd.Flags().GetString("worker")
if err != nil {
common.PrintLinesf("error in parse `--worker`")
return err
}
if specifyWorker != "" {
if len(cmd.Flags().Args()) > 2 {
common.PrintLinesf("operate-source create can't create multiple sources when specify worker")
}
}
}

contents := make([]string, 0, len(cmd.Flags().Args())-1)
sourceID := make([]string, 0, len(cmd.Flags().Args())-1)
sources, err := common.GetSourceArgs(cmd)
Expand Down Expand Up @@ -132,9 +147,10 @@ func operateSourceFunc(cmd *cobra.Command, _ []string) error {
ctx,
"OperateSource",
&pb.OperateSourceRequest{
Config: contents,
Op: op,
SourceID: sourceID,
Config: contents,
Op: op,
SourceID: sourceID,
WorkerName: specifyWorker,
},
&resp,
)
Expand Down
47 changes: 41 additions & 6 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s *Scheduler) CloseAllWorkers() {
}
}

// AddSourceCfg adds the upstream source config to the cluster.
// AddSourceCfg adds the upstream source config to the cluster, and try to bound source to worker
// NOTE: please verify the config before call this.
func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error {
s.mu.Lock()
Expand All @@ -327,11 +327,49 @@ func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error {
return terror.ErrSchedulerNotStarted.Generate()
}

err := s.addSource(cfg)
if err != nil {
return err
}

// try to bound it to a Free worker.
_, err = s.tryBoundForSource(cfg.SourceID)
return err
}

// AddSourceCfgWithWorker adds the upstream source config to the cluster, and try to bound source to specify worker
// NOTE: please verify the config before call this.
func (s *Scheduler) AddSourceCfgWithWorker(cfg *config.SourceConfig, workerName string) error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.started.Load() {
return terror.ErrSchedulerNotStarted.Generate()
}

// check whether worker exists.
w, ok := s.workers[workerName]
if !ok {
return terror.ErrSchedulerWorkerNotExist.Generate(workerName)
}

if w.stage != WorkerFree {
return terror.ErrSchedulerWorkerNotFree.Generate(workerName)
}

if err := s.addSource(cfg); err != nil {
return err
}

return s.boundSourceToWorker(cfg.SourceID, w)
}

// addSource adds the upstream source config to the cluster.
func (s *Scheduler) addSource(cfg *config.SourceConfig) error {
// 1. check whether exists.
if _, ok := s.sourceCfgs[cfg.SourceID]; ok {
return terror.ErrSchedulerSourceCfgExist.Generate(cfg.SourceID)
}

// 2. put the config into etcd.
_, err := ha.PutSourceCfg(s.etcdCli, cfg)
if err != nil {
Expand All @@ -341,10 +379,7 @@ func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error {
// 3. record the config in the scheduler.
s.sourceCfgs[cfg.SourceID] = cfg
s.unbounds[cfg.SourceID] = struct{}{}

// 4. try to bound it to a Free worker.
_, err = s.tryBoundForSource(cfg.SourceID)
return err
return nil
}

// UpdateSourceCfg update the upstream source config to the cluster.
Expand Down
37 changes: 35 additions & 2 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {

// not started scheduler can't do anything.
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSourceCfg(sourceCfg1)), IsTrue)
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSourceCfgWithWorker(sourceCfg1, workerName1)), IsTrue)
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.RemoveSourceCfg(sourceID1)), IsTrue)
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSubTasks(false, subtaskCfg1)), IsTrue)
Expand Down Expand Up @@ -406,7 +407,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
t.relayStageMatch(c, s, sourceID2, pb.Stage_Running)
rebuildScheduler(ctx)

// CASE 4.4: start a task with two sources.
// CASE 4.4.1: start a task with two sources.
// can't add more than one tasks at a time now.
c.Assert(terror.ErrSchedulerMultiTask.Equal(s.AddSubTasks(false, subtaskCfg1, subtaskCfg21)), IsTrue)
// task2' config and stage not exists before.
Expand All @@ -423,7 +424,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
t.subTaskStageMatch(c, s, taskName2, sourceID2, pb.Stage_Running)
rebuildScheduler(ctx)

// CASE 4.4.1 fail to stop any task.
// CASE 4.4.2 fail to stop any task.
// can call without tasks or sources, return without error, but take no effect.
c.Assert(s.RemoveSubTasks("", sourceID1), IsNil)
c.Assert(s.RemoveSubTasks(taskName1), IsNil)
Expand Down Expand Up @@ -481,6 +482,38 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
t.relayStageMatch(c, s, sourceID2, pb.Stage_InvalidStage)
rebuildScheduler(ctx)

// CASE 4.7.1: add source2 with specify worker1
// source2 not exist, worker1 is bound
t.sourceCfgNotExist(c, s, sourceID2)
t.workerBound(c, s, ha.NewSourceBound(sourceID1, workerName1))
c.Assert(terror.ErrSchedulerWorkerNotFree.Equal(s.AddSourceCfgWithWorker(&sourceCfg2, workerName1)), IsTrue)
// source2 is not created because expected worker1 is already bound
t.sourceCfgNotExist(c, s, sourceID2)
rebuildScheduler(ctx)

// CASE 4.7.2: add source2 with specify worker2
// source2 not exist, worker2 should be free
t.sourceCfgNotExist(c, s, sourceID2)
t.workerFree(c, s, workerName2)
c.Assert(s.AddSourceCfgWithWorker(&sourceCfg2, workerName2), IsNil)
t.workerBound(c, s, ha.NewSourceBound(sourceID2, workerName2))
t.sourceBounds(c, s, []string{sourceID1, sourceID2}, []string{})
c.Assert(s.StartRelay(sourceID2, []string{workerName2}), IsNil)
t.relayStageMatch(c, s, sourceID2, pb.Stage_Running)
rebuildScheduler(ctx)

// CASE 4.7.3: remove source2 again.
c.Assert(s.StopRelay(sourceID2, []string{workerName2}), IsNil)
c.Assert(s.RemoveSourceCfg(sourceID2), IsNil)
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.RemoveSourceCfg(sourceID2)), IsTrue) // already removed.
// source2 removed.
t.sourceCfgNotExist(c, s, sourceID2)
// worker2 become Free now.
t.workerFree(c, s, workerName2)
t.sourceBounds(c, s, []string{sourceID1}, []string{})
t.relayStageMatch(c, s, sourceID2, pb.Stage_InvalidStage)
rebuildScheduler(ctx)

// CASE 4.8: worker1 become offline.
// before shutdown, worker1 bound source
t.workerBound(c, s, ha.NewSourceBound(sourceID1, workerName1))
Expand Down
7 changes: 6 additions & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,12 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
err error
)
for _, cfg := range cfgs {
err = s.scheduler.AddSourceCfg(cfg)
// add source with worker when specify a worker name
if req.WorkerName != "" {
err = s.scheduler.AddSourceCfgWithWorker(cfg, req.WorkerName)
} else {
err = s.scheduler.AddSourceCfg(cfg)
}
// return first error and try to revert, so user could copy-paste same start command after error
if err != nil {
resp.Msg = err.Error()
Expand Down
Loading

0 comments on commit 2b45869

Please sign in to comment.