Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu committed Oct 13, 2022
1 parent 23717cb commit 04638f6
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
11 changes: 7 additions & 4 deletions cdc/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const defaultIOConcurrency = 1

type batchDecoder struct {
csvConfig *config.CSVConfig
parser *mydump.CSVParser
Expand All @@ -38,6 +40,8 @@ type batchDecoder struct {
func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []byte) (codec.EventBatchDecoder, error) {
var backslashEscape bool

// if quote is not set in config, we should unespace backslash
// when parsing csv columns.
if len(csvConfig.Quote) == 0 {
backslashEscape = true
}
Expand All @@ -51,13 +55,14 @@ func NewBatchDecoder(ctx context.Context, csvConfig *config.CSVConfig, value []b
csvParser, err := mydump.NewCSVParser(ctx, cfg,
mydump.NewStringReader(string(value)),
int64(lconfig.ReadBlockSize),
worker.NewPool(ctx, 1, "io"), false, nil)
worker.NewPool(ctx, defaultIOConcurrency, "io"), false, nil)
if err != nil {
return nil, err
}
return &batchDecoder{
csvConfig: csvConfig,
data: value,
msg: newCSVMessage(csvConfig),
parser: csvParser,
}, nil
}
Expand All @@ -71,11 +76,9 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
}

row := b.parser.LastRow()
csvMsg := newCSVMessage(b.csvConfig)
if err = csvMsg.decode(row.Row); err != nil {
if err = b.msg.decode(row.Row); err != nil {
return model.MessageTypeUnknown, false, errors.Trace(err)
}
b.msg = csvMsg

return model.MessageTypeRow, true, nil
}
Expand Down
5 changes: 4 additions & 1 deletion cdc/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// a csv row should at least contain operation-type, table-name, schema-name and one table column
const minimumColsCnt = 4

// operation specifies the operation type
type operation int

Expand Down Expand Up @@ -109,7 +112,7 @@ func (c *csvMessage) encode() []byte {
}

func (c *csvMessage) decode(datums []types.Datum) error {
if len(datums) < 4 {
if len(datums) < minimumColsCnt {
return cerror.WrapError(cerror.ErrCSVDecodeFailed,
errors.New("the csv row should have at least four columns"+
"(operation-type, table-name, schema-name, commit-ts)"))
Expand Down

0 comments on commit 04638f6

Please sign in to comment.