From 3693dc12dbbac0441e3d542b6d349f6afc5998be Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 8 Dec 2022 14:10:04 +0800 Subject: [PATCH] sink(ticdc): Add DDL query into `schema.json` file (#7834) (#7848) ref pingcap/tiflow#6797, close pingcap/tiflow#7847 --- cdc/model/schema_storage.go | 7 +- .../cloudstorage/cloud_storage_ddl_sink.go | 10 +- .../cloud_storage_ddl_sink_test.go | 7 +- .../eventsink/cloudstorage/dml_worker.go | 239 ++++++++++++------ .../eventsink/cloudstorage/dml_worker_test.go | 61 ++++- cmd/storage-consumer/main.go | 2 +- pkg/sink/cloudstorage/table_definition.go | 52 +++- .../cloudstorage/table_definition_test.go | 65 ++++- 8 files changed, 333 insertions(+), 110 deletions(-) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 96480796b5f..746bd59648a 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -66,9 +66,10 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode TableInfo: info, SchemaID: schemaID, TableName: TableName{ - Schema: schemaName, - Table: info.Name.O, - TableID: info.ID, + Schema: schemaName, + Table: info.Name.O, + TableID: info.ID, + IsPartition: info.GetPartitionInfo() != nil, }, Version: version, columnsOffset: make(map[int64]int, len(info.Columns)), diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go index d5c340249d3..bdb6f61a37d 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go @@ -68,24 +68,24 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er return d, nil } -func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDetail) string { - return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version) +func generateSchemaPath(def cloudstorage.TableDefinition) string { + return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) } func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - var def cloudstorage.TableDetail + var def cloudstorage.TableDefinition if ddl.TableInfo.TableInfo == nil { return nil } - def.FromTableInfo(ddl.TableInfo) + def.FromDDLEvent(ddl) encodedDef, err := json.MarshalIndent(def, "", " ") if err != nil { return errors.Trace(err) } - path := d.generateSchemaPath(def) + path := generateSchemaPath(def) err = d.statistics.RecordDDLExecution(func() error { err1 := d.storage.WriteFile(ctx, path, encodedDef) if err1 != nil { diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go index 5f46fd57b9e..7e83aa5f700 100644 --- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go +++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go @@ -39,6 +39,8 @@ func TestWriteDDLEvent(t *testing.T) { require.Nil(t, err) ddlEvent := &model.DDLEvent{ + Type: timodel.ActionAddColumn, + Query: "alter table test.table1 add col2 varchar(64)", TableInfo: &model.TableInfo{ Version: 100, TableName: model.TableName{ @@ -70,7 +72,10 @@ func TestWriteDDLEvent(t *testing.T) { require.JSONEq(t, `{ "Table": "table1", "Schema": "test", - "Version": 100, + "Version": 1, + "TableVersion": 100, + "Query": "alter table test.table1 add col2 varchar(64)", + "Type": 5, "TableColumns": [ { "ColumnName": "col1", diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index de71190bfe3..f3bd341d4c0 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/metrics" mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage" + "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" @@ -45,8 +46,8 @@ type dmlWorker struct { flushNotifyCh chan flushTask // tableEvents maintains a mapping of . tableEvents *tableEventsMap - // fileIndex maintains a mapping of . - fileIndex map[versionedTable]uint64 + // fileIndex maintains a mapping of . + fileIndex map[versionedTable]*indexWithDate // fileSize maintains a mapping of . fileSize map[versionedTable]uint64 wg sync.WaitGroup @@ -54,6 +55,8 @@ type dmlWorker struct { errCh chan<- error extension string statistics *metrics.Statistics + clock clock.Clock + bufferPool sync.Pool metricWriteBytes prometheus.Gauge metricFileCount prometheus.Gauge } @@ -69,9 +72,19 @@ func newTableEventsMap() *tableEventsMap { } } +type wrappedTable struct { + tableName model.TableName + tableInfo *model.TableInfo +} + // flushTask defines a task containing the tables to be flushed. type flushTask struct { - targetTables []*model.TableInfo + targetTables []wrappedTable +} + +type indexWithDate struct { + index uint64 + currDate, prevDate string } func newDMLWorker( @@ -84,17 +97,23 @@ func newDMLWorker( errCh chan<- error, ) *dmlWorker { d := &dmlWorker{ - id: id, - changeFeedID: changefeedID, - storage: storage, - config: config, - tableEvents: newTableEventsMap(), - flushNotifyCh: make(chan flushTask, 1), - fileIndex: make(map[versionedTable]uint64), - fileSize: make(map[versionedTable]uint64), - extension: extension, - errCh: errCh, - statistics: statistics, + id: id, + changeFeedID: changefeedID, + storage: storage, + config: config, + tableEvents: newTableEventsMap(), + flushNotifyCh: make(chan flushTask, 1), + fileIndex: make(map[versionedTable]*indexWithDate), + fileSize: make(map[versionedTable]uint64), + extension: extension, + errCh: errCh, + statistics: statistics, + clock: clock.New(), + bufferPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricFileCount: mcloudstorage.CloudStorageFileCountGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID), } @@ -115,7 +134,6 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) { go func() { defer d.wg.Done() - var buf bytes.Buffer for { select { case <-ctx.Done(): @@ -124,12 +142,10 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) { if atomic.LoadUint64(&d.isClosed) == 1 { return } - for _, tableInfo := range task.targetTables { - buf.Reset() - var callbacks []func() + for _, tbl := range task.targetTables { table := versionedTable{ - TableName: tableInfo.TableName, - version: tableInfo.Version, + TableName: tbl.tableName, + version: tbl.tableInfo.Version, } d.tableEvents.mu.Lock() events := make([]eventFragment, len(d.tableEvents.fragments[table])) @@ -140,54 +156,20 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) { continue } - rowsCnt := 0 - for _, frag := range events { - msgs := frag.encodedMsgs - d.statistics.ObserveRows(frag.event.Event.Rows...) - for _, msg := range msgs { - d.metricWriteBytes.Add(float64(len(msg.Value))) - rowsCnt += msg.GetRowsCount() - buf.Write(msg.Value) - callbacks = append(callbacks, msg.Callback) - } - } - - // mandatorily generate scheme.json file before generating the first data file - if d.fileIndex[table] == 0 { - var tableDetail cloudstorage.TableDetail - tableDetail.FromTableInfo(tableInfo) - path := d.generateSchemaFilePath(tableDetail) - encodedDetail, err := json.MarshalIndent(tableDetail, "", " ") - if err != nil { - d.errCh <- err - return - } - - err = d.storage.WriteFile(ctx, path, encodedDetail) - if err != nil { - d.errCh <- err - return - } + // generate scheme.json file before generating the first data file if necessary + err := d.writeSchemaFile(ctx, table, tbl.tableInfo) + if err != nil { + d.errCh <- err + return } path := d.generateDataFilePath(table) - if err := d.statistics.RecordBatchExecution(func() (int, error) { - err := d.storage.WriteFile(ctx, path, buf.Bytes()) - if err != nil { - return 0, err - } - return rowsCnt, nil - }); err != nil { + err = d.writeDataFile(ctx, path, events) + if err != nil { d.errCh <- err return } - d.metricFileCount.Add(1) - for _, cb := range callbacks { - if cb != nil { - cb() - } - } log.Debug("write file to storage success", zap.Int("workerID", d.id), zap.String("namespace", d.changeFeedID.Namespace), zap.String("changefeed", d.changeFeedID.ID), @@ -201,11 +183,85 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) { }() } +// In order to avoid spending so much time lookuping directory and getting last write point +// (i.e. which dir and which file) when the changefeed is restarted, we'd rather switch to +// a new dir and start writing. In this case, schema file should be created in the new dir +// if it hasn't been created when a DDL event was executed. +func (d *dmlWorker) writeSchemaFile( + ctx context.Context, + table versionedTable, + tableInfo *model.TableInfo, +) error { + if _, ok := d.fileIndex[table]; !ok { + var tableDetail cloudstorage.TableDefinition + tableDetail.FromTableInfo(tableInfo) + path := generateSchemaFilePath(tableDetail) + // the file may have been created when a DDL event was executed. + exist, err := d.storage.FileExists(ctx, path) + if err != nil { + return err + } + if exist { + return nil + } + + encodedDetail, err := json.MarshalIndent(tableDetail, "", " ") + if err != nil { + return err + } + + err = d.storage.WriteFile(ctx, path, encodedDetail) + if err != nil { + return err + } + } + + return nil +} + +func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eventFragment) error { + var callbacks []func() + + rowsCnt := 0 + buf := d.bufferPool.Get().(*bytes.Buffer) + defer d.bufferPool.Put(buf) + buf.Reset() + + for _, frag := range events { + msgs := frag.encodedMsgs + d.statistics.ObserveRows(frag.event.Event.Rows...) + for _, msg := range msgs { + d.metricWriteBytes.Add(float64(len(msg.Value))) + rowsCnt += msg.GetRowsCount() + buf.Write(msg.Value) + callbacks = append(callbacks, msg.Callback) + } + } + if err := d.statistics.RecordBatchExecution(func() (int, error) { + err := d.storage.WriteFile(ctx, path, buf.Bytes()) + if err != nil { + return 0, err + } + return rowsCnt, nil + }); err != nil { + return err + } + d.metricFileCount.Add(1) + + for _, cb := range callbacks { + if cb != nil { + cb() + } + } + + return nil +} + // backgroundDispatchTasks dispatches flush tasks in two conditions: // 1. the flush interval exceeds the upper limit. // 2. the file size exceeds the upper limit. func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann[eventFragment]) { - tableSet := make(map[*model.TableInfo]struct{}) + tableSet := make(map[wrappedTable]struct{}) ticker := time.NewTicker(d.config.FlushInterval) d.wg.Add(1) @@ -222,7 +278,7 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann if atomic.LoadUint64(&d.isClosed) == 1 { return } - var readyTables []*model.TableInfo + var readyTables []wrappedTable for tbl := range tableSet { readyTables = append(readyTables, tbl) } @@ -238,14 +294,19 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann case d.flushNotifyCh <- task: log.Debug("flush task is emitted successfully when flush interval exceeds", zap.Any("tables", task.targetTables)) - for tableInfo := range tableSet { + for elem := range tableSet { + // we should get TableName using elem.tableName instead of + // elem.tableInfo.TableName because the former one contains + // the physical table id (useful for partition table) + // recorded in mounter while the later one does not. + // TODO: handle TableID of model.TableInfo.TableName properly. tbl := versionedTable{ - TableName: tableInfo.TableName, - version: tableInfo.Version, + TableName: elem.tableName, + version: elem.tableInfo.Version, } d.fileSize[tbl] = 0 } - tableSet = make(map[*model.TableInfo]struct{}) + tableSet = make(map[wrappedTable]struct{}) default: } case frag, ok := <-ch.Out(): @@ -257,7 +318,12 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann d.tableEvents.fragments[table] = append(d.tableEvents.fragments[table], frag) d.tableEvents.mu.Unlock() - tableSet[frag.event.Event.TableInfo] = struct{}{} + key := wrappedTable{ + tableName: frag.TableName, + tableInfo: frag.event.Event.TableInfo, + } + + tableSet[key] = struct{}{} for _, msg := range frag.encodedMsgs { if msg.Value != nil { d.fileSize[table] += uint64(len(msg.Value)) @@ -267,7 +333,7 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann // as soon as possible. if d.fileSize[table] > uint64(d.config.FileSize) { task := flushTask{ - targetTables: []*model.TableInfo{frag.event.Event.TableInfo}, + targetTables: []wrappedTable{key}, } select { case <-ctx.Done(): @@ -284,14 +350,14 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann }() } -func (d *dmlWorker) generateSchemaFilePath(def cloudstorage.TableDetail) string { - return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version) +func generateSchemaFilePath(def cloudstorage.TableDefinition) string { + return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion) } func (d *dmlWorker) generateDataFilePath(tbl versionedTable) string { var elems []string + var dateStr string - d.fileIndex[tbl]++ elems = append(elems, tbl.Schema) elems = append(elems, tbl.Table) elems = append(elems, fmt.Sprintf("%d", tbl.version)) @@ -299,16 +365,35 @@ func (d *dmlWorker) generateDataFilePath(tbl versionedTable) string { if d.config.EnablePartitionSeparator && tbl.TableName.IsPartition { elems = append(elems, fmt.Sprintf("%d", tbl.TableID)) } - currTime := time.Now() + currTime := d.clock.Now() switch d.config.DateSeparator { case config.DateSeparatorYear.String(): - elems = append(elems, currTime.Format("2006")) + dateStr = currTime.Format("2006") + elems = append(elems, dateStr) case config.DateSeparatorMonth.String(): - elems = append(elems, currTime.Format("2006-01")) + dateStr = currTime.Format("2006-01") + elems = append(elems, dateStr) case config.DateSeparatorDay.String(): - elems = append(elems, currTime.Format("2006-01-02")) + dateStr = currTime.Format("2006-01-02") + elems = append(elems, dateStr) + default: + } + + if idx, ok := d.fileIndex[tbl]; !ok { + d.fileIndex[tbl] = &indexWithDate{ + currDate: dateStr, + } + } else { + idx.currDate = dateStr + } + + // if date changed, reset the counter + if d.fileIndex[tbl].prevDate != d.fileIndex[tbl].currDate { + d.fileIndex[tbl].prevDate = d.fileIndex[tbl].currDate + d.fileIndex[tbl].index = 0 } - elems = append(elems, fmt.Sprintf("CDC%06d%s", d.fileIndex[tbl], d.extension)) + d.fileIndex[tbl].index++ + elems = append(elems, fmt.Sprintf("CDC%06d%s", d.fileIndex[tbl].index, d.extension)) return strings.Join(elems, "/") } diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go index 7db7f43281f..25421d9ca21 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/metrics" + "github.com/pingcap/tiflow/engine/pkg/clock" "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink" @@ -55,10 +56,11 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { return d } -func TestGenerateCloudStoragePath(t *testing.T) { +func TestGenerateDataFilePath(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := testDMLWorker(ctx, t, t.TempDir()) + dir := t.TempDir() + w := testDMLWorker(ctx, t, dir) table := versionedTable{ TableName: model.TableName{ Schema: "test", @@ -66,10 +68,64 @@ func TestGenerateCloudStoragePath(t *testing.T) { }, version: 5, } + + // date-separator: none path := w.generateDataFilePath(table) require.Equal(t, "test/table1/5/CDC000001.json", path) path = w.generateDataFilePath(table) require.Equal(t, "test/table1/5/CDC000002.json", path) + + // date-separator: year + mockClock := clock.NewMock() + w = testDMLWorker(ctx, t, dir) + w.config.DateSeparator = config.DateSeparatorYear.String() + w.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022/CDC000002.json", path) + // year changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023/CDC000002.json", path) + + // date-separator: month + mockClock = clock.NewMock() + w = testDMLWorker(ctx, t, dir) + w.config.DateSeparator = config.DateSeparatorMonth.String() + w.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022-12/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022-12/CDC000002.json", path) + // month changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023-01/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023-01/CDC000002.json", path) + + // date-separator: day + mockClock = clock.NewMock() + w = testDMLWorker(ctx, t, dir) + w.config.DateSeparator = config.DateSeparatorDay.String() + w.clock = mockClock + mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022-12-31/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2022-12-31/CDC000002.json", path) + // day changed + mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC)) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023-01-01/CDC000001.json", path) + path = w.generateDataFilePath(table) + require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path) + w.close() } @@ -80,7 +136,6 @@ func TestDMLWorkerRun(t *testing.T) { d := testDMLWorker(ctx, t, parentDir) fragCh := chann.New[eventFragment]() table1Dir := path.Join(parentDir, "test/table1/99") - os.MkdirAll(table1Dir, 0o755) d.run(ctx, fragCh) // assume table1 and table2 are dispatched to the same DML worker table1 := model.TableName{ diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go index ce480e5ee0c..d5aa4a665ce 100644 --- a/cmd/storage-consumer/main.go +++ b/cmd/storage-consumer/main.go @@ -387,7 +387,7 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan func (c *consumer) emitDMLEvents(ctx context.Context, tableID int64, pathKey dmlPathKey, content []byte) error { var ( events []*model.RowChangedEvent - tableDetail cloudstorage.TableDetail + tableDetail cloudstorage.TableDefinition decoder codec.EventBatchDecoder err error ) diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index 60c78012cfa..b8ff5981952 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) +const defaultTableDefinitionVersion = 1 + // TableCol denotes the column info for a table definition. type TableCol struct { Name string `json:"ColumnName" ` @@ -151,20 +153,46 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) { return col, nil } -// TableDetail is the detailed table definition used for cloud storage sink. -type TableDetail struct { - Table string `json:"Table"` - Schema string `json:"Schema"` - Version uint64 `json:"Version"` - Columns []TableCol `json:"TableColumns"` - TotalColumns int `json:"TableColumnsTotal"` +// TableDefinition is the detailed table definition used for cloud storage sink. +type TableDefinition struct { + Table string `json:"Table"` + Schema string `json:"Schema"` + Version uint64 `json:"Version"` + TableVersion uint64 `json:"TableVersion"` + Query string `json:"Query"` + Type timodel.ActionType `json:"Type"` + Columns []TableCol `json:"TableColumns"` + TotalColumns int `json:"TableColumnsTotal"` +} + +// FromDDLEvent converts from DDLEvent to TableDefinition. +func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent) { + t.FromTableInfo(event.TableInfo) + t.Query = event.Query + t.Type = event.Type +} + +// ToDDLEvent converts from TableDefinition to DDLEvent. +func (t *TableDefinition) ToDDLEvent() (*model.DDLEvent, error) { + tableInfo, err := t.ToTableInfo() + if err != nil { + return nil, err + } + + return &model.DDLEvent{ + TableInfo: tableInfo, + CommitTs: t.TableVersion, + Type: t.Type, + Query: t.Query, + }, nil } -// FromTableInfo converts from TableInfo to TableDetail. -func (t *TableDetail) FromTableInfo(info *model.TableInfo) { +// FromTableInfo converts from TableInfo to TableDefinition. +func (t *TableDefinition) FromTableInfo(info *model.TableInfo) { t.Table = info.TableName.Table t.Schema = info.TableName.Schema - t.Version = info.Version + t.Version = defaultTableDefinitionVersion + t.TableVersion = info.Version t.TotalColumns = len(info.Columns) for _, col := range info.Columns { var tableCol TableCol @@ -173,8 +201,8 @@ func (t *TableDetail) FromTableInfo(info *model.TableInfo) { } } -// ToTableInfo converts from TableDetail to TableInfo. -func (t *TableDetail) ToTableInfo() (*model.TableInfo, error) { +// ToTableInfo converts from TableDefinition to DDLEvent. +func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error) { info := &model.TableInfo{ TableName: model.TableName{ Schema: t.Schema, diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index 4486979421a..a4cc005ec44 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -14,7 +14,6 @@ package cloudstorage import ( "encoding/json" - "fmt" "math" "testing" @@ -319,13 +318,13 @@ func TestTableCol(t *testing.T) { require.JSONEq(t, tc.expected, string(encodedCol), tc.name) _, err = tableCol.ToTiColumnInfo() - require.Nil(t, err) + require.NoError(t, err) } } -func TestTableDetail(t *testing.T) { +func TestTableDefinition(t *testing.T) { var columns []*timodel.ColumnInfo - var def TableDetail + var def TableDefinition tableInfo := &model.TableInfo{ Version: 100, @@ -358,12 +357,57 @@ func TestTableDetail(t *testing.T) { tableInfo.TableInfo = &timodel.TableInfo{Columns: columns} def.FromTableInfo(tableInfo) encodedDef, err := json.MarshalIndent(def, "", " ") - require.Nil(t, err) - fmt.Println(string(encodedDef)) + require.NoError(t, err) require.JSONEq(t, `{ "Table": "table1", "Schema": "test", - "Version": 100, + "Version": 1, + "TableVersion": 100, + "Query": "", + "Type": 0, + "TableColumns": [ + { + "ColumnName": "Id", + "ColumnType": "INT", + "ColumnPrecision": "11", + "ColumnNullable": "false", + "ColumnIsPk": "true" + }, + { + "ColumnName": "LastName", + "ColumnType": "VARCHAR", + "ColumnPrecision": "128", + "ColumnNullable": "false" + }, + { + "ColumnName": "FirstName", + "ColumnType": "VARCHAR", + "ColumnPrecision": "64" + }, + { + "ColumnName": "Birthday", + "ColumnType": "DATETIME" + } + ], + "TableColumnsTotal": 4 + }`, string(encodedDef)) + + def = TableDefinition{} + event := &model.DDLEvent{ + Type: timodel.ActionAddColumn, + Query: "alter table test.table1 add Birthday date", + TableInfo: tableInfo, + } + def.FromDDLEvent(event) + encodedDef, err = json.MarshalIndent(def, "", " ") + require.NoError(t, err) + require.JSONEq(t, `{ + "Table": "table1", + "Schema": "test", + "Version": 1, + "TableVersion": 100, + "Query": "alter table test.table1 add Birthday date", + "Type": 5, "TableColumns": [ { "ColumnName": "Id", @@ -392,6 +436,11 @@ func TestTableDetail(t *testing.T) { }`, string(encodedDef)) tableInfo, err = def.ToTableInfo() - require.Nil(t, err) + require.NoError(t, err) require.Len(t, tableInfo.Columns, 4) + + event, err = def.ToDDLEvent() + require.NoError(t, err) + require.Equal(t, timodel.ActionAddColumn, event.Type) + require.Equal(t, uint64(100), event.CommitTs) }