Skip to content

Commit

Permalink
support multi-table DDLs for mq sink
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu committed May 5, 2022
1 parent a9d5d5a commit df94013
Show file tree
Hide file tree
Showing 22 changed files with 1,010 additions and 116 deletions.
5 changes: 3 additions & 2 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
func (s *schemaSnapshot) renameTables(job *timodel.Job) error {
var oldSchemaIDs, newSchemaIDs, oldTableIDs []int64
var newTableNames, oldSchemaNames []*timodel.CIStr
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &newTableNames, &oldTableIDs, &oldSchemaNames)
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs,
&newTableNames, &oldTableIDs, &oldSchemaNames)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -713,7 +714,7 @@ type SchemaStorage interface {
GetLastSnapshot() *schemaSnapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
HandleDDLJob(job *timodel.Job) error
// AdvanceResolvedTs advances the resolved
// AdvanceResolvedTs advances the resolved ts
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
Expand Down
12 changes: 12 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
return jobs[0]
}

// DDL2Jobs executes the DDL statement and return the corresponding DDL jobs.
// It is mainly used for "DROP TABLE" and "DROP VIEW" statement because
// multiple jobs will be generated after executing these two types of
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
return jobs
}

// Storage return the tikv storage
func (s *SchemaTestHelper) Storage() kv.Storage {
return s.storage
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

// GetStartTs returns StartTs if it's specified or using the CreateTime of changefeed.
// GetStartTs returns StartTs if it's specified or using the
// CreateTime of changefeed.
func (info *ChangeFeedInfo) GetStartTs() uint64 {
if info.StartTs > 0 {
return info.StartTs
Expand Down
83 changes: 65 additions & 18 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,38 +478,85 @@ type RedoDDLEvent struct {

// FromJob fills the values of DDLEvent from DDL job
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
if d.Type == model.ActionRenameTables {
return
}

// The the query for "DROP TABLE" and "DROP VIEW" statements need
// to be rebuilt. The reason is elaborated as follows:
// for a DDL statement like "DROP TABLE test1.table1, test2.table2",
// two DDL jobs will be generated. These two jobs can be differentiated
// from job.BinlogInfo.TableInfo whereas the job.Query are identical.
rebuildQuery := func() {
switch d.Type {
case model.ActionDropTable:
d.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`",
d.TableInfo.Schema, d.TableInfo.Table)
case model.ActionDropView:
d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`",
d.TableInfo.Schema, d.TableInfo.Table)
default:
d.Query = job.Query
}
}

d.TableInfo = new(SimpleTableInfo)
d.TableInfo.Schema = job.SchemaName
d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
d.Query = job.Query
d.Type = job.Type
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)

switch d.Type {
case model.ActionRenameTables:
// DDLs update multiple target tables, in which case `TableInfo` isn't meaningful.
// So we can skip to fill TableInfo for the event.
// fill TableInfo for the event.
d.fillTableInfo(job.BinlogInfo.TableInfo, job.SchemaName)
// rebuild the query if necessary
rebuildQuery()
}

// FromRenameTablesJob fills the values of DDLEvent from a rename tables DDL job
func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
oldSchemaName, newSchemaName string,
preTableInfo *TableInfo, tableInfo *model.TableInfo,
) {
if job.Type != model.ActionRenameTables {
return
default:
}

// Fill TableInfo for the event.
if job.BinlogInfo.TableInfo != nil {
tableName := job.BinlogInfo.TableInfo.Name.O
tableInfo := job.BinlogInfo.TableInfo
d.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))
d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
oldTableName := preTableInfo.Name.O
newTableName := tableInfo.Name.O
d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
oldSchemaName, oldTableName, newSchemaName, newTableName)
d.Type = model.ActionRenameTable
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)
// fill TableInfo for the event.
d.fillTableInfo(tableInfo, newSchemaName)
}

for i, colInfo := range tableInfo.Columns {
d.TableInfo.ColumnInfo[i] = new(ColumnInfo)
d.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
// fillTableInfo populate the TableInfo of an DDLEvent
func (d *DDLEvent) fillTableInfo(tableInfo *model.TableInfo,
schemaName string,
) {
if tableInfo == nil {
return
}

d.TableInfo.Table = tableName
d.TableInfo.TableID = job.TableID
d.TableInfo = new(SimpleTableInfo)
d.TableInfo.Schema = schemaName
d.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))

for i, colInfo := range tableInfo.Columns {
d.TableInfo.ColumnInfo[i] = new(ColumnInfo)
d.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}

d.TableInfo.Table = tableInfo.Name.O
d.TableInfo.TableID = tableInfo.ID
}

//
func (d *DDLEvent) fillPreTableInfo(preTableInfo *TableInfo) {
if preTableInfo == nil {
return
Expand Down
138 changes: 138 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,141 @@ func TestDDLEventFromJob(t *testing.T) {
event.FromJob(job, nil)
require.Nil(t, event.PreTableInfo)
}

func TestDDLEventFromRenameTablesJob(t *testing.T) {
job := &timodel.Job{
ID: 71,
TableID: 69,
SchemaName: "test1",
Type: timodel.ActionRenameTables,
StartTS: 432853521879007233,
Query: "rename table test1.t1 to test1.t10, test1.t2 to test1.t20",
BinlogInfo: &timodel.HistoryInfo{
FinishedTS: 432853521879007238,
MultipleTableInfos: []*timodel.TableInfo{
{
ID: 67,
Name: timodel.CIStr{O: "t10"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
{
ID: 69,
Name: timodel.CIStr{O: "t20"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
},
},
}

preTableInfo := &TableInfo{
TableName: TableName{
Schema: "test1",
Table: "t1",
TableID: 67,
},
TableInfo: &timodel.TableInfo{
ID: 67,
Name: timodel.CIStr{O: "t1"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
}

tableInfo := &timodel.TableInfo{
ID: 67,
Name: timodel.CIStr{O: "t10"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
}

event := &DDLEvent{}
event.FromRenameTablesJob(job, "test1", "test1", preTableInfo, tableInfo)
require.Equal(t, event.PreTableInfo.TableID, int64(67))
require.Equal(t, event.PreTableInfo.Table, "t1")
require.Len(t, event.PreTableInfo.ColumnInfo, 1)
require.Equal(t, event.TableInfo.TableID, int64(67))
require.Equal(t, event.TableInfo.Table, "t10")
require.Len(t, event.TableInfo.ColumnInfo, 1)
require.Equal(t, event.Query, "RENAME TABLE `test1`.`t1` TO `test1`.`t10`")

preTableInfo = &TableInfo{
TableName: TableName{
Schema: "test1",
Table: "t2",
TableID: 69,
},
TableInfo: &timodel.TableInfo{
ID: 69,
Name: timodel.CIStr{O: "t2"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
}

tableInfo = &timodel.TableInfo{
ID: 69,
Name: timodel.CIStr{O: "t20"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
}

event = &DDLEvent{}
event.FromRenameTablesJob(job, "test1", "test1", preTableInfo, tableInfo)
require.Equal(t, event.PreTableInfo.TableID, int64(69))
require.Equal(t, event.PreTableInfo.Table, "t2")
require.Len(t, event.PreTableInfo.ColumnInfo, 1)
require.Equal(t, event.TableInfo.TableID, int64(69))
require.Equal(t, event.TableInfo.Table, "t20")
require.Len(t, event.TableInfo.ColumnInfo, 1)
require.Equal(t, event.Query, "RENAME TABLE `test1`.`t2` TO `test1`.`t20`")
}
Loading

0 comments on commit df94013

Please sign in to comment.