Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): init schemaTracker when syncer run #6052

Merged
merged 9 commits into from
Jun 30, 2022
Merged
96 changes: 68 additions & 28 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ func init() {

// Tracker is used to track schema locally.
type Tracker struct {
// we're using an embedded tidb, there's no need to sync operations on it, but we may recreate(drop and create)
// a table such as when checkpoint rollback, we need to make sure others(validator for now) can't see the table
// is deleted. so we add an extra layer of synchronization for GetTableInfo/RecreateTables for now.
// The Tracker is an embedded tidb in essence, where there was basically no parallel operation at the beginning.
// However, since the validator is introduced and heavily dependent on the Tracker, we need to make sure
// the synchronization between the reading from the validator and the modification from the syncer (e.g.
// when the checkpoint is being rolled back, we have to make sure the validator can still vision the original tables)
// From this point, we add an extra layer of the synchronization for the following operations:
// 1. GetTableInfo: the validator reads table infos.
// 2. Init: when the syncer restarts, it may re-initialize the Tracker while the validator may read the Tracker at the same time.
// 3. Close: Being similar as above, the validator can read the Tracker while the syncer is closing the Tracker.
sync.RWMutex
storePath string
store kv.Storage
Expand All @@ -99,16 +104,25 @@ type DownstreamTableInfo struct {
WhereHandle *sqlmodel.WhereHandle
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// NewTracker simply returns an empty Tracker,
// which should be followed by an initialization before used.
func NewTracker() *Tracker {
return &Tracker{}
}

// Init initializes the Tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream using `downstreamConn`.
// NOTE **sessionCfg is a reference to caller**.
func NewTracker(
func (tr *Tracker) Init(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
) error {
if tr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will tr be nil?

if it's for test, we can change test instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is copied from the old version. I remember it will let the load_task IT fail if deleted (probably this test case tries to pause-task before initializing the syncer). will take a look tomorrow.

return nil
}
var (
err error
storePath string
Expand Down Expand Up @@ -148,28 +162,28 @@ func NewTracker(
var ignoredColumn interface{}
rows, err2 := downstreamConn.QuerySQL(tctx, nil, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
return err2
}
if rows.Next() {
var value string
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil {
return nil, err3
return err3
}
sessionCfg[k] = value
}
// nolint:sqlclosecheck
if err2 = rows.Close(); err2 != nil {
return nil, err2
return err2
}
if err2 = rows.Err(); err2 != nil {
return nil, err2
return err2
}
}
}

storePath, err = newTmpFolderForTracker(task)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "DeleteStorePath", Fn: func() {
_ = os.RemoveAll(storePath)
Expand All @@ -179,7 +193,7 @@ func NewTracker(
mockstore.WithStoreType(mockstore.EmbedUnistore),
mockstore.WithPath(storePath))
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseStore", Fn: func() {
_ = store.Close()
Expand All @@ -190,13 +204,13 @@ func NewTracker(

dom, err = session.BootstrapSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseDomain", Fn: dom.Close})

se, err = session.CreateSession(store)
if err != nil {
return nil, err
return err
}
rollbackHolder.Add(fr.FuncRollback{Name: "CloseSession", Fn: se.Close})

Expand All @@ -216,13 +230,13 @@ func NewTracker(
log.L().Warn("can not set this variable", zap.Error(err))
continue
}
return nil, err
return err
}
}
for k, v := range globalVarsToSet {
err = se.GetSessionVars().SetSystemVarWithRelaxedValidation(k, v)
if err != nil {
return nil, err
return err
}
}
// skip DDL test https://github.com/pingcap/tidb/pull/33079
Expand All @@ -233,22 +247,40 @@ func NewTracker(
// exist by default. So we need to drop it first.
err = dom.DDL().DropSchema(se, model.NewCIStr("test"))
if err != nil {
return nil, err
return err
}

// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*DownstreamTableInfo),
}
tr.Lock()
defer tr.Unlock()
Comment on lines +258 to +259
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this lock is used in 3 places, please refine comments in Tracker definition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tr.storePath = storePath
tr.store = store
tr.dom = dom
tr.se = se
tr.dsTracker = dsTracker
tr.closed.Store(false)
return nil
}

return &Tracker{
storePath: storePath,
store: store,
dom: dom,
se: se,
dsTracker: dsTracker,
}, nil
// NewTestTracker creates an empty Tracker and initializes it subsequently.
// It is useful for test.
func NewTestTracker(
ctx context.Context,
task string,
sessionCfg map[string]string,
downstreamConn *dbconn.DBConn,
logger log.Logger,
) (*Tracker, error) {
tr := NewTracker()
err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger)
if err != nil {
return nil, err
}
return tr, nil
}

func newTmpFolderForTracker(task string) (string, error) {
Expand Down Expand Up @@ -392,10 +424,18 @@ func (tr *Tracker) Close() error {
if !tr.closed.CAS(false, true) {
return nil
}
tr.se.Close()
tr.dom.Close()
if err := tr.store.Close(); err != nil {
return err
// Build of the Tracker and the initialization is divided.
// these fields can possibly be nil if the Tracker is closed before the initialization.
if tr.se != nil {
tr.se.Close()
}
if tr.dom != nil {
tr.dom.Close()
}
if tr.store != nil {
if err := tr.store.Close(); err != nil {
return err
}
}
return os.RemoveAll(tr.storePath)
}
Expand Down
40 changes: 20 additions & 20 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,28 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
// user give correct session config

t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
t, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
err = t.Close()
c.Assert(err, IsNil)

// user give wrong session session, will return error
sessionCfg := map[string]string{"sql_mode": "HaHa"}
_, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, NotNil)

// discover session config failed, will return error
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "HaHa"))
_, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
_, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, NotNil)

