Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: change bit_xor to sum #671

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sync_diff_inspector/source/common/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type TableDiff struct {
// the table has column timestamp, which need to reset time_zone.
NeedUnifiedTimeZone bool `json:"-"`

// the table has unique column, so it's ok to use bit_xor function.
HasUniqueColumn bool `json:"-"`

Collation string `json:"collation"`

ChunkSize int64 `json:"chunk-size"`
Expand Down
10 changes: 7 additions & 3 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte

for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, table.HasUniqueColumn, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Expand All @@ -126,7 +126,11 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte
err = info.Err
}
totalCount += info.Count
totalChecksum ^= info.Checksum
if table.HasUniqueColumn {
totalChecksum ^= info.Checksum
} else {
totalChecksum += info.Checksum
}
}

cost := time.Since(beginTime)
Expand Down Expand Up @@ -231,7 +235,7 @@ func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int)
if err != nil {
return nil, errors.Trace(err)
}
sourceTableInfo, _ = utils.ResetColumns(sourceTableInfo, tableDiff.IgnoreColumns)
sourceTableInfo, _, _ = utils.ResetColumns(sourceTableInfo, tableDiff.IgnoreColumns)
sourceTableInfos[i] = sourceTableInfo
}
return sourceTableInfos, nil
Expand Down
10 changes: 5 additions & 5 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups

tableDiffs := make([]*common.TableDiff, 0, len(tablesToBeCheck))
for _, tableConfig := range tablesToBeCheck {
newInfo, needUnifiedTimeZone := utils.ResetColumns(tableConfig.TargetTableInfo, tableConfig.IgnoreColumns)
newInfo, needUnifiedTimeZone, hasUniqueColumn := utils.ResetColumns(tableConfig.TargetTableInfo, tableConfig.IgnoreColumns)
tableDiffs = append(tableDiffs, &common.TableDiff{
Schema: tableConfig.Schema,
Table: tableConfig.Table,
Info: newInfo,
// TODO: field `IgnoreColumns` can be deleted.
Schema: tableConfig.Schema,
Table: tableConfig.Table,
Info: newInfo,
IgnoreColumns: tableConfig.IgnoreColumns,
Fields: strings.Join(tableConfig.Fields, ","),
Range: tableConfig.Range,
NeedUnifiedTimeZone: needUnifiedTimeZone,
HasUniqueColumn: hasUniqueColumn,
Collation: tableConfig.Collation,
ChunkSize: tableConfig.ChunkSize,
})
Expand Down
4 changes: 2 additions & 2 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.
chunk := tableRange.GetChunk()

matchSource := getMatchSource(s.sourceTableMap, table)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, table.HasUniqueColumn, chunk.Where, chunk.Args)

