From a9c5dca5767aab1b8da3fd4fd2d67fe9ea7200a1 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Mon, 31 Oct 2022 18:45:59 +0800 Subject: [PATCH] sink/(ticdc): Support building `TableInfo` from schema file in cloud storage (#7415) ref pingcap/tiflow#6797 --- cdc/sink/codec/csv/csv_decoder.go | 19 +++- cdc/sink/codec/csv/csv_decoder_test.go | 36 +++++++- cdc/sink/codec/csv/csv_message.go | 40 ++++----- cdc/sink/codec/csv/csv_message_test.go | 12 ++- pkg/sink/cloudstorage/table_definition.go | 89 ++++++++++++++++++- .../cloudstorage/table_definition_test.go | 22 +++-- 6 files changed, 183 insertions(+), 35 deletions(-) diff --git a/cdc/sink/codec/csv/csv_decoder.go b/cdc/sink/codec/csv/csv_decoder.go index 0f20b061fc5..5e2c024fbab 100644 --- a/cdc/sink/codec/csv/csv_decoder.go +++ b/cdc/sink/codec/csv/csv_decoder.go @@ -15,6 +15,7 @@ package csv import ( "context" + "io" "github.com/pingcap/errors" lconfig "github.com/pingcap/tidb/br/pkg/lightning/config" @@ -33,11 +34,16 @@ type batchDecoder struct { parser *mydump.CSVParser data []byte msg *csvMessage + tableInfo *model.TableInfo closed bool } // NewBatchDecoder creates a new BatchDecoder -func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []byte) (codec.EventBatchDecoder, error) { +func NewBatchDecoder(ctx context.Context, + csvConfig *config.CSVConfig, + tableInfo *model.TableInfo, + value []byte, +) (codec.EventBatchDecoder, error) { var backslashEscape bool // if quote is not set in config, we should unespace backslash @@ -61,6 +67,7 @@ func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []b } return &batchDecoder{ csvConfig: csvConfig, + tableInfo: tableInfo, data: value, msg: newCSVMessage(csvConfig), parser: csvParser, @@ -72,6 +79,9 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { err := b.parser.ReadRow() if err != nil { b.closed = true + if errors.Cause(err) == io.EOF { + return model.MessageTypeUnknown, false, nil + } return model.MessageTypeUnknown, false, err } @@ -93,7 +103,12 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if b.closed { return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found")) } - return csvMsg2RowChangedEvent(b.msg), nil + + e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns) + if err != nil { + return nil, errors.Trace(err) + } + return e, nil } // NextDDLEvent implements the EventBatchDecoder interface. diff --git a/cdc/sink/codec/csv/csv_decoder_test.go b/cdc/sink/codec/csv/csv_decoder_test.go index da14a4df4c0..0ee51a3f254 100644 --- a/cdc/sink/codec/csv/csv_decoder_test.go +++ b/cdc/sink/codec/csv/csv_decoder_test.go @@ -16,6 +16,9 @@ import ( "context" "testing" + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" @@ -29,13 +32,44 @@ func TestCSVBatchDecoder(t *testing.T) { "U","employee","hr",433305438660591630,102,"Alex","Alice","2018-06-15","Beijing" ` ctx := context.Background() + tableInfo := &model.TableInfo{ + TableName: model.TableName{ + Schema: "hr", + Table: "employee", + }, + TableInfo: &timodel.TableInfo{ + Name: timodel.NewCIStr("employee"), + Columns: []*timodel.ColumnInfo{ + { + Name: timodel.NewCIStr("Id"), + FieldType: *types.NewFieldType(mysql.TypeInt24), + }, + { + Name: timodel.NewCIStr("LastName"), + FieldType: *types.NewFieldType(mysql.TypeVarchar), + }, + { + Name: timodel.NewCIStr("FirstName"), + FieldType: *types.NewFieldType(mysql.TypeVarchar), + }, + { + Name: timodel.NewCIStr("HireDate"), + FieldType: *types.NewFieldType(mysql.TypeDate), + }, + { + Name: timodel.NewCIStr("OfficeLocation"), + FieldType: *types.NewFieldType(mysql.TypeVarchar), + }, + }, + }, + } decoder, err := NewBatchDecoder(ctx, &config.CSVConfig{ Delimiter: ",", Quote: "\"", Terminator: "\n", NullString: "\\N", IncludeCommitTs: true, - }, []byte(csvData)) + }, tableInfo, []byte(csvData)) require.Nil(t, err) for i := 0; i < 5; i++ { diff --git a/cdc/sink/codec/csv/csv_message.go b/cdc/sink/codec/csv/csv_message.go index c64ad00f0eb..e372f0c2fd0 100644 --- a/cdc/sink/codec/csv/csv_message.go +++ b/cdc/sink/codec/csv/csv_message.go @@ -20,7 +20,7 @@ import ( "strings" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/charset" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/rowcodec" @@ -130,7 +130,10 @@ func (c *csvMessage) decode(datums []types.Datum) error { fmt.Errorf("the 4th column(%s) of csv row should be a valid commit-ts", datums[3].GetString())) } c.commitTs = commitTs + } else { + c.commitTs = 0 } + c.columns = c.columns[:0] for i := 4; i < len(datums); i++ { if datums[i].IsNull() { @@ -293,7 +296,13 @@ func rowChangedEvent2CSVMsg(csvConfig *config.CSVConfig, e *model.RowChangedEven return csvMsg, nil } -func csvMsg2RowChangedEvent(csvMsg *csvMessage) *model.RowChangedEvent { +func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) { + if len(csvMsg.columns) != len(ticols) { + return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, + fmt.Errorf("the column length of csv message %d doesn't equal to that of tableInfo %d", + len(csvMsg.columns), len(ticols))) + } + e := new(model.RowChangedEvent) e.CommitTs = csvMsg.commitTs e.Table = &model.TableName{ @@ -301,12 +310,12 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage) *model.RowChangedEvent { Table: csvMsg.tableName, } if csvMsg.opType == operationDelete { - e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns) + e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns, ticols) } else { - e.Columns = csvColumns2RowChangeColumns(csvMsg.columns) + e.Columns = csvColumns2RowChangeColumns(csvMsg.columns, ticols) } - return e + return e, nil } func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) { @@ -328,26 +337,17 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn return csvColumns, nil } -func csvColumns2RowChangeColumns(csvCols []any) []*model.Column { +func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) []*model.Column { cols := make([]*model.Column, 0, len(csvCols)) - for _, csvCol := range csvCols { + for idx, csvCol := range csvCols { col := new(model.Column) col.Charset = mysql.DefaultCharset + col.Value = csvCol - if str, ok := csvCol.(string); ok { - if blob, err := base64.StdEncoding.DecodeString(str); err == nil { - col.Value = blob - col.Charset = charset.CharsetBin - } else { - col.Value = csvCol - } - } else { - col.Value = csvCol - } + ticol := ticols[idx] + col.Type = ticol.GetType() + col.Name = ticol.Name.O - tp := new(types.FieldType) - types.DefaultTypeForValue(csvCol, tp, mysql.DefaultCharset, mysql.DefaultCollationName) - col.Type = tp.GetType() cols = append(cols, col) } diff --git a/cdc/sink/codec/csv/csv_message_test.go b/cdc/sink/codec/csv/csv_message_test.go index 36fe4bffd4a..32775238a5a 100644 --- a/cdc/sink/codec/csv/csv_message_test.go +++ b/cdc/sink/codec/csv/csv_message_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/rowcodec" @@ -798,7 +799,16 @@ func TestRowChangeEventConversion(t *testing.T) { require.NotNil(t, csvMsg) require.Nil(t, err) - row2 := csvMsg2RowChangedEvent(csvMsg) + ticols := make([]*timodel.ColumnInfo, 0) + for _, col := range cols { + ticols = append(ticols, &timodel.ColumnInfo{ + Name: timodel.NewCIStr(col.Name), + FieldType: *types.NewFieldType(col.Type), + }) + } + + row2, err := csvMsg2RowChangedEvent(csvMsg, ticols) + require.Nil(t, err) require.NotNil(t, row2) } } diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index e6c1e9a17b4..e3f0ea16d43 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -16,9 +16,10 @@ import ( "strconv" "strings" + "github.com/pingcap/errors" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tiflow/cdc/model" ) @@ -26,7 +27,6 @@ import ( type TableCol struct { Name string `json:"ColumnName" ` Tp string `json:"ColumnType"` - Length string `json:"ColumnLength,omitempty"` Precision string `json:"ColumnPrecision,omitempty"` Scale string `json:"ColumnScale,omitempty"` Nullable string `json:"ColumnNullable,omitempty"` @@ -83,6 +83,69 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) { } } +// ToTiColumnInfo converts from TableCol to TiDB ColumnInfo. +func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) { + col := new(timodel.ColumnInfo) + + col.Name = timodel.NewCIStr(t.Name) + tp := types.StrToType(strings.ToLower(strings.TrimSuffix(t.Tp, " UNSIGNED"))) + col.FieldType = *types.NewFieldType(tp) + if strings.Contains(t.Tp, "UNSIGNED") { + col.SetFlag(mysql.UnsignedFlag) + } + if t.IsPK == "true" { + col.SetFlag(mysql.PriKeyFlag) + } + if t.Nullable == "false" { + col.SetFlag(mysql.NotNullFlag) + } + setFlen := func(precision string) error { + if len(precision) > 0 { + flen, err := strconv.Atoi(precision) + if err != nil { + return errors.Trace(err) + } + col.SetFlen(flen) + } + return nil + } + setDecimal := func(scale string) error { + if len(scale) > 0 { + decimal, err := strconv.Atoi(scale) + if err != nil { + return errors.Trace(err) + } + col.SetDecimal(decimal) + } + return nil + } + switch col.GetType() { + case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration: + err := setDecimal(t.Scale) + if err != nil { + return nil, errors.Trace(err) + } + case mysql.TypeDouble, mysql.TypeFloat, mysql.TypeNewDecimal: + err := setFlen(t.Precision) + if err != nil { + return nil, errors.Trace(err) + } + err = setDecimal(t.Scale) + if err != nil { + return nil, errors.Trace(err) + } + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong, + mysql.TypeBit, mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, + mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeYear: + err := setFlen(t.Precision) + if err != nil { + return nil, errors.Trace(err) + } + } + + return col, nil +} + // TableDetail is the detailed table definition used for cloud storage sink. type TableDetail struct { Table string `json:"Table"` @@ -104,3 +167,25 @@ func (t *TableDetail) FromTableInfo(info *model.TableInfo) { t.Columns = append(t.Columns, tableCol) } } + +// ToTableInfo converts from TableDetail to TableInfo. +func (t *TableDetail) ToTableInfo() (*model.TableInfo, error) { + info := &model.TableInfo{ + TableName: model.TableName{ + Schema: t.Schema, + Table: t.Table, + }, + TableInfo: &timodel.TableInfo{ + Name: timodel.NewCIStr(t.Table), + }, + } + for _, col := range t.Columns { + tiCol, err := col.ToTiColumnInfo() + if err != nil { + return nil, err + } + info.Columns = append(info.Columns, tiCol) + } + + return info, nil +} diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index 573385c99a1..b13c11ef904 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -25,14 +25,13 @@ import ( "github.com/stretchr/testify/require" ) -func TestBuildTableColFromTiColumnInfo(t *testing.T) { +func TestTableCol(t *testing.T) { testCases := []struct { name string filedType byte flen int decimal int flag uint - elems []string expected string }{ { @@ -44,11 +43,11 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) { expected: `{"ColumnName":"","ColumnType":"TIME","ColumnScale":"5"}`, }, { - name: "int(5) UNSIGNED ZEROFILL", + name: "int(5) UNSIGNED", filedType: mysql.TypeLong, flen: 5, decimal: math.MinInt, - flag: mysql.UnsignedFlag | mysql.ZerofillFlag, + flag: mysql.UnsignedFlag, expected: `{"ColumnName":"","ColumnType":"INT UNSIGNED","ColumnPrecision":"5"}`, }, { @@ -144,8 +143,8 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) { filedType: mysql.TypeLong, flen: math.MinInt, decimal: math.MinInt, - flag: 0, - expected: `{"ColumnName":"","ColumnType":"INT","ColumnPrecision":"11"}`, + flag: mysql.PriKeyFlag, + expected: `{"ColumnIsPk":"true", "ColumnName":"", "ColumnPrecision":"11", "ColumnType":"INT"}`, }, { name: "bigint(20)", @@ -222,13 +221,11 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) { { name: "enum", filedType: mysql.TypeEnum, - elems: []string{"a", "b"}, expected: `{"ColumnName":"","ColumnType":"ENUM"}`, }, { name: "set", filedType: mysql.TypeSet, - elems: []string{"a", "b"}, expected: `{"ColumnName":"","ColumnType":"SET"}`, }, { @@ -306,10 +303,13 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) { encodedCol, err := json.Marshal(tableCol) require.Nil(t, err, tc.name) require.JSONEq(t, tc.expected, string(encodedCol), tc.name) + + _, err = tableCol.ToTiColumnInfo() + require.Nil(t, err) } } -func TestBuildTableDefFromTableInfo(t *testing.T) { +func TestTableDetail(t *testing.T) { var columns []*timodel.ColumnInfo var def TableDetail @@ -376,4 +376,8 @@ func TestBuildTableDefFromTableInfo(t *testing.T) { ], "TableColumnsTotal": 4 }`, string(encodedDef)) + + tableInfo, err = def.ToTableInfo() + require.Nil(t, err) + require.Len(t, tableInfo.Columns, 4) }