From 5e38c2854c81e3fd882e004ca25b3b5f1ef58622 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 27 Jun 2022 15:50:30 +0800 Subject: [PATCH 1/4] init schemaTracker when syncer run --- dm/pkg/schema/tracker.go | 68 +++++++++++++++++++++++++++------------- dm/syncer/syncer.go | 10 +++++- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index a0fa117fa14..72a908b4834 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -99,16 +99,25 @@ type DownstreamTableInfo struct { WhereHandle *sqlmodel.WhereHandle } -// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve +// NewDumpTracker simply returns an empty Tracker, +// which should be followed by a subsequent initialization. +func NewDumpTracker() *Tracker { + return &Tracker{} +} + +// Init initializes the Tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve // some variable from downstream using `downstreamConn`. // NOTE **sessionCfg is a reference to caller**. -func NewTracker( +func (tr *Tracker) Init( ctx context.Context, task string, sessionCfg map[string]string, downstreamConn *dbconn.DBConn, logger log.Logger, -) (*Tracker, error) { +) error { + if tr == nil { + return nil + } var ( err error storePath string @@ -148,28 +157,28 @@ func NewTracker( var ignoredColumn interface{} rows, err2 := downstreamConn.QuerySQL(tctx, nil, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k)) if err2 != nil { - return nil, err2 + return err2 } if rows.Next() { var value string if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil { - return nil, err3 + return err3 } sessionCfg[k] = value } // nolint:sqlclosecheck if err2 = rows.Close(); err2 != nil { - return nil, err2 + return err2 } if err2 = rows.Err(); err2 != nil { - return nil, err2 + return err2 } } } storePath, err = newTmpFolderForTracker(task) if err != nil { - return nil, err + return err } rollbackHolder.Add(fr.FuncRollback{Name: "DeleteStorePath", Fn: func() { _ = os.RemoveAll(storePath) @@ -179,7 +188,7 @@ func NewTracker( mockstore.WithStoreType(mockstore.EmbedUnistore), mockstore.WithPath(storePath)) if err != nil { - return nil, err + return err } rollbackHolder.Add(fr.FuncRollback{Name: "CloseStore", Fn: func() { _ = store.Close() @@ -190,13 +199,13 @@ func NewTracker( dom, err = session.BootstrapSession(store) if err != nil { - return nil, err + return err } rollbackHolder.Add(fr.FuncRollback{Name: "CloseDomain", Fn: dom.Close}) se, err = session.CreateSession(store) if err != nil { - return nil, err + return err } rollbackHolder.Add(fr.FuncRollback{Name: "CloseSession", Fn: se.Close}) @@ -216,13 +225,13 @@ func NewTracker( log.L().Warn("can not set this variable", zap.Error(err)) continue } - return nil, err + return err } } for k, v := range globalVarsToSet { err = se.GetSessionVars().SetSystemVarWithRelaxedValidation(k, v) if err != nil { - return nil, err + return err } } // skip DDL test https://github.com/pingcap/tidb/pull/33079 @@ -233,7 +242,7 @@ func NewTracker( // exist by default. So we need to drop it first. err = dom.DDL().DropSchema(se, model.NewCIStr("test")) if err != nil { - return nil, err + return err } // init downstreamTracker @@ -241,14 +250,31 @@ func NewTracker( downstreamConn: downstreamConn, tableInfos: make(map[string]*DownstreamTableInfo), } + tr.Lock() + defer tr.Unlock() + tr.storePath = storePath + tr.store = store + tr.dom = dom + tr.se = se + tr.dsTracker = dsTracker + tr.closed.Store(false) + return nil +} - return &Tracker{ - storePath: storePath, - store: store, - dom: dom, - se: se, - dsTracker: dsTracker, - }, nil +// NewTracker creates a new tracker. It's preserved for test. +func NewTracker( + ctx context.Context, + task string, + sessionCfg map[string]string, + downstreamConn *dbconn.DBConn, + logger log.Logger, +) (*Tracker, error) { + tr := NewDumpTracker() + err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger) + if err != nil { + return nil, err + } + return tr, nil } func newTmpFolderForTracker(task string) (string, error) { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 2eae19f5110..e059a2a15db 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -391,6 +391,8 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } s.sessCtx = utils.NewSessionCtx(vars) s.exprFilterGroup = NewExprFilterGroup(s.tctx, s.sessCtx, s.cfg.ExprFilter) + // create an empty Tracker and will be initialized in `Run` + s.schemaTracker = schema.NewDumpTracker() if len(s.cfg.ColumnMappingRules) > 0 { s.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1696,7 +1698,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) + if s.schemaTracker == nil { + s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) + } else { + // prevent creating new Tracker on `Run` in order to avoid + // two different Trackers are invoked in the validator and the syncer. + err = s.schemaTracker.Init(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) + } if err != nil { return terror.ErrSchemaTrackerInit.Delegate(err) } From ac6c120c2ee68351536acf9892a1f5212a5d59a2 Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Mon, 27 Jun 2022 16:46:44 +0800 Subject: [PATCH 2/4] fix: nil ptr exception --- dm/pkg/schema/tracker.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 72a908b4834..efce5130924 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -418,10 +418,18 @@ func (tr *Tracker) Close() error { if !tr.closed.CAS(false, true) { return nil } - tr.se.Close() - tr.dom.Close() - if err := tr.store.Close(); err != nil { - return err + // Build of the Tracker and the initialization is divided. + // these fields can possibly be nil if the Tracker is closed before the initialization. + if tr.se != nil { + tr.se.Close() + } + if tr.dom != nil { + tr.dom.Close() + } + if tr.store != nil { + if err := tr.store.Close(); err != nil { + return err + } } return os.RemoveAll(tr.storePath) } From 8dceb8105caeebc556294bab4344fa002c3d7dea Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Wed, 29 Jun 2022 11:55:58 +0800 Subject: [PATCH 3/4] fix comment --- dm/pkg/schema/tracker.go | 13 +++++---- dm/pkg/schema/tracker_test.go | 40 +++++++++++++------------- dm/syncer/checkpoint_test.go | 4 +-- dm/syncer/data_validator_test.go | 2 +- dm/syncer/expr_filter_group_test.go | 6 ++-- dm/syncer/filter_test.go | 2 +- dm/syncer/opt_sharding_group_test.go | 2 +- dm/syncer/syncer.go | 12 ++++---- dm/syncer/syncer_test.go | 14 ++++----- dm/syncer/validator_checkpoint_test.go | 2 +- 10 files changed, 49 insertions(+), 48 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index efce5130924..26377dc748b 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -99,9 +99,9 @@ type DownstreamTableInfo struct { WhereHandle *sqlmodel.WhereHandle } -// NewDumpTracker simply returns an empty Tracker, -// which should be followed by a subsequent initialization. -func NewDumpTracker() *Tracker { +// NewTracker simply returns an empty Tracker, +// which should be followed by an initialization before used. +func NewTracker() *Tracker { return &Tracker{} } @@ -261,15 +261,16 @@ func (tr *Tracker) Init( return nil } -// NewTracker creates a new tracker. It's preserved for test. -func NewTracker( +// NewTestTracker creates an empty Tracker and initializes it subsequently. +// It is useful for test. +func NewTestTracker( ctx context.Context, task string, sessionCfg map[string]string, downstreamConn *dbconn.DBConn, logger log.Logger, ) (*Tracker, error) { - tr := NewDumpTracker() + tr := NewTracker() err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger) if err != nil { return nil, err diff --git a/dm/pkg/schema/tracker_test.go b/dm/pkg/schema/tracker_test.go index 81d23112203..0e01dd20f4d 100644 --- a/dm/pkg/schema/tracker_test.go +++ b/dm/pkg/schema/tracker_test.go @@ -92,28 +92,28 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { dbConn := dbconn.NewDBConn(s.cfg, baseConn) // user give correct session config - t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + t, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) err = t.Close() c.Assert(err, IsNil) // user give wrong session session, will return error sessionCfg := map[string]string{"sql_mode": "HaHa"} - _, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) + _, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) c.Assert(err, NotNil) // discover session config failed, will return error mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "HaHa")) - _, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) + _, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) c.Assert(err, NotNil) // empty or default config in downstream mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", defaultTestSessionCfg["sql_mode"])) - tracker, err := NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) err = tracker.Exec(context.Background(), "", "create database testdb;") @@ -125,7 +125,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE")) - tracker, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) + tracker, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L()) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue) @@ -144,7 +144,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { // user set session config, get tracker config from downstream // no `STRICT_TRANS_TABLES`, no error now sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"} - tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) + tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -179,7 +179,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { err = tracker.Close() c.Assert(err, IsNil) - tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) + tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -203,7 +203,7 @@ func (s *trackerSuite) TestDDL(c *C) { Name: "foo", } - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -277,7 +277,7 @@ func (s *trackerSuite) TestDDL(c *C) { func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -320,7 +320,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) { func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -352,7 +352,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) { func (s *trackerSuite) TestMultiDrop(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -404,7 +404,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) { Name: "foo", } - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -486,7 +486,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) { func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) { log.SetLevel(zapcore.ErrorLevel) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -604,7 +604,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) { log.SetLevel(zapcore.ErrorLevel) ctx := context.Background() - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -700,7 +700,7 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) { oldSessionVar := map[string]string{ "tidb_enable_change_column_type": "ON", } - tracker, err := NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -720,7 +720,7 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) { baseConn := conn.NewBaseConn(con, nil) dbConn := dbconn.NewDBConn(s.cfg, baseConn) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -757,7 +757,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) dbConn := dbconn.NewDBConn(s.cfg, baseConn) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -799,7 +799,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) dbConn := dbconn.NewDBConn(s.cfg, baseConn) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -891,7 +891,7 @@ func (s *trackerSuite) TestVarchar20000(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) dbConn := dbconn.NewDBConn(s.cfg, baseConn) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() @@ -931,7 +931,7 @@ func (s *trackerSuite) TestPlacementRule(c *C) { c.Assert(err, IsNil) baseConn := conn.NewBaseConn(con, nil) dbConn := dbconn.NewDBConn(s.cfg, baseConn) - tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) + tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L()) c.Assert(err, IsNil) defer func() { err = tracker.Close() diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index 567d2ab975a..9ccb9deb423 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -83,7 +83,7 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) { } ) - s.tracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L()) + s.tracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L()) c.Assert(err, IsNil) } @@ -499,7 +499,7 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) { dbConn, err := db.Conn(ctx) require.NoError(t, err) downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L()) + schemaTracker, err := schema.NewTestTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L()) require.NoError(t, err) defer schemaTracker.Close() //nolint diff --git a/dm/syncer/data_validator_test.go b/dm/syncer/data_validator_test.go index 5f3f28c3144..2de25b94973 100644 --- a/dm/syncer/data_validator_test.go +++ b/dm/syncer/data_validator_test.go @@ -357,7 +357,7 @@ func TestValidatorDoValidate(t *testing.T) { dbConn, err := db.Conn(context.Background()) require.NoError(t, err) syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L()) + syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L()) defer syncerObj.schemaTracker.Close() require.NoError(t, err) require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName)) diff --git a/dm/syncer/expr_filter_group_test.go b/dm/syncer/expr_filter_group_test.go index 130e977f125..9efe5570ce0 100644 --- a/dm/syncer/expr_filter_group_test.go +++ b/dm/syncer/expr_filter_group_test.go @@ -96,7 +96,7 @@ create table t ( dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn) for _, ca := range cases { - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) + schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, ca.tableStr), IsNil) @@ -359,7 +359,7 @@ create table t ( dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn) for _, ca := range cases { c.Log(ca.tableStr) - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) + schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, ca.tableStr), IsNil) @@ -413,7 +413,7 @@ create table t ( ) dbConn := dbconn.NewDBConn(&config.SubTaskConfig{}, s.baseConn) - schemaTracker, err := schema.NewTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) + schemaTracker, err := schema.NewTestTracker(ctx, "unit-test", defaultTestSessionCfg, dbConn, log.L()) c.Assert(err, IsNil) c.Assert(schemaTracker.CreateSchemaIfNotExists(dbName), IsNil) c.Assert(schemaTracker.Exec(ctx, dbName, tableStr), IsNil) diff --git a/dm/syncer/filter_test.go b/dm/syncer/filter_test.go index 9636fdf98b0..dfaa02194d6 100644 --- a/dm/syncer/filter_test.go +++ b/dm/syncer/filter_test.go @@ -71,7 +71,7 @@ func (s *testFilterSuite) TestSkipQueryEvent(c *C) { c.Assert(err, IsNil) syncer.ddlDBConn = dbconn.NewDBConn(syncer.cfg, s.baseConn) - syncer.schemaTracker, err = schema.NewTracker(context.Background(), syncer.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), syncer.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) c.Assert(err, IsNil) defer syncer.schemaTracker.Close() syncer.exprFilterGroup = NewExprFilterGroup(tcontext.Background(), utils.NewSessionCtx(nil), nil) diff --git a/dm/syncer/opt_sharding_group_test.go b/dm/syncer/opt_sharding_group_test.go index 9df42942793..9357e4bfabc 100644 --- a/dm/syncer/opt_sharding_group_test.go +++ b/dm/syncer/opt_sharding_group_test.go @@ -102,7 +102,7 @@ func (s *optShardingGroupSuite) TestSync() { optimist: shardddl.NewOptimist(&logger, nil, "", ""), checkpoint: &mockCheckpoint{}, } - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) require.NoError(s.T(), err) // case 1: mock receive resolved stage from dm-master when syncing other tables diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index e059a2a15db..75c149304f4 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -392,7 +392,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { s.sessCtx = utils.NewSessionCtx(vars) s.exprFilterGroup = NewExprFilterGroup(s.tctx, s.sessCtx, s.cfg.ExprFilter) // create an empty Tracker and will be initialized in `Run` - s.schemaTracker = schema.NewDumpTracker() + s.schemaTracker = schema.NewTracker() if len(s.cfg.ColumnMappingRules) > 0 { s.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) @@ -1699,12 +1699,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } if s.schemaTracker == nil { - s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) - } else { - // prevent creating new Tracker on `Run` in order to avoid - // two different Trackers are invoked in the validator and the syncer. - err = s.schemaTracker.Init(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) + // some test will set their own schemaTracker and skip the syncer.Init + s.schemaTracker = schema.NewTracker() } + // prevent creating new Tracker on `Run` in order to avoid + // two different Trackers are invoked in the validator and the syncer. + err = s.schemaTracker.Init(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) if err != nil { return terror.ErrSchemaTrackerInit.Delegate(err) } diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 7289ec16638..09cc19dc36f 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -787,7 +787,7 @@ func (s *testSyncerSuite) TestRun(c *C) { } syncer.ddlDBConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) syncer.downstreamTrackConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) c.Assert(err, IsNil) syncer.metricsProxies = metrics.DefaultMetricsProxies.CacheForOneTask("task", "worker", "source") @@ -1134,7 +1134,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24))")) - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) c.Assert(err, IsNil) c.Assert(syncer.Type(), Equals, pb.UnitType_Sync) @@ -1340,7 +1340,7 @@ func (s *testSyncerSuite) TestTrackDDL(c *C) { } syncer.ddlDBConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) syncer.checkpoint.(*RemoteCheckPoint).dbConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})) - syncer.schemaTracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, syncer.ddlDBConn, log.L()) c.Assert(err, IsNil) defer syncer.schemaTracker.Close() syncer.exprFilterGroup = NewExprFilterGroup(tcontext.Background(), utils.NewSessionCtx(nil), nil) @@ -1658,7 +1658,7 @@ func (s *testSyncerSuite) TestTrackDownstreamTableWontOverwrite(c *C) { baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) syncer.ddlDBConn = dbconn.NewDBConn(s.cfg, baseConn) syncer.downstreamTrackConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) c.Assert(err, IsNil) defer syncer.schemaTracker.Close() @@ -1702,7 +1702,7 @@ func (s *testSyncerSuite) TestDownstreamTableHasAutoRandom(c *C) { baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) syncer.ddlDBConn = dbconn.NewDBConn(s.cfg, baseConn) syncer.downstreamTrackConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(ctx, s.cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) c.Assert(err, IsNil) schemaName := "test" @@ -1743,7 +1743,7 @@ func (s *testSyncerSuite) TestDownstreamTableHasAutoRandom(c *C) { schema.TiDBClusteredIndex: "ON", } c.Assert(syncer.schemaTracker.Close(), IsNil) - syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, sessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(ctx, s.cfg.Name, sessionCfg, syncer.downstreamTrackConn, log.L()) c.Assert(err, IsNil) defer syncer.schemaTracker.Close() v, ok := syncer.schemaTracker.GetSystemVar(schema.TiDBClusteredIndex) @@ -1925,7 +1925,7 @@ func TestSyncerGetTableInfo(t *testing.T) { baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}) syncer.ddlDBConn = dbconn.NewDBConn(cfg, baseConn) syncer.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncer.schemaTracker, err = schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) + syncer.schemaTracker, err = schema.NewTestTracker(ctx, cfg.Name, defaultTestSessionCfg, syncer.downstreamTrackConn, log.L()) require.NoError(t, err) defer syncer.schemaTracker.Close() diff --git a/dm/syncer/validator_checkpoint_test.go b/dm/syncer/validator_checkpoint_test.go index e775cc9abc4..8ab0e6e3e9f 100644 --- a/dm/syncer/validator_checkpoint_test.go +++ b/dm/syncer/validator_checkpoint_test.go @@ -92,7 +92,7 @@ func TestValidatorCheckpointPersist(t *testing.T) { dbConn, err := db.Conn(context.Background()) require.NoError(t, err) syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})) - syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L()) + syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L()) defer syncerObj.schemaTracker.Close() require.NoError(t, err) require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName)) From 77cfbcab957523dfd1092a03463b27183f28436f Mon Sep 17 00:00:00 2001 From: buchuitoudegou <756541536@qq.com> Date: Wed, 29 Jun 2022 14:38:00 +0800 Subject: [PATCH 4/4] update tracker comment --- dm/pkg/schema/tracker.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 26377dc748b..f2ee3c79039 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -73,9 +73,14 @@ func init() { // Tracker is used to track schema locally. type Tracker struct { - // we're using an embedded tidb, there's no need to sync operations on it, but we may recreate(drop and create) - // a table such as when checkpoint rollback, we need to make sure others(validator for now) can't see the table - // is deleted. so we add an extra layer of synchronization for GetTableInfo/RecreateTables for now. + // The Tracker is an embedded tidb in essence, where there was basically no parallel operation at the beginning. + // However, since the validator is introduced and heavily dependent on the Tracker, we need to make sure + // the synchronization between the reading from the validator and the modification from the syncer (e.g. + // when the checkpoint is being rolled back, we have to make sure the validator can still vision the original tables) + // From this point, we add an extra layer of the synchronization for the following operations: + // 1. GetTableInfo: the validator reads table infos. + // 2. Init: when the syncer restarts, it may re-initialize the Tracker while the validator may read the Tracker at the same time. + // 3. Close: Being similar as above, the validator can read the Tracker while the syncer is closing the Tracker. sync.RWMutex storePath string store kv.Storage