From 1fb41f91bb048860c924b4752dee4f22137635d2 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Mon, 14 Feb 2022 19:13:38 +0800 Subject: [PATCH] checker(dm): support optimistic checker (#4329) close pingcap/tiflow#4328 --- dm/checker/check_test.go | 63 ++++++++-- dm/checker/checker.go | 52 ++++++-- dm/dm/worker/subtask.go | 4 +- dm/pkg/checker/table_structure.go | 127 ++++++++++++++++++++ dm/pkg/checker/table_structure_test.go | 45 +++++++ dm/tests/dmctl_basic/conf/diff_config2.toml | 50 ++++++++ dm/tests/dmctl_basic/conf/dm-task4.yaml | 63 ++++++++++ dm/tests/dmctl_basic/data/db1.prepare.sql | 3 + dm/tests/dmctl_basic/run.sh | 3 + go.mod | 1 + go.sum | 3 +- 11 files changed, 395 insertions(+), 19 deletions(-) create mode 100644 dm/tests/dmctl_basic/conf/diff_config2.toml create mode 100644 dm/tests/dmctl_basic/conf/dm-task4.yaml diff --git a/dm/checker/check_test.go b/dm/checker/check_test.go index 45668d2354a..58f8ae372a8 100644 --- a/dm/checker/check_test.go +++ b/dm/checker/check_test.go @@ -19,11 +19,14 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" - + gmysql "github.com/go-sql-driver/mysql" router "github.com/pingcap/tidb-tools/pkg/table-router" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/ctl/common" "github.com/pingcap/tiflow/dm/pkg/conn" + "github.com/pingcap/tiflow/dm/pkg/cputil" tc "github.com/pingcap/check" ) @@ -37,9 +40,11 @@ type testCheckerSuite struct{} var _ = tc.Suite(&testCheckerSuite{}) var ( - schema = "db_1" - tb1 = "t_1" - tb2 = "t_2" + schema = "db_1" + tb1 = "t_1" + tb2 = "t_2" + metaSchema = "dm_meta" + taskName = "test" ) func ignoreExcept(itemMap map[string]struct{}) []string { @@ -287,6 +292,9 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) { func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { cfgs := []*config.SubTaskConfig{ { + MetaSchema: metaSchema, + Name: taskName, + ShardMode: config.ShardPessimistic, RouteRules: []*router.TableRule{ { SchemaPattern: schema, @@ -304,11 +312,16 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { b int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1` createTable2 := `CREATE TABLE %s ( - id int(11) DEFAULT NULL, - c int(11) DEFAULT NULL - ) ENGINE=InnoDB DEFAULT CHARSET=latin1` - + id int(11) DEFAULT NULL, + c int(11) DEFAULT NULL + ) ENGINE=InnoDB DEFAULT CHARSET=latin1` + errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable} + createTableSQL := "SHOW CREATE TABLE `%s`.`%s`" + // test different column definition mock := initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2")) @@ -320,7 +333,11 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { c.Assert(err, tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*") c.Assert(len(msg), tc.Equals, 0) + // test success check mock = initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2")) @@ -331,11 +348,24 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) { msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) c.Assert(err, tc.IsNil) c.Assert(msg, tc.Equals, CheckTaskSuccess) + + // test exist checkpoint + mock = initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnRows(sqlmock. + NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LoaderCheckpoint(taskName), "")) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnRows(sqlmock. + NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LightningCheckpoint(taskName), "")) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnRows(sqlmock. + NewRows([]string{"Table", "Create Table"}).AddRow(cputil.SyncerCheckpoint(taskName), "")) + msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt) + c.Assert(msg, tc.Equals, CheckTaskSuccess) + c.Assert(err, tc.IsNil) } func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { cfgs := []*config.SubTaskConfig{ { + ShardMode: config.ShardPessimistic, RouteRules: []*router.TableRule{ { SchemaPattern: schema, @@ -362,7 +392,12 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { UNIQUE KEY u_b(b) ) ENGINE=InnoDB DEFAULT CHARSET=latin1` + errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable} + createTableSQL := "SHOW CREATE TABLE `%s`.`%s`" mock := initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2")) @@ -375,6 +410,9 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) { c.Assert(err, tc.IsNil) mock = initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1))) mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2")) @@ -413,8 +451,15 @@ func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) { PRIMARY KEY (id), UNIQUE KEY u_b(b) ) ENGINE=InnoDB DEFAULT CHARSET=latin1` - + errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable} + createTableSQL := "SHOW CREATE TABLE `%s`.`%s`" mock := initMockDB(c) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable) + mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable) mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "")) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1))) mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2))) diff --git a/dm/checker/checker.go b/dm/checker/checker.go index 0ae60fea15a..dc688ca89ba 100644 --- a/dm/checker/checker.go +++ b/dm/checker/checker.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/checker" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/cputil" "github.com/pingcap/tiflow/dm/pkg/dumpling" fr "github.com/pingcap/tiflow/dm/pkg/func-rollback" "github.com/pingcap/tiflow/dm/pkg/log" @@ -44,6 +45,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" "github.com/pingcap/tidb/dumpling/export" + "github.com/pingcap/tidb/parser/mysql" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -243,13 +245,25 @@ func (c *Checker) Init(ctx context.Context) (err error) { c.checkList = append(c.checkList, checker.NewTablesChecker(dbs, checkTablesMap, dumpThreads)) } - if checkingShard { - for name, shardingSet := range sharding { - if shardingCounter[name] <= 1 { - continue + instance := c.instances[0] + // Not check the sharding tables’ schema when the mode is increment. + // Because the table schema obtained from `show create table` is not the schema at the point of binlog. + if checkingShard && instance.cfg.Mode != config.ModeIncrement { + isFresh, err := c.IsFreshTask() + if err != nil { + return err + } + if isFresh { + for targetTableID, shardingSet := range sharding { + if shardingCounter[targetTableID] <= 1 { + continue + } + if instance.cfg.ShardMode == config.ShardPessimistic { + c.checkList = append(c.checkList, checker.NewShardingTablesChecker(targetTableID, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads)) + } else { + c.checkList = append(c.checkList, checker.NewOptimisticShardingTablesChecker(targetTableID, dbs, shardingSet, dumpThreads)) + } } - - c.checkList = append(c.checkList, checker.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads)) } } @@ -448,7 +462,31 @@ func (c *Checker) Type() pb.UnitType { // IsFreshTask implements Unit.IsFreshTask. func (c *Checker) IsFreshTask() (bool, error) { - return true, nil + instance := c.instances[0] + checkpointSQLs := []string{ + fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LoaderCheckpoint(instance.cfg.Name))), + fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LightningCheckpoint(instance.cfg.Name))), + fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.SyncerCheckpoint(instance.cfg.Name))), + } + var existCheckpoint bool + for _, sql := range checkpointSQLs { + c.tctx.Logger.Info("exec query", zap.String("sql", sql)) + rows, err := instance.targetDB.DB.QueryContext(c.tctx.Ctx, sql) + if err != nil { + if utils.IsMySQLError(err, mysql.ErrNoSuchTable) { + continue + } + return false, err + } + defer rows.Close() + if rows.Err() != nil { + return false, rows.Err() + } + existCheckpoint = true + c.tctx.Logger.Info("exist checkpoint, so don't check sharding tables") + break + } + return !existCheckpoint, nil } // Status implements Unit interface. diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index 078902ad978..84621226eb5 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -382,11 +382,11 @@ func (st *SubTask) fetchResultAndUpdateStage(pr chan pb.ProcessResult) { } // setCurrUnit set current dm unit to ut. -func (st *SubTask) setCurrUnit(ut unit.Unit) { +func (st *SubTask) setCurrUnit(cu unit.Unit) { st.Lock() defer st.Unlock() pu := st.currUnit - st.currUnit = ut + st.currUnit = cu st.prevUnit = pu } diff --git a/dm/pkg/checker/table_structure.go b/dm/pkg/checker/table_structure.go index e6eb1363ff2..1bc18de99d8 100644 --- a/dm/pkg/checker/table_structure.go +++ b/dm/pkg/checker/table_structure.go @@ -30,6 +30,7 @@ import ( column "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb-tools/pkg/schemacmp" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/charset" @@ -564,6 +565,132 @@ func (c *ShardingTablesChecker) Name() string { return fmt.Sprintf("sharding table %s consistency checking", c.targetTableID) } +// OptimisticShardingTablesChecker checks consistency of table structures of one sharding group in optimistic shard. +// * check whether they have compatible column list. +type OptimisticShardingTablesChecker struct { + targetTableID string + dbs map[string]*sql.DB + tableMap map[string][]*filter.Table // sourceID => [table1, table2, ...] + reMu sync.Mutex + joinedMu sync.Mutex + inCh chan *checkItem + dumpThreads int + joined *schemacmp.Table +} + +// NewOptimisticShardingTablesChecker returns a RealChecker. +func NewOptimisticShardingTablesChecker(targetTableID string, dbs map[string]*sql.DB, tableMap map[string][]*filter.Table, dumpThreads int) RealChecker { + if dumpThreads == 0 { + dumpThreads = 1 + } + c := &OptimisticShardingTablesChecker{ + targetTableID: targetTableID, + dbs: dbs, + tableMap: tableMap, + dumpThreads: dumpThreads, + } + c.inCh = make(chan *checkItem, dumpThreads) + return c +} + +// Name implements Checker interface. +func (c *OptimisticShardingTablesChecker) Name() string { + return fmt.Sprintf("optimistic sharding table %s consistency checking", c.targetTableID) +} + +// Check implements RealChecker interface. +func (c *OptimisticShardingTablesChecker) Check(ctx context.Context) *Result { + r := &Result{ + Name: c.Name(), + Desc: "check consistency of sharding table structures for Optimistic Sharding Merge", + State: StateSuccess, + Extra: fmt.Sprintf("sharding %s", c.targetTableID), + } + + startTime := time.Now() + concurrency, err := getConcurrency(ctx, c.tableMap, c.dbs, c.dumpThreads) + if err != nil { + markCheckError(r, err) + return r + } + eg, checkCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + return c.checkTable(checkCtx, r) + }) + } + + dispatchTableItem(checkCtx, c.tableMap, c.inCh) + if err := eg.Wait(); err != nil { + markCheckError(r, err) + } + + log.L().Logger.Info("check optimistic sharding table structure over", zap.Duration("spend time", time.Since(startTime))) + return r +} + +func (c *OptimisticShardingTablesChecker) checkTable(ctx context.Context, r *Result) error { + var ( + sourceID string + p *parser.Parser + err error + ) + for { + select { + case <-ctx.Done(): + return nil + case checkItem, ok := <-c.inCh: + if !ok { + return nil + } + table := checkItem.table + if len(sourceID) == 0 || sourceID != checkItem.sourceID { + sourceID = checkItem.sourceID + p, err = dbutil.GetParserForDB(ctx, c.dbs[sourceID]) + if err != nil { + c.reMu.Lock() + r.Extra = fmt.Sprintf("fail to get parser for sourceID %s on sharding %s", sourceID, c.targetTableID) + c.reMu.Unlock() + return err + } + } + + statement, err := dbutil.GetCreateTableSQL(ctx, c.dbs[sourceID], table.Schema, table.Name) + if err != nil { + // continue if table was deleted when checking + if isMySQLError(err, mysql.ErrNoSuchTable) { + continue + } + return err + } + + ti, err := dbutil.GetTableInfoBySQL(statement, p) + if err != nil { + return err + } + encodeTi := schemacmp.Encode(ti) + c.joinedMu.Lock() + log.L().Logger.Debug("get schemacmp", zap.Stringer("ti", encodeTi), zap.Stringer("joined", c.joined), zap.Bool("pk is handle", ti.PKIsHandle)) + if c.joined == nil { + c.joined = &encodeTi + c.joinedMu.Unlock() + continue + } + newJoined, err2 := c.joined.Join(encodeTi) + if err2 != nil { + // NOTE: conflict detected. + c.reMu.Lock() + r.Extra = fmt.Sprintf("fail to join table info %s with %s", c.joined, encodeTi) + c.reMu.Unlock() + c.joinedMu.Unlock() + return err2 + } + c.joined = &newJoined + c.joinedMu.Unlock() + } + } +} + func dispatchTableItem(ctx context.Context, tableMap map[string][]*filter.Table, inCh chan *checkItem) { for sourceID, tables := range tableMap { for _, table := range tables { diff --git a/dm/pkg/checker/table_structure_test.go b/dm/pkg/checker/table_structure_test.go index bb67ab1a6a8..cc68f232421 100644 --- a/dm/pkg/checker/table_structure_test.go +++ b/dm/pkg/checker/table_structure_test.go @@ -16,6 +16,8 @@ package checker import ( "context" "database/sql" + "encoding/json" + "fmt" "github.com/DATA-DOG/go-sqlmock" tc "github.com/pingcap/check" @@ -177,6 +179,49 @@ func (t *testCheckSuite) TestTablesChecker(c *tc.C) { c.Assert(mock.ExpectationsWereMet(), tc.IsNil) } +func (t *testCheckSuite) TestOptimisticShardingTablesChecker(c *tc.C) { + db, mock, err := sqlmock.New() + c.Assert(err, tc.IsNil) + ctx := context.Background() + + printJSON := func(r *Result) { + rawResult, _ := json.MarshalIndent(r, "", "\t") + fmt.Println("\n" + string(rawResult)) + } + + // optimistic check different column number + maxConnecionsRow := sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("max_connections", "2") + mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(maxConnecionsRow) + sqlModeRow := sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ANSI_QUOTES") + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlModeRow) + createTableRow := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-1", `CREATE TABLE "test-table-1" ( +"c" int(11) NOT NULL, +PRIMARY KEY ("c") +) ENGINE=InnoDB DEFAULT CHARSET=latin1`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-1`").WillReturnRows(createTableRow) + createTableRow2 := sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("test-table-2", `CREATE TABLE "test-table-2" ( +"c" int(11) NOT NULL, +"d" int(11) NOT NULL, +PRIMARY KEY ("c") +) ENGINE=InnoDB DEFAULT CHARSET=latin1`) + mock.ExpectQuery("SHOW CREATE TABLE `test-db`.`test-table-2`").WillReturnRows(createTableRow2) + checker := NewOptimisticShardingTablesChecker("test-name", + map[string]*sql.DB{"test-source": db}, + map[string][]*filter.Table{"test-source": { + &filter.Table{Schema: "test-db", Name: "test-table-1"}, + &filter.Table{Schema: "test-db", Name: "test-table-2"}, + }}, + 0) + result := checker.Check(ctx) + printJSON(result) + c.Assert(result.State, tc.Equals, StateSuccess) + c.Assert(mock.ExpectationsWereMet(), tc.IsNil) +} + func initShardingMock(mock sqlmock.Sqlmock) sqlmock.Sqlmock { sqlModeRow := sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ANSI_QUOTES") diff --git a/dm/tests/dmctl_basic/conf/diff_config2.toml b/dm/tests/dmctl_basic/conf/diff_config2.toml new file mode 100644 index 00000000000..6b523b50678 --- /dev/null +++ b/dm/tests/dmctl_basic/conf/diff_config2.toml @@ -0,0 +1,50 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/ticdc_dm_test/output" + + source-instances = ["mysql1", "mysql2"] + + target-instance = "tidb0" + + target-check-tables = ["dmctl.precheck_optimistic_tb"] + + target-configs= ["config1"] + +[table-configs] +[table-configs.config1] +target-tables = ["dmctl.precheck_optimistic_tb"] + +[routes.rule1] +schema-pattern = "dmctl" +table-pattern = "precheck_optimistic_tb_?*" +target-schema = "dmctl" +target-table = "precheck_optimistic_tb" + + +[data-sources] +[data-sources.mysql1] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +route-rules = ["rule1"] + +[data-sources.mysql2] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" +route-rules = ["rule1"] + +[data-sources.tidb0] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/dm/tests/dmctl_basic/conf/dm-task4.yaml b/dm/tests/dmctl_basic/conf/dm-task4.yaml new file mode 100644 index 00000000000..ae9355ea391 --- /dev/null +++ b/dm/tests/dmctl_basic/conf/dm-task4.yaml @@ -0,0 +1,63 @@ +--- +name: pre_check_optimistic +task-mode: all +shard-mode: "optimistic" +meta-schema: "dm_meta" +enable-heartbeat: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + route-rules: ["sharding-route-rules-table", "sharding-route-rules-schema"] + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["dmctl"] + do-tables: + - db-name: "dmctl" + tbl-name: "~^precheck_optimistic_tb_[\\d]+" + +routes: + sharding-route-rules-table: + schema-pattern: dmctl + table-pattern: precheck_optimistic_tb_* + target-schema: dmctl + target-table: precheck_optimistic_tb + + sharding-route-rules-schema: + schema-pattern: dmctl + target-schema: dmctl + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + checkpoint-flush-interval: 1 diff --git a/dm/tests/dmctl_basic/data/db1.prepare.sql b/dm/tests/dmctl_basic/data/db1.prepare.sql index 289b61d977d..54b469d4aee 100644 --- a/dm/tests/dmctl_basic/data/db1.prepare.sql +++ b/dm/tests/dmctl_basic/data/db1.prepare.sql @@ -26,4 +26,7 @@ INSERT INTO `dmctl`.`t_1` (`b`,`c`,`d`,`id`) VALUES (800180420,'JuUIxUacksp','sX create table tb_1(a INT, b INT); create table tb_2(a INT, c INT); +create table precheck_optimistic_tb_1(a INT, b INT, primary key a(a)); +create table precheck_optimistic_tb_2(a INT, c INT, primary key a(a)); + CREATE TABLE only_warning (id bigint, b int, primary key id(id), FOREIGN KEY (b) REFERENCES t_1(b)); \ No newline at end of file diff --git a/dm/tests/dmctl_basic/run.sh b/dm/tests/dmctl_basic/run.sh index 858713ee737..5437d4f6aa5 100755 --- a/dm/tests/dmctl_basic/run.sh +++ b/dm/tests/dmctl_basic/run.sh @@ -283,6 +283,9 @@ function run() { check_task_not_pass $cur/conf/dm-task2.yaml check_task_error_count $cur/conf/dm-task3.yaml + echo "check_task_optimistic" + check_task_pass $cur/conf/dm-task4.yaml + echo "check_task_only_warning" check_task_only_warning $cur/conf/only_warning.yaml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ diff --git a/go.mod b/go.mod index c713e718dac..64416e4701d 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/uber-go/atomic v1.4.0 github.com/vmihailenco/msgpack/v5 v5.3.5 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c + github.com/xitongsys/parquet-go v1.6.0 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9 go.uber.org/atomic v1.9.0 go.uber.org/goleak v1.1.12 diff --git a/go.sum b/go.sum index 1f031ae2e48..232d25cd6d2 100644 --- a/go.sum +++ b/go.sum @@ -1109,8 +1109,9 @@ github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx0K/GyB0o2bww= -github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 h1:tBbuFCtyJNKT+BFAv6qjvTFpVdy97IYNaBwGUXifIUs= github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8= +github.com/xitongsys/parquet-go v1.6.0 h1:j6YrTVZdQx5yywJLIOklZcKVsCoSD1tqOVRXyTBFSjs= +github.com/xitongsys/parquet-go v1.6.0/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8= github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA= github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k= github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE=