diff --git a/cdc/sink/codec/csv/csv_decoder.go b/cdc/sink/codec/csv/csv_decoder.go index d42affffe47..b311f0d7e1d 100644 --- a/cdc/sink/codec/csv/csv_decoder.go +++ b/cdc/sink/codec/csv/csv_decoder.go @@ -26,6 +26,8 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +const defaultIOConcurrency = 1 + type batchDecoder struct { csvConfig *config.CSVConfig parser *mydump.CSVParser @@ -38,6 +40,8 @@ type batchDecoder struct { func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []byte) (codec.EventBatchDecoder, error) { var backslashEscape bool + // if quote is not set in config, we should unespace backslash + // when parsing csv columns. if len(csvConfig.Quote) == 0 { backslashEscape = true } @@ -51,13 +55,14 @@ func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []b csvParser, err := mydump.NewCSVParser(ctx, cfg, mydump.NewStringReader(string(value)), int64(lconfig.ReadBlockSize), - worker.NewPool(ctx, 1, "io"), false, nil) + worker.NewPool(ctx, defaultIOConcurrency, "io"), false, nil) if err != nil { return nil, err } return &batchDecoder{ csvConfig: csvConfig, data: value, + msg: newCSVMessage(csvConfig), parser: csvParser, }, nil } @@ -71,11 +76,9 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { } row := b.parser.LastRow() - csvMsg := newCSVMessage(b.csvConfig) - if err = csvMsg.decode(row.Row); err != nil { + if err = b.msg.decode(row.Row); err != nil { return model.MessageTypeUnknown, false, errors.Trace(err) } - b.msg = csvMsg return model.MessageTypeRow, true, nil } diff --git a/cdc/sink/codec/csv/csv_message.go b/cdc/sink/codec/csv/csv_message.go index e46192dd149..c64ad00f0eb 100644 --- a/cdc/sink/codec/csv/csv_message.go +++ b/cdc/sink/codec/csv/csv_message.go @@ -29,6 +29,9 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +// a csv row should at least contain operation-type, table-name, schema-name and one table column +const minimumColsCnt = 4 + // operation specifies the operation type type operation int @@ -109,7 +112,7 @@ func (c *csvMessage) encode() []byte { } func (c *csvMessage) decode(datums []types.Datum) error { - if len(datums) < 4 { + if len(datums) < minimumColsCnt { return cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("the csv row should have at least four columns"+ "(operation-type, table-name, schema-name, commit-ts)"))