cost := time.Since(beginTime)
return &ChecksumInfo{
Expand All @@ -147,7 +147,7 @@ func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([
if err != nil {
return nil, errors.Trace(err)
}
tableInfos[0], _ = utils.ResetColumns(tableInfos[0], tableDiff.IgnoreColumns)
tableInfos[0], _, _ = utils.ResetColumns(tableInfos[0], tableDiff.IgnoreColumns)
return tableInfos, nil
}

Expand Down
3 changes: 2 additions & 1 deletion sync_diff_inspector/splitter/splitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,14 @@ func TestRandomSpliter(t *testing.T) {
tableInfo, err := dbutil.GetTableInfoBySQL(testCase.createTableSQL, parser.New())
require.NoError(t, err)

info, needUnifiedTimeStamp := utils.ResetColumns(tableInfo, testCase.IgnoreColumns)
info, needUnifiedTimeStamp, hasUniqueColumn := utils.ResetColumns(tableInfo, testCase.IgnoreColumns)
tableDiff := &common.TableDiff{
Schema: "test",
Table: "test",
Info: info,
IgnoreColumns: testCase.IgnoreColumns,
NeedUnifiedTimeZone: needUnifiedTimeStamp,
HasUniqueColumn: hasUniqueColumn,
Fields: testCase.fields,
ChunkSize: 5,
}
Expand Down
29 changes: 22 additions & 7 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,16 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string)
return dataSize.Int64, nil
}

const (
BIT_XOR_QUERY = "SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;"
SUM_QUERY = "SELECT COUNT(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;"
)

// GetCountAndCRC32Checksum returns checksum code and count of some data by given condition
func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) {
func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, hasUniqueColumn bool, limitRange string, args []interface{}) (int64, int64, error) {
/*
calculate CRC32 checksum and count example:
mysql> select count(*) as CNT, SUM(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0;
mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0;
+--------+------------+
| CNT | CHECKSUM |
Expand All @@ -732,9 +738,14 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table
columnNames = append(columnNames, name)
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name))
}

query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
var query string
if hasUniqueColumn {
query = fmt.Sprintf(BIT_XOR_QUERY,
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
} else {
query = fmt.Sprintf(SUM_QUERY,
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
}
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args))

var count sql.NullInt64
Expand Down Expand Up @@ -821,11 +832,14 @@ NEXTROW:
// And removes column from `tableInfo.Columns`, which appears in `columns`.
// And initializes the offset of the column of each index to new `tableInfo.Columns`.
//
// Return the new tableInfo and the flag whether the columns have timestamp type.
func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInfo, bool) {
// Return the new tableInfo and the flag shows whether the columns have timestamp type
// and another flag shows whether the table has a unique column.
func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInfo, bool, bool) {
// Although columns is empty, need to initialize indices' offset mapping to column.

hasTimeStampType := false
hasUniqueColumn := uint(0)
uniqueFlag := mysql.UniqueFlag | mysql.UniqueKeyFlag | mysql.PriKeyFlag
// Remove all index from `tableInfo.Indices`, whose columns are involved of any column in `columns`.
removeColMap := SliceToMap(columns)
for i := 0; i < len(tableInfo.Indices); i++ {
Expand Down Expand Up @@ -855,6 +869,7 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf
col.Offset = i
colMap[col.Name.O] = i
hasTimeStampType = hasTimeStampType || (col.FieldType.GetType() == mysql.TypeTimestamp)
hasUniqueColumn = hasUniqueColumn | (col.FieldType.GetFlag() & uniqueFlag)
}

// Initialize the offset of the column of each index to new `tableInfo.Columns`.
Expand All @@ -869,7 +884,7 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf
}
}

return tableInfo, hasTimeStampType
return tableInfo, hasTimeStampType, hasUniqueColumn > 0
}

// UniqueID returns `schema:table`
Expand Down
33 changes: 27 additions & 6 deletions sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,25 @@ func TestBasicTableUtilOperation(t *testing.T) {
require.Equal(t, len(tableInfo.Columns), 5)
require.Equal(t, tableInfo.Indices[0].Columns[1].Name.O, "b")
require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 2)
info, hasTimeStampType := ResetColumns(tableInfo, []string{"c"})
info, hasTimeStampType, hasUniqueColumn := ResetColumns(tableInfo, []string{"c"})
require.True(t, hasTimeStampType)
require.True(t, hasUniqueColumn)
require.Equal(t, len(info.Indices), 1)
require.Equal(t, len(info.Columns), 4)
require.Equal(t, tableInfo.Indices[0].Columns[1].Name.O, "b")
require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 1)

createTableSQL = "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime)"
tableInfo, err = dbutil.GetTableInfoBySQL(createTableSQL, parser.New())
require.NoError(t, err)

require.Equal(t, len(tableInfo.Indices), 0)
require.Equal(t, len(tableInfo.Columns), 4)
info, hasTimeStampType, hasUniqueColumn = ResetColumns(tableInfo, []string{"c"})
require.False(t, hasTimeStampType)
require.False(t, hasUniqueColumn)
require.Equal(t, len(info.Indices), 0)
require.Equal(t, len(info.Columns), 3)
}

func TestGetCountAndCRC32Checksum(t *testing.T) {
Expand All @@ -241,12 +254,19 @@ func TestGetCountAndCRC32Checksum(t *testing.T) {
tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New())
require.NoError(t, err)

mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) as CNT, BIT_XOR.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456))

count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, true, "[23 45]", []interface{}{"123", "234"})
require.NoError(t, err)
require.Equal(t, count, int64(123))
require.Equal(t, checksum, int64(456))

mock.ExpectQuery("SELECT COUNT\\(\\*\\) as CNT, SUM.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(456, 123))

count, checksum, err = GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, false, "[23 45]", []interface{}{"123", "234"})
require.NoError(t, err)
require.Equal(t, count, int64(456))
require.Equal(t, checksum, int64(123))
}

func TestGetApproximateMid(t *testing.T) {
Expand Down Expand Up @@ -331,23 +351,24 @@ func TestResetColumns(t *testing.T) {
createTableSQL1 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`))"
tableInfo1, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New())
require.NoError(t, err)
tbInfo, hasTimeStampType := ResetColumns(tableInfo1, []string{"a"})
tbInfo, hasTimeStampType, hasUniqueColumn := ResetColumns(tableInfo1, []string{"a"})
require.Equal(t, len(tbInfo.Columns), 3)
require.Equal(t, len(tbInfo.Indices), 0)
require.Equal(t, tbInfo.Columns[2].Offset, 2)
require.False(t, hasTimeStampType)
require.False(t, hasUniqueColumn)

createTableSQL2 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))"
tableInfo2, err := dbutil.GetTableInfoBySQL(createTableSQL2, parser.New())
require.NoError(t, err)
tbInfo, _ = ResetColumns(tableInfo2, []string{"a", "b"})
tbInfo, _, _ = ResetColumns(tableInfo2, []string{"a", "b"})
require.Equal(t, len(tbInfo.Columns), 2)
require.Equal(t, len(tbInfo.Indices), 0)

createTableSQL3 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))"
tableInfo3, err := dbutil.GetTableInfoBySQL(createTableSQL3, parser.New())
require.NoError(t, err)
tbInfo, _ = ResetColumns(tableInfo3, []string{"b", "c"})
tbInfo, _, _ = ResetColumns(tableInfo3, []string{"b", "c"})
require.Equal(t, len(tbInfo.Columns), 2)
require.Equal(t, len(tbInfo.Indices), 1)
}
Expand Down