Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/(cdc): Support multi-table DDL dispatching for MQ sink #5329

Merged
merged 26 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f7235bc
Merge branch 'master' into support_rename_table
zhaoxinyu May 5, 2022
bd66939
fix some lint problems
zhaoxinyu May 5, 2022
8898780
fix a integration test
zhaoxinyu May 5, 2022
0c1565b
fix some comments problem
zhaoxinyu May 5, 2022
46fe490
address comments
zhaoxinyu May 5, 2022
94825ef
Merge branch 'master' into support_rename_table
zhaoxinyu May 5, 2022
e56c848
fix a merge conflict problem
zhaoxinyu May 6, 2022
962691b
Merge branch 'master' into support_rename_table
zhaoxinyu May 6, 2022
4c5f53e
address comments and fix a consumer bug
zhaoxinyu May 6, 2022
eb629fd
Merge branch 'master' into support_rename_table
zhaoxinyu May 6, 2022
a7b87f4
address comments
zhaoxinyu May 6, 2022
cb10971
fix a map initializing problem
zhaoxinyu May 6, 2022
9213b6b
fix an integration test failure
zhaoxinyu May 6, 2022
bf02648
fix an integration test sql
zhaoxinyu May 7, 2022
832e8d5
remove two redundant lines
zhaoxinyu May 8, 2022
ac706ef
Merge branch 'master' into support_rename_table
asddongmen May 9, 2022
2d37762
modify an integration test
zhaoxinyu May 9, 2022
8cfbc02
modify an integration test
zhaoxinyu May 9, 2022
a3a01d8
modify an integration test sql
zhaoxinyu May 9, 2022
b647558
fix a field init problem
zhaoxinyu May 10, 2022
b35140f
Merge branch 'master' into support_rename_table
zhaoxinyu May 10, 2022
1788599
Merge branch 'master' into support_rename_table
ti-chi-bot May 10, 2022
dae115c
Merge branch 'master' into support_rename_table
ti-chi-bot May 10, 2022
d23ee5b
Merge branch 'master' into support_rename_table
ti-chi-bot May 10, 2022
a5d87b9
Merge branch 'master' into support_rename_table
ti-chi-bot May 10, 2022
a90db0a
fix a flaky kafka integration test
zhaoxinyu May 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type SchemaStorage interface {
GetLastSnapshot() *schema.Snapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
HandleDDLJob(job *timodel.Job) error
// AdvanceResolvedTs advances the resolved
// AdvanceResolvedTs advances the resolved ts
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
Expand Down
12 changes: 12 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
return jobs[0]
}

// DDL2Jobs executes the DDL statement and return the corresponding DDL jobs.
// It is mainly used for "DROP TABLE" and "DROP VIEW" statement because
// multiple jobs will be generated after executing these two types of
// DDL statements.
func (s *SchemaTestHelper) DDL2Jobs(ddl string, jobCnt int) []*timodel.Job {
s.tk.MustExec(ddl)
jobs, err := s.GetCurrentMeta().GetLastNHistoryDDLJobs(jobCnt)
require.Nil(s.t, err)
require.Len(s.t, jobs, jobCnt)
return jobs
}

