From 346ba7a62c460484aed4b80ed922abce16a12b96 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 16 Aug 2022 15:49:35 +0800 Subject: [PATCH 1/4] change bit_xor to sum --- sync_diff_inspector/utils/utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index cce227c87..74161c585 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -710,7 +710,7 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) { /* calculate CRC32 checksum and count example: - mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; + mysql> select count(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; +--------+------------+ | CNT | CHECKSUM | +--------+------------+ @@ -733,7 +733,7 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", + query := fmt.Sprintf("SELECT COUNT(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) From 555e8226663471f8c86445d303d1fe8a1985a609 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 16 Aug 2022 17:30:54 +0800 Subject: [PATCH 2/4] use `sum` function when the table has no unique column --- .../source/common/table_diff.go | 3 ++ sync_diff_inspector/source/mysql_shard.go | 10 +++++-- sync_diff_inspector/source/source.go | 10 +++---- sync_diff_inspector/source/tidb.go | 4 +-- sync_diff_inspector/utils/utils.go | 29 ++++++++++++++----- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/sync_diff_inspector/source/common/table_diff.go b/sync_diff_inspector/source/common/table_diff.go index d6d1b9157..7c12214b9 100644 --- a/sync_diff_inspector/source/common/table_diff.go +++ b/sync_diff_inspector/source/common/table_diff.go @@ -62,6 +62,9 @@ type TableDiff struct { // the table has column timestamp, which need to reset time_zone. NeedUnifiedTimeZone bool `json:"-"` + // the table has unique column, so it's ok to use bit_xor function. + HasUniqueColumn bool `json:"-"` + Collation string `json:"collation"` ChunkSize int64 `json:"chunk-size"` diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 004b8195e..d74bd658e 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -103,7 +103,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte for _, ms := range matchSources { go func(ms *common.TableShardSource) { - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, table.HasUniqueColumn, chunk.Where, chunk.Args) infoCh <- &ChecksumInfo{ Checksum: checksum, Count: count, @@ -126,7 +126,11 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte err = info.Err } totalCount += info.Count - totalChecksum ^= info.Checksum + if table.HasUniqueColumn { + totalChecksum ^= info.Checksum + } else { + totalChecksum += info.Checksum + } } cost := time.Since(beginTime) @@ -231,7 +235,7 @@ func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) if err != nil { return nil, errors.Trace(err) } - sourceTableInfo, _ = utils.ResetColumns(sourceTableInfo, tableDiff.IgnoreColumns) + sourceTableInfo, _, _ = utils.ResetColumns(sourceTableInfo, tableDiff.IgnoreColumns) sourceTableInfos[i] = sourceTableInfo } return sourceTableInfos, nil diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index 3bc1db0da..60688b650 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -123,16 +123,16 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups tableDiffs := make([]*common.TableDiff, 0, len(tablesToBeCheck)) for _, tableConfig := range tablesToBeCheck { - newInfo, needUnifiedTimeZone := utils.ResetColumns(tableConfig.TargetTableInfo, tableConfig.IgnoreColumns) + newInfo, needUnifiedTimeZone, hasUniqueColumn := utils.ResetColumns(tableConfig.TargetTableInfo, tableConfig.IgnoreColumns) tableDiffs = append(tableDiffs, &common.TableDiff{ - Schema: tableConfig.Schema, - Table: tableConfig.Table, - Info: newInfo, - // TODO: field `IgnoreColumns` can be deleted. + Schema: tableConfig.Schema, + Table: tableConfig.Table, + Info: newInfo, IgnoreColumns: tableConfig.IgnoreColumns, Fields: strings.Join(tableConfig.Fields, ","), Range: tableConfig.Range, NeedUnifiedTimeZone: needUnifiedTimeZone, + HasUniqueColumn: hasUniqueColumn, Collation: tableConfig.Collation, ChunkSize: tableConfig.ChunkSize, }) diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 842cb6d6a..bbc6a674b 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -123,7 +123,7 @@ func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter. chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, table.HasUniqueColumn, chunk.Where, chunk.Args) cost := time.Since(beginTime) return &ChecksumInfo{ @@ -147,7 +147,7 @@ func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([ if err != nil { return nil, errors.Trace(err) } - tableInfos[0], _ = utils.ResetColumns(tableInfos[0], tableDiff.IgnoreColumns) + tableInfos[0], _, _ = utils.ResetColumns(tableInfos[0], tableDiff.IgnoreColumns) return tableInfos, nil } diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 74161c585..2e6a3a6bb 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -706,11 +706,17 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) return dataSize.Int64, nil } +const ( + BIT_XOR_QUERY = "SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;" + SUM_QUERY = "SELECT COUNT(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;" +) + // GetCountAndCRC32Checksum returns checksum code and count of some data by given condition -func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) { +func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, hasUniqueColumn bool, limitRange string, args []interface{}) (int64, int64, error) { /* calculate CRC32 checksum and count example: mysql> select count(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; + mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; +--------+------------+ | CNT | CHECKSUM | +--------+------------+ @@ -732,9 +738,14 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table columnNames = append(columnNames, name) columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - - query := fmt.Sprintf("SELECT COUNT(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", - strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + var query string + if hasUniqueColumn { + query = fmt.Sprintf(BIT_XOR_QUERY, + strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + } else { + query = fmt.Sprintf(SUM_QUERY, + strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + } log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) var count sql.NullInt64 @@ -821,11 +832,14 @@ NEXTROW: // And removes column from `tableInfo.Columns`, which appears in `columns`. // And initializes the offset of the column of each index to new `tableInfo.Columns`. // -// Return the new tableInfo and the flag whether the columns have timestamp type. -func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInfo, bool) { +// Return the new tableInfo and the flag shows whether the columns have timestamp type +// and another flag shows whether the table has a unique column. +func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInfo, bool, bool) { // Although columns is empty, need to initialize indices' offset mapping to column. hasTimeStampType := false + hasUniqueColumn := uint(0) + uniqueFlag := mysql.UniqueFlag | mysql.UniqueKeyFlag | mysql.PriKeyFlag // Remove all index from `tableInfo.Indices`, whose columns are involved of any column in `columns`. removeColMap := SliceToMap(columns) for i := 0; i < len(tableInfo.Indices); i++ { @@ -855,6 +869,7 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf col.Offset = i colMap[col.Name.O] = i hasTimeStampType = hasTimeStampType || (col.FieldType.GetType() == mysql.TypeTimestamp) + hasUniqueColumn = hasUniqueColumn | (col.FieldType.GetFlag() & uniqueFlag) } // Initialize the offset of the column of each index to new `tableInfo.Columns`. @@ -869,7 +884,7 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf } } - return tableInfo, hasTimeStampType + return tableInfo, hasTimeStampType, hasUniqueColumn > 0 } // UniqueID returns `schema:table` From 8e3f74f035c391a7a9cd6982b720cdcf0f04065b Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 16 Aug 2022 17:48:45 +0800 Subject: [PATCH 3/4] add unit test --- sync_diff_inspector/splitter/splitter_test.go | 3 +- sync_diff_inspector/utils/utils_test.go | 33 +++++++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index d977da6f4..057f0030b 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -322,13 +322,14 @@ func TestRandomSpliter(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) require.NoError(t, err) - info, needUnifiedTimeStamp := utils.ResetColumns(tableInfo, testCase.IgnoreColumns) + info, needUnifiedTimeStamp, hasUniqueColumn := utils.ResetColumns(tableInfo, testCase.IgnoreColumns) tableDiff := &common.TableDiff{ Schema: "test", Table: "test", Info: info, IgnoreColumns: testCase.IgnoreColumns, NeedUnifiedTimeZone: needUnifiedTimeStamp, + HasUniqueColumn: hasUniqueColumn, Fields: testCase.fields, ChunkSize: 5, } diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index cc06f73da..59acfd42c 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -221,12 +221,25 @@ func TestBasicTableUtilOperation(t *testing.T) { require.Equal(t, len(tableInfo.Columns), 5) require.Equal(t, tableInfo.Indices[0].Columns[1].Name.O, "b") require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 2) - info, hasTimeStampType := ResetColumns(tableInfo, []string{"c"}) + info, hasTimeStampType, hasUniqueColumn := ResetColumns(tableInfo, []string{"c"}) require.True(t, hasTimeStampType) + require.True(t, hasUniqueColumn) require.Equal(t, len(info.Indices), 1) require.Equal(t, len(info.Columns), 4) require.Equal(t, tableInfo.Indices[0].Columns[1].Name.O, "b") require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 1) + + createTableSQL = "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime)" + tableInfo, err = dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) + require.NoError(t, err) + + require.Equal(t, len(tableInfo.Indices), 0) + require.Equal(t, len(tableInfo.Columns), 4) + info, hasTimeStampType, hasUniqueColumn = ResetColumns(tableInfo, []string{"c"}) + require.False(t, hasTimeStampType) + require.False(t, hasUniqueColumn) + require.Equal(t, len(info.Indices), 0) + require.Equal(t, len(info.Columns), 3) } func TestGetCountAndCRC32Checksum(t *testing.T) { @@ -241,12 +254,19 @@ func TestGetCountAndCRC32Checksum(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) - mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) + mock.ExpectQuery("SELECT COUNT(*) as CNT, BIT_XOR.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) - count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"}) + count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, true, "[23 45]", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) require.Equal(t, checksum, int64(456)) + + mock.ExpectQuery("SELECT COUNT(*) as CNT, SUM.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(456, 123)) + + count, checksum, err = GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, true, "[23 45]", []interface{}{"123", "234"}) + require.NoError(t, err) + require.Equal(t, count, int64(456)) + require.Equal(t, checksum, int64(123)) } func TestGetApproximateMid(t *testing.T) { @@ -331,23 +351,24 @@ func TestResetColumns(t *testing.T) { createTableSQL1 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`))" tableInfo1, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) - tbInfo, hasTimeStampType := ResetColumns(tableInfo1, []string{"a"}) + tbInfo, hasTimeStampType, hasUniqueColumn := ResetColumns(tableInfo1, []string{"a"}) require.Equal(t, len(tbInfo.Columns), 3) require.Equal(t, len(tbInfo.Indices), 0) require.Equal(t, tbInfo.Columns[2].Offset, 2) require.False(t, hasTimeStampType) + require.False(t, hasUniqueColumn) createTableSQL2 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))" tableInfo2, err := dbutil.GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) - tbInfo, _ = ResetColumns(tableInfo2, []string{"a", "b"}) + tbInfo, _, _ = ResetColumns(tableInfo2, []string{"a", "b"}) require.Equal(t, len(tbInfo.Columns), 2) require.Equal(t, len(tbInfo.Indices), 0) createTableSQL3 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))" tableInfo3, err := dbutil.GetTableInfoBySQL(createTableSQL3, parser.New()) require.NoError(t, err) - tbInfo, _ = ResetColumns(tableInfo3, []string{"b", "c"}) + tbInfo, _, _ = ResetColumns(tableInfo3, []string{"b", "c"}) require.Equal(t, len(tbInfo.Columns), 2) require.Equal(t, len(tbInfo.Indices), 1) } From 1dd7d8a438c36bb440c351e3f11ef7081899d26b Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 16 Aug 2022 18:09:02 +0800 Subject: [PATCH 4/4] fix unit test --- sync_diff_inspector/utils/utils_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index 59acfd42c..9194680a0 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -254,16 +254,16 @@ func TestGetCountAndCRC32Checksum(t *testing.T) { tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) - mock.ExpectQuery("SELECT COUNT(*) as CNT, BIT_XOR.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) as CNT, BIT_XOR.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, true, "[23 45]", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) require.Equal(t, checksum, int64(456)) - mock.ExpectQuery("SELECT COUNT(*) as CNT, SUM.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(456, 123)) + mock.ExpectQuery("SELECT COUNT\\(\\*\\) as CNT, SUM.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(456, 123)) - count, checksum, err = GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, true, "[23 45]", []interface{}{"123", "234"}) + count, checksum, err = GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, false, "[23 45]", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(456)) require.Equal(t, checksum, int64(123))