Skip to content

Commit

Permalink
checker(dm): support optimistic checker (pingcap#4329)
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang authored and zhaoxinyu committed Feb 16, 2022
1 parent 4974552 commit 56e8f4a
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 19 deletions.
63 changes: 54 additions & 9 deletions dm/checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"

gmysql "github.com/go-sql-driver/mysql"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/ctl/common"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/cputil"

tc "github.com/pingcap/check"
)
Expand All @@ -37,9 +40,11 @@ type testCheckerSuite struct{}
var _ = tc.Suite(&testCheckerSuite{})

var (
schema = "db_1"
tb1 = "t_1"
tb2 = "t_2"
schema = "db_1"
tb1 = "t_1"
tb2 = "t_2"
metaSchema = "dm_meta"
taskName = "test"
)

func ignoreExcept(itemMap map[string]struct{}) []string {
Expand Down Expand Up @@ -287,6 +292,9 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) {
func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
cfgs := []*config.SubTaskConfig{
{
MetaSchema: metaSchema,
Name: taskName,
ShardMode: config.ShardPessimistic,
RouteRules: []*router.TableRule{
{
SchemaPattern: schema,
Expand All @@ -304,11 +312,16 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
createTable2 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
// test different column definition
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -320,7 +333,11 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
c.Assert(err, tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*")
c.Assert(len(msg), tc.Equals, 0)

// test success check
mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -331,11 +348,24 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(err, tc.IsNil)
c.Assert(msg, tc.Equals, CheckTaskSuccess)

// test exist checkpoint
mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LoaderCheckpoint(taskName), ""))
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LightningCheckpoint(taskName), ""))
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.SyncerCheckpoint(taskName), ""))
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
c.Assert(err, tc.IsNil)
}

func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
cfgs := []*config.SubTaskConfig{
{
ShardMode: config.ShardPessimistic,
RouteRules: []*router.TableRule{
{
SchemaPattern: schema,
Expand All @@ -362,7 +392,12 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -375,6 +410,9 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
c.Assert(err, tc.IsNil)

mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand Down Expand Up @@ -413,8 +451,15 @@ func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) {
PRIMARY KEY (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2)))
Expand Down
52 changes: 45 additions & 7 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/checker"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -243,13 +245,25 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewTablesChecker(dbs, checkTablesMap, dumpThreads))
}

if checkingShard {
for name, shardingSet := range sharding {
if shardingCounter[name] <= 1 {
continue
instance := c.instances[0]
// Not check the sharding tables’ schema when the mode is increment.
// Because the table schema obtained from `show create table` is not the schema at the point of binlog.
if checkingShard && instance.cfg.Mode != config.ModeIncrement {
isFresh, err := c.IsFreshTask()
if err != nil {
return err
}
if isFresh {
for targetTableID, shardingSet := range sharding {
if shardingCounter[targetTableID] <= 1 {
continue
}
if instance.cfg.ShardMode == config.ShardPessimistic {
c.checkList = append(c.checkList, checker.NewShardingTablesChecker(targetTableID, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads))
} else {
c.checkList = append(c.checkList, checker.NewOptimisticShardingTablesChecker(targetTableID, dbs, shardingSet, dumpThreads))
}
}

c.checkList = append(c.checkList, checker.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads))
}
}

Expand Down Expand Up @@ -448,7 +462,31 @@ func (c *Checker) Type() pb.UnitType {

// IsFreshTask implements Unit.IsFreshTask.
func (c *Checker) IsFreshTask() (bool, error) {
return true, nil
instance := c.instances[0]
checkpointSQLs := []string{
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LoaderCheckpoint(instance.cfg.Name))),
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LightningCheckpoint(instance.cfg.Name))),
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.SyncerCheckpoint(instance.cfg.Name))),
}
var existCheckpoint bool
for _, sql := range checkpointSQLs {
c.tctx.Logger.Info("exec query", zap.String("sql", sql))
rows, err := instance.targetDB.DB.QueryContext(c.tctx.Ctx, sql)
if err != nil {
if utils.IsMySQLError(err, mysql.ErrNoSuchTable) {
continue
}
return false, err
}
defer rows.Close()
if rows.Err() != nil {
return false, rows.Err()
}
existCheckpoint = true
c.tctx.Logger.Info("exist checkpoint, so don't check sharding tables")
break
}
return !existCheckpoint, nil
}

// Status implements Unit interface.
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,11 @@ func (st *SubTask) fetchResultAndUpdateStage(pr chan pb.ProcessResult) {
}

