From 4775a7ef2b2effedcb17eb27669704ad9eddc6c2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 11:08:23 +0800 Subject: [PATCH 01/43] --wip-- [skip ci] --- dm/tests/many_tables/conf/source1.yaml | 3 ++- dm/tests/many_tables/run.sh | 33 +++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dm/tests/many_tables/conf/source1.yaml b/dm/tests/many_tables/conf/source1.yaml index 0018c1b61e3..04cc575da29 100644 --- a/dm/tests/many_tables/conf/source1.yaml +++ b/dm/tests/many_tables/conf/source1.yaml @@ -10,4 +10,5 @@ from: password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3306 checker: - check-enable: false + check-enable: true + backoff-max: 1s diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 913d03b2e94..f13f0253603 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -27,6 +27,13 @@ function incremental_data() { done } +function incremental_data_2() { + j=6 + for i in $(seq $TABLE_NUM); do + run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + done +} + function run() { echo "start prepare_data" prepare_data @@ -70,10 +77,34 @@ function run() { fi echo "start incremental_data" + read -p 123 incremental_data echo "finish incremental_data" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + read -p 456 + + # test https://github.com/pingcap/tiflow/issues/5344 + kill_dm_worker + # let some binlog event save table checkpoint before meet downstream error + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_sql_source1 "CREATE TABLE many_tables_db.flush (c INT PRIMARY KEY);" + # not sure why we need this sleep + sleep 5 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + + read -p 789 + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + + incremental_data_2 + sleep 20 + [ 1 == 0 ] + + export GO_FAILPOINTS='' } cleanup_data many_tables_db From c2f54f4ddbac4149cdb00c00fc3682e54ccd2914 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 14:41:41 +0800 Subject: [PATCH 02/43] change unistore settings Signed-off-by: lance6716 --- dm/pkg/schema/tracker.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 3fc3b48c718..4af48ab40ef 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -21,6 +21,7 @@ import ( "strings" "sync" + "github.com/docker/go-units" "github.com/pingcap/errors" tidbConfig "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -36,6 +37,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + unistoreConfig "github.com/pingcap/tidb/store/mockstore/unistore/config" "github.com/pingcap/tidb/util/filter" "go.uber.org/zap" @@ -62,6 +64,11 @@ var ( } ) +func init() { + unistoreConfig.DefaultConf.Engine.VlogFileSize = 4 * units.MiB + unistoreConfig.DefaultConf.Engine.L1Size = 128 * units.MiB +} + // Tracker is used to track schema locally. type Tracker struct { // we're using an embedded tidb, there's no need to sync operations on it, but we may recreate(drop and create) From 134bd557597e3b754a82282912aa0e3070557824 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 15:02:57 +0800 Subject: [PATCH 03/43] add IT Signed-off-by: lance6716 --- dm/tests/many_tables/run.sh | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index f13f0253603..06a041aef83 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -77,11 +77,9 @@ function run() { fi echo "start incremental_data" - read -p 123 incremental_data echo "finish incremental_data" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - read -p 456 # test https://github.com/pingcap/tiflow/issues/5344 kill_dm_worker @@ -90,19 +88,24 @@ function run() { run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT run_sql_source1 "CREATE TABLE many_tables_db.flush (c INT PRIMARY KEY);" - # not sure why we need this sleep sleep 5 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ '"synced": true' 1 - read -p 789 pkill -hup tidb-server 2>/dev/null || true wait_process_exit tidb-server - + # now worker will process some binlog events, save table checkpoint and meet downstream error incremental_data_2 sleep 20 - [ 1 == 0 ] + + rollback_num=$(grep '"rollback checkpoint' $WORK_DIR/worker1/log/dm-worker.log | wc -l) + echo "rollback_num: $rollback_num" + [ $rollback_num -gt 20 ] + folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) + echo "folder_size: $folder_size" + # less than 10M + [ $folder_size -lt 10000 ] export GO_FAILPOINTS='' } From 4c3ba73dda2c7f2f6719d07f7198ec99fa844256 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 16:27:39 +0800 Subject: [PATCH 04/43] reset schema tracker every time Signed-off-by: lance6716 --- dm/pkg/schema/tracker.go | 17 +++++---- dm/pkg/schema/tracker_test.go | 6 ++-- dm/syncer/checkpoint.go | 66 ++++++++--------------------------- dm/syncer/syncer.go | 31 ++++++++-------- 4 files changed, 43 insertions(+), 77 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 4af48ab40ef..3615fc4407d 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -439,19 +439,18 @@ func (tr *Tracker) RecreateTables(logCtx *tcontext.Context, tablesToDrop []*filt zap.Stringer("table", tbl), log.ShortError(err)) } } - for schemaName := range tablesToCreate { - // TODO: Figure out how to recover from errors. - if err := tr.CreateSchemaIfNotExists(schemaName); err != nil { - logCtx.L().Error("failed to rollback schema on schema tracker: cannot create schema", - zap.String("schema", schemaName), log.ShortError(err)) - } - } - return tr.batchCreateTableIfNotExist(tablesToCreate) + return tr.BatchCreateTableIfNotExist(tablesToCreate) } -func (tr *Tracker) batchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error { +// BatchCreateTableIfNotExist will batch creating tables per schema. If the schema does not exist, it will create it. +// The argument is { database name -> { table name -> TableInfo } }. +func (tr *Tracker) BatchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error { tr.se.SetValue(sessionctx.QueryString, "skip") for schema, tableNameInfo := range tablesToCreate { + if err := tr.CreateSchemaIfNotExists(schema); err != nil { + return err + } + var cloneTis []*model.TableInfo for table, ti := range tableNameInfo { cloneTi := cloneTableInfo(ti) // clone TableInfo w.r.t the warning of the CreateTable function diff --git a/dm/pkg/schema/tracker_test.go b/dm/pkg/schema/tracker_test.go index 8eb49766a85..86c06904a0e 100644 --- a/dm/pkg/schema/tracker_test.go +++ b/dm/pkg/schema/tracker_test.go @@ -560,7 +560,7 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) { for i := range tables { tablesToCreate[tables[i].Schema][tables[i].Name] = tiInfos[i] } - err = tracker.batchCreateTableIfNotExist(tablesToCreate) + err = tracker.BatchCreateTableIfNotExist(tablesToCreate) c.Assert(err, IsNil) // 3. check all create success for i := range tables { @@ -580,7 +580,7 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) { err = tracker.DropTable(tables[0]) c.Assert(err, IsNil) // 2. batch create - err = tracker.batchCreateTableIfNotExist(tablesToCreate) + err = tracker.BatchCreateTableIfNotExist(tablesToCreate) c.Assert(err, IsNil) // 3. check for i := range tables { @@ -595,7 +595,7 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) { ctx := context.Background() err = tracker.Exec(ctx, "", `drop database testdb`) c.Assert(err, IsNil) - err = tracker.batchCreateTableIfNotExist(tablesToCreate) + err = tracker.BatchCreateTableIfNotExist(tablesToCreate) c.Assert(err, NotNil) } diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index d3f0f04cf86..414c643377f 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -120,7 +120,7 @@ func (b *binlogPoint) flushBy(tp tablePoint) { b.flushedPoint = tp } -func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { +func (b *binlogPoint) rollback() { b.Lock() defer b.Unlock() @@ -135,15 +135,14 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is // NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, // and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. - trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.savedPoint.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. - // may three versions of schema exist: - // - the one tracked in the TiDB-with-mockTiKV. + // there may be three versions of schema: + // - the one tracked in the schema tracker (TiDB-with-unistore). // - the one in the checkpoint but not flushed. // - the one in the checkpoint and flushed. - // if any of them are not equal, then we rollback them: + // schema tracker will be closed after task is paused, and it will load all schemas from checkpoint when task resumes. + // if the later two are not equal, then we rollback them: // - set the one in the checkpoint but not flushed to the one flushed. - // - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) - if isSchemaChanged = (trackedTi != b.savedPoint.ti) || (b.savedPoint.ti != b.flushedPoint.ti); isSchemaChanged { + if b.savedPoint.ti != b.flushedPoint.ti { b.savedPoint.ti = b.flushedPoint.ti } return @@ -287,7 +286,7 @@ type CheckPoint interface { LastFlushOutdated() bool // Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints - Rollback(schemaTracker *schema.Tracker) + Rollback() // String return text of global position String() string @@ -892,12 +891,10 @@ func (cp *RemoteCheckPoint) LastFlushOutdated() bool { } // Rollback implements CheckPoint.Rollback. -func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { +func (cp *RemoteCheckPoint) Rollback() { cp.RLock() defer cp.RUnlock() - cp.globalPoint.rollback(schemaTracker, "") - tablesToDrop := make([]*filter.Table, 0) - tablesToCreate := make(map[string]map[string]*model.TableInfo) + cp.globalPoint.rollback() for schemaName, mSchema := range cp.points { for tableName, point := range mSchema { table := &filter.Table{ @@ -906,37 +903,7 @@ func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { } logger := cp.logCtx.L().WithFields(zap.Stringer("table", table)) logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point)) - from := point.MySQLLocation() - if point.rollback(schemaTracker, schemaName) { - logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation())) - tablesToDrop = append(tablesToDrop, table) - if point.savedPoint.ti != nil { - if _, ok := tablesToCreate[schemaName]; !ok { - tablesToCreate[schemaName] = map[string]*model.TableInfo{} - } - tablesToCreate[schemaName][tableName] = point.savedPoint.ti - } - } - } - } - if err := schemaTracker.RecreateTables(cp.logCtx, tablesToDrop, tablesToCreate); err != nil { - cp.logCtx.L().Error("failed to rollback schema on schema tracker: cannot recreate table", - zap.Reflect("tables to drop", tablesToDrop), zap.Reflect("batch recreate table", tablesToCreate), - log.ShortError(err)) - } - - // drop any tables in the tracker if no corresponding checkpoint exists. - for _, schema := range schemaTracker.AllSchemas() { - _, ok1 := cp.points[schema.Name.O] - for _, table := range schema.Tables { - var ok2 bool - if ok1 { - _, ok2 = cp.points[schema.Name.O][table.Name.O] - } - if !ok2 { - err := schemaTracker.DropTable(&filter.Table{Schema: schema.Name.O, Name: table.Name.O}) - cp.logCtx.L().Info("drop table in schema tracker because no checkpoint exists", zap.String("schema", schema.Name.O), zap.String("table", table.Name.O), log.ShortError(err)) - } + point.rollback() } } } @@ -1097,23 +1064,20 @@ func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTra cp.RLock() defer cp.RUnlock() + tablesToCreate := map[string]map[string]*model.TableInfo{} for cpSchema, mSchema := range cp.points { for cpTable, point := range mSchema { // for create database DDL, we'll create a table point with no table name and table info, need to skip. if point.flushedPoint.ti == nil { continue } - if err := schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil { - return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema) + if _, ok := tablesToCreate[cpSchema]; !ok { + tablesToCreate[cpSchema] = make(map[string]*model.TableInfo) } - tbl := filter.Table{Schema: cpSchema, Name: cpTable} - if err := schemaTracker.CreateTableIfNotExists(&tbl, point.flushedPoint.ti); err != nil { - return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable) - } - cp.logCtx.L().Debug("init table info in schema tracker", zap.Stringer("table", &tbl)) + tablesToCreate[cpSchema][cpTable] = point.flushedPoint.ti } } - return nil + return schemaTracker.BatchCreateTableIfNotExist(tablesToCreate) } // CheckAndUpdate check the checkpoint data consistency and try to fix them if possible. diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 204f60e043d..009172e495a 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -354,11 +354,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) - if err != nil { - return terror.ErrSchemaTrackerInit.Delegate(err) - } - if s.cfg.CollationCompatible == config.StrictCollationCompatible { s.charsetAndDefaultCollation, s.idAndCollationMap, err = dbconn.GetCharsetAndCollationInfo(tctx, s.fromConn) if err != nil { @@ -1556,14 +1551,18 @@ func (s *Syncer) updateTSOffset(ctx context.Context) error { // Run starts running for sync, we should guarantee it can rerun when paused. func (s *Syncer) Run(ctx context.Context) (err error) { - if !s.schemaLoaded.Load() { - err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker) - if err != nil { - return err - } - s.schemaLoaded.Store(true) + s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) + if err != nil { + return terror.ErrSchemaTrackerInit.Delegate(err) } + err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker) + if err != nil { + return err + } + // TODO: not sure if schemaLoaded is needed by data validator, will remove it later. + s.schemaLoaded.Store(true) + runCtx, runCancel := context.WithCancel(context.Background()) s.runCtx, s.runCancel = tcontext.NewContext(runCtx, s.tctx.L()), runCancel syncCtx, syncCancel := context.WithCancel(context.Background()) @@ -3489,7 +3488,7 @@ func (s *Syncer) Kill() { s.Close() } -// stopSync stops stream and rollbacks checkpoint now it used by Close() and Pause(). +// stopSync stops stream and rollbacks checkpoint. Now it's used by Close() and Pause(). func (s *Syncer) stopSync() { // before re-write workflow for s.syncer, simply close it // when resuming, re-create s.syncer @@ -3500,10 +3499,10 @@ func (s *Syncer) stopSync() { // try to rollback checkpoints, if they already flushed, no effect, this operation should call before close schemaTracker prePos := s.checkpoint.GlobalPoint() - s.checkpoint.Rollback(s.schemaTracker) + s.checkpoint.Rollback() currPos := s.checkpoint.GlobalPoint() if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { - s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) + s.tctx.L().Warn("rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) } } @@ -3521,6 +3520,10 @@ func (s *Syncer) Pause() { return } s.stopSync() + err := s.schemaTracker.Close() + if err != nil { + s.tctx.L().Error("fail to close schema tracker", log.ShortError(err)) + } } // Resume resumes the paused process. From 9516171d9698b5181becce96a7a71261d5f608f4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 16:45:52 +0800 Subject: [PATCH 05/43] try fix panic Signed-off-by: lance6716 --- dm/pkg/schema/tracker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 3615fc4407d..5cf04a5e6d6 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/store/mockstore" unistoreConfig "github.com/pingcap/tidb/store/mockstore/unistore/config" "github.com/pingcap/tidb/util/filter" + "go.uber.org/atomic" "go.uber.org/zap" tcontext "github.com/pingcap/tiflow/dm/pkg/context" @@ -80,6 +81,7 @@ type Tracker struct { dom *domain.Domain se session.Session dsTracker *downstreamTracker + closed atomic.Bool } // downstreamTracker tracks downstream schema. @@ -368,6 +370,9 @@ func (tr *Tracker) Reset() error { // Close close a tracker. func (tr *Tracker) Close() error { + if !tr.closed.CAS(false, true) { + return nil + } tr.se.Close() tr.dom.Close() if err := tr.store.Close(); err != nil { From 6f42039137369008d5cacc07cf2f32cbfa10cf43 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 16:54:07 +0800 Subject: [PATCH 06/43] fix IT Signed-off-by: lance6716 --- dm/tests/many_tables/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 06a041aef83..36a310d8a5f 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -99,9 +99,9 @@ function run() { incremental_data_2 sleep 20 - rollback_num=$(grep '"rollback checkpoint' $WORK_DIR/worker1/log/dm-worker.log | wc -l) - echo "rollback_num: $rollback_num" - [ $rollback_num -gt 20 ] + resume_num=$(grep 'unit process error' $WORK_DIR/worker1/log/dm-worker.log | wc -l) + echo "resume_num: $resume_num" + [ $resume_num -gt 10 ] folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) echo "folder_size: $folder_size" # less than 10M From d55796724102eea3617ab0528816deeb9b223fb4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 17:09:08 +0800 Subject: [PATCH 07/43] save work Signed-off-by: lance6716 --- dm/tests/many_tables/run.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 36a310d8a5f..26e1cbdbc38 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -97,11 +97,12 @@ function run() { wait_process_exit tidb-server # now worker will process some binlog events, save table checkpoint and meet downstream error incremental_data_2 - sleep 20 + sleep 30 resume_num=$(grep 'unit process error' $WORK_DIR/worker1/log/dm-worker.log | wc -l) echo "resume_num: $resume_num" - [ $resume_num -gt 10 ] + # because we check auto resume every 5 seconds... + [ $resume_num -ge 4 ] folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) echo "folder_size: $folder_size" # less than 10M From f0d406baa538a20fe6d526780f74df17375aaccb Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 17:13:36 +0800 Subject: [PATCH 08/43] make fmt Signed-off-by: lance6716 --- dm/tests/many_tables/run.sh | 60 ++++++++++++++++++------------------ scripts/generate-protobuf.sh | 2 ++ 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 26e1cbdbc38..34186db3578 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -28,10 +28,10 @@ function incremental_data() { } function incremental_data_2() { - j=6 - for i in $(seq $TABLE_NUM); do - run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 - done + j=6 + for i in $(seq $TABLE_NUM); do + run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + done } function run() { @@ -81,34 +81,34 @@ function run() { echo "finish incremental_data" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # test https://github.com/pingcap/tiflow/issues/5344 - kill_dm_worker - # let some binlog event save table checkpoint before meet downstream error - export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1)' - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_sql_source1 "CREATE TABLE many_tables_db.flush (c INT PRIMARY KEY);" - sleep 5 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - '"synced": true' 1 - - pkill -hup tidb-server 2>/dev/null || true - wait_process_exit tidb-server - # now worker will process some binlog events, save table checkpoint and meet downstream error - incremental_data_2 - sleep 30 + # test https://github.com/pingcap/tiflow/issues/5344 + kill_dm_worker + # let some binlog event save table checkpoint before meet downstream error + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_sql_source1 "CREATE TABLE many_tables_db.flush (c INT PRIMARY KEY);" + sleep 5 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + # now worker will process some binlog events, save table checkpoint and meet downstream error + incremental_data_2 + sleep 30 resume_num=$(grep 'unit process error' $WORK_DIR/worker1/log/dm-worker.log | wc -l) - echo "resume_num: $resume_num" - # because we check auto resume every 5 seconds... - [ $resume_num -ge 4 ] - folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) - echo "folder_size: $folder_size" - # less than 10M - [ $folder_size -lt 10000 ] - - export GO_FAILPOINTS='' + echo "resume_num: $resume_num" + # because we check auto resume every 5 seconds... + [ $resume_num -ge 4 ] + folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) + echo "folder_size: $folder_size" + # less than 10M + [ $folder_size -lt 10000 ] + + export GO_FAILPOINTS='' } cleanup_data many_tables_db diff --git a/scripts/generate-protobuf.sh b/scripts/generate-protobuf.sh index 88140e8f842..4410885dbd7 100755 --- a/scripts/generate-protobuf.sh +++ b/scripts/generate-protobuf.sh @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +exit 0 + set -eu TOOLS_BIN_DIR=tools/bin From 6493a5062d69df6a35288cb441f2c841641904cf Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 19:23:16 +0800 Subject: [PATCH 09/43] fix CI Signed-off-by: lance6716 --- dm/pkg/schema/tracker.go | 3 +++ dm/syncer/checkpoint_test.go | 25 +++++++------------------ scripts/generate-protobuf.sh | 2 -- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 5cf04a5e6d6..32ee779cf31 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -370,6 +370,9 @@ func (tr *Tracker) Reset() error { // Close close a tracker. func (tr *Tracker) Close() error { + if tr == nil { + return nil + } if !tr.closed.CAS(false, true) { return nil } diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index 844cb4d4090..5d999b9bec1 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -19,7 +19,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "testing" "time" @@ -207,7 +206,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) // test rollback - cp.Rollback(s.tracker) + cp.Rollback() c.Assert(cp.GlobalPoint().Position, Equals, pos1) c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) @@ -222,7 +221,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, cp.Snapshot(true).id, nil, nil, nil) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() c.Assert(cp.GlobalPoint().Position, Equals, pos2) c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos2) @@ -365,7 +364,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(older, IsTrue) // rollback, to min - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) @@ -380,7 +379,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, cp.Snapshot(true).id, nil, nil, nil) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) @@ -412,19 +411,12 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), IsNil) c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, IsNil) - _, err = s.tracker.GetTableInfo(table) - c.Assert(strings.Contains(err.Error(), "doesn't exist"), IsTrue) - // test save, flush and rollback to not nil table info - err = s.tracker.Exec(ctx, schemaName, "create table "+tableName+" (c int);") - c.Assert(err, IsNil) - ti, err = s.tracker.GetTableInfo(table) - c.Assert(err, IsNil) cp.SaveTablePoint(table, binlog.Location{Position: pos1}, ti) tiBytes, _ := json.Marshal(ti) s.mock.ExpectBegin() @@ -440,10 +432,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ti2, err := s.tracker.GetTableInfo(table) c.Assert(err, IsNil) cp.SaveTablePoint(table, binlog.Location{Position: pos2}, ti2) - cp.Rollback(s.tracker) - ti11, err := s.tracker.GetTableInfo(table) - c.Assert(err, IsNil) - c.Assert(ti11.Columns, HasLen, 1) + cp.Rollback() // clear, to min s.mock.ExpectBegin() @@ -476,7 +465,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.GlobalPoint(), Equals, lastGlobalPoint) c.Assert(cp.GlobalPointSaveTime(), Not(Equals), lastGlobalPointSavedTime) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) diff --git a/scripts/generate-protobuf.sh b/scripts/generate-protobuf.sh index 4410885dbd7..88140e8f842 100755 --- a/scripts/generate-protobuf.sh +++ b/scripts/generate-protobuf.sh @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -exit 0 - set -eu TOOLS_BIN_DIR=tools/bin From ba10208d01ac8c28078b8db3b2a62789a1c3dfd6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 20:19:21 +0800 Subject: [PATCH 10/43] try fix UT Signed-off-by: lance6716 --- dm/pkg/schema/tracker_test.go | 4 ++-- dm/syncer/syncer_test.go | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dm/pkg/schema/tracker_test.go b/dm/pkg/schema/tracker_test.go index 86c06904a0e..463c5f7a341 100644 --- a/dm/pkg/schema/tracker_test.go +++ b/dm/pkg/schema/tracker_test.go @@ -591,12 +591,12 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) { c.Assert(ti, DeepEquals, tiInfos[i]) } - // drop schema and raise error + // BatchCreateTableIfNotExist will also create database ctx := context.Background() err = tracker.Exec(ctx, "", `drop database testdb`) c.Assert(err, IsNil) err = tracker.BatchCreateTableIfNotExist(tablesToCreate) - c.Assert(err, NotNil) + c.Assert(err, IsNil) } func (s *trackerSuite) TestAllSchemas(c *C) { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 4dd26fe720b..7ec0a467d2a 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1044,6 +1044,10 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { {Schema: "test_1", Name: "t_1"}, }, } + s.cfg.To.Session = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } cfg, err := s.cfg.Clone() c.Assert(err, IsNil) @@ -1120,6 +1124,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) + defer close(resultCh) // When crossing safeModeExitPoint, will generate a flush sql checkPointMock.ExpectBegin() @@ -1128,6 +1133,13 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { // disable 1-minute safe mode c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) go syncer.Process(ctx, resultCh) + go func() { + for r := range resultCh { + if len(r.Errors) > 0 { + c.Fatal(r.String()) + } + } + }() expectJobs := []*expectJob{ // now every ddl job will start with a flush job From 350d7f47cf95507c0dc70da4c8e3405b9e516cc5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sat, 7 May 2022 21:16:00 +0800 Subject: [PATCH 11/43] try fix CI Signed-off-by: lance6716 --- dm/syncer/checkpoint.go | 3 +++ dm/syncer/syncer.go | 3 +-- dm/syncer/syncer_test.go | 15 ++++++++++++--- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 414c643377f..b9c5f9f0d84 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -1075,6 +1075,9 @@ func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTra tablesToCreate[cpSchema] = make(map[string]*model.TableInfo) } tablesToCreate[cpSchema][cpTable] = point.flushedPoint.ti + cp.logCtx.L().Debug("will init table info in schema tracker", + zap.String("database", cpSchema), + zap.String("table", cpTable)) } } return schemaTracker.BatchCreateTableIfNotExist(tablesToCreate) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 009172e495a..508dae203f2 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -3520,8 +3520,7 @@ func (s *Syncer) Pause() { return } s.stopSync() - err := s.schemaTracker.Close() - if err != nil { + if err := s.schemaTracker.Close(); err != nil { s.tctx.L().Error("fail to close schema tracker", log.ShortError(err)) } } diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 7ec0a467d2a..3f8529c12d3 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -25,8 +25,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/pkg/binlog" @@ -45,6 +43,7 @@ import ( "github.com/pingcap/tiflow/dm/syncer/dbconn" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/stretchr/testify/require" sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-mysql-org/go-mysql/mysql" @@ -767,6 +766,10 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.Batch = 1000 s.cfg.WorkerCount = 2 s.cfg.MaxRetry = 1 + s.cfg.To.Session = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } cfg, err := s.cfg.Clone() c.Assert(err, IsNil) @@ -790,6 +793,9 @@ func (s *testSyncerSuite) TestRun(c *C) { sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) mockGetServerUnixTS(mock) + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) @@ -1124,7 +1130,6 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - defer close(resultCh) // When crossing safeModeExitPoint, will generate a flush sql checkPointMock.ExpectBegin() @@ -1772,6 +1777,10 @@ func TestWaitBeforeRunExit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cfg := genDefaultSubTaskConfig4Test() cfg.WorkerCount = 0 + cfg.To.Session = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } syncer := NewSyncer(cfg, nil, nil) db, mock, err := sqlmock.New() From 5e982fbfdf7f4256831f2f7245d62fd9003d3c4c Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 8 May 2022 13:31:23 +0800 Subject: [PATCH 12/43] refine mock order Signed-off-by: lance6716 --- dm/syncer/syncer_test.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 3f8529c12d3..0345b9b7e37 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -792,13 +792,7 @@ func (s *testSyncerSuite) TestRun(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) - mockGetServerUnixTS(mock) - mock.ExpectBegin() - mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) - mock.ExpectCommit() - mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( - sqlmock.NewRows([]string{"Table", "Create Table"}). - AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) + syncer.exprFilterGroup = NewExprFilterGroup(utils.NewSessionCtx(nil), nil) c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) @@ -957,6 +951,16 @@ func (s *testSyncerSuite) TestRun(c *C) { mockDBProvider := conn.InitMockDB(c) mockDBProvider.ExpectQuery("SELECT cast\\(TIMEDIFF\\(NOW\\(6\\), UTC_TIMESTAMP\\(6\\)\\) as time\\);"). WillReturnRows(sqlmock.NewRows([]string{""}).AddRow("01:00:00")) + mockGetServerUnixTS(mock) + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) + c.Assert(syncer.Update(context.Background(), s.cfg), IsNil) c.Assert(syncer.timezone.String(), Equals, "+01:00") From 021b6f1007fff0f64870e55a3d644766d910c52a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 8 May 2022 13:39:23 +0800 Subject: [PATCH 13/43] fix mock Signed-off-by: lance6716 --- dm/syncer/syncer_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 0345b9b7e37..4214e19c89a 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -954,6 +954,9 @@ func (s *testSyncerSuite) TestRun(c *C) { mockGetServerUnixTS(mock) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectCommit() From 91784c8f488ff338aa1fdb98fea2f8b6488a2341 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 8 May 2022 14:45:56 +0800 Subject: [PATCH 14/43] fix IT Signed-off-by: lance6716 --- dm/syncer/syncer_test.go | 7 ++----- dm/tests/new_relay/run.sh | 2 ++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 4214e19c89a..041124d5bc6 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -1817,13 +1817,10 @@ func TestWaitBeforeRunExit(t *testing.T) { time.Sleep(time.Second) // wait s.Run start // test s.Run will not exit unit caller cancel ctx or call s.runCancel + require.Len(t, errCh, 0) cancel() // this will make s.Run exit wg.Wait() - require.Nil(t, <-errCh) - require.Equal(t, 0, len(errCh)) - require.NotNil(t, syncer.runCtx) - require.NotNil(t, syncer.runCancel) - require.True(t, syncer.schemaLoaded.Load()) + <-errCh // test syncer wait time not more than maxPauseOrStopWaitTime oldMaxPauseOrStopWaitTime := defaultMaxPauseOrStopWaitTime diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 8fb149d0e2f..50b21eeb807 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -242,6 +242,8 @@ function test_relay_operations() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # wait syncer begin to sync so it has deleted load task etcd KV. + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml # relay task transfer to worker1 with no error. check_metric $WORKER1_PORT "dm_relay_data_corruption" 3 -1 1 From 39a326a9a3ea12b05612422cdf2c9905fbf92dfe Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 9 May 2022 16:47:14 +0800 Subject: [PATCH 15/43] change logic for operate-schema Signed-off-by: lance6716 --- dm/pkg/schema/tracker.go | 13 ------ dm/pkg/schema/tracker_test.go | 74 ----------------------------------- dm/pkg/terror/error_list.go | 2 + dm/syncer/checkpoint.go | 18 +++++++++ dm/syncer/dbconn/utils.go | 2 + dm/syncer/schema.go | 55 ++++++++++++-------------- dm/syncer/syncer.go | 8 +--- 7 files changed, 49 insertions(+), 123 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 32ee779cf31..0e3eb6488d1 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -437,19 +437,6 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn return tr.dom.DDL().CreateTableWithInfo(tr.se, schemaName, ti, ddl.OnExistIgnore) } -func (tr *Tracker) RecreateTables(logCtx *tcontext.Context, tablesToDrop []*filter.Table, tablesToCreate map[string]map[string]*model.TableInfo) error { - tr.Lock() - defer tr.Unlock() - for _, tbl := range tablesToDrop { - // schema changed - if err := tr.DropTable(tbl); err != nil { - logCtx.L().Warn("failed to drop table from schema tracker", - zap.Stringer("table", tbl), log.ShortError(err)) - } - } - return tr.BatchCreateTableIfNotExist(tablesToCreate) -} - // BatchCreateTableIfNotExist will batch creating tables per schema. If the schema does not exist, it will create it. // The argument is { database name -> { table name -> TableInfo } }. func (tr *Tracker) BatchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error { diff --git a/dm/pkg/schema/tracker_test.go b/dm/pkg/schema/tracker_test.go index 463c5f7a341..c8bb5c16d7e 100644 --- a/dm/pkg/schema/tracker_test.go +++ b/dm/pkg/schema/tracker_test.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/util/filter" timock "github.com/pingcap/tidb/util/mock" - "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "github.com/pingcap/tiflow/dm/dm/config" @@ -954,76 +953,3 @@ func (s *trackerSuite) TestPlacementRule(c *C) { _, ok := tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) } - -func TestTrackerRecreateTables(t *testing.T) { - cfg := &config.SubTaskConfig{} - backupKeys := downstreamVars - defer func() { - downstreamVars = backupKeys - }() - downstreamVars = []string{"sql_mode"} - db, _, err := sqlmock.New() - require.NoError(t, err) - defer db.Close() - con, err := db.Conn(context.Background()) - require.NoError(t, err) - dbConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(con, nil)) - log.SetLevel(zapcore.ErrorLevel) - - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn) - require.NoError(t, err) - defer func() { - err = tracker.Close() - require.NoError(t, err) - }() - err = tracker.CreateSchemaIfNotExists("testdb") - require.NoError(t, err) - err = tracker.CreateSchemaIfNotExists("testdb2") - require.NoError(t, err) - - tables := []*filter.Table{ - {Schema: "testdb", Name: "foo"}, - {Schema: "testdb", Name: "foo1"}, - {Schema: "testdb2", Name: "foo3"}, - } - execStmt := []string{ - `create table foo(a int primary key);`, - `create table foo1(a int primary key);`, - `create table foo3(a int primary key);`, - } - tiInfos := make([]*model.TableInfo, len(tables)) - for i := range tables { - ctx := context.Background() - err = tracker.Exec(ctx, tables[i].Schema, execStmt[i]) - require.NoError(t, err) - tiInfos[i], err = tracker.GetTableInfo(tables[i]) - require.NoError(t, err) - require.NotNil(t, tiInfos[i]) - require.Equal(t, tables[i].Name, tiInfos[i].Name.O) - tiInfos[i] = tiInfos[i].Clone() - clearVolatileInfo(tiInfos[i]) - } - - // drop one schema - require.NoError(t, tracker.dom.DDL().DropSchema(tracker.se, model.NewCIStr("testdb"))) - - // recreate tables - tablesToCreate := map[string]map[string]*model.TableInfo{} - tablesToCreate["testdb"] = map[string]*model.TableInfo{} - tablesToCreate["testdb2"] = map[string]*model.TableInfo{} - for i := range tables { - tablesToCreate[tables[i].Schema][tables[i].Name] = tiInfos[i] - } - tctx := tcontext.Background() - err = tracker.RecreateTables(tctx, tables, tablesToCreate) - require.NoError(t, err) - // check all create success - for i := range tables { - var ti *model.TableInfo - ti, err = tracker.GetTableInfo(tables[i]) - require.NoError(t, err) - cloneTi := ti.Clone() - clearVolatileInfo(cloneTi) - require.Equal(t, tiInfos[i], cloneTi) - } -} diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index cae2a83f9d6..e794c85bb22 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -446,6 +446,7 @@ const ( codeSyncerParseDDL codeSyncerUnsupportedStmt codeSyncerGetEvent + codeSyncerDownstreamTableNotFound ) // DM-master error code. @@ -1125,6 +1126,7 @@ var ( ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.") ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "") ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.") + ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "") // DM-master error. ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "") diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index b9c5f9f0d84..17cdc659a04 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -277,6 +277,9 @@ type CheckPoint interface { // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]binlog.Location + // GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found + GetTableInfo(schema string, table string) *model.TableInfo + // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location @@ -860,6 +863,21 @@ func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location { return tablePoint } +func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo { + cp.RLock() + defer cp.RUnlock() + + tables, ok := cp.points[schema] + if !ok { + return nil + } + tablePoint, ok := tables[table] + if !ok { + return nil + } + return tablePoint.TableInfo() +} + // FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint. func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location { cp.RLock() diff --git a/dm/syncer/dbconn/utils.go b/dm/syncer/dbconn/utils.go index b0232b46537..277b75c0a98 100644 --- a/dm/syncer/dbconn/utils.go +++ b/dm/syncer/dbconn/utils.go @@ -43,6 +43,8 @@ func GetTableCreateSQL(tctx *tcontext.Context, conn *DBConn, tableID string) (sq if scanErr := rows.Scan(&table, &createStr); scanErr != nil { return "", terror.DBErrorAdapt(scanErr, terror.ErrDBDriverError) } + } else { + return "", terror.ErrSyncerDownstreamTableNotFound.Generate(tableID) } if err = rows.Close(); err != nil { diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index 3d0a21ec3f7..265ce265a93 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -14,11 +14,15 @@ package syncer import ( + "bytes" "context" "encoding/json" "regexp" "strings" + ddl2 "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" @@ -30,7 +34,6 @@ import ( "github.com/pingcap/tiflow/dm/openapi" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" - "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/syncer/dbconn" ) @@ -66,9 +69,18 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } return string(tableListJSON), err case pb.SchemaOp_GetSchema: - // we only try to get schema from schema-tracker now. - // in other words, we can not get the schema if any DDL/DML has been replicated, or set a schema previously. - return s.schemaTracker.GetCreateTable(ctx, sourceTable) + // when task is paused, schemaTracker is closed. We get the table structure from checkpoint. + ti := s.checkpoint.GetTableInfo(req.Database, req.Table) + if ti == nil { + s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", + zap.String("table", sourceTable.String())) + targetTable := s.route(sourceTable) + return dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) + } + result := bytes.NewBuffer(make([]byte, 0, 512)) + err2 := executor.ConstructResultOfShowCreateTable(s.sessCtx, ti, autoid.Allocators{}, result) + return result.String(), err2 + case pb.SchemaOp_SetSchema: // from source or target need get schema if req.FromSource { @@ -89,7 +101,6 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } // for set schema, we must ensure it's a valid `CREATE TABLE` statement. - // now, we only set schema for schema-tracker, // if want to update the one in checkpoint, it should wait for the flush of checkpoint. parser2, err := s.fromDB.GetParser(ctx) if err != nil { @@ -115,37 +126,21 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } newSQL := newCreateSQLBuilder.String() - // drop the previous schema first. - err = s.schemaTracker.DropTable(sourceTable) - if err != nil && !schema.IsTableNotExists(err) { - return "", terror.ErrSchemaTrackerCannotDropTable.Delegate(err, sourceTable) - } - err = s.schemaTracker.CreateSchemaIfNotExists(req.Database) - if err != nil { - return "", terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, req.Database) - } - err = s.schemaTracker.Exec(ctx, req.Database, newSQL) - if err != nil { - return "", terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, sourceTable) - } - s.exprFilterGroup.ResetExprs(sourceTable) - if !req.Flush && !req.Sync { - break + if !req.Flush { + s.tctx.L().Info("overwrite --flush to true for operate-schema") } - ti, err := s.schemaTracker.GetTableInfo(sourceTable) - if err != nil { - return "", err + ti, err2 := ddl2.BuildTableInfoFromAST(stmt) + if err2 != nil { + return "", terror.ErrSchemaTrackerRestoreStmtFail.Delegate(err2) } - if req.Flush { - log.L().Info("flush table info", zap.String("table info", newSQL)) - err = s.checkpoint.FlushPointsWithTableInfos(tcontext.NewContext(ctx, log.L()), []*filter.Table{sourceTable}, []*model.TableInfo{ti}) - if err != nil { - return "", err - } + s.tctx.L().Info("flush table info", zap.String("table info", newSQL)) + err = s.checkpoint.FlushPointsWithTableInfos(tcontext.NewContext(ctx, log.L()), []*filter.Table{sourceTable}, []*model.TableInfo{ti}) + if err != nil { + return "", err } if req.Sync { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 508dae203f2..e3ac6980a71 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -170,9 +170,8 @@ type Syncer struct { exprFilterGroup *ExprFilterGroup sessCtx sessionctx.Context - running atomic.Bool - closed atomic.Bool - schemaLoaded atomic.Bool + running atomic.Bool + closed atomic.Bool start atomic.Time lastTime atomic.Time @@ -260,7 +259,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel syncer.waitXIDJob.Store(int64(noWait)) syncer.isTransactionEnd = true syncer.closed.Store(false) - syncer.schemaLoaded.Store(false) syncer.lastBinlogSizeCount.Store(0) syncer.binlogSizeCount.Store(0) syncer.lastCount.Store(0) @@ -1560,8 +1558,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return err } - // TODO: not sure if schemaLoaded is needed by data validator, will remove it later. - s.schemaLoaded.Store(true) runCtx, runCancel := context.WithCancel(context.Background()) s.runCtx, s.runCancel = tcontext.NewContext(runCtx, s.tctx.L()), runCancel From 9b3ef6fc34c0b15d830ffd826e8632b29f13d2b4 Mon Sep 17 00:00:00 2001 From: --get-all Date: Mon, 9 May 2022 18:08:19 +0800 Subject: [PATCH 16/43] fix some tests --- dm/tests/dmctl_basic/run.sh | 4 ++-- dm/tests/expression_filter/run.sh | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index e803b7984a8..6987205ebca 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -435,9 +435,9 @@ function run() { run_sql_source1 "update dmctl.t_1 set d = '' where id = 13" run_sql_source1 "update dmctl.t_2 set d = '' where id = 12" - # sleep 2*1s to ensure syncer unit has flushed global checkpoint and updates + # sleep to ensure syncer unit has flushed global checkpoint and updates # updated ActiveRelayLog - sleep 2 + sleep 5 server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql_source1 "show binary logs\G" max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE" | tail -n 1 | awk -F":" '{print $NF}') diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 1b33c27051a..4c04ce3a617 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -19,11 +19,13 @@ function complex_behaviour() { run_sql_file $cur/data/db1.prepare2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - # test about schema-tracker can't create its storage + # test no permission chmod -w $WORK_DIR/worker1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task $cur/conf/dm-task2.yaml" \ - "failed to create schema tracker" 1 \ + "start-task $cur/conf/dm-task2.yaml" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "mydumper/dumpling runs with error" 1 \ "permission denied" 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test" From 083b2d6b4c370029dc0308311e77f1ce630acc17 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 9 May 2022 18:37:23 +0800 Subject: [PATCH 17/43] make fmt Signed-off-by: lance6716 --- dm/tests/expression_filter/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 4c04ce3a617..7a2d0e72afd 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -24,7 +24,7 @@ function complex_behaviour() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/dm-task2.yaml" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ + "query-status test" \ "mydumper/dumpling runs with error" 1 \ "permission denied" 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ From b5a52d4f6ec5406df9fc8ca540cf5f91d43c4a62 Mon Sep 17 00:00:00 2001 From: --get-all Date: Mon, 9 May 2022 18:51:12 +0800 Subject: [PATCH 18/43] fix test --- dm/pkg/schema/tracker.go | 5 +---- dm/pkg/utils/db.go | 7 +++++++ dm/pkg/utils/db_test.go | 6 ++++++ dm/syncer/schema.go | 4 +++- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 0e3eb6488d1..0f1a8a91e1d 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -286,10 +286,7 @@ func (tr *Tracker) GetCreateTable(ctx context.Context, table *filter.Table) (str row := req.GetRow(0) str := row.GetString(1) // the first column is the table name. - // returned as single line. - str = strings.ReplaceAll(str, "\n", "") - str = strings.ReplaceAll(str, " ", " ") - return str, nil + return utils.CreateTableSQLToOneRow(str), nil } // AllSchemas returns all schemas visible to the tracker (excluding system tables). diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index ec724d16e11..89464e4a7d0 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -671,3 +671,10 @@ func GetMaxConnectionsForConn(ctx context.Context, conn *sql.Conn) (int, error) func IsMariaDB(version string) bool { return strings.Contains(strings.ToUpper(version), "MARIADB") } + +// CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row. +func CreateTableSQLToOneRow(sql string) string { + sql = strings.ReplaceAll(sql, "\n", "") + sql = strings.ReplaceAll(sql, " ", " ") + return sql +} diff --git a/dm/pkg/utils/db_test.go b/dm/pkg/utils/db_test.go index 35f86ec12c3..2c2c475f836 100644 --- a/dm/pkg/utils/db_test.go +++ b/dm/pkg/utils/db_test.go @@ -497,3 +497,9 @@ func TestIsMariaDB(t *testing.T) { require.True(t, IsMariaDB("5.5.50-MariaDB-1~wheezy")) require.False(t, IsMariaDB("5.7.19-17-log")) } + +func TestCreateTableSQLToOneRow(t *testing.T) { + input := "CREATE TABLE `t1` (\n `id` bigint(20) NOT NULL,\n `c1` varchar(20) DEFAULT NULL,\n `c2` varchar(20) DEFAULT NULL,\n PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */\n) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin" + expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin" + require.Equal(t, expected, CreateTableSQLToOneRow(input)) +} diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index 265ce265a93..c21fc3f19b2 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/filter" + "github.com/pingcap/tiflow/dm/pkg/utils" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -77,9 +78,10 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR targetTable := s.route(sourceTable) return dbconn.GetTableCreateSQL(s.tctx.WithContext(ctx), s.downstreamTrackConn, targetTable.String()) } + result := bytes.NewBuffer(make([]byte, 0, 512)) err2 := executor.ConstructResultOfShowCreateTable(s.sessCtx, ti, autoid.Allocators{}, result) - return result.String(), err2 + return utils.CreateTableSQLToOneRow(result.String()), err2 case pb.SchemaOp_SetSchema: // from source or target need get schema From 8ec105070a4fcd7c5d470a1921a80138b5595bb6 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 9 May 2022 19:58:59 +0800 Subject: [PATCH 19/43] fix terror Signed-off-by: lance6716 --- dm/_utils/terror_gen/errors_release.txt | 1 + dm/errors.toml | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index 1fd4b06f350..9321197e8e7 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -347,6 +347,7 @@ ErrSyncerEventNotExist,[code=36066:class=sync-unit:scope=internal:level=high], " ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed." ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode" ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`." +ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid" diff --git a/dm/errors.toml b/dm/errors.toml index c0d9a75cc40..03a046aeb24 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -2092,6 +2092,12 @@ description = "" workaround = "Please check if the binlog file could be parsed by `mysqlbinlog`." tags = ["upstream", "high"] +[error.DM-sync-unit-36070] +message = "downstream table %s not found" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-dm-master-38001] message = "nil request not valid" description = "" From 04e1ae9d54fc78f1727c6b896c2a4e888352ec95 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 11:51:39 +0800 Subject: [PATCH 20/43] change syncer prepare logic order Signed-off-by: lance6716 --- dm/syncer/syncer.go | 56 ++++++++++++++++++++++++------------- dm/tests/dmctl_basic/run.sh | 4 +-- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index e3ac6980a71..5823b09c3dd 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1549,16 +1549,6 @@ func (s *Syncer) updateTSOffset(ctx context.Context) error { // Run starts running for sync, we should guarantee it can rerun when paused. func (s *Syncer) Run(ctx context.Context) (err error) { - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) - if err != nil { - return terror.ErrSchemaTrackerInit.Delegate(err) - } - - err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker) - if err != nil { - return err - } - runCtx, runCancel := context.WithCancel(context.Background()) s.runCtx, s.runCancel = tcontext.NewContext(runCtx, s.tctx.L()), runCancel syncCtx, syncCancel := context.WithCancel(context.Background()) @@ -1613,6 +1603,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { flushCheckpoint bool delLoadTask bool cleanDumpFile = s.cfg.CleanDumpFile + freshAndAllMode bool ) flushCheckpoint, err = s.adjustGlobalPointGTID(s.runCtx) if err != nil { @@ -1621,14 +1612,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if fresh && s.cfg.Mode == config.ModeAll { delLoadTask = true flushCheckpoint = true - err = s.loadTableStructureFromDump(ctx) - if err != nil { - s.tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) - cleanDumpFile = false - } - if s.cfg.ShardMode == config.ShardOptimistic { - s.flushOptimisticTableInfos(s.runCtx) - } + freshAndAllMode = true } if s.cfg.Mode == config.ModeIncrement || !fresh { @@ -1649,6 +1633,19 @@ func (s *Syncer) Run(ctx context.Context) (err error) { go s.updateLagCronJob(s.runCtx.Ctx) s.runWg.Add(1) go s.updateTSOffsetCronJob(s.runCtx.Ctx) + + // some prepare work before the binlog event loop: + // 1. first we flush checkpoint as needed, so in next resume we won't go to Load unit. + // 2. then since we are confident that Load unit is done we can delete the load task etcd KV. + // TODO: we can't handle panic between 1. and 2., or fail to delete the load task etcd KV. + // 3. then we initiate schema tracker + // 4. - when it's a fresh task, load the table structure from dump files into schema tracker. + // if it's also a optimistic sharding task, also load the table structure into checkpoints because shard tables + // may not have same table structure so we can't fetch the downstream table structure for them lazily. + // - when it's a resumed task, load the table structure from checkpoints into schema tracker. + // TODO: we can't handle failure between 1. and 4. After 1. it's not a fresh task. + // 5. finally clean the dump files + if flushCheckpoint { if err = s.flushCheckPoints(); err != nil { s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) @@ -1657,7 +1654,28 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } if delLoadTask { if err = s.delLoadTask(); err != nil { - s.tctx.L().Warn("error when del load task in etcd", zap.Error(err)) + s.tctx.L().Error("error when del load task in etcd", zap.Error(err)) + } + } + + s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) + if err != nil { + return terror.ErrSchemaTrackerInit.Delegate(err) + } + + if freshAndAllMode { + err = s.loadTableStructureFromDump(ctx) + if err != nil { + s.tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) + cleanDumpFile = false + } + if s.cfg.ShardMode == config.ShardOptimistic { + s.flushOptimisticTableInfos(s.runCtx) + } + } else { + err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker) + if err != nil { + return err } } diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 6987205ebca..e803b7984a8 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -435,9 +435,9 @@ function run() { run_sql_source1 "update dmctl.t_1 set d = '' where id = 13" run_sql_source1 "update dmctl.t_2 set d = '' where id = 12" - # sleep to ensure syncer unit has flushed global checkpoint and updates + # sleep 2*1s to ensure syncer unit has flushed global checkpoint and updates # updated ActiveRelayLog - sleep 5 + sleep 2 server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql_source1 "show binary logs\G" max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE" | tail -n 1 | awk -F":" '{print $NF}') From e5ce6356a5f100bd5f709b747520da7d59fc33f0 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 13:32:44 +0800 Subject: [PATCH 21/43] try fix CI Signed-off-by: lance6716 --- dm/tests/dmctl_basic/conf/dm-task.yaml | 2 ++ dm/tests/dmctl_basic/conf/get_task.yaml | 2 ++ dm/tests/dmctl_basic/run.sh | 6 +++--- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dm/tests/dmctl_basic/conf/dm-task.yaml b/dm/tests/dmctl_basic/conf/dm-task.yaml index ed9f54cfeaf..b9ea640e7f1 100644 --- a/dm/tests/dmctl_basic/conf/dm-task.yaml +++ b/dm/tests/dmctl_basic/conf/dm-task.yaml @@ -39,6 +39,8 @@ block-allow-list: do-tables: - db-name: "dmctl" tbl-name: "~^t_[\\d]+" + - db-name: "dmctl" + tbl-name: "flush_trigger" routes: sharding-route-rules-table: diff --git a/dm/tests/dmctl_basic/conf/get_task.yaml b/dm/tests/dmctl_basic/conf/get_task.yaml index 27554b6dec2..48ca0ecad06 100644 --- a/dm/tests/dmctl_basic/conf/get_task.yaml +++ b/dm/tests/dmctl_basic/conf/get_task.yaml @@ -126,6 +126,8 @@ block-allow-list: do-tables: - db-name: dmctl tbl-name: ~^t_[\d]+ + - db-name: dmctl + tbl-name: flush_trigger do-dbs: - dmctl ignore-tables: [] diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index e803b7984a8..ad13e6fab90 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -432,12 +432,12 @@ function run() { # make sure every shard table in source 1 has be forwarded to newer binlog, so older relay log could be purged run_sql_source1 "flush logs" + run_sql_source1 "create table dmctl.flush_trigger (c int primary key);" run_sql_source1 "update dmctl.t_1 set d = '' where id = 13" run_sql_source1 "update dmctl.t_2 set d = '' where id = 12" - # sleep 2*1s to ensure syncer unit has flushed global checkpoint and updates - # updated ActiveRelayLog - sleep 2 + # sleep to ensure syncer unit has resumed, read next binlog files and updated ActiveRelayLog + sleep 5 server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql_source1 "show binary logs\G" max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE" | tail -n 1 | awk -F":" '{print $NF}') From 83235ed2e97e35d55961d94172887eda28ed5ca1 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 16:04:06 +0800 Subject: [PATCH 22/43] try fix CI --- dm/tests/handle_error_3/run.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dm/tests/handle_error_3/run.sh b/dm/tests/handle_error_3/run.sh index f0726acd0da..4cb30f7c34e 100644 --- a/dm/tests/handle_error_3/run.sh +++ b/dm/tests/handle_error_3/run.sh @@ -187,6 +187,10 @@ function DM_4193_CASE() { "binlog revert test -s $source2 -b $first_name2:$second_pos2" \ "operator not exist" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"stage": "Paused"' 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "binlog skip test" \ "\"result\": true" 3 From 0913117a41336a1b8e9edc3a3c8bd228a5667c8f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 16:26:06 +0800 Subject: [PATCH 23/43] fix again Signed-off-by: lance6716 --- dm/tests/expression_filter/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 7a2d0e72afd..c5662e6ae89 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -25,7 +25,7 @@ function complex_behaviour() { "start-task $cur/conf/dm-task2.yaml" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "mydumper/dumpling runs with error" 1 \ + "dumpling runs with error" 1 \ "permission denied" 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test" From 0f3f03a6d98e377306c87113bf2645a565a3fa52 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 17:38:35 +0800 Subject: [PATCH 24/43] weaken the test checking pattern Signed-off-by: lance6716 --- dm/tests/shardddl2_1/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/tests/shardddl2_1/run.sh b/dm/tests/shardddl2_1/run.sh index 112b670bd22..7669d2a1804 100644 --- a/dm/tests/shardddl2_1/run.sh +++ b/dm/tests/shardddl2_1/run.sh @@ -146,9 +146,10 @@ function DM_050_CASE() { if [[ "$1" = "pessimistic" ]]; then check_log_contain_with_retry "is different with" $WORK_DIR/master/log/dm-master.log else + # can't make sure DDL of which source comes first run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - 'ALTER TABLE `shardddl`.`tb` CHANGE COLUMN `a` `d` INT' 1 \ + 'ALTER TABLE `shardddl`.`tb` CHANGE COLUMN' 1 \ "\"${SOURCE_ID2}-\`${shardddl1}\`.\`${tb1}\`\"" 1 fi } From bbed995841dc9eb2aaefeabc6b6241d077bfb37d Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 18:46:35 +0800 Subject: [PATCH 25/43] fix again Signed-off-by: lance6716 --- dm/tests/shardddl3_1/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/tests/shardddl3_1/run.sh b/dm/tests/shardddl3_1/run.sh index d774a8de2e8..12dbb1d6c5b 100644 --- a/dm/tests/shardddl3_1/run.sh +++ b/dm/tests/shardddl3_1/run.sh @@ -312,12 +312,12 @@ function different_field_flag_test() { if [[ $locked == true ]]; then run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\` ${type2^^}" 1 \ + "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\`" 1 \ "\"${SOURCE_ID2}-\`${shardddl1}\`.\`${tb1}\`\"" 1 else run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\` ${type2^^}" 2 \ + "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\`" 2 \ "because schema conflict detected" 1 fi From 5170b1cf6396624f809ce29d1d7a8b84531b076b Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 10 May 2022 19:32:26 +0800 Subject: [PATCH 26/43] fix again Signed-off-by: lance6716 --- dm/syncer/checkpoint.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 74dba381110..76a3bdc56ad 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -782,6 +782,9 @@ func (cp *RemoteCheckPoint) FlushPointsWithTableInfos(tctx *tcontext.Context, ta if point == nil { cp.saveTablePoint(table, cp.globalPoint.MySQLLocation(), ti) point = cp.points[sourceSchema][sourceTable] + } else { + point.savedPoint.ti = ti + point.flushedPoint.ti = ti } tiBytes, err := json.Marshal(ti) if err != nil { From 165e93d50c24f895bf1c447e752a47b1dc4819fe Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 10:17:45 +0800 Subject: [PATCH 27/43] fix test in a new day --- dm/tests/shardddl3_1/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dm/tests/shardddl3_1/run.sh b/dm/tests/shardddl3_1/run.sh index 12dbb1d6c5b..bf6b4109130 100644 --- a/dm/tests/shardddl3_1/run.sh +++ b/dm/tests/shardddl3_1/run.sh @@ -309,11 +309,13 @@ function different_field_flag_test() { run_sql_source2 "insert into ${shardddl1}.${tb1} values(3);" run_sql_source2 "alter table ${shardddl1}.${tb1} add column col1 $type2" run_sql_source2 "insert into ${shardddl1}.${tb1} values (4,${val2});" + + # we can't sure SQL on which source comes first, so only check the common pattern if [[ $locked == true ]]; then run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\`" 1 \ - "\"${SOURCE_ID2}-\`${shardddl1}\`.\`${tb1}\`\"" 1 + "-\`${shardddl1}\`.\`${tb1}\`\"" 1 else run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ From 7de53ce09560856771fc96cc889cb79cbd12791e Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 10:36:08 +0800 Subject: [PATCH 28/43] fix matching pattern --- dm/tests/shardddl3_1/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/shardddl3_1/run.sh b/dm/tests/shardddl3_1/run.sh index bf6b4109130..d15e481b76c 100644 --- a/dm/tests/shardddl3_1/run.sh +++ b/dm/tests/shardddl3_1/run.sh @@ -315,7 +315,7 @@ function different_field_flag_test() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "ALTER TABLE \`${shardddl}\`.\`${tb}\` ADD COLUMN \`col1\`" 1 \ - "-\`${shardddl1}\`.\`${tb1}\`\"" 1 + "\`${shardddl1}\`.\`${tb1}\`\"" 1 else run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ From b485b0029da6808a742640b4d86e8248fe07d802 Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 13:33:43 +0800 Subject: [PATCH 29/43] try another API --- dm/syncer/schema.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index c21fc3f19b2..9cf4b4c2024 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -162,9 +162,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } case pb.SchemaOp_RemoveSchema: - // we only drop the schema in the schema-tracker now, - // so if we drop the schema and continue to replicate any DDL/DML, it will try to get schema from downstream again. - return "", s.schemaTracker.DropTable(sourceTable) + // as the doc says, `operate-schema remove` will let DM-worker use table structure in checkpoint, which does not + // need further actions. + return "", nil } return "", nil } From 5ae3a50279f1803f429b8c1355cad0df1dc6aa0d Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 13:41:25 +0800 Subject: [PATCH 30/43] fix again --- dm/tests/sequence_sharding_optimistic/run.sh | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dm/tests/sequence_sharding_optimistic/run.sh b/dm/tests/sequence_sharding_optimistic/run.sh index 1c3cd6f35d9..07f66dbaf07 100755 --- a/dm/tests/sequence_sharding_optimistic/run.sh +++ b/dm/tests/sequence_sharding_optimistic/run.sh @@ -154,17 +154,13 @@ run() { # drop the schema. curl -X PUT ${API_URL} -d '{"op":3, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/remove_schema.log - # try to get schema again, but can't get. - curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log - check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1 - # try to set an invalid schema. curl -X PUT ${API_URL} -d '{"op":2, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1", "schema":"invalid create table statement"}' >${WORK_DIR}/get_schema.log >${WORK_DIR}/set_schema.log check_log_contains ${WORK_DIR}/set_schema.log 'is not a valid `CREATE TABLE` statement' 1 - # try to get schema again, no one exist. + # try to get schema again, still the old schema. curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log - check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1 + check_log_contains ${WORK_DIR}/get_schema.log 'CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c2` varchar(20) DEFAULT NULL, `c3` int(11) DEFAULT NULL, PRIMARY KEY (`id`) .*) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "binlog-schema list -s mysql-replica-01,mysql-replica-02 sequence_sharding_optimistic sharding_seq_opt t2" \ From 0f3ae525caa7a690e1c13d7613324d6c4c0f4718 Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 14:15:56 +0800 Subject: [PATCH 31/43] some fix --- dm/tests/relay_interrupt/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/tests/relay_interrupt/run.sh b/dm/tests/relay_interrupt/run.sh index 122eb3e19d9..409836644cb 100644 --- a/dm/tests/relay_interrupt/run.sh +++ b/dm/tests/relay_interrupt/run.sh @@ -90,8 +90,8 @@ function run() { "start-task $task_conf" \ "\"result\": false" 1 \ "subtasks with name test for sources \[mysql-replica-01\] already exist" 1 - - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + # wait relay unit up + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"binlogType\": \"local\"" 1 From 7c6ff83ed2415ca2d1c1f38b3c7068406f9219fc Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 14:58:41 +0800 Subject: [PATCH 32/43] increase check_sync_diff retry --- dm/tests/_utils/check_sync_diff | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/_utils/check_sync_diff b/dm/tests/_utils/check_sync_diff index 27d85485768..ccccb0a833d 100755 --- a/dm/tests/_utils/check_sync_diff +++ b/dm/tests/_utils/check_sync_diff @@ -9,7 +9,7 @@ conf=$2 if [ $# -ge 3 ]; then check_time=$3 else - check_time=10 + check_time=20 fi PWD=$(pwd) From 6ca219a99f24e8fb119af44241eda4a678fb0f08 Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 16:39:15 +0800 Subject: [PATCH 33/43] fix again... --- dm/tests/shardddl1/run.sh | 20 ++++++++++---------- dm/tests/shardddl4/run.sh | 3 +++ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 36118287425..941292da2d9 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -559,8 +559,8 @@ function DM_COMPACT_CASE() { function DM_COMPACT() { # mock downstream has a high latency and upstream has a high workload ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -602,8 +602,8 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { # downstream pk/uk/column is diffrent with upstream, compact use downstream schema. ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20, if false, it will be panic. # This goal is check whether it use downstream schema in compator. # if use downstream schema, key will be 'b' with value less than 20. @@ -681,8 +681,8 @@ function DM_MULTIPLE_ROWS_CASE() { function DM_MULTIPLE_ROWS() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -694,8 +694,8 @@ function DM_MULTIPLE_ROWS() { "clean_table" "" ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -761,8 +761,8 @@ function DM_DML_EXECUTE_ERROR_CASE() { function DM_DML_EXECUTE_ERROR() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/ErrorOnLastDML=return()' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml diff --git a/dm/tests/shardddl4/run.sh b/dm/tests/shardddl4/run.sh index 26e7883cdd4..cd71422d9bf 100644 --- a/dm/tests/shardddl4/run.sh +++ b/dm/tests/shardddl4/run.sh @@ -422,6 +422,9 @@ function DM_130_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(6,6);" + # make sure 2 DM-workers have a order to see DDL + sleep 2 + run_sql_source2 "alter table ${shardddl1}.${tb1} modify b int default -1;" run_sql_source1 "insert into ${shardddl1}.${tb1}(a) values(7);" run_sql_source2 "insert into ${shardddl1}.${tb1}(a) values(8);" From 6b72a4040ee005948dd5fa0431c510430bf7861d Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 19:06:58 +0800 Subject: [PATCH 34/43] add retryable error --- dm/dm/worker/source_worker.go | 2 +- dm/syncer/dbconn/db.go | 22 +++++++++++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 6bbff78f5cd..1eea6bb7ce6 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -608,7 +608,7 @@ func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskCon return st.Update(ctx, cfg) } -// OperateSubTask stop/resume/pause sub task. +// OperateSubTask stop/resume/pause sub task. func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { w.Lock() defer w.Unlock() diff --git a/dm/syncer/dbconn/db.go b/dm/syncer/dbconn/db.go index c91a4528d03..187d84adee9 100644 --- a/dm/syncer/dbconn/db.go +++ b/dm/syncer/dbconn/db.go @@ -18,7 +18,10 @@ import ( "strings" "time" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/util/dbutil" "go.uber.org/zap" @@ -197,7 +200,8 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1) return true } - return false + // TODO: move it to above IsRetryableError + return isRetryableError(err) }, } @@ -234,6 +238,22 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun return ret.(int), nil } +func isRetryableError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrKeyColumnDoesNotExits: + // when two DDL modify one column + return true + } + + return false +} + // ExecuteSQL does some SQL executions. func (conn *DBConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) { return conn.ExecuteSQLWithIgnore(tctx, nil, queries, args...) From 0d0f63a80d47591706173d80d21116828b66b156 Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 19:28:49 +0800 Subject: [PATCH 35/43] fix openapi test --- dm/tests/openapi/client/openapi_task_check | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index 83fd5f0847b..afd2cf42ddb 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -292,17 +292,7 @@ def operate_schema_and_table_success(task_name, source_name, schema_name, table_ assert create_table["schema_name"] == schema_name assert table_name in create_table["schema_create_sql"] - # delete table - resp = requests.delete(url=single_table_url) - assert resp.status_code == 204 - - # after delete, no table in schema - resp = requests.get(url=table_url) - assert resp.status_code == 200 - print("get_task_schema_success table resp=", resp.json()) - assert len(resp.json()) == 0 - - # add table back again + # overwrite table set_table_data = { "sql_content": "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);", "flush": True, From 1a7ef75e4d349b8c4a93fb7ddda648e59dfeb2c0 Mon Sep 17 00:00:00 2001 From: --get-all Date: Wed, 11 May 2022 20:10:53 +0800 Subject: [PATCH 36/43] fix another test --- dm/tests/sync_collation/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 5eadd952fe6..241122c34bc 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -94,7 +94,8 @@ function run() { run_sql_file $cur/data/db1.prepare_err.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.prepare_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml --remove-meta" echo "check full phase error" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ From 3e2ee0660eb5a5ecc2d5ef2bd67a972af674cf67 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 09:43:44 +0800 Subject: [PATCH 37/43] fix another unstable test --- dm/tests/s3_dumpling_lighting/run.sh | 3 ++- dm/tests/sync_collation/run.sh | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dm/tests/s3_dumpling_lighting/run.sh b/dm/tests/s3_dumpling_lighting/run.sh index 3fb1a87c89c..2b18e119c65 100755 --- a/dm/tests/s3_dumpling_lighting/run.sh +++ b/dm/tests/s3_dumpling_lighting/run.sh @@ -177,7 +177,8 @@ function run_error_check() { cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml sed -i "s#name: test#name: $TASK_NAME#g" $WORK_DIR/dm-task.yaml sed -i "s#dir: placeholder#dir: $S3_DIR#g" $WORK_DIR/dm-task.yaml - dmctl_start_task $WORK_DIR/dm-task.yaml "--remove-meta" + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $WORK_DIR/dm-task.yaml" run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 diff --git a/dm/tests/sync_collation/run.sh b/dm/tests/sync_collation/run.sh index 241122c34bc..18ce6d444ae 100755 --- a/dm/tests/sync_collation/run.sh +++ b/dm/tests/sync_collation/run.sh @@ -95,7 +95,7 @@ function run() { run_sql_file $cur/data/db2.prepare_err.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task $WORK_DIR/dm-task.yaml --remove-meta" + "start-task $WORK_DIR/dm-task.yaml --remove-meta" echo "check full phase error" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ From 9b1c4350c7d55b80d963bd5e2583cf691124fd27 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 10:09:49 +0800 Subject: [PATCH 38/43] fix lint --- dm/syncer/dbconn/db.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/dm/syncer/dbconn/db.go b/dm/syncer/dbconn/db.go index 187d84adee9..952c6231c32 100644 --- a/dm/syncer/dbconn/db.go +++ b/dm/syncer/dbconn/db.go @@ -244,14 +244,8 @@ func isRetryableError(err error) bool { if !ok { return false } - - switch mysqlErr.Number { - case errno.ErrKeyColumnDoesNotExits: - // when two DDL modify one column - return true - } - - return false + + return mysqlErr.Number == errno.ErrKeyColumnDoesNotExits } // ExecuteSQL does some SQL executions. From 7b0e7d62cd5dcab141b5a9bd01e2c640d72157e2 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 11:25:51 +0800 Subject: [PATCH 39/43] fix comment Signed-off-by: lance6716 --- dm/pkg/shardddl/optimism/lock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/pkg/shardddl/optimism/lock.go b/dm/pkg/shardddl/optimism/lock.go index da71fe38804..5c39444d57f 100644 --- a/dm/pkg/shardddl/optimism/lock.go +++ b/dm/pkg/shardddl/optimism/lock.go @@ -614,7 +614,7 @@ func (l *Lock) IsDroppedColumn(source, upSchema, upTable, col string) bool { return true } -// AddDroppedColumn adds a dropped column name in both etcd and lock's column map. +// AddDroppedColumns adds a dropped column name in both etcd and lock's column map. func (l *Lock) AddDroppedColumns(source, schema, table string, cols []string) error { newCols := make([]string, 0, len(cols)) for _, col := range cols { From 787c11c09ea9234411b6e151d34f4ec431ae3474 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 11:52:11 +0800 Subject: [PATCH 40/43] fix unstable CI Signed-off-by: lance6716 --- dm/syncer/dbconn/db.go | 2 +- dm/tests/checkpoint_transaction/run.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/syncer/dbconn/db.go b/dm/syncer/dbconn/db.go index 952c6231c32..59d9226b203 100644 --- a/dm/syncer/dbconn/db.go +++ b/dm/syncer/dbconn/db.go @@ -244,7 +244,7 @@ func isRetryableError(err error) bool { if !ok { return false } - + return mysqlErr.Number == errno.ErrKeyColumnDoesNotExits } diff --git a/dm/tests/checkpoint_transaction/run.sh b/dm/tests/checkpoint_transaction/run.sh index e3834469245..720c557c7b7 100755 --- a/dm/tests/checkpoint_transaction/run.sh +++ b/dm/tests/checkpoint_transaction/run.sh @@ -118,7 +118,7 @@ function run() { rm -rf $WORK_DIR/worker1 run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"stage\": \"Running\"" 1 From 6195c35386a2db6fb1fd9637cd00c3934fc038e9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 13:07:59 +0800 Subject: [PATCH 41/43] fix another unstable test Signed-off-by: lance6716 --- dm/tests/shardddl_optimistic/run.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dm/tests/shardddl_optimistic/run.sh b/dm/tests/shardddl_optimistic/run.sh index 8781e3a7760..6e1573ad9f3 100644 --- a/dm/tests/shardddl_optimistic/run.sh +++ b/dm/tests/shardddl_optimistic/run.sh @@ -173,6 +173,8 @@ function DM_RESTART_TASK_MASTER_WORKER_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb2} values(8,'8','88');" run_sql_source1 "alter table ${shardddl1}.${tb1} add column c text;" + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb1} add column c text" \ + $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log random_restart # source1.tb1(a,c); source1.tb2(a,b); source2.tb1(a,c); source2.tb2(a,b,c) @@ -182,6 +184,8 @@ function DM_RESTART_TASK_MASTER_WORKER_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb2} values(12,'1212','121212');" run_sql_source2 "alter table ${shardddl1}.${tb2} drop column b;" + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb2} drop column b" \ + $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log random_restart # source1.tb1(a,c); source1.tb2(a,b); source2.tb1(a,c); source2.tb2(a,c) @@ -191,6 +195,10 @@ function DM_RESTART_TASK_MASTER_WORKER_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb2} values(16,'161616');" run_sql_source1 "alter table ${shardddl1}.${tb2} drop column b;" + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb2} drop column b" \ + $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb2} drop column b" \ + $WORK_DIR/worker2/log/dm-worker.log random_restart # source1.tb1(a,c); source1.tb2(a); source2.tb1(a,c); source2.tb2(a,c) @@ -200,6 +208,8 @@ function DM_RESTART_TASK_MASTER_WORKER_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb2} values(20,'202020');" run_sql_source1 "alter table ${shardddl1}.${tb2} add column c text;" + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb2} add column c text" \ + $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log random_restart # source1.tb1(a,c); source1.tb2(a,c); source2.tb1(a,c); source2.tb2(a,c) From 5be23767fa8bd17736ce5ab59e78c50024e3a21f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 16:13:04 +0800 Subject: [PATCH 42/43] address comment Signed-off-by: lance6716 --- dm/tests/shardddl4/run.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dm/tests/shardddl4/run.sh b/dm/tests/shardddl4/run.sh index cd71422d9bf..0774ed9c507 100644 --- a/dm/tests/shardddl4/run.sh +++ b/dm/tests/shardddl4/run.sh @@ -422,8 +422,10 @@ function DM_130_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(6,6);" - # make sure 2 DM-workers have a order to see DDL - sleep 2 + if [[ "$1" = "optimistic" ]]; then + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb1} modify b int default 0" \ + $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + fi run_sql_source2 "alter table ${shardddl1}.${tb1} modify b int default -1;" run_sql_source1 "insert into ${shardddl1}.${tb1}(a) values(7);" From 983b24cd636031d9a0d752e83e63cd42ccee1cc4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 12 May 2022 18:01:30 +0800 Subject: [PATCH 43/43] fix unstable test Signed-off-by: lance6716 --- dm/tests/_utils/test_prepare | 11 +++++++---- dm/tests/shardddl4_1/run.sh | 24 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index b4ea29bf170..dc022897114 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -311,15 +311,18 @@ function check_log_contain_with_retry() { function init_cluster(){ run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 } diff --git a/dm/tests/shardddl4_1/run.sh b/dm/tests/shardddl4_1/run.sh index 8149af80bf7..3321342b7bf 100644 --- a/dm/tests/shardddl4_1/run.sh +++ b/dm/tests/shardddl4_1/run.sh @@ -911,73 +911,97 @@ function DM_155_CASE { run_sql_source2 "insert into ${shardddl1}.${tb2} values(3,3,3);" run_sql_source1 "alter table ${shardddl1}.${tb1} change c b int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(4,4,4);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5,5);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(6,6,6);" run_sql_source1 "alter table ${shardddl1}.${tb1} add column g int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(7,7,7,7);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(8,8,8);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(9,9,9);" run_sql_source1 "alter table ${shardddl1}.${tb1} change d f int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(10,10,10,10);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(11,11,11);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(12,12,12);" run_sql_source1 "alter table ${shardddl1}.${tb1} add column e int not null after f;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(13,13,13,13,13);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(14,14,14);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(15,15,15);" run_sql_source2 "alter table ${shardddl1}.${tb1} change c b int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(16,16,16,16,16);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(17,17,17);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(18,18,18);" run_sql_source2 "alter table ${shardddl1}.${tb1} change d f int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(19,19,19,19,19);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(20,20,20);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(21,21,21);" run_sql_source2 "alter table ${shardddl1}.${tb1} add column g int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(22,22,22,22,22);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(23,23,23,23);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(24,24,24);" run_sql_source2 "alter table ${shardddl1}.${tb1} add column e int not null after f;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(25,25,25,25,25);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(26,26,26,26,26);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(27,27,27);" run_sql_source2 "alter table ${shardddl1}.${tb2} change c b int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(28,28,28,28,28);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(29,29,29,29,29);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(30,30,30);" run_sql_source2 "alter table ${shardddl1}.${tb2} change d f int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(31,31,31,31,31);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(32,32,32,32,32);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(33,33,33);" run_sql_source2 "alter table ${shardddl1}.${tb2} add column e int not null after f;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(34,34,34,34,34);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(35,35,35,35,35);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(36,36,36,36);" run_sql_source2 "alter table ${shardddl1}.${tb2} add column g int;" + sleep 1 random_restart 3 + run_sql_source1 "insert into ${shardddl1}.${tb1} values(37,37,37,37,37);" run_sql_source2 "insert into ${shardddl1}.${tb1} values(38,38,38,38,38);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(39,39,39,39,39);"