// Storage return the tikv storage
func (s *SchemaTestHelper) Storage() kv.Storage {
return s.storage
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

// GetStartTs returns StartTs if it's specified or using the CreateTime of changefeed.
// GetStartTs returns StartTs if it's specified or using the
// CreateTime of changefeed.
func (info *ChangeFeedInfo) GetStartTs() uint64 {
if info.StartTs > 0 {
return info.StartTs
Expand Down
85 changes: 65 additions & 20 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,38 +478,83 @@ type RedoDDLEvent struct {

// FromJob fills the values of DDLEvent from DDL job
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo) {
d.TableInfo = new(SimpleTableInfo)
d.TableInfo.Schema = job.SchemaName
// populating DDLEvent of a rename tables job is handled in `FromRenameTablesJob()`
if d.Type == model.ActionRenameTables {
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
return
}

// The query for "DROP TABLE" and "DROP VIEW" statements need
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
// to be rebuilt. The reason is elaborated as follows:
// for a DDL statement like "DROP TABLE test1.table1, test2.table2",
// two DDL jobs will be generated. These two jobs can be differentiated
// from job.BinlogInfo.TableInfo whereas the job.Query are identical.
rebuildQuery := func() {
switch d.Type {
case model.ActionDropTable:
d.Query = fmt.Sprintf("DROP TABLE `%s`.`%s`", d.TableInfo.Schema, d.TableInfo.Table)
case model.ActionDropView:
d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.Schema, d.TableInfo.Table)
default:
d.Query = job.Query
}
}

d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
d.Query = job.Query
d.Type = job.Type
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)

switch d.Type {
case model.ActionRenameTables:
// DDLs update multiple target tables, in which case `TableInfo` isn't meaningful.
// So we can skip to fill TableInfo for the event.
// fill TableInfo for the event.
d.fillTableInfo(job.BinlogInfo.TableInfo, job.SchemaName)
// rebuild the query if necessary
rebuildQuery()
}

// FromRenameTablesJob fills the values of DDLEvent from a rename tables DDL job
func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
oldSchemaName, newSchemaName string,
preTableInfo *TableInfo, tableInfo *model.TableInfo,
) {
if job.Type != model.ActionRenameTables {
return
default:
}

// Fill TableInfo for the event.
if job.BinlogInfo.TableInfo != nil {
tableName := job.BinlogInfo.TableInfo.Name.O
tableInfo := job.BinlogInfo.TableInfo
d.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))
d.StartTs = job.StartTS
d.CommitTs = job.BinlogInfo.FinishedTS
oldTableName := preTableInfo.Name.O
newTableName := tableInfo.Name.O
d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`",
oldSchemaName, oldTableName, newSchemaName, newTableName)
d.Type = model.ActionRenameTable
// fill PreTableInfo for the event.
d.fillPreTableInfo(preTableInfo)
// fill TableInfo for the event.
d.fillTableInfo(tableInfo, newSchemaName)
}

for i, colInfo := range tableInfo.Columns {
d.TableInfo.ColumnInfo[i] = new(ColumnInfo)
d.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
// fillTableInfo populates the TableInfo of an DDLEvent
func (d *DDLEvent) fillTableInfo(tableInfo *model.TableInfo,
schemaName string,
) {
// `TableInfo` field of `DDLEvent` should always not be nil
d.TableInfo = new(SimpleTableInfo)
d.TableInfo.Schema = schemaName

d.TableInfo.Table = tableName
d.TableInfo.TableID = job.TableID
if tableInfo == nil {
return
}

d.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))
for i, colInfo := range tableInfo.Columns {
d.TableInfo.ColumnInfo[i] = new(ColumnInfo)
d.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}

d.TableInfo.Table = tableInfo.Name.O
d.TableInfo.TableID = tableInfo.ID
}

// fillPreTableInfo populates the PreTableInfo of an event
func (d *DDLEvent) fillPreTableInfo(preTableInfo *TableInfo) {
if preTableInfo == nil {
return
Expand Down
138 changes: 138 additions & 0 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,141 @@ func TestDDLEventFromJob(t *testing.T) {
event.FromJob(job, nil)
require.Nil(t, event.PreTableInfo)
}

func TestDDLEventFromRenameTablesJob(t *testing.T) {
job := &timodel.Job{
ID: 71,
TableID: 69,
SchemaName: "test1",
Type: timodel.ActionRenameTables,
StartTS: 432853521879007233,
Query: "rename table test1.t1 to test1.t10, test1.t2 to test1.t20",
BinlogInfo: &timodel.HistoryInfo{
FinishedTS: 432853521879007238,
MultipleTableInfos: []*timodel.TableInfo{
{
ID: 67,
Name: timodel.CIStr{O: "t10"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
{
ID: 69,
Name: timodel.CIStr{O: "t20"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
},
},
}

preTableInfo := &TableInfo{
TableName: TableName{
Schema: "test1",
Table: "t1",
TableID: 67,
},
TableInfo: &timodel.TableInfo{
ID: 67,
Name: timodel.CIStr{O: "t1"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
}

tableInfo := &timodel.TableInfo{
ID: 67,
Name: timodel.CIStr{O: "t10"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
}

event := &DDLEvent{}
event.FromRenameTablesJob(job, "test1", "test1", preTableInfo, tableInfo)
require.Equal(t, event.PreTableInfo.TableID, int64(67))
require.Equal(t, event.PreTableInfo.Table, "t1")
require.Len(t, event.PreTableInfo.ColumnInfo, 1)
require.Equal(t, event.TableInfo.TableID, int64(67))
require.Equal(t, event.TableInfo.Table, "t10")
require.Len(t, event.TableInfo.ColumnInfo, 1)
require.Equal(t, event.Query, "RENAME TABLE `test1`.`t1` TO `test1`.`t10`")

preTableInfo = &TableInfo{
TableName: TableName{
Schema: "test1",
Table: "t2",
TableID: 69,
},
TableInfo: &timodel.TableInfo{
ID: 69,
Name: timodel.CIStr{O: "t2"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
},
}

tableInfo = &timodel.TableInfo{
ID: 69,
Name: timodel.CIStr{O: "t20"},
Columns: []*timodel.ColumnInfo{
{
ID: 1,
Name: timodel.CIStr{O: "id"},
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag | mysql.UniqueFlag,
},
State: timodel.StatePublic,
},
},
}

event = &DDLEvent{}
event.FromRenameTablesJob(job, "test1", "test1", preTableInfo, tableInfo)
require.Equal(t, event.PreTableInfo.TableID, int64(69))
require.Equal(t, event.PreTableInfo.Table, "t2")
require.Len(t, event.PreTableInfo.ColumnInfo, 1)
require.Equal(t, event.TableInfo.TableID, int64(69))
require.Equal(t, event.TableInfo.Table, "t20")
require.Len(t, event.TableInfo.ColumnInfo, 1)
require.Equal(t, event.Query, "RENAME TABLE `test1`.`t2` TO `test1`.`t20`")
}
Loading