Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): init schemaTracker when syncer run #6052

Merged
merged 9 commits into from
Jun 30, 2022
Merged
85 changes: 60 additions & 25 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,25 @@ type DownstreamTableInfo struct {
WhereHandle *sqlmodel.WhereHandle
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// NewTracker simply returns an empty Tracker,
// which should be followed by an initialization before used.
func NewTracker() *Tracker {
return &Tracker{}
}

// Init initializes the Tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream using `downstreamConn`.
// NOTE **sessionCfg is a reference to caller**.
func NewTracker(
func (tr *Tracker) Init(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
) error {
if tr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will tr be nil?

if it's for test, we can change test instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is copied from the old version. I remember it will let the load_task IT fail if deleted (probably this test case tries to pause-task before initializing the syncer). will take a look tomorrow.

return nil
}
var (
err error
storePath string
Expand Down Expand Up @@ -148,28 +157,28 @@ func NewTracker(
var ignoredColumn interface{}
rows, err2 := downstreamConn.QuerySQL(tctx, nil, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
return err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
return err3
}
sessionCfg[k] = value
}
// nolint:sqlclosecheck
if err2 = rows.Close(); err2 != nil {
return nil, err2
return err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
return err2
}
}
}

storePath, err = newTmpFolderForTracker(task)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "DeleteStorePath", Fn: func() {
_ = os.RemoveAll(storePath)
Expand All @@ -179,7 +188,7 @@ func NewTracker(
mockstore.WithStoreType(mockstore.EmbedUnistore),
mockstore.WithPath(storePath))
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseStore", Fn: func() {
_ = store.Close()
Expand All @@ -190,13 +199,13 @@ func NewTracker(

dom, err = session.BootstrapSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseDomain", Fn: dom.Close})

se, err = session.CreateSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseSession", Fn: se.Close})

Expand All @@ -216,13 +225,13 @@ func NewTracker(
log.L().Warn("can not set this variable", zap.Error(err))
continue
}
return nil, err
return err
}
}
for k, v := range globalVarsToSet {
err = se.GetSessionVars().SetSystemVarWithRelaxedValidation(k, v)
if err != nil {
return nil, err
return err
}
}
// skip DDL test https://github.com/pingcap/tidb/pull/33079
Expand All @@ -233,22 +242,40 @@ func NewTracker(
// exist by default. So we need to drop it first.
err = dom.DDL().DropSchema(se, model.NewCIStr("test"))
if err != nil {
return nil, err
return err
}

// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*DownstreamTableInfo),
}
tr.Lock()
defer tr.Unlock()
Comment on lines +258 to +259
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this lock is used in 3 places, please refine comments in Tracker definition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tr.storePath = storePath
tr.store = store
tr.dom = dom
tr.se = se
tr.dsTracker = dsTracker
tr.closed.Store(false)
return nil
}

return &Tracker{
storePath: storePath,
store: store,
dom: dom,
se: se,
dsTracker: dsTracker,
}, nil
// NewTestTracker creates an empty Tracker and initializes it subsequently.
// It is useful for test.
func NewTestTracker(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
tr := NewTracker()
err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger)
if err != nil {
return nil, err
}
return tr, nil
}

func newTmpFolderForTracker(task string) (string, error) {
Expand Down Expand Up @@ -392,10 +419,18 @@ func (tr *Tracker) Close() error {
if !tr.closed.CAS(false, true) {
return nil
}
tr.se.Close()
tr.dom.Close()
if err := tr.store.Close(); err != nil {
return err
// Build of the Tracker and the initialization is divided.
// these fields can possibly be nil if the Tracker is closed before the initialization.
if tr.se != nil {
tr.se.Close()
}
if tr.dom != nil {
tr.dom.Close()
}
if tr.store != nil {
if err := tr.store.Close(); err != nil {
return err
}
}
return os.RemoveAll(tr.storePath)
}
Expand Down
40 changes: 20 additions & 20 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,28 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
// user give correct session config

t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
t, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
err = t.Close()
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
_, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, NotNil)

// discover session config failed, will return error
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", defaultTestSessionCfg["sql_mode"]))
tracker, err := NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
Expand All @@ -125,7 +125,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
Expand All @@ -144,7 +144,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
err = tracker.Close()
c.Assert(err, IsNil)

tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -203,7 +203,7 @@ func (s *trackerSuite) TestDDL(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -486,7 +486,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {

func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
log.SetLevel(zapcore.ErrorLevel)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -700,7 +700,7 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) {
oldSessionVar := map[string]string{
"tidb_enable_change_column_type": "ON",
}
tracker, err := NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -720,7 +720,7 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -757,7 +757,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -799,7 +799,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -891,7 +891,7 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -931,7 +931,7 @@ func (s *trackerSuite) TestPlacementRule(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) {
}
)

s.tracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
s.tracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -499,7 +499,7 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
dbConn, err := db.Conn(ctx)
require.NoError(t, err)
downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
schemaTracker, err := schema.NewTestTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
require.NoError(t, err)
defer schemaTracker.Close() //nolint

Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestValidatorDoValidate(t *testing.T) {
dbConn, err := db.Conn(context.Background())
require.NoError(t, err)
syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
defer syncerObj.schemaTracker.Close()
require.NoError(t, err)
require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName))
Expand Down
Loading