Skip to content

Commit

Permalink
sync-diff: add table match check (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Jan 18, 2022
1 parent 50e7dfb commit c5e06ff
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 65 deletions.
24 changes: 15 additions & 9 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -280,15 +281,18 @@ func (ms *MultiSourceRowsIterator) Close() {
}
}

func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int) (Source, error) {
func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int, f tableFilter.Filter) (Source, error) {
sourceTablesMap := make(map[string][]*common.TableShardSource)
// we should get the real table name
// and real table row query from sourceDB.
uniqueMap := make(map[string]struct{})
targetUniqueTableMap := make(map[string]struct{})
for _, tableDiff := range tableDiffs {
uniqueMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)] = struct{}{}
targetUniqueTableMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)] = struct{}{}
}

// only used for check
sourceTablesAfterRoute := make(map[string]struct{})

for i, sourceDB := range ds {
sourceSchemas, err := dbutil.GetSchemas(ctx, sourceDB.Conn)
if err != nil {
Expand All @@ -315,7 +319,12 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
}
}
uniqueId := utils.UniqueID(targetSchema, targetTable)
if _, ok := uniqueMap[uniqueId]; !ok {
// get all tables from all source db instance
if f.MatchTable(targetSchema, targetTable) {
// if match the filter, we should respect it and check target has this table later.
sourceTablesAfterRoute[uniqueId] = struct{}{}
}
if _, ok := targetUniqueTableMap[uniqueId]; !ok {
continue
}
maxSourceRouteTableCount[uniqueId]++
Expand Down Expand Up @@ -345,11 +354,8 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*

}

// check tablesMap
for _, tableDiff := range tableDiffs {
if _, ok := sourceTablesMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)]; !ok {
return nil, errors.Errorf("the source has no table to be compared. target-table is `%s`.`%s`", tableDiff.Schema, tableDiff.Table)
}
if err := checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil {
return nil, errors.Annotatef(err, "please make sure the filter is correct.")
}

mss := &MySQLSources{
Expand Down
29 changes: 24 additions & 5 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups
tj := utils.UniqueID(tableDiffs[j].Schema, tableDiffs[j].Table)
return strings.Compare(ti, tj) > 0
})
upstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.SourceInstances...)
upstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.TargetCheckTables, cfg.Task.SourceInstances...)
if err != nil {
return nil, nil, errors.Annotate(err, "from upstream")
}
downstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.TargetInstance)
downstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.TargetCheckTables, cfg.Task.TargetInstance)
if err != nil {
return nil, nil, errors.Annotate(err, "from downstream")
}
return downstream, upstream, nil
}

func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, checkThreadCount int, dbs ...*config.DataSource) (Source, error) {
func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, checkThreadCount int, f tableFilter.Filter, dbs ...*config.DataSource) (Source, error) {
if len(dbs) < 1 {
return nil, errors.Errorf("no db config detected")
}
Expand All @@ -178,12 +178,12 @@ func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, che

if ok {
if len(dbs) == 1 {
return NewTiDBSource(ctx, tableDiffs, dbs[0], checkThreadCount)
return NewTiDBSource(ctx, tableDiffs, dbs[0], checkThreadCount, f)
} else {
log.Fatal("Don't support check table in multiple tidb instance, please specify one tidb instance.")
}
}
return NewMySQLSources(ctx, tableDiffs, dbs, checkThreadCount)
return NewMySQLSources(ctx, tableDiffs, dbs, checkThreadCount, f)
}

func initDBConn(ctx context.Context, cfg *config.Config) error {
Expand Down Expand Up @@ -293,3 +293,22 @@ type RangeIterator interface {

Close()
}

func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string]struct{}) error {
// check target exists but source not found
for tableDiff := range targetMap {
// target table have all passed in tableFilter
if _, ok := sourceMap[tableDiff]; !ok {
return errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff)
}
}
// check source exists but target not found
for tableDiff := range sourceMap {
// need check source table have passd in tableFilter here
if _, ok := targetMap[tableDiff]; !ok {
return errors.Errorf("the target has no table to be compared. source-table is `%s`", tableDiff)
}
}
log.Info("table match check passed!!")
return nil
}
40 changes: 36 additions & 4 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ func TestTiDBSource(t *testing.T) {

tableDiffs := prepareTiDBTables(t, tableCases)

tidb, err := NewTiDBSource(ctx, tableDiffs, &config.DataSource{Conn: conn}, 1)
mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"Database"}).AddRow("mysql").AddRow("source_test"))
mock.ExpectQuery("SHOW FULL TABLES*").WillReturnRows(sqlmock.NewRows([]string{"Table", "type"}).AddRow("test1", "base").AddRow("test2", "base"))

