Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker(dm): support optimistic checker #4329

Merged
merged 28 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
54fbc98
save work: don't check sharding tables if not exist checkpoint
okJiang Jan 12, 2022
35bc0df
save work
okJiang Jan 14, 2022
205dae7
support to check optimistic compitible
okJiang Jan 14, 2022
1f96128
save work
okJiang Jan 14, 2022
432ac7f
save work
okJiang Jan 14, 2022
4223f3d
add IT
okJiang Jan 14, 2022
7dc823f
add ut
okJiang Jan 14, 2022
3f763ae
save work
okJiang Jan 26, 2022
2a5507a
save work
okJiang Jan 26, 2022
688b57c
save work
okJiang Jan 26, 2022
ccf85a4
fix it
okJiang Feb 7, 2022
5a10533
Update dm/pkg/checker/table_structure.go
okJiang Feb 7, 2022
f830512
fix ut
okJiang Feb 7, 2022
3036e49
Merge branch 'master' of github.com:pingcap/tiflow into pre-check-opt…
okJiang Feb 7, 2022
258f9a8
save work
okJiang Feb 7, 2022
9e38b2a
Merge branch 'master' into pre-check-optimistic-shard
Ehco1996 Feb 11, 2022
1309ff8
Apply suggestions from code review
okJiang Feb 11, 2022
36c56c0
save work
okJiang Feb 11, 2022
78f2bc4
add exist checkpoint UT
okJiang Feb 11, 2022
203dd2b
imp IsFreshTask()
okJiang Feb 11, 2022
b5fed04
fix lint
okJiang Feb 11, 2022
5ba89bf
Merge branch 'pre-check-optimistic-shard' of github.com:okJiang/ticdc…
okJiang Feb 14, 2022
fd9d9d2
add multi-thread
okJiang Feb 14, 2022
6a31f3a
fix fmt
okJiang Feb 14, 2022
d496900
Merge branch 'master' into pre-check-optimistic-shard
ti-chi-bot Feb 14, 2022
5eb6613
Merge branch 'master' into pre-check-optimistic-shard
ti-chi-bot Feb 14, 2022
61a824d
Merge branch 'master' into pre-check-optimistic-shard
ti-chi-bot Feb 14, 2022
6c6730a
Merge branch 'master' into pre-check-optimistic-shard
ti-chi-bot Feb 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions dm/checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"

gmysql "github.com/go-sql-driver/mysql"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/ctl/common"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/cputil"

tc "github.com/pingcap/check"
)
Expand All @@ -37,9 +40,11 @@ type testCheckerSuite struct{}
var _ = tc.Suite(&testCheckerSuite{})

var (
schema = "db_1"
tb1 = "t_1"
tb2 = "t_2"
schema = "db_1"
tb1 = "t_1"
tb2 = "t_2"
metaSchema = "dm_meta"
taskName = "test"
)

func ignoreExcept(itemMap map[string]struct{}) []string {
Expand Down Expand Up @@ -287,6 +292,9 @@ func (s *testCheckerSuite) TestTableSchemaChecking(c *tc.C) {
func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
cfgs := []*config.SubTaskConfig{
{
MetaSchema: metaSchema,
Name: taskName,
ShardMode: config.ShardPessimistic,
RouteRules: []*router.TableRule{
{
SchemaPattern: schema,
Expand All @@ -304,11 +312,16 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
b int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
createTable2 := `CREATE TABLE %s (
id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

id int(11) DEFAULT NULL,
c int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1`
errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a test to cover checkpoint existing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it isn't. These code mock

		checkpointSQLs := []string{
			fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LoaderCheckpoint(instance.cfg.Name))),
			fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LightningCheckpoint(instance.cfg.Name))),
			fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.SyncerCheckpoint(instance.cfg.Name))),
		}
		var existCheckpoint bool
		for _, sql := range checkpointSQLs {
			c.tctx.Logger.Info("exec query", zap.String("sql", sql))
			_, err := instance.targetDB.DB.QueryContext(c.tctx.Ctx, sql)
			if err != nil && !utils.IsMySQLError(err, mysql.ErrNoSuchTable) {
				return err
			}
			if err == nil {
				existCheckpoint = true
				c.tctx.Logger.Info("exist checkpoint, so don't check sharding tables")
				break
			}
		}

createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
// test different column definition
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -320,7 +333,11 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
c.Assert(err, tc.ErrorMatches, "(.|\n)*different column definition(.|\n)*")
c.Assert(len(msg), tc.Equals, 0)

