Skip to content

Commit

Permalink
sink(ticdc): Add DDL query into schema.json file (#7834) (#7848)
Browse files Browse the repository at this point in the history
ref #6797, close #7847
  • Loading branch information
ti-chi-bot committed Dec 8, 2022
1 parent 075dab2 commit 3693dc1
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 110 deletions.
7 changes: 4 additions & 3 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
TableInfo: info,
SchemaID: schemaID,
TableName: TableName{
Schema: schemaName,
Table: info.Name.O,
TableID: info.ID,
Schema: schemaName,
Table: info.Name.O,
TableID: info.ID,
IsPartition: info.GetPartitionInfo() != nil,
},
Version: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
Expand Down
10 changes: 5 additions & 5 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
return d, nil
}

func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDetail) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version)
func generateSchemaPath(def cloudstorage.TableDefinition) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion)
}

func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
var def cloudstorage.TableDetail
var def cloudstorage.TableDefinition

if ddl.TableInfo.TableInfo == nil {
return nil
}

def.FromTableInfo(ddl.TableInfo)
def.FromDDLEvent(ddl)
encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
}

path := d.generateSchemaPath(def)
path := generateSchemaPath(def)
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func TestWriteDDLEvent(t *testing.T) {
require.Nil(t, err)

ddlEvent := &model.DDLEvent{
Type: timodel.ActionAddColumn,
Query: "alter table test.table1 add col2 varchar(64)",
TableInfo: &model.TableInfo{
Version: 100,
TableName: model.TableName{
Expand Down Expand Up @@ -70,7 +72,10 @@ func TestWriteDDLEvent(t *testing.T) {
require.JSONEq(t, `{
"Table": "table1",
"Schema": "test",
"Version": 100,
"Version": 1,
"TableVersion": 100,
"Query": "alter table test.table1 add col2 varchar(64)",
"Type": 5,
"TableColumns": [
{
"ColumnName": "col1",
Expand Down
Loading

0 comments on commit 3693dc1

Please sign in to comment.