f, err := filter.Parse([]string{"source_test.*"})
require.NoError(t, err)
tidb, err := NewTiDBSource(ctx, tableDiffs, &config.DataSource{Conn: conn}, 1, f)
require.NoError(t, err)

for n, tableCase := range tableCases {
Expand Down Expand Up @@ -305,7 +310,9 @@ func TestMysqlShardSources(t *testing.T) {
cs[i] = &config.DataSource{Conn: conn}
}

shard, err := NewMySQLSources(ctx, tableDiffs, cs, 4)
f, err := filter.Parse([]string{"source_test.*"})
require.NoError(t, err)
shard, err := NewMySQLSources(ctx, tableDiffs, cs, 4, f)
require.NoError(t, err)

for i := 0; i < len(dbs); i++ {
Expand Down Expand Up @@ -429,7 +436,10 @@ func TestMysqlRouter(t *testing.T) {
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
tablesRows = sqlmock.NewRows([]string{"Tables_in_test", "Table_type"}).AddRow("test_t", "BASE TABLE")
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
mysql, err := NewMySQLSources(ctx, tableDiffs, []*config.DataSource{ds}, 4)

f, err := filter.Parse([]string{"*.*"})
require.NoError(t, err)
mysql, err := NewMySQLSources(ctx, tableDiffs, []*config.DataSource{ds}, 4, f)
require.NoError(t, err)

// random splitter
Expand Down Expand Up @@ -534,7 +544,10 @@ func TestTiDBRouter(t *testing.T) {
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
tablesRows = sqlmock.NewRows([]string{"Tables_in_test", "Table_type"}).AddRow("test2", "BASE TABLE")
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
tidb, err := NewTiDBSource(ctx, tableDiffs, ds, 1)

f, err := filter.Parse([]string{"*.*"})
require.NoError(t, err)
tidb, err := NewTiDBSource(ctx, tableDiffs, ds, 1, f)
require.NoError(t, err)
infoRows := sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow("test_t", "CREATE TABLE `source_test`.`test1` (`a` int, `b` varchar(24), `c` float, primary key(`a`, `b`))")
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(infoRows)
Expand Down Expand Up @@ -724,3 +737,22 @@ func TestInitTables(t *testing.T) {
require.Contains(t, err.Error(), "different config matched to same target table")
require.NoError(t, mock.ExpectationsWereMet())
}

func TestCheckTableMatched(t *testing.T) {
tmap := make(map[string]struct{})
smap := make(map[string]struct{})

tmap["1"] = struct{}{}
tmap["2"] = struct{}{}

smap["1"] = struct{}{}
smap["2"] = struct{}{}
require.NoError(t, checkTableMatched(tmap, smap))

delete(smap, "1")
require.Contains(t, checkTableMatched(tmap, smap).Error(), "the source has no table to be compared. target-table")

delete(tmap, "1")
smap["1"] = struct{}{}
require.Contains(t, checkTableMatched(tmap, smap).Error(), "the target has no table to be compared. source-table")
}
96 changes: 51 additions & 45 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -189,67 +190,72 @@ func (s *TiDBSource) GetSnapshot() string {
return s.snapshot
}

func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource) (map[string]*common.TableSource, error) {
func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, f tableFilter.Filter) (map[string]*common.TableSource, error) {
sourceTableMap := make(map[string]*common.TableSource)
if ds.Router != nil {
log.Info("find router for tidb source")
// we should get the real table name
// and real table row query from source.
uniqueMap := make(map[string]struct{})
for _, tableDiff := range tableDiffs {
uniqueMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)] = struct{}{}
}
log.Info("find router for tidb source")
// we should get the real table name
// and real table row query from source.
targetUniqueTableMap := make(map[string]struct{})
for _, tableDiff := range tableDiffs {
targetUniqueTableMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)] = struct{}{}
}
sourceTablesAfterRoute := make(map[string]struct{})

// instance -> db -> table
allTablesMap := make(map[string]map[string]interface{})
sourceSchemas, err := dbutil.GetSchemas(ctx, ds.Conn)
if err != nil {
return nil, errors.Annotatef(err, "get schemas from database")
}
// instance -> db -> table
allTablesMap := make(map[string]map[string]interface{})
sourceSchemas, err := dbutil.GetSchemas(ctx, ds.Conn)

for _, schema := range sourceSchemas {
if filter.IsSystemSchema(schema) {
// ignore system schema
continue
}
allTables, err := dbutil.GetTables(ctx, ds.Conn, schema)
if err != nil {
return nil, errors.Annotatef(err, "get tables from %s", schema)
}
allTablesMap[schema] = utils.SliceToMap(allTables)
if err != nil {
return nil, errors.Annotatef(err, "get schemas from database")
}

for _, schema := range sourceSchemas {
if filter.IsSystemSchema(schema) {
// ignore system schema
continue
}
allTables, err := dbutil.GetTables(ctx, ds.Conn, schema)
if err != nil {
return nil, errors.Annotatef(err, "get tables from %s", schema)
}
allTablesMap[schema] = utils.SliceToMap(allTables)
}

for schema, allTables := range allTablesMap {
for table := range allTables {
targetSchema, targetTable, err := ds.Router.Route(schema, table)
for schema, allTables := range allTablesMap {
for table := range allTables {
targetSchema, targetTable := schema, table
if ds.Router != nil {
targetSchema, targetTable, err = ds.Router.Route(schema, table)
if err != nil {
return nil, errors.Errorf("get route result for %s.%s failed, error %v", schema, table, err)
}
uniqueId := utils.UniqueID(targetSchema, targetTable)
if _, ok := uniqueMap[uniqueId]; ok {
if _, ok := sourceTableMap[uniqueId]; ok {
log.Fatal("TiDB source don't merge multiple tables into one table")
}
sourceTableMap[uniqueId] = &common.TableSource{
OriginSchema: schema,
OriginTable: table,
}
}
}
}

// check tablesMap
for _, tableDiff := range tableDiffs {
if _, ok := sourceTableMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)]; !ok {
return nil, errors.Errorf("the source has no table to be compared. target-table is `%s`.`%s`", tableDiff.Schema, tableDiff.Table)
uniqueId := utils.UniqueID(targetSchema, targetTable)
if f.MatchTable(targetSchema, targetTable) {
// if match the filter, we should respect it and check target has this table later.
sourceTablesAfterRoute[uniqueId] = struct{}{}
}
if _, ok := targetUniqueTableMap[uniqueId]; ok {
if _, ok := sourceTableMap[uniqueId]; ok {
log.Fatal("TiDB source don't merge multiple tables into one table")
}
sourceTableMap[uniqueId] = &common.TableSource{
OriginSchema: schema,
OriginTable: table,
}
}
}
}

if err = checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil {
return nil, errors.Annotatef(err, "please make sure the filter is correct.")
}
return sourceTableMap, nil
}

func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, checkThreadCount int) (Source, error) {
sourceTableMap, err := getSourceTableMap(ctx, tableDiffs, ds)
func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, checkThreadCount int, f tableFilter.Filter) (Source, error) {
sourceTableMap, err := getSourceTableMap(ctx, tableDiffs, ds, f)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 7 additions & 1 deletion sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,13 @@ func ResetColumns(tableInfo *model.TableInfo, columns []string) (*model.TableInf

// UniqueID returns `schema:table`
func UniqueID(schema string, table string) string {
return schema + ":" + table
// QuoteSchema quotes a full table name
return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table))
}

// EscapeName replaces all "`" in name with "``"
func EscapeName(name string) string {
return strings.Replace(name, "`", "``", -1)
}

// GetBetterIndex returns the index more dinstict.
Expand Down
2 changes: 1 addition & 1 deletion sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestStringsToInterface(t *testing.T) {
}
require.Equal(t, len(sliceMap), len(expectSlice))

require.Equal(t, UniqueID("123", "456"), "123:456")
require.Equal(t, UniqueID("123", "456"), "`123`.`456`")

}

Expand Down

0 comments on commit c5e06ff

Please sign in to comment.