diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go
index 96480796b5f..746bd59648a 100644
--- a/cdc/model/schema_storage.go
+++ b/cdc/model/schema_storage.go
@@ -66,9 +66,10 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
TableInfo: info,
SchemaID: schemaID,
TableName: TableName{
- Schema: schemaName,
- Table: info.Name.O,
- TableID: info.ID,
+ Schema: schemaName,
+ Table: info.Name.O,
+ TableID: info.ID,
+ IsPartition: info.GetPartitionInfo() != nil,
},
Version: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
index d5c340249d3..bdb6f61a37d 100644
--- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
+++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
@@ -68,24 +68,24 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
return d, nil
}
-func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDetail) string {
- return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version)
+func generateSchemaPath(def cloudstorage.TableDefinition) string {
+ return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion)
}
func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
- var def cloudstorage.TableDetail
+ var def cloudstorage.TableDefinition
if ddl.TableInfo.TableInfo == nil {
return nil
}
- def.FromTableInfo(ddl.TableInfo)
+ def.FromDDLEvent(ddl)
encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
}
- path := d.generateSchemaPath(def)
+ path := generateSchemaPath(def)
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
diff --git a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
index 5f46fd57b9e..7e83aa5f700 100644
--- a/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
+++ b/cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink_test.go
@@ -39,6 +39,8 @@ func TestWriteDDLEvent(t *testing.T) {
require.Nil(t, err)
ddlEvent := &model.DDLEvent{
+ Type: timodel.ActionAddColumn,
+ Query: "alter table test.table1 add col2 varchar(64)",
TableInfo: &model.TableInfo{
Version: 100,
TableName: model.TableName{
@@ -70,7 +72,10 @@ func TestWriteDDLEvent(t *testing.T) {
require.JSONEq(t, `{
"Table": "table1",
"Schema": "test",
- "Version": 100,
+ "Version": 1,
+ "TableVersion": 100,
+ "Query": "alter table test.table1 add col2 varchar(64)",
+ "Type": 5,
"TableColumns": [
{
"ColumnName": "col1",
diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
index de71190bfe3..f3bd341d4c0 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
@@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
+ "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
@@ -45,8 +46,8 @@ type dmlWorker struct {
flushNotifyCh chan flushTask
// tableEvents maintains a mapping of
.
tableEvents *tableEventsMap
- // fileIndex maintains a mapping of .
- fileIndex map[versionedTable]uint64
+ // fileIndex maintains a mapping of .
+ fileIndex map[versionedTable]*indexWithDate
// fileSize maintains a mapping of .
fileSize map[versionedTable]uint64
wg sync.WaitGroup
@@ -54,6 +55,8 @@ type dmlWorker struct {
errCh chan<- error
extension string
statistics *metrics.Statistics
+ clock clock.Clock
+ bufferPool sync.Pool
metricWriteBytes prometheus.Gauge
metricFileCount prometheus.Gauge
}
@@ -69,9 +72,19 @@ func newTableEventsMap() *tableEventsMap {
}
}
+type wrappedTable struct {
+ tableName model.TableName
+ tableInfo *model.TableInfo
+}
+
// flushTask defines a task containing the tables to be flushed.
type flushTask struct {
- targetTables []*model.TableInfo
+ targetTables []wrappedTable
+}
+
+type indexWithDate struct {
+ index uint64
+ currDate, prevDate string
}
func newDMLWorker(
@@ -84,17 +97,23 @@ func newDMLWorker(
errCh chan<- error,
) *dmlWorker {
d := &dmlWorker{
- id: id,
- changeFeedID: changefeedID,
- storage: storage,
- config: config,
- tableEvents: newTableEventsMap(),
- flushNotifyCh: make(chan flushTask, 1),
- fileIndex: make(map[versionedTable]uint64),
- fileSize: make(map[versionedTable]uint64),
- extension: extension,
- errCh: errCh,
- statistics: statistics,
+ id: id,
+ changeFeedID: changefeedID,
+ storage: storage,
+ config: config,
+ tableEvents: newTableEventsMap(),
+ flushNotifyCh: make(chan flushTask, 1),
+ fileIndex: make(map[versionedTable]*indexWithDate),
+ fileSize: make(map[versionedTable]uint64),
+ extension: extension,
+ errCh: errCh,
+ statistics: statistics,
+ clock: clock.New(),
+ bufferPool: sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+ },
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}
@@ -115,7 +134,6 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
go func() {
defer d.wg.Done()
- var buf bytes.Buffer
for {
select {
case <-ctx.Done():
@@ -124,12 +142,10 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
if atomic.LoadUint64(&d.isClosed) == 1 {
return
}
- for _, tableInfo := range task.targetTables {
- buf.Reset()
- var callbacks []func()
+ for _, tbl := range task.targetTables {
table := versionedTable{
- TableName: tableInfo.TableName,
- version: tableInfo.Version,
+ TableName: tbl.tableName,
+ version: tbl.tableInfo.Version,
}
d.tableEvents.mu.Lock()
events := make([]eventFragment, len(d.tableEvents.fragments[table]))
@@ -140,54 +156,20 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
continue
}
- rowsCnt := 0
- for _, frag := range events {
- msgs := frag.encodedMsgs
- d.statistics.ObserveRows(frag.event.Event.Rows...)
- for _, msg := range msgs {
- d.metricWriteBytes.Add(float64(len(msg.Value)))
- rowsCnt += msg.GetRowsCount()
- buf.Write(msg.Value)
- callbacks = append(callbacks, msg.Callback)
- }
- }
-
- // mandatorily generate scheme.json file before generating the first data file
- if d.fileIndex[table] == 0 {
- var tableDetail cloudstorage.TableDetail
- tableDetail.FromTableInfo(tableInfo)
- path := d.generateSchemaFilePath(tableDetail)
- encodedDetail, err := json.MarshalIndent(tableDetail, "", " ")
- if err != nil {
- d.errCh <- err
- return
- }
-
- err = d.storage.WriteFile(ctx, path, encodedDetail)
- if err != nil {
- d.errCh <- err
- return
- }
+ // generate scheme.json file before generating the first data file if necessary
+ err := d.writeSchemaFile(ctx, table, tbl.tableInfo)
+ if err != nil {
+ d.errCh <- err
+ return
}
path := d.generateDataFilePath(table)
- if err := d.statistics.RecordBatchExecution(func() (int, error) {
- err := d.storage.WriteFile(ctx, path, buf.Bytes())
- if err != nil {
- return 0, err
- }
- return rowsCnt, nil
- }); err != nil {
+ err = d.writeDataFile(ctx, path, events)
+ if err != nil {
d.errCh <- err
return
}
- d.metricFileCount.Add(1)
- for _, cb := range callbacks {
- if cb != nil {
- cb()
- }
- }
log.Debug("write file to storage success", zap.Int("workerID", d.id),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID),
@@ -201,11 +183,85 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
}()
}
+// In order to avoid spending so much time lookuping directory and getting last write point
+// (i.e. which dir and which file) when the changefeed is restarted, we'd rather switch to
+// a new dir and start writing. In this case, schema file should be created in the new dir
+// if it hasn't been created when a DDL event was executed.
+func (d *dmlWorker) writeSchemaFile(
+ ctx context.Context,
+ table versionedTable,
+ tableInfo *model.TableInfo,
+) error {
+ if _, ok := d.fileIndex[table]; !ok {
+ var tableDetail cloudstorage.TableDefinition
+ tableDetail.FromTableInfo(tableInfo)
+ path := generateSchemaFilePath(tableDetail)
+ // the file may have been created when a DDL event was executed.
+ exist, err := d.storage.FileExists(ctx, path)
+ if err != nil {
+ return err
+ }
+ if exist {
+ return nil
+ }
+
+ encodedDetail, err := json.MarshalIndent(tableDetail, "", " ")
+ if err != nil {
+ return err
+ }
+
+ err = d.storage.WriteFile(ctx, path, encodedDetail)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (d *dmlWorker) writeDataFile(ctx context.Context, path string, events []eventFragment) error {
+ var callbacks []func()
+
+ rowsCnt := 0
+ buf := d.bufferPool.Get().(*bytes.Buffer)
+ defer d.bufferPool.Put(buf)
+ buf.Reset()
+
+ for _, frag := range events {
+ msgs := frag.encodedMsgs
+ d.statistics.ObserveRows(frag.event.Event.Rows...)
+ for _, msg := range msgs {
+ d.metricWriteBytes.Add(float64(len(msg.Value)))
+ rowsCnt += msg.GetRowsCount()
+ buf.Write(msg.Value)
+ callbacks = append(callbacks, msg.Callback)
+ }
+ }
+ if err := d.statistics.RecordBatchExecution(func() (int, error) {
+ err := d.storage.WriteFile(ctx, path, buf.Bytes())
+ if err != nil {
+ return 0, err
+ }
+ return rowsCnt, nil
+ }); err != nil {
+ return err
+ }
+ d.metricFileCount.Add(1)
+
+ for _, cb := range callbacks {
+ if cb != nil {
+ cb()
+ }
+ }
+
+ return nil
+}
+
// backgroundDispatchTasks dispatches flush tasks in two conditions:
// 1. the flush interval exceeds the upper limit.
// 2. the file size exceeds the upper limit.
func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann[eventFragment]) {
- tableSet := make(map[*model.TableInfo]struct{})
+ tableSet := make(map[wrappedTable]struct{})
ticker := time.NewTicker(d.config.FlushInterval)
d.wg.Add(1)
@@ -222,7 +278,7 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann
if atomic.LoadUint64(&d.isClosed) == 1 {
return
}
- var readyTables []*model.TableInfo
+ var readyTables []wrappedTable
for tbl := range tableSet {
readyTables = append(readyTables, tbl)
}
@@ -238,14 +294,19 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann
case d.flushNotifyCh <- task:
log.Debug("flush task is emitted successfully when flush interval exceeds",
zap.Any("tables", task.targetTables))
- for tableInfo := range tableSet {
+ for elem := range tableSet {
+ // we should get TableName using elem.tableName instead of
+ // elem.tableInfo.TableName because the former one contains
+ // the physical table id (useful for partition table)
+ // recorded in mounter while the later one does not.
+ // TODO: handle TableID of model.TableInfo.TableName properly.
tbl := versionedTable{
- TableName: tableInfo.TableName,
- version: tableInfo.Version,
+ TableName: elem.tableName,
+ version: elem.tableInfo.Version,
}
d.fileSize[tbl] = 0
}
- tableSet = make(map[*model.TableInfo]struct{})
+ tableSet = make(map[wrappedTable]struct{})
default:
}
case frag, ok := <-ch.Out():
@@ -257,7 +318,12 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann
d.tableEvents.fragments[table] = append(d.tableEvents.fragments[table], frag)
d.tableEvents.mu.Unlock()
- tableSet[frag.event.Event.TableInfo] = struct{}{}
+ key := wrappedTable{
+ tableName: frag.TableName,
+ tableInfo: frag.event.Event.TableInfo,
+ }
+
+ tableSet[key] = struct{}{}
for _, msg := range frag.encodedMsgs {
if msg.Value != nil {
d.fileSize[table] += uint64(len(msg.Value))
@@ -267,7 +333,7 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann
// as soon as possible.
if d.fileSize[table] > uint64(d.config.FileSize) {
task := flushTask{
- targetTables: []*model.TableInfo{frag.event.Event.TableInfo},
+ targetTables: []wrappedTable{key},
}
select {
case <-ctx.Done():
@@ -284,14 +350,14 @@ func (d *dmlWorker) backgroundDispatchTasks(ctx context.Context, ch *chann.Chann
}()
}
-func (d *dmlWorker) generateSchemaFilePath(def cloudstorage.TableDetail) string {
- return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version)
+func generateSchemaFilePath(def cloudstorage.TableDefinition) string {
+ return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.TableVersion)
}
func (d *dmlWorker) generateDataFilePath(tbl versionedTable) string {
var elems []string
+ var dateStr string
- d.fileIndex[tbl]++
elems = append(elems, tbl.Schema)
elems = append(elems, tbl.Table)
elems = append(elems, fmt.Sprintf("%d", tbl.version))
@@ -299,16 +365,35 @@ func (d *dmlWorker) generateDataFilePath(tbl versionedTable) string {
if d.config.EnablePartitionSeparator && tbl.TableName.IsPartition {
elems = append(elems, fmt.Sprintf("%d", tbl.TableID))
}
- currTime := time.Now()
+ currTime := d.clock.Now()
switch d.config.DateSeparator {
case config.DateSeparatorYear.String():
- elems = append(elems, currTime.Format("2006"))
+ dateStr = currTime.Format("2006")
+ elems = append(elems, dateStr)
case config.DateSeparatorMonth.String():
- elems = append(elems, currTime.Format("2006-01"))
+ dateStr = currTime.Format("2006-01")
+ elems = append(elems, dateStr)
case config.DateSeparatorDay.String():
- elems = append(elems, currTime.Format("2006-01-02"))
+ dateStr = currTime.Format("2006-01-02")
+ elems = append(elems, dateStr)
+ default:
+ }
+
+ if idx, ok := d.fileIndex[tbl]; !ok {
+ d.fileIndex[tbl] = &indexWithDate{
+ currDate: dateStr,
+ }
+ } else {
+ idx.currDate = dateStr
+ }
+
+ // if date changed, reset the counter
+ if d.fileIndex[tbl].prevDate != d.fileIndex[tbl].currDate {
+ d.fileIndex[tbl].prevDate = d.fileIndex[tbl].currDate
+ d.fileIndex[tbl].index = 0
}
- elems = append(elems, fmt.Sprintf("CDC%06d%s", d.fileIndex[tbl], d.extension))
+ d.fileIndex[tbl].index++
+ elems = append(elems, fmt.Sprintf("CDC%06d%s", d.fileIndex[tbl].index, d.extension))
return strings.Join(elems, "/")
}
diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
index 7db7f43281f..25421d9ca21 100644
--- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
+++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
@@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
+ "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
@@ -55,10 +56,11 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
return d
}
-func TestGenerateCloudStoragePath(t *testing.T) {
+func TestGenerateDataFilePath(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- w := testDMLWorker(ctx, t, t.TempDir())
+ dir := t.TempDir()
+ w := testDMLWorker(ctx, t, dir)
table := versionedTable{
TableName: model.TableName{
Schema: "test",
@@ -66,10 +68,64 @@ func TestGenerateCloudStoragePath(t *testing.T) {
},
version: 5,
}
+
+ // date-separator: none
path := w.generateDataFilePath(table)
require.Equal(t, "test/table1/5/CDC000001.json", path)
path = w.generateDataFilePath(table)
require.Equal(t, "test/table1/5/CDC000002.json", path)
+
+ // date-separator: year
+ mockClock := clock.NewMock()
+ w = testDMLWorker(ctx, t, dir)
+ w.config.DateSeparator = config.DateSeparatorYear.String()
+ w.clock = mockClock
+ mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022/CDC000002.json", path)
+ // year changed
+ mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023/CDC000002.json", path)
+
+ // date-separator: month
+ mockClock = clock.NewMock()
+ w = testDMLWorker(ctx, t, dir)
+ w.config.DateSeparator = config.DateSeparatorMonth.String()
+ w.clock = mockClock
+ mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022-12/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022-12/CDC000002.json", path)
+ // month changed
+ mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023-01/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023-01/CDC000002.json", path)
+
+ // date-separator: day
+ mockClock = clock.NewMock()
+ w = testDMLWorker(ctx, t, dir)
+ w.config.DateSeparator = config.DateSeparatorDay.String()
+ w.clock = mockClock
+ mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022-12-31/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2022-12-31/CDC000002.json", path)
+ // day changed
+ mockClock.Set(time.Date(2023, 1, 1, 0, 0, 20, 0, time.UTC))
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023-01-01/CDC000001.json", path)
+ path = w.generateDataFilePath(table)
+ require.Equal(t, "test/table1/5/2023-01-01/CDC000002.json", path)
+
w.close()
}
@@ -80,7 +136,6 @@ func TestDMLWorkerRun(t *testing.T) {
d := testDMLWorker(ctx, t, parentDir)
fragCh := chann.New[eventFragment]()
table1Dir := path.Join(parentDir, "test/table1/99")
- os.MkdirAll(table1Dir, 0o755)
d.run(ctx, fragCh)
// assume table1 and table2 are dispatched to the same DML worker
table1 := model.TableName{
diff --git a/cmd/storage-consumer/main.go b/cmd/storage-consumer/main.go
index ce480e5ee0c..d5aa4a665ce 100644
--- a/cmd/storage-consumer/main.go
+++ b/cmd/storage-consumer/main.go
@@ -387,7 +387,7 @@ func (c *consumer) getNewFiles(ctx context.Context) (map[dmlPathKey]fileIndexRan
func (c *consumer) emitDMLEvents(ctx context.Context, tableID int64, pathKey dmlPathKey, content []byte) error {
var (
events []*model.RowChangedEvent
- tableDetail cloudstorage.TableDetail
+ tableDetail cloudstorage.TableDefinition
decoder codec.EventBatchDecoder
err error
)
diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go
index 60c78012cfa..b8ff5981952 100644
--- a/pkg/sink/cloudstorage/table_definition.go
+++ b/pkg/sink/cloudstorage/table_definition.go
@@ -24,6 +24,8 @@ import (
"github.com/pingcap/tiflow/cdc/model"
)
+const defaultTableDefinitionVersion = 1
+
// TableCol denotes the column info for a table definition.
type TableCol struct {
Name string `json:"ColumnName" `
@@ -151,20 +153,46 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) {
return col, nil
}
-// TableDetail is the detailed table definition used for cloud storage sink.
-type TableDetail struct {
- Table string `json:"Table"`
- Schema string `json:"Schema"`
- Version uint64 `json:"Version"`
- Columns []TableCol `json:"TableColumns"`
- TotalColumns int `json:"TableColumnsTotal"`
+// TableDefinition is the detailed table definition used for cloud storage sink.
+type TableDefinition struct {
+ Table string `json:"Table"`
+ Schema string `json:"Schema"`
+ Version uint64 `json:"Version"`
+ TableVersion uint64 `json:"TableVersion"`
+ Query string `json:"Query"`
+ Type timodel.ActionType `json:"Type"`
+ Columns []TableCol `json:"TableColumns"`
+ TotalColumns int `json:"TableColumnsTotal"`
+}
+
+// FromDDLEvent converts from DDLEvent to TableDefinition.
+func (t *TableDefinition) FromDDLEvent(event *model.DDLEvent) {
+ t.FromTableInfo(event.TableInfo)
+ t.Query = event.Query
+ t.Type = event.Type
+}
+
+// ToDDLEvent converts from TableDefinition to DDLEvent.
+func (t *TableDefinition) ToDDLEvent() (*model.DDLEvent, error) {
+ tableInfo, err := t.ToTableInfo()
+ if err != nil {
+ return nil, err
+ }
+
+ return &model.DDLEvent{
+ TableInfo: tableInfo,
+ CommitTs: t.TableVersion,
+ Type: t.Type,
+ Query: t.Query,
+ }, nil
}
-// FromTableInfo converts from TableInfo to TableDetail.
-func (t *TableDetail) FromTableInfo(info *model.TableInfo) {
+// FromTableInfo converts from TableInfo to TableDefinition.
+func (t *TableDefinition) FromTableInfo(info *model.TableInfo) {
t.Table = info.TableName.Table
t.Schema = info.TableName.Schema
- t.Version = info.Version
+ t.Version = defaultTableDefinitionVersion
+ t.TableVersion = info.Version
t.TotalColumns = len(info.Columns)
for _, col := range info.Columns {
var tableCol TableCol
@@ -173,8 +201,8 @@ func (t *TableDetail) FromTableInfo(info *model.TableInfo) {
}
}
-// ToTableInfo converts from TableDetail to TableInfo.
-func (t *TableDetail) ToTableInfo() (*model.TableInfo, error) {
+// ToTableInfo converts from TableDefinition to DDLEvent.
+func (t *TableDefinition) ToTableInfo() (*model.TableInfo, error) {
info := &model.TableInfo{
TableName: model.TableName{
Schema: t.Schema,
diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go
index 4486979421a..a4cc005ec44 100644
--- a/pkg/sink/cloudstorage/table_definition_test.go
+++ b/pkg/sink/cloudstorage/table_definition_test.go
@@ -14,7 +14,6 @@ package cloudstorage
import (
"encoding/json"
- "fmt"
"math"
"testing"
@@ -319,13 +318,13 @@ func TestTableCol(t *testing.T) {
require.JSONEq(t, tc.expected, string(encodedCol), tc.name)
_, err = tableCol.ToTiColumnInfo()
- require.Nil(t, err)
+ require.NoError(t, err)
}
}
-func TestTableDetail(t *testing.T) {
+func TestTableDefinition(t *testing.T) {
var columns []*timodel.ColumnInfo
- var def TableDetail
+ var def TableDefinition
tableInfo := &model.TableInfo{
Version: 100,
@@ -358,12 +357,57 @@ func TestTableDetail(t *testing.T) {
tableInfo.TableInfo = &timodel.TableInfo{Columns: columns}
def.FromTableInfo(tableInfo)
encodedDef, err := json.MarshalIndent(def, "", " ")
- require.Nil(t, err)
- fmt.Println(string(encodedDef))
+ require.NoError(t, err)
require.JSONEq(t, `{
"Table": "table1",
"Schema": "test",
- "Version": 100,
+ "Version": 1,
+ "TableVersion": 100,
+ "Query": "",
+ "Type": 0,
+ "TableColumns": [
+ {
+ "ColumnName": "Id",
+ "ColumnType": "INT",
+ "ColumnPrecision": "11",
+ "ColumnNullable": "false",
+ "ColumnIsPk": "true"
+ },
+ {
+ "ColumnName": "LastName",
+ "ColumnType": "VARCHAR",
+ "ColumnPrecision": "128",
+ "ColumnNullable": "false"
+ },
+ {
+ "ColumnName": "FirstName",
+ "ColumnType": "VARCHAR",
+ "ColumnPrecision": "64"
+ },
+ {
+ "ColumnName": "Birthday",
+ "ColumnType": "DATETIME"
+ }
+ ],
+ "TableColumnsTotal": 4
+ }`, string(encodedDef))
+
+ def = TableDefinition{}
+ event := &model.DDLEvent{
+ Type: timodel.ActionAddColumn,
+ Query: "alter table test.table1 add Birthday date",
+ TableInfo: tableInfo,
+ }
+ def.FromDDLEvent(event)
+ encodedDef, err = json.MarshalIndent(def, "", " ")
+ require.NoError(t, err)
+ require.JSONEq(t, `{
+ "Table": "table1",
+ "Schema": "test",
+ "Version": 1,
+ "TableVersion": 100,
+ "Query": "alter table test.table1 add Birthday date",
+ "Type": 5,
"TableColumns": [
{
"ColumnName": "Id",
@@ -392,6 +436,11 @@ func TestTableDetail(t *testing.T) {
}`, string(encodedDef))
tableInfo, err = def.ToTableInfo()
- require.Nil(t, err)
+ require.NoError(t, err)
require.Len(t, tableInfo.Columns, 4)
+
+ event, err = def.ToDDLEvent()
+ require.NoError(t, err)
+ require.Equal(t, timodel.ActionAddColumn, event.Type)
+ require.Equal(t, uint64(100), event.CommitTs)
}