// setCurrUnit set current dm unit to ut.
func (st *SubTask) setCurrUnit(ut unit.Unit) {
func (st *SubTask) setCurrUnit(cu unit.Unit) {
st.Lock()
defer st.Unlock()
pu := st.currUnit
st.currUnit = ut
st.currUnit = cu
st.prevUnit = pu
}

Expand Down
127 changes: 127 additions & 0 deletions dm/pkg/checker/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
Expand Down Expand Up @@ -564,6 +565,132 @@ func (c *ShardingTablesChecker) Name() string {
return fmt.Sprintf("sharding table %s consistency checking", c.targetTableID)
}

// OptimisticShardingTablesChecker checks consistency of table structures of one sharding group in optimistic shard.
// * check whether they have compatible column list.
type OptimisticShardingTablesChecker struct {
targetTableID string
dbs map[string]*sql.DB
tableMap map[string][]*filter.Table // sourceID => [table1, table2, ...]
reMu sync.Mutex
joinedMu sync.Mutex
inCh chan *checkItem
dumpThreads int
joined *schemacmp.Table
}

// NewOptimisticShardingTablesChecker returns a RealChecker.
func NewOptimisticShardingTablesChecker(targetTableID string, dbs map[string]*sql.DB, tableMap map[string][]*filter.Table, dumpThreads int) RealChecker {
if dumpThreads == 0 {
dumpThreads = 1
}
c := &OptimisticShardingTablesChecker{
targetTableID: targetTableID,
dbs: dbs,
tableMap: tableMap,
dumpThreads: dumpThreads,
}
c.inCh = make(chan *checkItem, dumpThreads)
return c
}

// Name implements Checker interface.
func (c *OptimisticShardingTablesChecker) Name() string {
return fmt.Sprintf("optimistic sharding table %s consistency checking", c.targetTableID)
}

// Check implements RealChecker interface.
func (c *OptimisticShardingTablesChecker) Check(ctx context.Context) *Result {
r := &Result{
Name: c.Name(),
Desc: "check consistency of sharding table structures for Optimistic Sharding Merge",
State: StateSuccess,
Extra: fmt.Sprintf("sharding %s", c.targetTableID),
}

startTime := time.Now()
concurrency, err := getConcurrency(ctx, c.tableMap, c.dbs, c.dumpThreads)
if err != nil {
markCheckError(r, err)
return r
}
eg, checkCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
return c.checkTable(checkCtx, r)
})
}

dispatchTableItem(checkCtx, c.tableMap, c.inCh)
if err := eg.Wait(); err != nil {
markCheckError(r, err)
}

log.L().Logger.Info("check optimistic sharding table structure over", zap.Duration("spend time", time.Since(startTime)))
return r
}

func (c *OptimisticShardingTablesChecker) checkTable(ctx context.Context, r *Result) error {
var (
sourceID string
p *parser.Parser
err error
)
for {
select {
case <-ctx.Done():
return nil
case checkItem, ok := <-c.inCh:
if !ok {
return nil
}
table := checkItem.table
if len(sourceID) == 0 || sourceID != checkItem.sourceID {
sourceID = checkItem.sourceID
p, err = dbutil.GetParserForDB(ctx, c.dbs[sourceID])
if err != nil {
c.reMu.Lock()
r.Extra = fmt.Sprintf("fail to get parser for sourceID %s on sharding %s", sourceID, c.targetTableID)
c.reMu.Unlock()
return err
}
}

statement, err := dbutil.GetCreateTableSQL(ctx, c.dbs[sourceID], table.Schema, table.Name)
if err != nil {
// continue if table was deleted when checking
if isMySQLError(err, mysql.ErrNoSuchTable) {
continue
}
return err
}

ti, err := dbutil.GetTableInfoBySQL(statement, p)
if err != nil {
return err
}
encodeTi := schemacmp.Encode(ti)
c.joinedMu.Lock()
log.L().Logger.Debug("get schemacmp", zap.Stringer("ti", encodeTi), zap.Stringer("joined", c.joined), zap.Bool("pk is handle", ti.PKIsHandle))
if c.joined == nil {
c.joined = &encodeTi
c.joinedMu.Unlock()
continue
}
newJoined, err2 := c.joined.Join(encodeTi)
if err2 != nil {
// NOTE: conflict detected.
c.reMu.Lock()
r.Extra = fmt.Sprintf("fail to join table info %s with %s", c.joined, encodeTi)
c.reMu.Unlock()
c.joinedMu.Unlock()
return err2
}
c.joined = &newJoined
c.joinedMu.Unlock()
}
}
}

func dispatchTableItem(ctx context.Context, tableMap map[string][]*filter.Table, inCh chan *checkItem) {
for sourceID, tables := range tableMap {
for _, table := range tables {
Expand Down
Loading

0 comments on commit 56e8f4a

Please sign in to comment.