From 74ac9622fddf0584b4c6ed96f75911853c835ffa Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 15 Jun 2022 11:22:33 +0800 Subject: [PATCH] tracker(dm): close and recreate tracker when pause and resume (#5350) (#5419) (#5636) close pingcap/tiflow#5344 --- dm/_utils/terror_gen/errors_release.txt | 1 + dm/dm/worker/source_worker.go | 2 +- dm/errors.toml | 6 + dm/pkg/schema/tracker.go | 20 +++- dm/pkg/terror/error_list.go | 2 + dm/pkg/utils/db.go | 7 ++ dm/pkg/utils/db_test.go | 8 ++ dm/syncer/checkpoint.go | 115 +++++++++++-------- dm/syncer/checkpoint_test.go | 25 ++-- dm/syncer/dbconn/db.go | 16 ++- dm/syncer/schema.go | 63 +++++----- dm/syncer/syncer.go | 50 ++++++-- dm/syncer/syncer_test.go | 27 ++++- dm/tests/_utils/check_process_exit | 22 ++++ dm/tests/_utils/check_sync_diff | 2 +- dm/tests/_utils/test_prepare | 11 +- dm/tests/dmctl_basic/conf/dm-task.yaml | 2 + dm/tests/dmctl_basic/conf/get_task.yaml | 2 + dm/tests/dmctl_basic/run.sh | 6 +- dm/tests/expression_filter/run.sh | 8 +- dm/tests/handle_error_3/run.sh | 4 + dm/tests/many_tables/conf/source1.yaml | 3 +- dm/tests/many_tables/run.sh | 37 +++++- dm/tests/new_relay/run.sh | 2 + dm/tests/openapi/client/openapi_task_check | 12 +- dm/tests/relay_interrupt/run.sh | 4 +- dm/tests/sequence_sharding_optimistic/run.sh | 13 +-- dm/tests/shardddl1/run.sh | 20 ++-- dm/tests/shardddl4/run.sh | 5 + dm/tests/start_task/run.sh | 4 +- 30 files changed, 333 insertions(+), 166 deletions(-) create mode 100755 dm/tests/_utils/check_process_exit diff --git a/dm/_utils/terror_gen/errors_release.txt b/dm/_utils/terror_gen/errors_release.txt index c81497876da..1e97991f2c7 100644 --- a/dm/_utils/terror_gen/errors_release.txt +++ b/dm/_utils/terror_gen/errors_release.txt @@ -336,6 +336,7 @@ ErrSyncerReplaceEventNotExist,[code=36066:class=sync-unit:scope=internal:level=h ErrSyncerParseDDL,[code=36067:class=sync-unit:scope=internal:level=high], "Message: parse DDL: %s, Workaround: Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed." ErrSyncerUnsupportedStmt,[code=36068:class=sync-unit:scope=internal:level=high], "Message: `%s` statement not supported in %s mode" ErrSyncerGetEvent,[code=36069:class=sync-unit:scope=upstream:level=high], "Message: get binlog event error: %v, Workaround: Please check if the binlog file could be parsed by `mysqlbinlog`." +ErrSyncerDownstreamTableNotFound,[code=36070:class=sync-unit:scope=internal:level=high], "Message: downstream table %s not found" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium], "Message: nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium], "Message: op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium], "Message: operate request without --sharding specified not valid" diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 5989da6adee..0b1ef910c94 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -540,7 +540,7 @@ func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskCon return st.Update(ctx, cfg) } -// OperateSubTask stop/resume/pause sub task. +// OperateSubTask stop/resume/pause sub task. func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { w.Lock() defer w.Unlock() diff --git a/dm/errors.toml b/dm/errors.toml index 9fd7af01ece..e07679a6174 100644 --- a/dm/errors.toml +++ b/dm/errors.toml @@ -2026,6 +2026,12 @@ description = "" workaround = "Please check if the binlog file could be parsed by `mysqlbinlog`." tags = ["upstream", "high"] +[error.DM-sync-unit-36070] +message = "downstream table %s not found" +description = "" +workaround = "" +tags = ["internal", "high"] + [error.DM-dm-master-38001] message = "nil request not valid" description = "" diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 43e47c1c4aa..4b4e757ee8e 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -20,6 +20,7 @@ import ( "os" "strings" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/pkg/filter" tidbConfig "github.com/pingcap/tidb/config" @@ -35,7 +36,9 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + unistoreConfig "github.com/pingcap/tidb/store/mockstore/unistore/config" "github.com/pingcap/tidb/types" + "go.uber.org/atomic" "go.uber.org/zap" tcontext "github.com/pingcap/tiflow/dm/pkg/context" @@ -60,6 +63,11 @@ var ( } ) +func init() { + unistoreConfig.DefaultConf.Engine.VlogFileSize = 4 * units.MiB + unistoreConfig.DefaultConf.Engine.L1Size = 128 * units.MiB +} + // Tracker is used to track schema locally. type Tracker struct { storePath string @@ -67,6 +75,7 @@ type Tracker struct { dom *domain.Domain se session.Session dsTracker *downstreamTracker + closed atomic.Bool } // downstreamTracker tracks downstream schema. @@ -265,10 +274,7 @@ func (tr *Tracker) GetCreateTable(ctx context.Context, table *filter.Table) (str row := req.GetRow(0) str := row.GetString(1) // the first column is the table name. - // returned as single line. - str = strings.ReplaceAll(str, "\n", "") - str = strings.ReplaceAll(str, " ", " ") - return str, nil + return utils.CreateTableSQLToOneRow(str), nil } // AllSchemas returns all schemas visible to the tracker (excluding system tables). @@ -348,6 +354,12 @@ func (tr *Tracker) Reset() error { // Close close a tracker. func (tr *Tracker) Close() error { + if tr == nil { + return nil + } + if !tr.closed.CAS(false, true) { + return nil + } tr.se.Close() tr.dom.Close() if err := tr.store.Close(); err != nil { diff --git a/dm/pkg/terror/error_list.go b/dm/pkg/terror/error_list.go index 347192f5719..7003df2fb90 100644 --- a/dm/pkg/terror/error_list.go +++ b/dm/pkg/terror/error_list.go @@ -433,6 +433,7 @@ const ( codeSyncerParseDDL codeSyncerUnsupportedStmt codeSyncerGetEvent + codeSyncerDownstreamTableNotFound ) // DM-master error code. @@ -1067,6 +1068,7 @@ var ( ErrSyncerParseDDL = New(codeSyncerParseDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "parse DDL: %s", "Please confirm your DDL statement is correct and needed. For TiDB compatible DDL, see https://docs.pingcap.com/tidb/stable/mysql-compatibility#ddl. You can use `handle-error` command to skip or replace the DDL or add a binlog filter rule to ignore it if the DDL is not needed.") ErrSyncerUnsupportedStmt = New(codeSyncerUnsupportedStmt, ClassSyncUnit, ScopeInternal, LevelHigh, "`%s` statement not supported in %s mode", "") ErrSyncerGetEvent = New(codeSyncerGetEvent, ClassSyncUnit, ScopeUpstream, LevelHigh, "get binlog event error: %v", "Please check if the binlog file could be parsed by `mysqlbinlog`.") + ErrSyncerDownstreamTableNotFound = New(codeSyncerDownstreamTableNotFound, ClassSyncUnit, ScopeInternal, LevelHigh, "downstream table %s not found", "") // DM-master error. ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid", "") diff --git a/dm/pkg/utils/db.go b/dm/pkg/utils/db.go index c582a016823..806b48165d0 100644 --- a/dm/pkg/utils/db.go +++ b/dm/pkg/utils/db.go @@ -632,3 +632,10 @@ func GetTableCreateSQL(ctx context.Context, conn *sql.Conn, tableID string) (sql } return createStr, nil } + +// CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row. +func CreateTableSQLToOneRow(sql string) string { + sql = strings.ReplaceAll(sql, "\n", "") + sql = strings.ReplaceAll(sql, " ", " ") + return sql +} diff --git a/dm/pkg/utils/db_test.go b/dm/pkg/utils/db_test.go index 3687f1eaf82..04b640d2973 100644 --- a/dm/pkg/utils/db_test.go +++ b/dm/pkg/utils/db_test.go @@ -16,6 +16,7 @@ package utils import ( "context" "strconv" + "testing" "time" "github.com/DATA-DOG/go-sqlmock" @@ -25,6 +26,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/require" "github.com/pingcap/tiflow/dm/pkg/gtid" ) @@ -458,3 +460,9 @@ func (t *testDBSuite) TestAddGSetWithPurged(c *C) { c.Assert(originSet, DeepEquals, tc.originGSet) } } + +func TestCreateTableSQLToOneRow(t *testing.T) { + input := "CREATE TABLE `t1` (\n `id` bigint(20) NOT NULL,\n `c1` varchar(20) DEFAULT NULL,\n `c2` varchar(20) DEFAULT NULL,\n PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */\n) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin" + expected := "CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) /*T![clustered_index] NONCLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin" + require.Equal(t, expected, CreateTableSQLToOneRow(input)) +} diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index 3c38a0d58cb..87d6260bc91 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -105,7 +105,7 @@ func (b *binlogPoint) flush() { b.flushedTI = b.ti } -func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (isSchemaChanged bool) { +func (b *binlogPoint) rollback() { b.Lock() defer b.Unlock() @@ -120,18 +120,16 @@ func (b *binlogPoint) rollback(schemaTracker *schema.Tracker, schema string) (is // NOTE: no `Equal` function for `model.TableInfo` exists now, so we compare `pointer` directly, // and after a new DDL applied to the schema, the returned pointer of `model.TableInfo` changed now. - trackedTi, _ := schemaTracker.GetTableInfo(&filter.Table{Schema: schema, Name: b.ti.Name.O}) // ignore the returned error, only compare `trackerTi` is enough. - // may three versions of schema exist: - // - the one tracked in the TiDB-with-mockTiKV. + // there may be three versions of schema: + // - the one tracked in the schema tracker (TiDB-with-unistore). // - the one in the checkpoint but not flushed. // - the one in the checkpoint and flushed. - // if any of them are not equal, then we rollback them: + // schema tracker will be closed after task is paused, and it will load all schemas from checkpoint when task resumes. + // if the later two are not equal, then we rollback them: // - set the one in the checkpoint but not flushed to the one flushed. - // - set the one tracked to the one in the checkpoint by the caller of this method (both flushed and not flushed are the same now) - if isSchemaChanged = (trackedTi != b.ti) || (b.ti != b.flushedTI); isSchemaChanged { + if b.ti != b.flushedTI { b.ti = b.flushedTI } - return } func (b *binlogPoint) outOfDate() bool { @@ -237,6 +235,9 @@ type CheckPoint interface { // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]binlog.Location + // GetTableInfo returns the saved table info from table checkpoint for the given table, return nil when not found + GetTableInfo(schema string, table string) *model.TableInfo + // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location @@ -250,13 +251,16 @@ type CheckPoint interface { GetFlushedTableInfo(table *filter.Table) *model.TableInfo // Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints - Rollback(schemaTracker *schema.Tracker) + Rollback() // String return text of global position String() string // CheckAndUpdate check the checkpoint data consistency and try to fix them if possible CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error + + // LoadIntoSchemaTracker loads table infos of all points into schema tracker. + LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error } // RemoteCheckPoint implements CheckPoint @@ -596,10 +600,16 @@ func (cp *RemoteCheckPoint) FlushPointWithTableInfo(tctx *tcontext.Context, tabl args := make([][]interface{}, 0, 10) point := newBinlogPoint(binlog.NewLocation(cp.cfg.Flavor), binlog.NewLocation(cp.cfg.Flavor), nil, nil, cp.cfg.EnableGTID) - if tablePoints, ok := cp.points[sourceSchema]; ok { - if p, ok2 := tablePoints[sourceTable]; ok2 { - point = p - } + tablePoints, ok := cp.points[sourceSchema] + if !ok { + tablePoints = map[string]*binlogPoint{} + cp.points[sourceSchema] = tablePoints + } + p, ok2 := tablePoints[sourceTable] + if ok2 { + point = p + } else { + tablePoints[sourceTable] = point } tiBytes, err := json.Marshal(ti) @@ -676,6 +686,21 @@ func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]binlog.Location { return tablePoint } +func (cp *RemoteCheckPoint) GetTableInfo(schema string, table string) *model.TableInfo { + cp.RLock() + defer cp.RUnlock() + + tables, ok := cp.points[schema] + if !ok { + return nil + } + tablePoint, ok := tables[table] + if !ok { + return nil + } + return tablePoint.TableInfo() +} + // FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint. func (cp *RemoteCheckPoint) FlushedGlobalPoint() binlog.Location { cp.RLock() @@ -700,10 +725,10 @@ func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { } // Rollback implements CheckPoint.Rollback. -func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { +func (cp *RemoteCheckPoint) Rollback() { cp.RLock() defer cp.RUnlock() - cp.globalPoint.rollback(schemaTracker, "") + cp.globalPoint.rollback() for schemaName, mSchema := range cp.points { for tableName, point := range mSchema { table := &filter.Table{ @@ -712,38 +737,7 @@ func (cp *RemoteCheckPoint) Rollback(schemaTracker *schema.Tracker) { } logger := cp.logCtx.L().WithFields(zap.Stringer("table", table)) logger.Debug("try to rollback checkpoint", log.WrapStringerField("checkpoint", point)) - from := point.MySQLLocation() - if point.rollback(schemaTracker, schemaName) { - logger.Info("rollback checkpoint", zap.Stringer("from", from), zap.Stringer("to", point.FlushedMySQLLocation())) - // schema changed - if err := schemaTracker.DropTable(table); err != nil { - logger.Warn("failed to drop table from schema tracker", log.ShortError(err)) - } - if point.ti != nil { - // TODO: Figure out how to recover from errors. - if err := schemaTracker.CreateSchemaIfNotExists(schemaName); err != nil { - logger.Error("failed to rollback schema on schema tracker: cannot create schema", log.ShortError(err)) - } - if err := schemaTracker.CreateTableIfNotExists(table, point.ti); err != nil { - logger.Error("failed to rollback schema on schema tracker: cannot create table", log.ShortError(err)) - } - } - } - } - } - - // drop any tables in the tracker if no corresponding checkpoint exists. - for _, schema := range schemaTracker.AllSchemas() { - _, ok1 := cp.points[schema.Name.O] - for _, table := range schema.Tables { - var ok2 bool - if ok1 { - _, ok2 = cp.points[schema.Name.O][table.Name.O] - } - if !ok2 { - err := schemaTracker.DropTable(&filter.Table{Schema: schema.Name.O, Name: table.Name.O}) - cp.logCtx.L().Info("drop table in schema tracker because no checkpoint exists", zap.String("schema", schema.Name.O), zap.String("table", table.Name.O), log.ShortError(err)) - } + point.rollback() } } } @@ -899,6 +893,33 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { return terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream) } +// LoadIntoSchemaTracker loads table infos of all points into schema tracker. +func (cp *RemoteCheckPoint) LoadIntoSchemaTracker(ctx context.Context, schemaTracker *schema.Tracker) error { + cp.RLock() + defer cp.RUnlock() + + for cpSchema, mSchema := range cp.points { + err := schemaTracker.CreateSchemaIfNotExists(cpSchema) + if err != nil { + return err + } + for cpTable, point := range mSchema { + // for create database DDL, we'll create a table point with no table name and table info, need to skip. + if point.flushedTI == nil { + continue + } + cp.logCtx.L().Debug("will init table info in schema tracker", + zap.String("database", cpSchema), + zap.String("table", cpTable)) + err := schemaTracker.CreateTableIfNotExists(&filter.Table{Schema: cpSchema, Name: cpTable}, point.flushedTI) + if err != nil { + return err + } + } + } + return nil +} + // CheckAndUpdate check the checkpoint data consistency and try to fix them if possible. func (cp *RemoteCheckPoint) CheckAndUpdate(ctx context.Context, schemas map[string]string, tables map[string]map[string]string) error { cp.Lock() diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index 4564d630bfd..81ddedbbfe4 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -19,7 +19,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/binlog" @@ -201,7 +200,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) // test rollback - cp.Rollback(s.tracker) + cp.Rollback() c.Assert(cp.GlobalPoint().Position, Equals, pos1) c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1) @@ -216,7 +215,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() c.Assert(cp.GlobalPoint().Position, Equals, pos2) c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos2) @@ -354,7 +353,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(older, IsTrue) // rollback, to min - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) @@ -369,7 +368,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, nil, nil, nil) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsTrue) @@ -401,19 +400,12 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(rcp.points[schemaName][tableName].TableInfo(), NotNil) c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() rcp = cp.(*RemoteCheckPoint) c.Assert(rcp.points[schemaName][tableName].TableInfo(), IsNil) c.Assert(rcp.points[schemaName][tableName].flushedTI, IsNil) - _, err = s.tracker.GetTableInfo(table) - c.Assert(strings.Contains(err.Error(), "doesn't exist"), IsTrue) - // test save, flush and rollback to not nil table info - err = s.tracker.Exec(ctx, schemaName, "create table "+tableName+" (c int);") - c.Assert(err, IsNil) - ti, err = s.tracker.GetTableInfo(table) - c.Assert(err, IsNil) cp.SaveTablePoint(table, binlog.Location{Position: pos1}, ti) tiBytes, _ := json.Marshal(ti) s.mock.ExpectBegin() @@ -425,10 +417,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { ti2, err := s.tracker.GetTableInfo(table) c.Assert(err, IsNil) cp.SaveTablePoint(table, binlog.Location{Position: pos2}, ti2) - cp.Rollback(s.tracker) - ti11, err := s.tracker.GetTableInfo(table) - c.Assert(err, IsNil) - c.Assert(ti11.Columns, HasLen, 1) + cp.Rollback() // clear, to min s.mock.ExpectBegin() @@ -456,7 +445,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { s.mock.ExpectCommit() err = cp.FlushPointsExcept(tctx, []*filter.Table{table}, nil, nil) c.Assert(err, IsNil) - cp.Rollback(s.tracker) + cp.Rollback() older = cp.IsOlderThanTablePoint(table, binlog.Location{Position: pos1}, false) c.Assert(older, IsFalse) diff --git a/dm/syncer/dbconn/db.go b/dm/syncer/dbconn/db.go index 379ef4a6de2..8625e71b106 100644 --- a/dm/syncer/dbconn/db.go +++ b/dm/syncer/dbconn/db.go @@ -18,8 +18,11 @@ import ( "strings" "time" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb/errno" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" @@ -190,7 +193,8 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.Cfg.Name).Add(1) return true } - return false + // TODO: move it to above IsRetryableError + return isRetryableError(err) }, } @@ -227,6 +231,16 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun return ret.(int), nil } +func isRetryableError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + return mysqlErr.Number == errno.ErrKeyColumnDoesNotExits +} + // ExecuteSQL does some SQL executions. func (conn *DBConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) { return conn.ExecuteSQLWithIgnore(tctx, nil, queries, args...) diff --git a/dm/syncer/schema.go b/dm/syncer/schema.go index a7d1c2360f5..e4ffbb02432 100644 --- a/dm/syncer/schema.go +++ b/dm/syncer/schema.go @@ -14,21 +14,25 @@ package syncer import ( + "bytes" "context" "encoding/json" "strings" "github.com/pingcap/tidb-tools/pkg/filter" + ddl2 "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/dm/pkg/utils" "go.uber.org/zap" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" - "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -61,12 +65,21 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } return string(tableListJSON), err case pb.SchemaOp_GetSchema: - // we only try to get schema from schema-tracker now. - // in other words, we can not get the schema if any DDL/DML has been replicated, or set a schema previously. - return s.schemaTracker.GetCreateTable(ctx, sourceTable) + // when task is paused, schemaTracker is closed. We get the table structure from checkpoint. + ti := s.checkpoint.GetTableInfo(req.Database, req.Table) + if ti == nil { + s.tctx.L().Info("table schema is not in checkpoint, fetch from downstream", + zap.String("table", sourceTable.String())) + targetTable := s.route(sourceTable) + return utils.GetTableCreateSQL(ctx, s.downstreamTrackConn.BaseConn.DBConn, targetTable.String()) + } + + result := bytes.NewBuffer(make([]byte, 0, 512)) + err2 := executor.ConstructResultOfShowCreateTable(s.sessCtx, ti, autoid.Allocators{}, result) + return utils.CreateTableSQLToOneRow(result.String()), err2 + case pb.SchemaOp_SetSchema: // for set schema, we must ensure it's a valid `CREATE TABLE` statement. - // now, we only set schema for schema-tracker, // if want to update the one in checkpoint, it should wait for the flush of checkpoint. parser2, err := s.fromDB.GetParser(ctx) if err != nil { @@ -92,37 +105,21 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } newSQL := newCreateSQLBuilder.String() - // drop the previous schema first. - err = s.schemaTracker.DropTable(sourceTable) - if err != nil && !schema.IsTableNotExists(err) { - return "", terror.ErrSchemaTrackerCannotDropTable.Delegate(err, sourceTable) - } - err = s.schemaTracker.CreateSchemaIfNotExists(req.Database) - if err != nil { - return "", terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, req.Database) - } - err = s.schemaTracker.Exec(ctx, req.Database, newSQL) - if err != nil { - return "", terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, sourceTable) - } - s.exprFilterGroup.ResetExprs(sourceTable) - if !req.Flush && !req.Sync { - break + if !req.Flush { + s.tctx.L().Info("overwrite --flush to true for operate-schema") } - ti, err := s.schemaTracker.GetTableInfo(sourceTable) - if err != nil { - return "", err + ti, err2 := ddl2.BuildTableInfoFromAST(stmt) + if err2 != nil { + return "", terror.ErrSchemaTrackerRestoreStmtFail.Delegate(err2) } - if req.Flush { - log.L().Info("flush table info", zap.String("table info", newSQL)) - err = s.checkpoint.FlushPointWithTableInfo(tcontext.NewContext(ctx, log.L()), sourceTable, ti) - if err != nil { - return "", err - } + s.tctx.L().Info("flush table info", zap.String("table info", newSQL)) + err = s.checkpoint.FlushPointWithTableInfo(tcontext.NewContext(ctx, log.L()), sourceTable, ti) + if err != nil { + return "", err } if req.Sync { @@ -142,9 +139,9 @@ func (s *Syncer) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaR } case pb.SchemaOp_RemoveSchema: - // we only drop the schema in the schema-tracker now, - // so if we drop the schema and continue to replicate any DDL/DML, it will try to get schema from downstream again. - return "", s.schemaTracker.DropTable(sourceTable) + // as the doc says, `operate-schema remove` will let DM-worker use table structure in checkpoint, which does not + // need further actions. + return "", nil } return "", nil } diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index e3ecf9f2ffc..16520d897cd 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -321,11 +321,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "close-DBs", Fn: s.closeDBs}) - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) - if err != nil { - return terror.ErrSchemaTrackerInit.Delegate(err) - } - s.streamerController = NewStreamerController(s.notifier, s.syncCfg, s.cfg.EnableGTID, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList) @@ -675,7 +670,7 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { // try to rollback checkpoints, if they already flushed, no effect prePos := s.checkpoint.GlobalPoint() - s.checkpoint.Rollback(s.schemaTracker) + s.checkpoint.Rollback() currPos := s.checkpoint.GlobalPoint() if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) @@ -1346,6 +1341,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { flushCheckpoint bool delLoadTask bool cleanDumpFile = s.cfg.CleanDumpFile + freshAndAllMode bool ) flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) if err != nil { @@ -1354,15 +1350,24 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if s.cfg.Mode == config.ModeAll && fresh { delLoadTask = true flushCheckpoint = true - err = s.loadTableStructureFromDump(ctx) - if err != nil { - tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) - cleanDumpFile = false - } - } else { + freshAndAllMode = true + } + if s.cfg.Mode == config.ModeIncrement || !fresh { cleanDumpFile = false } + // some prepare work before the binlog event loop: + // 1. first we flush checkpoint as needed, so in next resume we won't go to Load unit. + // 2. then since we are confident that Load unit is done we can delete the load task etcd KV. + // TODO: we can't handle panic between 1. and 2., or fail to delete the load task etcd KV. + // 3. then we initiate schema tracker + // 4. - when it's a fresh task, load the table structure from dump files into schema tracker. + // if it's also a optimistic sharding task, also load the table structure into checkpoints because shard tables + // may not have same table structure so we can't fetch the downstream table structure for them lazily. + // - when it's a resumed task, load the table structure from checkpoints into schema tracker. + // TODO: we can't handle failure between 1. and 4. After 1. it's not a fresh task. + // 5. finally clean the dump files + if flushCheckpoint { if err = s.flushCheckPoints(); err != nil { tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) @@ -1374,6 +1379,24 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tctx.L().Warn("error when del load task in etcd", zap.Error(err)) } } + + s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn) + if err != nil { + return terror.ErrSchemaTrackerInit.Delegate(err) + } + + if freshAndAllMode { + err = s.loadTableStructureFromDump(ctx) + if err != nil { + tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) + cleanDumpFile = false + } + } else { + err = s.checkpoint.LoadIntoSchemaTracker(ctx, s.schemaTracker) + if err != nil { + return err + } + } if cleanDumpFile { tctx.L().Info("try to remove all dump files") if err = os.RemoveAll(s.cfg.Dir); err != nil { @@ -3125,6 +3148,9 @@ func (s *Syncer) Pause() { return } s.stopSync() + if err := s.schemaTracker.Close(); err != nil { + s.tctx.L().Error("fail to close schema tracker", log.ShortError(err)) + } } // Resume resumes the paused process. diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index b924a935546..7209d5302ca 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -770,6 +770,10 @@ func (s *testSyncerSuite) TestRun(c *C) { s.cfg.Batch = 1000 s.cfg.WorkerCount = 2 s.cfg.MaxRetry = 1 + s.cfg.To.Session = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } cfg, err := s.cfg.Clone() c.Assert(err, IsNil) @@ -792,10 +796,6 @@ func (s *testSyncerSuite) TestRun(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) - s.mockGetServerUnixTS(mock) - mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( - sqlmock.NewRows([]string{"Table", "Create Table"}). - AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) syncer.exprFilterGroup = NewExprFilterGroup(utils.NewSessionCtx(nil), nil) c.Assert(err, IsNil) @@ -947,6 +947,14 @@ func (s *testSyncerSuite) TestRun(c *C) { mockDBProvider := conn.InitMockDB(c) mockDBProvider.ExpectQuery("SELECT cast\\(TIMEDIFF\\(NOW\\(6\\), UTC_TIMESTAMP\\(6\\)\\) as time\\);"). WillReturnRows(sqlmock.NewRows([]string{""}).AddRow("01:00:00")) + s.mockGetServerUnixTS(mock) + mock.ExpectBegin() + mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", pmysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectCommit() + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) + c.Assert(syncer.Update(context.Background(), s.cfg), IsNil) c.Assert(syncer.timezone.String(), Equals, "+01:00") @@ -1032,6 +1040,10 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { {Schema: "test_1", Name: "t_1"}, }, } + s.cfg.To.Session = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } cfg, err := s.cfg.Clone() c.Assert(err, IsNil) @@ -1110,6 +1122,13 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { // disable 1-minute safe mode c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds", "return(0)"), IsNil) go syncer.Process(ctx, resultCh) + go func() { + for r := range resultCh { + if len(r.Errors) > 0 { + c.Fatal(r.String()) + } + } + }() expectJobs := []*expectJob{ // now every ddl job will start with a flush job diff --git a/dm/tests/_utils/check_process_exit b/dm/tests/_utils/check_process_exit new file mode 100755 index 00000000000..f2f9aa932e6 --- /dev/null +++ b/dm/tests/_utils/check_process_exit @@ -0,0 +1,22 @@ +#!/bin/bash +# parameter 1: name +# parameter 2: retry count + +name=$1 +retry=$2 + +i=0 +while [ $i -lt $retry ]; do + ret=$(ps aux | grep $name | sed '/.*grep.*/d' | sed '/.*check_process_exit.*/d' | wc -l) + if [ $ret -lt 1 ]; then + break + fi + ((i++)) + echo "wait for process $name exits the $i-th time" + sleep 1 +done + +if [ $i -ge $retry ]; then + echo "process $name exits timeout" + exit 1 +fi diff --git a/dm/tests/_utils/check_sync_diff b/dm/tests/_utils/check_sync_diff index 2b7bcd17bdf..632eda279fe 100755 --- a/dm/tests/_utils/check_sync_diff +++ b/dm/tests/_utils/check_sync_diff @@ -9,7 +9,7 @@ conf=$2 if [ $# -ge 3 ]; then check_time=$3 else - check_time=10 + check_time=20 fi PWD=$(pwd) diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index ff24c716c5c..a91e1755372 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -307,15 +307,18 @@ function check_log_contain_with_retry() { function init_cluster(){ run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 } diff --git a/dm/tests/dmctl_basic/conf/dm-task.yaml b/dm/tests/dmctl_basic/conf/dm-task.yaml index ed9f54cfeaf..b9ea640e7f1 100644 --- a/dm/tests/dmctl_basic/conf/dm-task.yaml +++ b/dm/tests/dmctl_basic/conf/dm-task.yaml @@ -39,6 +39,8 @@ block-allow-list: do-tables: - db-name: "dmctl" tbl-name: "~^t_[\\d]+" + - db-name: "dmctl" + tbl-name: "flush_trigger" routes: sharding-route-rules-table: diff --git a/dm/tests/dmctl_basic/conf/get_task.yaml b/dm/tests/dmctl_basic/conf/get_task.yaml index e26a00dae17..c7f79655a9b 100644 --- a/dm/tests/dmctl_basic/conf/get_task.yaml +++ b/dm/tests/dmctl_basic/conf/get_task.yaml @@ -123,6 +123,8 @@ block-allow-list: do-tables: - db-name: dmctl tbl-name: ~^t_[\d]+ + - db-name: dmctl + tbl-name: flush_trigger do-dbs: - dmctl ignore-tables: [] diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 25b56a061f8..22c361f3074 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -379,12 +379,12 @@ function run() { # make sure every shard table in source 1 has be forwarded to newer binlog, so older relay log could be purged run_sql_source1 "flush logs" + run_sql_source1 "create table dmctl.flush_trigger (c int primary key);" run_sql_source1 "update dmctl.t_1 set d = '' where id = 13" run_sql_source1 "update dmctl.t_2 set d = '' where id = 12" - # sleep 2*1s to ensure syncer unit has flushed global checkpoint and updates - # updated ActiveRelayLog - sleep 2 + # sleep to ensure syncer unit has resumed, read next binlog files and updated ActiveRelayLog + sleep 5 server_uuid=$(tail -n 1 $WORK_DIR/worker1/relay_log/server-uuid.index) run_sql_source1 "show binary logs\G" max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE" | tail -n 1 | awk -F":" '{print $NF}') diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 1b33c27051a..c5662e6ae89 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -19,11 +19,13 @@ function complex_behaviour() { run_sql_file $cur/data/db1.prepare2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - # test about schema-tracker can't create its storage + # test no permission chmod -w $WORK_DIR/worker1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task $cur/conf/dm-task2.yaml" \ - "failed to create schema tracker" 1 \ + "start-task $cur/conf/dm-task2.yaml" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "dumpling runs with error" 1 \ "permission denied" 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "stop-task test" diff --git a/dm/tests/handle_error_3/run.sh b/dm/tests/handle_error_3/run.sh index f0726acd0da..4cb30f7c34e 100644 --- a/dm/tests/handle_error_3/run.sh +++ b/dm/tests/handle_error_3/run.sh @@ -187,6 +187,10 @@ function DM_4193_CASE() { "binlog revert test -s $source2 -b $first_name2:$second_pos2" \ "operator not exist" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"stage": "Paused"' 2 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "binlog skip test" \ "\"result\": true" 3 diff --git a/dm/tests/many_tables/conf/source1.yaml b/dm/tests/many_tables/conf/source1.yaml index 0018c1b61e3..04cc575da29 100644 --- a/dm/tests/many_tables/conf/source1.yaml +++ b/dm/tests/many_tables/conf/source1.yaml @@ -10,4 +10,5 @@ from: password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3306 checker: - check-enable: false + check-enable: true + backoff-max: 1s diff --git a/dm/tests/many_tables/run.sh b/dm/tests/many_tables/run.sh index 4291c34c624..1ec0154ad7d 100644 --- a/dm/tests/many_tables/run.sh +++ b/dm/tests/many_tables/run.sh @@ -27,6 +27,13 @@ function incremental_data() { done } +function incremental_data_2() { + j=6 + for i in $(seq $TABLE_NUM); do + run_sql "INSERT INTO many_tables_db.t$i VALUES ($j,${j}000$j);" $MYSQL_PORT1 $MYSQL_PASSWORD1 + done +} + function run() { echo "start prepare_data" prepare_data @@ -69,8 +76,36 @@ function run() { echo "start incremental_data" incremental_data echo "finish incremental_data" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # test https://github.com/pingcap/tiflow/issues/5344 + kill_dm_worker + # let some binlog event save table checkpoint before meet downstream error + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_sql_source1 "CREATE TABLE many_tables_db.flush (c INT PRIMARY KEY);" + sleep 5 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + # now worker will process some binlog events, save table checkpoint and meet downstream error + incremental_data_2 + sleep 30 + + resume_num=$(grep 'unit process error' $WORK_DIR/worker1/log/dm-worker.log | wc -l) + echo "resume_num: $resume_num" + # because we check auto resume every 5 seconds... + [ $resume_num -ge 4 ] + folder_size=$(du -d0 $WORK_DIR/worker1/ --exclude="$WORK_DIR/worker1/log" | cut -f1) + echo "folder_size: $folder_size" + # less than 10M + [ $folder_size -lt 10000 ] + + export GO_FAILPOINTS='' } cleanup_data many_tables_db diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 42f0fda61ce..f4162ba1db4 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -245,6 +245,8 @@ function run() { check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + # wait syncer begin to sync so it has deleted load task etcd KV. + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml # relay task tranfer to worker1 with no error. check_metric $WORKER1_PORT "dm_relay_data_corruption" 3 -1 1 diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index ac763a8aa36..04824830cb4 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -239,17 +239,7 @@ def operate_schema_and_table_success(task_name, source_name, schema_name, table_ assert create_table["schema_name"] == schema_name assert table_name in create_table["schema_create_sql"] - # delete table - resp = requests.delete(url=single_table_url) - assert resp.status_code == 204 - - # after delete, no table in schema - resp = requests.get(url=table_url) - assert resp.status_code == 200 - print("get_task_schema_success table resp=", resp.json()) - assert len(resp.json()) == 0 - - # add table back again + # overwrite table set_table_data = { "sql_content": "CREATE TABLE openapi.t1(i TINYINT, j INT UNIQUE KEY);", "flush": True, diff --git a/dm/tests/relay_interrupt/run.sh b/dm/tests/relay_interrupt/run.sh index 4ba58be24a9..8ffa6de84c6 100644 --- a/dm/tests/relay_interrupt/run.sh +++ b/dm/tests/relay_interrupt/run.sh @@ -94,8 +94,8 @@ function run() { "start-task $task_conf" \ "\"result\": false" 1 \ "subtasks with name test for sources \[mysql-replica-01\] already exist" 1 - - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + # wait relay unit up + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ "\"binlogType\": \"local\"" 1 diff --git a/dm/tests/sequence_sharding_optimistic/run.sh b/dm/tests/sequence_sharding_optimistic/run.sh index d414646e899..b6cc06ccbe6 100755 --- a/dm/tests/sequence_sharding_optimistic/run.sh +++ b/dm/tests/sequence_sharding_optimistic/run.sh @@ -71,8 +71,9 @@ run() { "\"stage\": \"Paused\"" 2 # try to get schema for the table, but can't get because no DDL/DML replicated yet. - curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log - check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1 + # uncomment it after #5824 + # curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log + # check_log_contains ${WORK_DIR}/get_schema.log 'CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c1` varchar(20) DEFAULT NULL, `c2` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`) .*) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' 1 # resume task manually. run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ @@ -164,17 +165,13 @@ run() { # drop the schema. curl -X PUT ${API_URL} -d '{"op":3, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/remove_schema.log - # try to get schema again, but can't get. - curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log - check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1 - # try to set an invalid schema. curl -X PUT ${API_URL} -d '{"op":2, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1", "schema":"invalid create table statement"}' >${WORK_DIR}/get_schema.log >${WORK_DIR}/set_schema.log check_log_contains ${WORK_DIR}/set_schema.log 'is not a valid `CREATE TABLE` statement' 1 - # try to get schema again, no one exist. + # try to get schema again, still the old schema. curl -X PUT ${API_URL} -d '{"op":1, "task":"sequence_sharding_optimistic", "sources": ["mysql-replica-01"], "database":"sharding_seq_opt", "table":"t1"}' >${WORK_DIR}/get_schema.log - check_log_contains ${WORK_DIR}/get_schema.log "Table 'sharding_seq_opt.t1' doesn't exist" 1 + check_log_contains ${WORK_DIR}/get_schema.log 'CREATE TABLE `t1` ( `id` bigint(20) NOT NULL, `c2` varchar(20) DEFAULT NULL, `c3` int(11) DEFAULT NULL, PRIMARY KEY (`id`) .*) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin' 1 run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "binlog-schema -s mysql-replica-01,mysql-replica-02 sequence_sharding_optimistic sharding_seq_opt t2" \ diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 290e573a54c..9e6bee3d682 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -594,8 +594,8 @@ function DM_COMPACT_CASE() { function DM_COMPACT() { # mock downstream has a high latency and upstream has a high workload ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -637,8 +637,8 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { # downstream pk/uk/column is diffrent with upstream, compact use downstream schema. ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20, if false, it will be panic. # This goal is check whether it use downstream schema in compator. # if use downstream schema, key will be 'b' with value less than 20. @@ -716,8 +716,8 @@ function DM_MULTIPLE_ROWS_CASE() { function DM_MULTIPLE_ROWS() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -729,8 +729,8 @@ function DM_MULTIPLE_ROWS() { "clean_table" "" ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml @@ -796,8 +796,8 @@ function DM_DML_EXECUTE_ERROR_CASE() { function DM_DML_EXECUTE_ERROR() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 + check_process_exit worker1 20 + check_process_exit worker2 20 export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/ErrorOnLastDML=return()' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml diff --git a/dm/tests/shardddl4/run.sh b/dm/tests/shardddl4/run.sh index 0bd6426a434..9ff6582f08f 100644 --- a/dm/tests/shardddl4/run.sh +++ b/dm/tests/shardddl4/run.sh @@ -422,6 +422,11 @@ function DM_130_CASE() { run_sql_source2 "insert into ${shardddl1}.${tb1} values(5,5);" run_sql_source2 "insert into ${shardddl1}.${tb2} values(6,6);" + if [[ "$1" = "optimistic" ]]; then + check_log_contain_with_retry "finish to handle ddls in optimistic shard mode.*alter table ${shardddl1}.${tb1} modify b int default 0" \ + $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + fi + run_sql_source2 "alter table ${shardddl1}.${tb1} modify b int default -1;" run_sql_source1 "insert into ${shardddl1}.${tb1}(a) values(7);" run_sql_source2 "insert into ${shardddl1}.${tb1}(a) values(8);" diff --git a/dm/tests/start_task/run.sh b/dm/tests/start_task/run.sh index 90e51cb24f7..2e79fc57933 100644 --- a/dm/tests/start_task/run.sh +++ b/dm/tests/start_task/run.sh @@ -55,8 +55,8 @@ function lazy_init_tracker() { done check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 20 - check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t50' 1 - check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t51' + check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'init table info.*t50' 1 + check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'init table info.*t51' cleanup_data start_task cleanup_process