// test success check
mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -331,11 +348,24 @@ func (s *testCheckerSuite) TestShardTableSchemaChecking(c *tc.C) {
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(err, tc.IsNil)
c.Assert(msg, tc.Equals, CheckTaskSuccess)

// test exist checkpoint
mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LoaderCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LoaderCheckpoint(taskName), ""))
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.LightningCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.LightningCheckpoint(taskName), ""))
mock.ExpectQuery(fmt.Sprintf(createTableSQL, metaSchema, cputil.SyncerCheckpoint(taskName))).WillReturnRows(sqlmock.
NewRows([]string{"Table", "Create Table"}).AddRow(cputil.SyncerCheckpoint(taskName), ""))
msg, err = CheckSyncConfig(context.Background(), cfgs, common.DefaultErrorCnt, common.DefaultWarnCnt)
c.Assert(msg, tc.Equals, CheckTaskSuccess)
c.Assert(err, tc.IsNil)
}

func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
cfgs := []*config.SubTaskConfig{
{
ShardMode: config.ShardPessimistic,
RouteRules: []*router.TableRule{
{
SchemaPattern: schema,
Expand All @@ -362,7 +392,12 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand All @@ -375,6 +410,9 @@ func (s *testCheckerSuite) TestShardAutoIncrementIDChecking(c *tc.C) {
c.Assert(err, tc.IsNil)

mock = initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable2, tb1)))
mock.ExpectQuery("SHOW VARIABLES LIKE 'max_connections'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("max_connections", "2"))
Expand Down Expand Up @@ -413,8 +451,15 @@ func (s *testCheckerSuite) TestSameTargetTableDetection(c *tc.C) {
PRIMARY KEY (id),
UNIQUE KEY u_b(b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1`

errNoSuchTable := &gmysql.MySQLError{Number: mysql.ErrNoSuchTable}
createTableSQL := "SHOW CREATE TABLE `%s`.`%s`"
mock := initMockDB(c)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LoaderCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.LightningCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery(fmt.Sprintf(createTableSQL, "", cputil.SyncerCheckpoint(""))).WillReturnError(errNoSuchTable)
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb1)))
mock.ExpectQuery("SHOW CREATE TABLE .*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tb1, fmt.Sprintf(createTable1, tb2)))
Expand Down
52 changes: 45 additions & 7 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/checker"
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -243,13 +245,25 @@ func (c *Checker) Init(ctx context.Context) (err error) {
c.checkList = append(c.checkList, checker.NewTablesChecker(dbs, checkTablesMap, dumpThreads))
}

if checkingShard {
for name, shardingSet := range sharding {
if shardingCounter[name] <= 1 {
continue
instance := c.instances[0]
// Not check the sharding tables’ schema when the mode is increment.
// Because the table schema obtained from `show create table` is not the schema at the point of binlog.
if checkingShard && instance.cfg.Mode != config.ModeIncrement {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
isFresh, err := c.IsFreshTask()
if err != nil {
return err
}
if isFresh {
for targetTableID, shardingSet := range sharding {
if shardingCounter[targetTableID] <= 1 {
continue
}
if instance.cfg.ShardMode == config.ShardPessimistic {
c.checkList = append(c.checkList, checker.NewShardingTablesChecker(targetTableID, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads))
} else {
c.checkList = append(c.checkList, checker.NewOptimisticShardingTablesChecker(targetTableID, dbs, shardingSet, dumpThreads))
}
}

c.checkList = append(c.checkList, checker.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID, dumpThreads))
}
}

Expand Down Expand Up @@ -448,7 +462,31 @@ func (c *Checker) Type() pb.UnitType {

// IsFreshTask implements Unit.IsFreshTask.
func (c *Checker) IsFreshTask() (bool, error) {
return true, nil
instance := c.instances[0]
checkpointSQLs := []string{
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LoaderCheckpoint(instance.cfg.Name))),
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.LightningCheckpoint(instance.cfg.Name))),
fmt.Sprintf("SHOW CREATE TABLE %s", dbutil.TableName(instance.cfg.MetaSchema, cputil.SyncerCheckpoint(instance.cfg.Name))),
}
var existCheckpoint bool
for _, sql := range checkpointSQLs {
c.tctx.Logger.Info("exec query", zap.String("sql", sql))
rows, err := instance.targetDB.DB.QueryContext(c.tctx.Ctx, sql)
if err != nil {
if utils.IsMySQLError(err, mysql.ErrNoSuchTable) {
continue
}
return false, err
}
defer rows.Close()
if rows.Err() != nil {
return false, rows.Err()
}
existCheckpoint = true
c.tctx.Logger.Info("exist checkpoint, so don't check sharding tables")
break
}
return !existCheckpoint, nil
}

// Status implements Unit interface.
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,11 +382,11 @@ func (st *SubTask) fetchResultAndUpdateStage(pr chan pb.ProcessResult) {
}

// setCurrUnit set current dm unit to ut.
func (st *SubTask) setCurrUnit(ut unit.Unit) {
func (st *SubTask) setCurrUnit(cu unit.Unit) {
st.Lock()
defer st.Unlock()
pu := st.currUnit
st.currUnit = ut
st.currUnit = cu
st.prevUnit = pu
}

Expand Down
127 changes: 127 additions & 0 deletions dm/pkg/checker/table_structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/schemacmp"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
Expand Down Expand Up @@ -564,6 +565,132 @@ func (c *ShardingTablesChecker) Name() string {
return fmt.Sprintf("sharding table %s consistency checking", c.targetTableID)
}

// OptimisticShardingTablesChecker checks consistency of table structures of one sharding group in optimistic shard.
// * check whether they have compatible column list.
type OptimisticShardingTablesChecker struct {
targetTableID string
dbs map[string]*sql.DB
tableMap map[string][]*filter.Table // sourceID => [table1, table2, ...]
reMu sync.Mutex
joinedMu sync.Mutex
inCh chan *checkItem
dumpThreads int
joined *schemacmp.Table
}

// NewOptimisticShardingTablesChecker returns a RealChecker.
func NewOptimisticShardingTablesChecker(targetTableID string, dbs map[string]*sql.DB, tableMap map[string][]*filter.Table, dumpThreads int) RealChecker {
if dumpThreads == 0 {
dumpThreads = 1
}
c := &OptimisticShardingTablesChecker{
targetTableID: targetTableID,
dbs: dbs,
tableMap: tableMap,
dumpThreads: dumpThreads,
}
c.inCh = make(chan *checkItem, dumpThreads)
return c
}

// Name implements Checker interface.
func (c *OptimisticShardingTablesChecker) Name() string {
return fmt.Sprintf("optimistic sharding table %s consistency checking", c.targetTableID)
}

// Check implements RealChecker interface.
func (c *OptimisticShardingTablesChecker) Check(ctx context.Context) *Result {
r := &Result{
Name: c.Name(),
Desc: "check consistency of sharding table structures for Optimistic Sharding Merge",
State: StateSuccess,
Extra: fmt.Sprintf("sharding %s", c.targetTableID),
}

startTime := time.Now()
concurrency, err := getConcurrency(ctx, c.tableMap, c.dbs, c.dumpThreads)
if err != nil {
markCheckError(r, err)
return r
}
eg, checkCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
return c.checkTable(checkCtx, r)
})
}

dispatchTableItem(checkCtx, c.tableMap, c.inCh)
if err := eg.Wait(); err != nil {
markCheckError(r, err)
}

log.L().Logger.Info("check optimistic sharding table structure over", zap.Duration("spend time", time.Since(startTime)))
return r
}

func (c *OptimisticShardingTablesChecker) checkTable(ctx context.Context, r *Result) error {
var (
sourceID string
p *parser.Parser
err error
)
for {
select {
case <-ctx.Done():
return nil
case checkItem, ok := <-c.inCh:
if !ok {
return nil
}
table := checkItem.table
if len(sourceID) == 0 || sourceID != checkItem.sourceID {
sourceID = checkItem.sourceID
p, err = dbutil.GetParserForDB(ctx, c.dbs[sourceID])
if err != nil {
c.reMu.Lock()
r.Extra = fmt.Sprintf("fail to get parser for sourceID %s on sharding %s", sourceID, c.targetTableID)
c.reMu.Unlock()
return err
}
}

statement, err := dbutil.GetCreateTableSQL(ctx, c.dbs[sourceID], table.Schema, table.Name)
if err != nil {
// continue if table was deleted when checking
if isMySQLError(err, mysql.ErrNoSuchTable) {
continue
}
return err
}

ti, err := dbutil.GetTableInfoBySQL(statement, p)
if err != nil {
return err
}
encodeTi := schemacmp.Encode(ti)
c.joinedMu.Lock()
log.L().Logger.Debug("get schemacmp", zap.Stringer("ti", encodeTi), zap.Stringer("joined", c.joined), zap.Bool("pk is handle", ti.PKIsHandle))
if c.joined == nil {
c.joined = &encodeTi
c.joinedMu.Unlock()
continue
}
newJoined, err2 := c.joined.Join(encodeTi)
if err2 != nil {
// NOTE: conflict detected.
c.reMu.Lock()
r.Extra = fmt.Sprintf("fail to join table info %s with %s", c.joined, encodeTi)
c.reMu.Unlock()
c.joinedMu.Unlock()
return err2
}
c.joined = &newJoined
c.joinedMu.Unlock()
}
}
}

func dispatchTableItem(ctx context.Context, tableMap map[string][]*filter.Table, inCh chan *checkItem) {
for sourceID, tables := range tableMap {
for _, table := range tables {
Expand Down
Loading