Skip to content

Commit

Permalink
*(dm): move downstream table info logic into sqlmodel (#4885)
Browse files Browse the repository at this point in the history
close #4287
  • Loading branch information
lance6716 authored Mar 23, 2022
1 parent 171a8bf commit 14d01a8
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 425 deletions.
157 changes: 8 additions & 149 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -44,6 +43,7 @@ import (
dmterror "github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/sqlmodel"
)

const (
Expand Down Expand Up @@ -78,9 +78,8 @@ type downstreamTracker struct {

// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
WhereHandle *sqlmodel.WhereHandle
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -430,7 +429,7 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {

// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTI *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
Expand All @@ -440,39 +439,15 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = GetDownStreamTI(downstreamTI, originTi)
dti = &DownstreamTableInfo{
TableInfo: downstreamTI,
WhereHandle: sqlmodel.GetWhereHandle(originTI, downstreamTI),
}
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
}
}
return nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
func (tr *Tracker) RemoveDownstreamSchema(tctx *tcontext.Context, targetTables []*filter.Table) {
if len(targetTables) == 0 {
Expand Down Expand Up @@ -541,119 +516,3 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
tr.dsTracker.stmtParser = stmtParser
return nil
}

// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
// second check not null unique key
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}

// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

// redirectIndexKeys redirect index's columns offset in origin tableinfo.
func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo {
if index == nil || originTi == nil {
return nil
}

columns := make([]*model.IndexColumn, 0, len(index.Columns))
for _, key := range index.Columns {
originColumn := model.FindColumnInfo(originTi.Columns, key.Name.L)
if originColumn == nil {
return nil
}
column := &model.IndexColumn{
Name: key.Name,
Offset: originColumn.Offset,
Length: key.Length,
}
columns = append(columns, column)
}
return &model.IndexInfo{
Table: index.Table,
Unique: index.Unique,
Primary: index.Primary,
State: index.State,
Tp: index.Tp,
Columns: columns,
}
}

// handlePkExCase is handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
func handlePkExCase(ti *model.TableInfo) *model.IndexInfo {
if pk := ti.GetPkColInfo(); pk != nil {
return &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: pk.Name,
Offset: pk.Offset,
Length: types.UnspecifiedLength,
}},
}
}
return nil
}

// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'.
func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool {
for _, col := range index.Columns {
if !fn(col.Offset) {
return false
}
}
return true
}
Loading

0 comments on commit 14d01a8

Please sign in to comment.