Skip to content

Commit

Permalink
sink/(ticdc): Support building TableInfo from schema file in cloud …
Browse files Browse the repository at this point in the history
…storage (#7415)

ref #6797
  • Loading branch information
zhaoxinyu committed Oct 31, 2022
1 parent 243f2a5 commit a9c5dca
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 35 deletions.
19 changes: 17 additions & 2 deletions cdc/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package csv

import (
"context"
"io"

"github.com/pingcap/errors"
lconfig "github.com/pingcap/tidb/br/pkg/lightning/config"
Expand All @@ -33,11 +34,16 @@ type batchDecoder struct {
parser *mydump.CSVParser
data []byte
msg *csvMessage
tableInfo *model.TableInfo
closed bool
}

// NewBatchDecoder creates a new BatchDecoder
func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []byte) (codec.EventBatchDecoder, error) {
func NewBatchDecoder(ctx context.Context,
csvConfig *config.CSVConfig,
tableInfo *model.TableInfo,
value []byte,
) (codec.EventBatchDecoder, error) {
var backslashEscape bool

// if quote is not set in config, we should unespace backslash
Expand All @@ -61,6 +67,7 @@ func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []b
}
return &batchDecoder{
csvConfig: csvConfig,
tableInfo: tableInfo,
data: value,
msg: newCSVMessage(csvConfig),
parser: csvParser,
Expand All @@ -72,6 +79,9 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
err := b.parser.ReadRow()
if err != nil {
b.closed = true
if errors.Cause(err) == io.EOF {
return model.MessageTypeUnknown, false, nil
}
return model.MessageTypeUnknown, false, err
}

Expand All @@ -93,7 +103,12 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if b.closed {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}
return csvMsg2RowChangedEvent(b.msg), nil

e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
}
return e, nil
}

// NextDDLEvent implements the EventBatchDecoder interface.
Expand Down
36 changes: 35 additions & 1 deletion cdc/sink/codec/csv/csv_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"context"
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
Expand All @@ -29,13 +32,44 @@ func TestCSVBatchDecoder(t *testing.T) {
"U","employee","hr",433305438660591630,102,"Alex","Alice","2018-06-15","Beijing"
`
ctx := context.Background()
tableInfo := &model.TableInfo{
TableName: model.TableName{
Schema: "hr",
Table: "employee",
},
TableInfo: &timodel.TableInfo{
Name: timodel.NewCIStr("employee"),
Columns: []*timodel.ColumnInfo{
{
Name: timodel.NewCIStr("Id"),
FieldType: *types.NewFieldType(mysql.TypeInt24),
},
{
Name: timodel.NewCIStr("LastName"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
{
Name: timodel.NewCIStr("FirstName"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
{
Name: timodel.NewCIStr("HireDate"),
FieldType: *types.NewFieldType(mysql.TypeDate),
},
{
Name: timodel.NewCIStr("OfficeLocation"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
},
}
decoder, err := NewBatchDecoder(ctx, &config.CSVConfig{
Delimiter: ",",
Quote: "\"",
Terminator: "\n",
NullString: "\\N",
IncludeCommitTs: true,
}, []byte(csvData))
}, tableInfo, []byte(csvData))
require.Nil(t, err)

for i := 0; i < 5; i++ {
Expand Down
40 changes: 20 additions & 20 deletions cdc/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -130,7 +130,10 @@ func (c *csvMessage) decode(datums []types.Datum) error {
fmt.Errorf("the 4th column(%s) of csv row should be a valid commit-ts", datums[3].GetString()))
}
c.commitTs = commitTs
} else {
c.commitTs = 0
}
c.columns = c.columns[:0]

for i := 4; i < len(datums); i++ {
if datums[i].IsNull() {
Expand Down Expand Up @@ -293,20 +296,26 @@ func rowChangedEvent2CSVMsg(csvConfig *config.CSVConfig, e *model.RowChangedEven
return csvMsg, nil
}

func csvMsg2RowChangedEvent(csvMsg *csvMessage) *model.RowChangedEvent {
func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) {
if len(csvMsg.columns) != len(ticols) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed,
fmt.Errorf("the column length of csv message %d doesn't equal to that of tableInfo %d",
len(csvMsg.columns), len(ticols)))
}

e := new(model.RowChangedEvent)
e.CommitTs = csvMsg.commitTs
e.Table = &model.TableName{
Schema: csvMsg.schemaName,
Table: csvMsg.tableName,
}
if csvMsg.opType == operationDelete {
e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns)
e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
} else {
e.Columns = csvColumns2RowChangeColumns(csvMsg.columns)
e.Columns = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
}

return e
return e, nil
}

func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) {
Expand All @@ -328,26 +337,17 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn
return csvColumns, nil
}

func csvColumns2RowChangeColumns(csvCols []any) []*model.Column {
func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) []*model.Column {
cols := make([]*model.Column, 0, len(csvCols))
for _, csvCol := range csvCols {
for idx, csvCol := range csvCols {
col := new(model.Column)
col.Charset = mysql.DefaultCharset
col.Value = csvCol

if str, ok := csvCol.(string); ok {
if blob, err := base64.StdEncoding.DecodeString(str); err == nil {
col.Value = blob
col.Charset = charset.CharsetBin
} else {
col.Value = csvCol
}
} else {
col.Value = csvCol
}
ticol := ticols[idx]
col.Type = ticol.GetType()
col.Name = ticol.Name.O

tp := new(types.FieldType)
types.DefaultTypeForValue(csvCol, tp, mysql.DefaultCharset, mysql.DefaultCollationName)
col.Type = tp.GetType()
cols = append(cols, col)
}

Expand Down
12 changes: 11 additions & 1 deletion cdc/sink/codec/csv/csv_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -798,7 +799,16 @@ func TestRowChangeEventConversion(t *testing.T) {
require.NotNil(t, csvMsg)
require.Nil(t, err)

row2 := csvMsg2RowChangedEvent(csvMsg)
ticols := make([]*timodel.ColumnInfo, 0)
for _, col := range cols {
ticols = append(ticols, &timodel.ColumnInfo{
Name: timodel.NewCIStr(col.Name),
FieldType: *types.NewFieldType(col.Type),
})
}

row2, err := csvMsg2RowChangedEvent(csvMsg, ticols)
require.Nil(t, err)
require.NotNil(t, row2)
}
}
Expand Down
89 changes: 87 additions & 2 deletions pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ import (
"strconv"
"strings"

"github.com/pingcap/errors"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
)

// TableCol denotes the column info for a table definition.
type TableCol struct {
Name string `json:"ColumnName" `
Tp string `json:"ColumnType"`
Length string `json:"ColumnLength,omitempty"`
Precision string `json:"ColumnPrecision,omitempty"`
Scale string `json:"ColumnScale,omitempty"`
Nullable string `json:"ColumnNullable,omitempty"`
Expand Down Expand Up @@ -83,6 +83,69 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) {
}
}

// ToTiColumnInfo converts from TableCol to TiDB ColumnInfo.
func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) {
col := new(timodel.ColumnInfo)

col.Name = timodel.NewCIStr(t.Name)
tp := types.StrToType(strings.ToLower(strings.TrimSuffix(t.Tp, " UNSIGNED")))
col.FieldType = *types.NewFieldType(tp)
if strings.Contains(t.Tp, "UNSIGNED") {
col.SetFlag(mysql.UnsignedFlag)
}
if t.IsPK == "true" {
col.SetFlag(mysql.PriKeyFlag)
}
if t.Nullable == "false" {
col.SetFlag(mysql.NotNullFlag)
}
setFlen := func(precision string) error {
if len(precision) > 0 {
flen, err := strconv.Atoi(precision)
if err != nil {
return errors.Trace(err)
}
col.SetFlen(flen)
}
return nil
}
setDecimal := func(scale string) error {
if len(scale) > 0 {
decimal, err := strconv.Atoi(scale)
if err != nil {
return errors.Trace(err)
}
col.SetDecimal(decimal)
}
return nil
}
switch col.GetType() {
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:
err := setDecimal(t.Scale)
if err != nil {
return nil, errors.Trace(err)
}
case mysql.TypeDouble, mysql.TypeFloat, mysql.TypeNewDecimal:
err := setFlen(t.Precision)
if err != nil {
return nil, errors.Trace(err)
}
err = setDecimal(t.Scale)
if err != nil {
return nil, errors.Trace(err)
}
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong,
mysql.TypeBit, mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob,
mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeYear:
err := setFlen(t.Precision)
if err != nil {
return nil, errors.Trace(err)
}
}

return col, nil
}

// TableDetail is the detailed table definition used for cloud storage sink.
type TableDetail struct {
Table string `json:"Table"`
Expand All @@ -104,3 +167,25 @@ func (t *TableDetail) FromTableInfo(info *model.TableInfo) {
t.Columns = append(t.Columns, tableCol)
}
}

// ToTableInfo converts from TableDetail to TableInfo.
func (t *TableDetail) ToTableInfo() (*model.TableInfo, error) {
info := &model.TableInfo{
TableName: model.TableName{
Schema: t.Schema,
Table: t.Table,
},
TableInfo: &timodel.TableInfo{
Name: timodel.NewCIStr(t.Table),
},
}
for _, col := range t.Columns {
tiCol, err := col.ToTiColumnInfo()
if err != nil {
return nil, err
}
info.Columns = append(info.Columns, tiCol)
}

return info, nil
}
Loading

0 comments on commit a9c5dca

Please sign in to comment.