Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-flush-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Mar 14, 2022
2 parents ee1567e + 461b98f commit d6c544e
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 211 deletions.
1 change: 1 addition & 0 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (st *SubTask) StartValidator(expect pb.Stage) {
}
st.Lock()
defer st.Unlock()

if st.cfg.ValidatorCfg.Mode != config.ValidationFast && st.cfg.ValidatorCfg.Mode != config.ValidationFull {
return
}
Expand Down
112 changes: 110 additions & 2 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
// user give correct session config

_, err = NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
err = t.Close()
c.Assert(err, IsNil)

// user give wrong session session, will return error
Expand All @@ -114,6 +116,8 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
c.Assert(err, IsNil)
err = tracker.Close()
c.Assert(err, IsNil)

// found session config in downstream
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
Expand All @@ -132,6 +136,8 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
// Now create the table with ZERO_DATE
err = tracker.Exec(ctx, "testdb", "create table foo (a varchar(255) primary key, b DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00')")
c.Assert(err, NotNil)
err = tracker.Close()
c.Assert(err, IsNil)

// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
Expand Down Expand Up @@ -168,8 +174,15 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES",
"tidb_enable_clustered_index": "ON",
}
err = tracker.Close()
c.Assert(err, IsNil)

tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = tracker.Exec(ctx, "", "create database testdb;")
Expand All @@ -190,6 +203,10 @@ func (s *trackerSuite) TestDDL(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

// Table shouldn't exist before initialization.
_, err = tracker.GetTableInfo(table)
Expand Down Expand Up @@ -260,6 +277,10 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

ctx := context.Background()
err = tracker.Exec(ctx, "", "create database testdb;")
Expand Down Expand Up @@ -299,6 +320,10 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

// We cannot create a table without a database.
ctx := context.Background()
Expand Down Expand Up @@ -327,6 +352,10 @@ func (s *trackerSuite) TestMultiDrop(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

ctx := context.Background()
err = tracker.CreateSchemaIfNotExists("testdb")
Expand Down Expand Up @@ -375,6 +404,10 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

// Create some sort of complicated table.
err = tracker.CreateSchemaIfNotExists("testdb")
Expand Down Expand Up @@ -453,6 +486,10 @@ func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
log.SetLevel(zapcore.ErrorLevel)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()
err = tracker.CreateSchemaIfNotExists("testdb")
c.Assert(err, IsNil)
err = tracker.CreateSchemaIfNotExists("testdb2")
Expand Down Expand Up @@ -567,6 +604,10 @@ func (s *trackerSuite) TestAllSchemas(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

// nothing should exist...
c.Assert(tracker.AllSchemas(), HasLen, 0)
Expand Down Expand Up @@ -657,8 +698,12 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) {
oldSessionVar := map[string]string{
"tidb_enable_change_column_type": "ON",
}
_, err = NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn)
tracker, err := NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()
}

func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {
Expand All @@ -675,6 +720,10 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down Expand Up @@ -708,6 +757,10 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down Expand Up @@ -877,6 +930,10 @@ func (s *trackerSuite) TestGetAvailableDownStreamUKIIndexInfo(c *C) {
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down Expand Up @@ -977,6 +1034,10 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down Expand Up @@ -1065,6 +1126,10 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
Expand All @@ -1080,3 +1145,46 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
_, ok := tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
}

func (s *trackerSuite) TestPlacementRule(c *C) {
log.SetLevel(zapcore.ErrorLevel)

// origin table info
p := parser.New()
node, err := p.ParseOneStmt("create table t(c int) charset=utf8mb4", "", "")
c.Assert(err, IsNil)
oriTi, err := ddl.BuildTableInfoFromAST(node.(*ast.CreateTableStmt))
c.Assert(err, IsNil)

// tracker and sqlmock
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
defer db.Close()
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
c.Assert(err, IsNil)
}()

mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)).WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

tableID := "`test`.`test`"

mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", ""+
"CREATE TABLE `t` ("+
" `c` int(11) DEFAULT NULL"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`acdc` */;"))
_, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok := tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
}
16 changes: 9 additions & 7 deletions dm/syncer/validate_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,15 @@ func (vw *validateWorker) validateInsertAndUpdateRows(rows []*rowChange, cond *C
failedRows[key] = &validateFailedRow{tp: rowNotExist}
continue
}

eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
if vw.cfg.Mode == config.ValidationFull {
// only compare the whole row in full mode
eq, err := vw.compareData(sourceRow, targetRow, tableInfo.Columns[:cond.ColumnCnt])
if err != nil {
return nil, err
}
if !eq {
failedRows[key] = &validateFailedRow{tp: rowDifferent, dstData: targetRow}
}
}
}
return failedRows, nil
Expand Down
Loading

0 comments on commit d6c544e

Please sign in to comment.