Skip to content

Commit

Permalink
sink(ticdc): Use TableInfo instead of SimpleTableInfo to hold mor…
Browse files Browse the repository at this point in the history
…e information in `DDLEvent` (#7221)

ref #6797
  • Loading branch information
zhaoxinyu authored Oct 10, 2022
1 parent 68ac37f commit eb03362
Show file tree
Hide file tree
Showing 36 changed files with 592 additions and 1,372 deletions.
4 changes: 2 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestTable(t *testing.T) {
}
preTableInfo, err := snap.PreTableInfo(job)
require.Nil(t, err)
require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T"})
require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T", TableID: 2})
require.Equal(t, preTableInfo.ID, int64(2))

err = snap.HandleDDL(job)
Expand All @@ -262,7 +262,7 @@ func TestTable(t *testing.T) {
}
preTableInfo, err = snap.PreTableInfo(job)
require.Nil(t, err)
require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T"})
require.Equal(t, preTableInfo.TableName, model.TableName{Schema: "Test", Table: "T", TableID: 9})
require.Equal(t, preTableInfo.ID, int64(9))

err = snap.HandleDDL(job)
Expand Down
10 changes: 7 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ type TableInfo struct {
// WrapTableInfo creates a TableInfo from a timodel.TableInfo
func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *model.TableInfo) *TableInfo {
ti := &TableInfo{
TableInfo: info,
SchemaID: schemaID,
TableName: TableName{Schema: schemaName, Table: info.Name.O},
TableInfo: info,
SchemaID: schemaID,
TableName: TableName{
Schema: schemaName,
Table: info.Name.O,
TableID: info.ID,
},
TableInfoVersion: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
indicesOffset: make(map[int64]int, len(info.Indices)),
Expand Down
86 changes: 10 additions & 76 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,36 +466,13 @@ func ColumnValueString(c interface{}) string {
return data
}

// ColumnInfo represents the name and type information passed to the sink
type ColumnInfo struct {
Name string `msg:"name"`
Type byte `msg:"type"`
}

// FromTiColumnInfo populates cdc's ColumnInfo from TiDB's model.ColumnInfo
func (c *ColumnInfo) FromTiColumnInfo(tiColumnInfo *model.ColumnInfo) {
c.Type = tiColumnInfo.GetType()
c.Name = tiColumnInfo.Name.O
}

// SimpleTableInfo is the simplified table info passed to the sink
type SimpleTableInfo struct {
// db name
Schema string `msg:"schema"`
// table name
Table string `msg:"table"`
// table ID
TableID int64 `msg:"table-id"`
ColumnInfo []*ColumnInfo `msg:"column-info"`
}

// DDLEvent stores DDL event
type DDLEvent struct {
StartTs uint64 `msg:"start-ts"`
CommitTs uint64 `msg:"commit-ts"`
TableInfo *SimpleTableInfo `msg:"table-info"`
PreTableInfo *SimpleTableInfo `msg:"pre-table-info"`
Query string `msg:"query"`
TableInfo *TableInfo `msg:"-"`
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
}
Expand All @@ -507,7 +484,7 @@ type RedoDDLEvent struct {
}

// FromJob fills the values with DDLEvent from DDL job
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo, tableInfo *TableInfo) {
// populating DDLEvent of an `rename tables` job is handled in `FromRenameTablesJob()`
if d.Type == model.ActionRenameTables {
return
Expand All @@ -521,9 +498,9 @@ func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
rebuildQuery := func() {
switch d.Type {
case model.ActionDropTable:
d.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", d.TableInfo.Schema, d.TableInfo.Table)
d.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", d.TableInfo.TableName.Schema, d.TableInfo.TableName.Table)
case model.ActionDropView:
d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.Schema, d.TableInfo.Table)
d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.TableName.Schema, d.TableInfo.TableName.Table)
default:
d.Query = job.Query
}
Expand All @@ -532,18 +509,16 @@ func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
d.Type = job.Type
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)
// fill TableInfo for the event.
d.fillTableInfo(job.BinlogInfo.TableInfo, job.SchemaName)
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo
// rebuild the query if necessary
rebuildQuery()
}

// FromRenameTablesJob fills the values of DDLEvent from a rename tables DDL job
func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
oldSchemaName, newSchemaName string,
preTableInfo *TableInfo, tableInfo *model.TableInfo,
preTableInfo *TableInfo, tableInfo *TableInfo,
) {
if job.Type != model.ActionRenameTables {
return
Expand All @@ -556,49 +531,8 @@ func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
oldSchemaName, oldTableName, newSchemaName, newTableName)
d.Type = model.ActionRenameTable
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)
// fill TableInfo for the event.
d.fillTableInfo(tableInfo, newSchemaName)
}

// fillTableInfo populates the TableInfo of an DDLEvent
func (d *DDLEvent) fillTableInfo(tableInfo *model.TableInfo,
schemaName string,
) {
// `TableInfo` field of `DDLEvent` should always not be nil
d.TableInfo = new(SimpleTableInfo)
d.TableInfo.Schema = schemaName

if tableInfo == nil {
return
}

d.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))
for i, colInfo := range tableInfo.Columns {
d.TableInfo.ColumnInfo[i] = new(ColumnInfo)
d.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}

d.TableInfo.Table = tableInfo.Name.O
d.TableInfo.TableID = tableInfo.ID
}

// fillPreTableInfo populates the PreTableInfo of an event
func (d *DDLEvent) fillPreTableInfo(preTableInfo *TableInfo) {
if preTableInfo == nil {
return
}
d.PreTableInfo = new(SimpleTableInfo)
d.PreTableInfo.Schema = preTableInfo.TableName.Schema
d.PreTableInfo.Table = preTableInfo.TableName.Table
d.PreTableInfo.TableID = preTableInfo.ID

d.PreTableInfo.ColumnInfo = make([]*ColumnInfo, len(preTableInfo.Columns))
for i, colInfo := range preTableInfo.Columns {
d.PreTableInfo.ColumnInfo[i] = new(ColumnInfo)
d.PreTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo
}

// SingleTableTxn represents a transaction which includes many row events in a single table
Expand Down
Loading

0 comments on commit eb03362

Please sign in to comment.