-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
syncer(dm): init schemaTracker when syncer run #6052
Changes from 2 commits
5e38c28
ac6c120
8dceb81
77cfbca
5a82679
a713b5f
85a9541
73ddbc2
fe59d73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,16 +99,25 @@ type DownstreamTableInfo struct { | |
WhereHandle *sqlmodel.WhereHandle | ||
} | ||
|
||
// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve | ||
// NewDumpTracker simply returns an empty Tracker, | ||
// which should be followed by a subsequent initialization. | ||
func NewDumpTracker() *Tracker { | ||
return &Tracker{} | ||
} | ||
|
||
// Init initializes the Tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve | ||
// some variable from downstream using `downstreamConn`. | ||
// NOTE **sessionCfg is a reference to caller**. | ||
func NewTracker( | ||
func (tr *Tracker) Init( | ||
ctx context.Context, | ||
task string, | ||
sessionCfg map[string]string, | ||
downstreamConn *dbconn.DBConn, | ||
logger log.Logger, | ||
) (*Tracker, error) { | ||
) error { | ||
if tr == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when will tr be nil? if it's for test, we can change test instead There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is copied from the old version. I remember it will let the |
||
return nil | ||
} | ||
var ( | ||
err error | ||
storePath string | ||
|
@@ -148,28 +157,28 @@ func NewTracker( | |
var ignoredColumn interface{} | ||
rows, err2 := downstreamConn.QuerySQL(tctx, nil, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k)) | ||
if err2 != nil { | ||
return nil, err2 | ||
return err2 | ||
} | ||
if rows.Next() { | ||
var value string | ||
if err3 := rows.Scan(&ignoredColumn, &value); err3 != nil { | ||
return nil, err3 | ||
return err3 | ||
} | ||
sessionCfg[k] = value | ||
} | ||
// nolint:sqlclosecheck | ||
if err2 = rows.Close(); err2 != nil { | ||
return nil, err2 | ||
return err2 | ||
} | ||
if err2 = rows.Err(); err2 != nil { | ||
return nil, err2 | ||
return err2 | ||
} | ||
} | ||
} | ||
|
||
storePath, err = newTmpFolderForTracker(task) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
rollbackHolder.Add(fr.FuncRollback{Name: "DeleteStorePath", Fn: func() { | ||
_ = os.RemoveAll(storePath) | ||
|
@@ -179,7 +188,7 @@ func NewTracker( | |
mockstore.WithStoreType(mockstore.EmbedUnistore), | ||
mockstore.WithPath(storePath)) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
rollbackHolder.Add(fr.FuncRollback{Name: "CloseStore", Fn: func() { | ||
_ = store.Close() | ||
|
@@ -190,13 +199,13 @@ func NewTracker( | |
|
||
dom, err = session.BootstrapSession(store) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
rollbackHolder.Add(fr.FuncRollback{Name: "CloseDomain", Fn: dom.Close}) | ||
|
||
se, err = session.CreateSession(store) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
rollbackHolder.Add(fr.FuncRollback{Name: "CloseSession", Fn: se.Close}) | ||
|
||
|
@@ -216,13 +225,13 @@ func NewTracker( | |
log.L().Warn("can not set this variable", zap.Error(err)) | ||
continue | ||
} | ||
return nil, err | ||
return err | ||
} | ||
} | ||
for k, v := range globalVarsToSet { | ||
err = se.GetSessionVars().SetSystemVarWithRelaxedValidation(k, v) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
} | ||
// skip DDL test https://github.com/pingcap/tidb/pull/33079 | ||
|
@@ -233,22 +242,39 @@ func NewTracker( | |
// exist by default. So we need to drop it first. | ||
err = dom.DDL().DropSchema(se, model.NewCIStr("test")) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
|
||
// init downstreamTracker | ||
dsTracker := &downstreamTracker{ | ||
downstreamConn: downstreamConn, | ||
tableInfos: make(map[string]*DownstreamTableInfo), | ||
} | ||
tr.Lock() | ||
defer tr.Unlock() | ||
Comment on lines
+258
to
+259
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now this lock is used in 3 places, please refine comments in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
tr.storePath = storePath | ||
tr.store = store | ||
tr.dom = dom | ||
tr.se = se | ||
tr.dsTracker = dsTracker | ||
tr.closed.Store(false) | ||
return nil | ||
} | ||
|
||
return &Tracker{ | ||
storePath: storePath, | ||
store: store, | ||
dom: dom, | ||
se: se, | ||
dsTracker: dsTracker, | ||
}, nil | ||
// NewTracker creates a new tracker. It's preserved for test. | ||
func NewTracker( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prefer to name for test, we change them to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe leaving a function invoking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then name it explicitly as something like NewTrackerForTest. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IDE should be able to easily rename them all There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
ctx context.Context, | ||
task string, | ||
sessionCfg map[string]string, | ||
downstreamConn *dbconn.DBConn, | ||
logger log.Logger, | ||
) (*Tracker, error) { | ||
tr := NewDumpTracker() | ||
err := tr.Init(ctx, task, sessionCfg, downstreamConn, logger) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return tr, nil | ||
} | ||
|
||
func newTmpFolderForTracker(task string) (string, error) { | ||
|
@@ -392,10 +418,18 @@ func (tr *Tracker) Close() error { | |
if !tr.closed.CAS(false, true) { | ||
return nil | ||
} | ||
tr.se.Close() | ||
tr.dom.Close() | ||
if err := tr.store.Close(); err != nil { | ||
return err | ||
// Build of the Tracker and the initialization is divided. | ||
// these fields can possibly be nil if the Tracker is closed before the initialization. | ||
if tr.se != nil { | ||
tr.se.Close() | ||
} | ||
if tr.dom != nil { | ||
tr.dom.Close() | ||
} | ||
if tr.store != nil { | ||
if err := tr.store.Close(); err != nil { | ||
return err | ||
} | ||
} | ||
return os.RemoveAll(tr.storePath) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -391,6 +391,8 @@ func (s *Syncer) Init(ctx context.Context) (err error) { | |
} | ||
s.sessCtx = utils.NewSessionCtx(vars) | ||
s.exprFilterGroup = NewExprFilterGroup(s.tctx, s.sessCtx, s.cfg.ExprFilter) | ||
// create an empty Tracker and will be initialized in `Run` | ||
s.schemaTracker = schema.NewDumpTracker() | ||
|
||
if len(s.cfg.ColumnMappingRules) > 0 { | ||
s.columnMapping, err = cm.NewMapping(s.cfg.CaseSensitive, s.cfg.ColumnMappingRules) | ||
|
@@ -1696,7 +1698,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { | |
} | ||
} | ||
|
||
s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) | ||
if s.schemaTracker == nil { | ||
s.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is only to make test happy? we can left a comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. some tests set the schemaTracker themselves. Will leave a comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} else { | ||
// prevent creating new Tracker on `Run` in order to avoid | ||
// two different Trackers are invoked in the validator and the syncer. | ||
err = s.schemaTracker.Init(ctx, s.cfg.Name, s.cfg.To.Session, s.downstreamTrackConn, s.tctx.L()) | ||
} | ||
if err != nil { | ||
return terror.ErrSchemaTrackerInit.Delegate(err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the meaning of Dump? Syncer can run without dump unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8dceb81