// empty or default config in downstream
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", defaultTestSessionCfg["sql_mode"]))
tracker, err := NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
err = tracker.Exec(context.Background(), "", "create database testdb;")
Expand All @@ -125,7 +125,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE"))
tracker, err = NewTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", nil, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)
c.Assert(tracker.se.GetSessionVars().SQLMode.HasOnlyFullGroupBy(), IsTrue)
Expand All @@ -144,7 +144,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
// user set session config, get tracker config from downstream
// no `STRICT_TRANS_TABLES`, no error now
sessionCfg = map[string]string{"sql_mode": "NO_ZERO_DATE,NO_ZERO_IN_DATE,ANSI_QUOTES"}
tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
err = tracker.Close()
c.Assert(err, IsNil)

tracker, err = NewTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
tracker, err = NewTestTracker(context.Background(), "test-tracker", sessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -203,7 +203,7 @@ func (s *trackerSuite) TestDDL(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *trackerSuite) TestDDL(c *C) {
func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *trackerSuite) TestGetSingleColumnIndices(c *C) {
func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *trackerSuite) TestCreateSchemaIfNotExists(c *C) {
func (s *trackerSuite) TestMultiDrop(c *C) {
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {
Name: "foo",
}

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -486,7 +486,7 @@ func (s *trackerSuite) TestCreateTableIfNotExists(c *C) {

func (s *trackerSuite) TestBatchCreateTableIfNotExist(c *C) {
log.SetLevel(zapcore.ErrorLevel)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -604,7 +604,7 @@ func (s *trackerSuite) TestAllSchemas(c *C) {
log.SetLevel(zapcore.ErrorLevel)
ctx := context.Background()

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, s.dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -700,7 +700,7 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) {
oldSessionVar := map[string]string{
"tidb_enable_change_column_type": "ON",
}
tracker, err := NewTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", oldSessionVar, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand All @@ -720,7 +720,7 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -757,7 +757,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -799,7 +799,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -891,7 +891,7 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down Expand Up @@ -931,7 +931,7 @@ func (s *trackerSuite) TestPlacementRule(c *C) {
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
tracker, err := NewTestTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn, dlog.L())
c.Assert(err, IsNil)
defer func() {
err = tracker.Close()
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) {
}
)

s.tracker, err = schema.NewTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
s.tracker, err = schema.NewTestTracker(context.Background(), s.cfg.Name, defaultTestSessionCfg, nil, dlog.L())
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -499,7 +499,7 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
dbConn, err := db.Conn(ctx)
require.NoError(t, err)
downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
schemaTracker, err := schema.NewTestTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn, dlog.L())
require.NoError(t, err)
defer schemaTracker.Close() //nolint

Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestValidatorDoValidate(t *testing.T) {
dbConn, err := db.Conn(context.Background())
require.NoError(t, err)
syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
syncerObj.schemaTracker, err = schema.NewTestTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn, log.L())
defer syncerObj.schemaTracker.Close()
require.NoError(t, err)
require.NoError(t, syncerObj.schemaTracker.CreateSchemaIfNotExists(schemaName))
Expand Down
Loading