From 4974552822be36d5c03659b53f4a1789788f31e7 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 14 Feb 2022 17:55:38 +0800 Subject: [PATCH] syncer(dm): implement start-task --start-time (#4485) close pingcap/tiflow#4106 --- dm/_utils/terror_gen/errors_release.txt | 1 + dm/dm/config/task.go | 9 +- dm/dm/config/task_cli_args.go | 11 ++- dm/dm/master/server.go | 12 ++- dm/dm/worker/source_worker.go | 2 +- dm/errors.toml | 6 ++ dm/pkg/binlog/pos_finder.go | 3 + dm/pkg/terror/error_list.go | 2 + dm/syncer/checkpoint.go | 40 ++++++++- dm/syncer/syncer.go | 103 ++++++++++++++++++++-- dm/tests/duplicate_event/run.sh | 1 + dm/tests/start_task/conf/dm-worker2.toml | 2 + dm/tests/start_task/run.sh | 106 +++++++++++++++++++++++ 13 files changed, 282 insertions(+), 16 deletions(-) create mode 100644 dm/tests/start_task/conf/dm-worker2.toml diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index fda955674ea..1a772597b12 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -186,6 +186,7 @@ ErrConfigInvalidLoadMode,[code=20053:class=config:scope=internal:level=medium], ErrConfigInvalidDuplicateResolution,[code=20054:class=config:scope=internal:level=medium], "Message: invalid load on-duplicate '%s', Workaround: Please choose a valid value in ['replace', 'error', 'ignore']" ErrConfigValidationMode,[code=20055:class=config:scope=internal:level=high], "Message: invalid validation mode, Workaround: Please check `validation-mode` config in task configuration file." ErrContinuousValidatorCfgNotFound,[code=20056:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s continuous validator config %s not exist, Workaround: Please check the `continuous-validator-config-name` config in task configuration file." +ErrConfigStartTimeTooLate,[code=20057:class=config:scope=internal:level=high], "Message: start-time %s is too late, no binlog location matches it, Workaround: Please check the `--start-time` is expected or try again later." ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high] ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename" ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high] diff --git a/dm/dm/config/task.go b/dm/dm/config/task.go index 0a1601aa153..69f536236e0 100644 --- a/dm/dm/config/task.go +++ b/dm/dm/config/task.go @@ -514,7 +514,14 @@ const ( validatorIdx ) -// adjust adjusts and verifies config. +// Adjust adjusts and verifies config. +func (c *TaskConfig) Adjust() error { + if c == nil { + return terror.ErrConfigYamlTransform.New("task config is nil") + } + return c.adjust() +} + func (c *TaskConfig) adjust() error { if len(c.Name) == 0 { return terror.ErrConfigNeedUniqueTaskName.Generate() diff --git a/dm/dm/config/task_cli_args.go b/dm/dm/config/task_cli_args.go index 9f1bcedf420..b8c0a09d6ef 100644 --- a/dm/dm/config/task_cli_args.go +++ b/dm/dm/config/task_cli_args.go @@ -20,6 +20,11 @@ import ( "github.com/pingcap/tiflow/dm/pkg/terror" ) +const ( + StartTimeFormat = "2006-01-02 15:04:05" + StartTimeFormat2 = "2006-01-02T15:04:05" +) + // TaskCliArgs is the task command line arguments, these arguments have higher priority than the config file and // downstream checkpoint, but may need to be removed after the first time they take effect. type TaskCliArgs struct { @@ -46,10 +51,10 @@ func (t *TaskCliArgs) Verify() error { if t.StartTime == "" { return nil } - _, err := time.Parse("2006-01-02 15:04:05", t.StartTime) + _, err := time.Parse(StartTimeFormat, t.StartTime) if err == nil { return nil } - _, err = time.Parse("2006-01-02T15:04:05", t.StartTime) - return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05'") + _, err = time.Parse(StartTimeFormat2, t.StartTime) + return terror.Annotate(err, "error while parse start-time, expected in the format like '2006-01-02 15:04:05' or '2006-01-02T15:04:05'") } diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 310a6836d7a..365c1a84964 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tiflow/dm/dm/master/workerrpc" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/dm/unit" + "github.com/pingcap/tiflow/dm/pkg/binlog" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/cputil" @@ -1545,14 +1546,21 @@ func (s *Server) generateSubTask( task string, cliArgs *config.TaskCliArgs, ) (*config.TaskConfig, []*config.SubTaskConfig, error) { + var err error cfg := config.NewTaskConfig() // bypass the meta check by set any value. If start-time is specified, DM-worker will not use meta field. if cliArgs != nil && cliArgs.StartTime != "" { + err = cfg.RawDecode(task) + if err != nil { + return nil, nil, terror.WithClass(err, terror.ClassDMMaster) + } for _, inst := range cfg.MySQLInstances { - inst.Meta = &config.Meta{BinLogName: cliArgs.StartTime} + inst.Meta = &config.Meta{BinLogName: binlog.FakeBinlogName} } + err = cfg.Adjust() + } else { + err = cfg.Decode(task) } - err := cfg.Decode(task) if err != nil { return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index b322602bc2e..851a3245317 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -551,7 +551,7 @@ func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage, vali } // directly put cfg into subTaskHolder - // the unique of subtask should be assured by etcd + // the uniqueness of subtask should be assured by etcd st := NewSubTask(cfg, w.etcdClient, w.name) w.subTaskHolder.recordSubTask(st) if w.closed.Load() { diff --git a/dm/errors.toml b/dm/errors.toml index 9e2376a51f7..c7cee3f0b32 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -1126,6 +1126,12 @@ description = "" workaround = "Please check the `continuous-validator-config-name` config in task configuration file." tags = ["internal", "medium"] +[error.DM-config-20057] +message = "start-time %s is too late, no binlog location matches it" +description = "" +workaround = "Please check the `--start-time` is expected or try again later." +tags = ["internal", "high"] + [error.DM-binlog-op-22001] message = "" description = "" diff --git a/dm/pkg/binlog/pos_finder.go b/dm/pkg/binlog/pos_finder.go index ccd35bccebb..8f911f8253e 100644 --- a/dm/pkg/binlog/pos_finder.go +++ b/dm/pkg/binlog/pos_finder.go @@ -30,6 +30,9 @@ import ( "github.com/pingcap/tiflow/dm/pkg/utils" ) +// FakeBinlogName is used to bypass the checking of meta in task config when start-task with --start-time. +const FakeBinlogName = "start-task with --start-time" + type binlogPosFinder struct { remote bool tctx *tcontext.Context diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 7737bfb9374..7e8cdf03058 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -251,6 +251,7 @@ const ( codeConfigInvalidLoadDuplicateResolution codeConfigValidationMode codeContinuousValidatorCfgNotFound + codeConfigStartTimeTooLate ) // Binlog operation error code list. @@ -914,6 +915,7 @@ var ( ErrConfigInvalidDuplicateResolution = New(codeConfigInvalidLoadDuplicateResolution, ClassConfig, ScopeInternal, LevelMedium, "invalid load on-duplicate '%s'", "Please choose a valid value in ['replace', 'error', 'ignore']") ErrConfigValidationMode = New(codeConfigValidationMode, ClassConfig, ScopeInternal, LevelHigh, "invalid validation mode", "Please check `validation-mode` config in task configuration file.") ErrContinuousValidatorCfgNotFound = New(codeContinuousValidatorCfgNotFound, ClassConfig, ScopeInternal, LevelMedium, "mysql-instance(%d)'s continuous validator config %s not exist", "Please check the `continuous-validator-config-name` config in task configuration file.") + ErrConfigStartTimeTooLate = New(codeConfigStartTimeTooLate, ClassConfig, ScopeInternal, LevelHigh, "start-time %s is too late, no binlog location matches it", "Please check the `--start-time` is expected or try again later.") // Binlog operation error. ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "") diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 833bde9f798..c5b12461287 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -25,6 +25,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/binlog" "github.com/pingcap/tiflow/dm/pkg/conn" @@ -227,6 +228,9 @@ type CheckPoint interface { // DeleteTablePoint deletes checkpoint for specified table in memory and storage DeleteTablePoint(tctx *tcontext.Context, table *filter.Table) error + // DeleteAllTablePoint deletes all checkpoints for table in memory and storage + DeleteAllTablePoint(tctx *tcontext.Context) error + // DeleteSchemaPoint deletes checkpoint for specified schema DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error @@ -237,10 +241,13 @@ type CheckPoint interface { // corresponding to Meta.Save SaveGlobalPoint(point binlog.Location) + // SaveGlobalPointForcibly saves the global binlog stream's checkpoint forcibly. + SaveGlobalPointForcibly(location binlog.Location) + // Snapshot make a snapshot of current checkpoint Snapshot(isSyncFlush bool) *SnapshotInfo - // FlushGlobalPointsExcept flushes the global checkpoint and tables' + // FlushPointsExcept flushes the global checkpoint and tables' // checkpoints except exceptTables, it also flushes SQLs with Args providing // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. // @exceptTables: [[schema, table]... ] @@ -551,6 +558,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, table *filt return nil } +// DeleteAllTablePoint implements CheckPoint.DeleteAllTablePoint. +func (cp *RemoteCheckPoint) DeleteAllTablePoint(tctx *tcontext.Context) error { + cp.Lock() + defer cp.Unlock() + + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() + cp.logCtx.L().Info("delete all table checkpoint") + _, err := cp.dbConn.ExecuteSQL( + tctx2, + []string{`DELETE FROM ` + cp.tableName + ` WHERE id = ? AND is_global = ?`}, + []interface{}{cp.id, false}, + ) + if err != nil { + return err + } + cp.points = make(map[string]map[string]*binlogPoint) + return nil +} + // DeleteSchemaPoint implements CheckPoint.DeleteSchemaPoint. func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSchema string) error { cp.Lock() @@ -614,7 +641,16 @@ func (cp *RemoteCheckPoint) SaveGlobalPoint(location binlog.Location) { } } -// FlushPointsExcept implements CheckPoint.FlushSnapshotPointsExcept. +// SaveGlobalPointForcibly implements CheckPoint.SaveGlobalPointForcibly. +func (cp *RemoteCheckPoint) SaveGlobalPointForcibly(location binlog.Location) { + cp.Lock() + defer cp.Unlock() + + cp.logCtx.L().Info("reset global checkpoint", zap.Stringer("location", location)) + cp.globalPoint = newBinlogPoint(location, binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) +} + +// FlushPointsExcept implements CheckPoint.FlushPointsExcept. func (cp *RemoteCheckPoint) FlushPointsExcept( tctx *tcontext.Context, snapshotID int, diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 68eb7ef9ba7..96ebb22d793 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -125,6 +125,7 @@ type Syncer struct { cfg *config.SubTaskConfig syncCfg replication.BinlogSyncerConfig + cliArgs *config.TaskCliArgs sgk *ShardingGroupKeeper // keeper to keep all sharding (sub) group in this syncer pessimist *shardddl.Pessimist // shard DDL pessimist @@ -439,11 +440,13 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } // when Init syncer, set active relay log info - err = s.setInitActiveRelayLog(ctx) - if err != nil { - return err + if s.cfg.Meta == nil || s.cfg.Meta.BinLogName != binlog.FakeBinlogName { + err = s.setInitActiveRelayLog(ctx) + if err != nil { + return err + } + rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) } - rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) s.reset() return nil @@ -1257,6 +1260,17 @@ func (s *Syncer) afterFlushCheckpoint(task *checkpointFlushTask) error { s.lastCheckpointFlushedTime = now s.logAndClearFilteredStatistics() + + if s.cliArgs != nil && s.cliArgs.StartTime != "" { + clone := *s.cliArgs + clone.StartTime = "" + err2 := ha.PutTaskCliArgs(s.cli, s.cfg.Name, []string{s.cfg.SourceID}, clone) + if err2 != nil { + s.tctx.L().Error("failed to clean start-time in task cli args", zap.Error(err2)) + } else { + s.cliArgs.StartTime = "" + } + } return nil } @@ -1477,11 +1491,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } }() - // some initialization that can't be put in Syncer.Init fresh, err := s.IsFreshTask(runCtx) if err != nil { return err - } else if fresh { + } + + // task command line arguments have the highest priority + // dm-syncer and other usage may not have a etcdCli, so we check it first + skipLoadMeta := false + if s.cli != nil { + s.cliArgs, err = ha.GetTaskCliArgs(s.cli, s.cfg.Name, s.cfg.SourceID) + if err != nil { + s.tctx.L().Error("failed to get task cli args", zap.Error(err)) + } + if s.cliArgs != nil && s.cliArgs.StartTime != "" { + err = s.setGlobalPointByTime(tctx, s.cliArgs.StartTime) + if terror.ErrConfigStartTimeTooLate.Equal(err) { + return err + } + skipLoadMeta = err == nil + } + } + + // some initialization that can't be put in Syncer.Init + if fresh && !skipLoadMeta { // for fresh task, we try to load checkpoints from meta (file or config item) err = s.checkpoint.LoadMeta() if err != nil { @@ -3636,7 +3669,8 @@ func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { // 1. GTID is not enabled // 2. location already has GTID position // 3. location is totally new, has no position info - if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" { + // 4. location is too early thus not a COMMIT location, which happens when it's reset by other logic + if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" || location.Position.Pos == 4 { return false, nil } // set enableGTID to false for new streamerController @@ -3728,3 +3762,58 @@ func (s *Syncer) flushOptimisticTableInfos(tctx *tcontext.Context) { tctx.L().Error("failed to flush table points with table infos", log.ShortError(err)) } } + +func (s *Syncer) setGlobalPointByTime(tctx *tcontext.Context, timeStr string) error { + // we support two layout + t, err := time.ParseInLocation(config.StartTimeFormat, timeStr, s.timezone) + if err != nil { + t, err = time.ParseInLocation(config.StartTimeFormat2, timeStr, s.timezone) + } + if err != nil { + return err + } + + var ( + loc *binlog.Location + posTp binlog.PosType + ) + + if s.relay != nil { + subDir := s.relay.Status(nil).(*pb.RelayStatus).RelaySubDir + relayDir := path.Join(s.cfg.RelayDir, subDir) + finder := binlog.NewLocalBinlogPosFinder(tctx, s.cfg.EnableGTID, s.cfg.Flavor, relayDir) + loc, posTp, err = finder.FindByTimestamp(t.Unix()) + } else { + finder := binlog.NewRemoteBinlogPosFinder(tctx, s.fromDB.BaseDB.DB, s.syncCfg, s.cfg.EnableGTID) + loc, posTp, err = finder.FindByTimestamp(t.Unix()) + } + if err != nil { + s.tctx.L().Error("fail to find binlog position by timestamp", + zap.Time("time", t), + zap.Error(err)) + return err + } + + switch posTp { + case binlog.InRangeBinlogPos: + s.tctx.L().Info("find binlog position by timestamp", + zap.String("time", timeStr), + zap.Stringer("pos", loc)) + case binlog.BelowLowerBoundBinlogPos: + s.tctx.L().Warn("fail to find binlog location by timestamp because the timestamp is too early, will use the earliest binlog location", + zap.String("time", timeStr), + zap.Any("location", loc)) + case binlog.AboveUpperBoundBinlogPos: + return terror.ErrConfigStartTimeTooLate.Generate(timeStr) + } + + err = s.checkpoint.DeleteAllTablePoint(tctx) + if err != nil { + return err + } + s.checkpoint.SaveGlobalPointForcibly(*loc) + s.tctx.L().Info("Will replicate from the specified time, the location recorded in checkpoint and config file will be ignored", + zap.String("time", timeStr), + zap.Any("locationOfTheTime", loc)) + return nil +} diff --git a/dm/tests/duplicate_event/run.sh b/dm/tests/duplicate_event/run.sh index 5d15329d025..85561e9f559 100644 --- a/dm/tests/duplicate_event/run.sh +++ b/dm/tests/duplicate_event/run.sh @@ -76,6 +76,7 @@ function run_with_prepared_source_config() { server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index) relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}') + echo "binlog_pos: $binlog_pos relay_log_size: $relay_log_size" [ "$binlog_pos" -eq "$relay_log_size" ] echo "============== run_with_prepared_source_config success ===================" diff --git a/dm/tests/start_task/conf/dm-worker2.toml b/dm/tests/start_task/conf/dm-worker2.toml new file mode 100644 index 00000000000..010e21c73eb --- /dev/null +++ b/dm/tests/start_task/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/dm/tests/start_task/run.sh b/dm/tests/start_task/run.sh index 90e51cb24f7..c725983852f 100644 --- a/dm/tests/start_task/run.sh +++ b/dm/tests/start_task/run.sh @@ -62,7 +62,113 @@ function lazy_init_tracker() { cleanup_process } +function start_task_by_time() { + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(0)' + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $cur/conf/source1.yaml $SOURCE_ID1 + + run_sql_source1 'DROP DATABASE if exists start_task;' + run_sql_source1 'CREATE DATABASE start_task;' + run_sql_source1 'CREATE TABLE start_task.t1 (c INT PRIMARY KEY);' + + sleep 2 + start_time=$(date '+%Y-%m-%d %T') # 2022-01-26 17:32:22 + sleep 2 + + run_sql_source1 'CREATE TABLE start_task.t2 (c INT PRIMARY KEY);' + run_sql_source1 'INSERT INTO start_task.t2 VALUES (1), (2);INSERT INTO start_task.t2 VALUES (3), (4);' + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i "s/task-mode: all/task-mode: incremental/g" $WORK_DIR/dm-task.yaml + + # test with relay + + run_sql_tidb 'DROP DATABASE if exists start_task;CREATE DATABASE start_task;' + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --start-time '$start_time'" \ + "\"result\": true" 2 + + run_sql_tidb_with_retry "show tables in start_task;" "t2" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'start_task';" "count(1): 1" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + + # test without relay and safe mode + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-relay -s $SOURCE_ID1" \ + "\"result\": true" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"relayStatus\": null" 1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "transfer-source $SOURCE_ID1 worker2" \ + "\"result\": true" 1 + + run_sql_tidb 'DROP DATABASE if exists start_task;CREATE DATABASE start_task;' + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --start-time '$start_time'" \ + "\"result\": true" 2 + + run_sql_tidb_with_retry "show tables in start_task;" "t2" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'start_task';" "count(1): 1" + + # no duplicate entry error + check_log_contain_with_retry "enable safe-mode for safe mode exit point, will exit at" $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 2 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + + # test too early + + run_sql_tidb 'DROP DATABASE if exists start_task;CREATE DATABASE start_task;' + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --start-time '1995-03-07 01:02:03'" \ + "\"result\": true" 2 + + run_sql_tidb_with_retry "show tables in start_task;" "t1" + run_sql_tidb_with_retry "SELECT count(1) FROM information_schema.tables WHERE table_schema = 'start_task';" "count(1): 2" + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + + # test too late + + run_sql_tidb 'DROP DATABASE if exists start_task;CREATE DATABASE start_task;' + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --start-time '2037-12-12 01:02:03'" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Paused\"" 1 \ + "no binlog location matches it" 1 + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test" \ + "\"result\": true" 2 + + export GO_FAILPOINTS='' + cleanup_process + cleanup_data start_task +} + function run() { + start_task_by_time lazy_init_tracker failpoints=( # 1152 is ErrAbortingConnection