Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm/syncer: multiple rows use downstream schema (#3308) #3953

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8128f38
This is an automated cherry-pick of #3168
WizardXiao Nov 3, 2021
cab561d
commit-message: merge conflict
WizardXiao Nov 4, 2021
0c86101
This is an automated cherry-pick of #3308
WizardXiao Nov 9, 2021
f57b50f
commit-message: fix conflict
WizardXiao Dec 17, 2021
5751cf1
commit-message: fix compile error
WizardXiao Dec 17, 2021
6b78b5f
commit-message: fix conflict
WizardXiao Dec 20, 2021
195a4da
commit-message: add comment
WizardXiao Dec 20, 2021
2597e7e
commit-message: fix error code in release
WizardXiao Dec 20, 2021
9a50336
commit-message: fix error code order
WizardXiao Dec 20, 2021
c4d1ce8
commit-message: fix import tiflow conflict
WizardXiao Dec 20, 2021
118be7a
commit-message: fix format
WizardXiao Dec 20, 2021
e6d3f2c
commit-message: fix format
WizardXiao Dec 20, 2021
44f4671
commit-message: merge with 3168
WizardXiao Dec 20, 2021
860b16e
commit-message: fix error in make check
WizardXiao Dec 20, 2021
704f88d
commit-message: fix integration test and make check
WizardXiao Dec 20, 2021
3d211b2
Merge branch 'cherry-pick-3168-to-release-5.3' of https://github.com/…
WizardXiao Dec 20, 2021
a675c70
Merge branch 'release-5.3' of https://github.com/pingcap/tiflow into …
WizardXiao Dec 20, 2021
1af3b9d
commit-message: fix conflict with 3256
WizardXiao Dec 21, 2021
3eb4921
Merge branch 'cherry-pick-3308-to-release-5.3' of https://github.com/…
WizardXiao Dec 21, 2021
5257d59
Merge branch 'release-5.3' into cherry-pick-3308-to-release-5.3
ti-chi-bot Dec 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 38 additions & 35 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ type Tracker struct {
type downstreamTracker struct {
downstreamConn *dbconn.DBConn // downstream connection
stmtParser *parser.Parser // statement parser
tableInfos map[string]*downstreamTableInfo // downstream table infos
tableInfos map[string]*DownstreamTableInfo // downstream table infos
}

// downstreamTableInfo contains tableinfo and index cache.
type downstreamTableInfo struct {
tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
indexCache *model.IndexInfo // index cache include pk/uk(not null)
availableUKCache []*model.IndexInfo // index cache include uks(data not null)
// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -181,7 +181,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*downstreamTableInfo),
tableInfos: make(map[string]*DownstreamTableInfo),
}

return &Tracker{
Expand Down Expand Up @@ -375,9 +375,9 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}

// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index.
// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) {
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
Expand All @@ -387,32 +387,28 @@ func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = getDownStreamTi(ti, originTi)
dti = GetDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti.indexCache, nil
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.availableUKCache) == 0 {
if !ok || len(dti.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for i, uk := range dti.availableUKCache {
for _, uk := range dti.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
if i != 0 {
// exchange available uk to the first of the array to reduce judgements for next row
dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
}
return uk
}
}
Expand Down Expand Up @@ -487,37 +483,37 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// getDownStreamTi constructs downstreamTable index cache by tableinfo.
func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo {
// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
indexCache *model.IndexInfo
availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices))
hasPk = false
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
}

for _, idx := range ti.Indices {
for i, idx := range ti.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
indexCache = indexRedirect
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if idx.Unique {
} else if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
// second check not null unique key
if !hasPk && isSpecifiedIndexColumn(idx, fn) {
indexCache = indexRedirect
} else {
availableUKCache = append(availableUKCache, indexRedirect)
}
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}

Expand All @@ -526,14 +522,21 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
indexCache = exPk
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

return &downstreamTableInfo{
tableInfo: ti,
indexCache: indexCache,
availableUKCache: availableUKCache,
// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: ti,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

Expand Down
Loading