diff --git a/cdc/sink/codec/csv/csv_message.go b/cdc/sink/codec/csv/csv_message.go index e372f0c2fd0..6f7d78a3b8f 100644 --- a/cdc/sink/codec/csv/csv_message.go +++ b/cdc/sink/codec/csv/csv_message.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" @@ -226,8 +227,47 @@ func (c *csvMessage) formatValue(value any, strBuilder *strings.Builder) { } } -// convertToCSVType converts column from TiDB type to csv type. -func convertToCSVType(col *model.Column, ft *types.FieldType) (any, error) { +func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) { + str, ok := csvVal.(string) + if !ok { + return csvVal, nil + } + + switch ft.GetType() { + case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, + mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + if ft.GetCharset() == charset.CharsetBin { + blob, err := base64.StdEncoding.DecodeString(str) + return blob, err + } + return []byte(str), nil + case mysql.TypeFloat: + val, err := strconv.ParseFloat(str, 32) + return val, err + case mysql.TypeDouble: + val, err := strconv.ParseFloat(str, 64) + return val, err + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong: + if mysql.HasUnsignedFlag(ft.GetFlag()) { + val, err := strconv.ParseUint(str, 10, 64) + return val, err + } + val, err := strconv.ParseInt(str, 10, 64) + return val, err + case mysql.TypeBit: + val, err := strconv.ParseUint(str, 10, 64) + return val, err + default: + return str, nil + } +} + +// fromColValToCsvVal converts column from TiDB type to csv type. +func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) { + if col.Value == nil { + return nil, nil + } + switch col.Type { case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: @@ -297,6 +337,7 @@ func rowChangedEvent2CSVMsg(csvConfig *config.CSVConfig, e *model.RowChangedEven } func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) { + var err error if len(csvMsg.columns) != len(ticols) { return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, fmt.Errorf("the column length of csv message %d doesn't equal to that of tableInfo %d", @@ -310,9 +351,13 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (* Table: csvMsg.tableName, } if csvMsg.opType == operationDelete { - e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns, ticols) + e.PreColumns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols) } else { - e.Columns = csvColumns2RowChangeColumns(csvMsg.columns, ticols) + e.Columns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols) + } + + if err != nil { + return nil, err } return e, nil @@ -327,7 +372,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn continue } - converted, err := convertToCSVType(column, colInfos[i].Ft) + converted, err := fromColValToCsvVal(column, colInfos[i].Ft) if err != nil { return nil, errors.Trace(err) } @@ -337,19 +382,27 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn return csvColumns, nil } -func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) []*model.Column { +func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) { cols := make([]*model.Column, 0, len(csvCols)) for idx, csvCol := range csvCols { col := new(model.Column) - col.Charset = mysql.DefaultCharset - col.Value = csvCol ticol := ticols[idx] col.Type = ticol.GetType() + col.Charset = ticol.GetCharset() col.Name = ticol.Name.O + if mysql.HasPriKeyFlag(ticol.GetFlag()) { + col.Flag.SetIsHandleKey() + col.Flag.SetIsPrimaryKey() + } + val, err := fromCsvValToColValue(csvCol, ticol.FieldType) + if err != nil { + return cols, err + } + col.Value = val cols = append(cols, col) } - return cols + return cols, nil } diff --git a/cdc/sink/codec/csv/csv_message_test.go b/cdc/sink/codec/csv/csv_message_test.go index 32775238a5a..e2aaa7f4380 100644 --- a/cdc/sink/codec/csv/csv_message_test.go +++ b/cdc/sink/codec/csv/csv_message_test.go @@ -18,6 +18,7 @@ import ( "strings" "testing" + "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" @@ -760,7 +761,7 @@ func TestCSVMessageEncode(t *testing.T) { func TestConvertToCSVType(t *testing.T) { for _, group := range csvTestColumnsGroup { for _, c := range group { - val, _ := convertToCSVType(&c.col, c.colInfo.Ft) + val, _ := fromColValToCsvVal(&c.col, c.colInfo.Ft) require.Equal(t, c.want, val, c.col.Name) } } @@ -801,10 +802,16 @@ func TestRowChangeEventConversion(t *testing.T) { ticols := make([]*timodel.ColumnInfo, 0) for _, col := range cols { - ticols = append(ticols, &timodel.ColumnInfo{ + ticol := &timodel.ColumnInfo{ Name: timodel.NewCIStr(col.Name), FieldType: *types.NewFieldType(col.Type), - }) + } + if col.Flag.IsBinary() { + ticol.SetCharset(charset.CharsetBin) + } else { + ticol.SetCharset(mysql.DefaultCharset) + } + ticols = append(ticols, ticol) } row2, err := csvMsg2RowChangedEvent(csvMsg, ticols) diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index ceca5c432cc..dc8dc0d47d7 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -192,8 +192,8 @@ func (d *dmlWorker) backgroundDispatchMsgs(ctx context.Context, ch *chann.Chann[ tableSet = make(map[versionedTable]struct{}) default: } - case frag := <-ch.Out(): - if atomic.LoadUint64(&d.isClosed) == 1 { + case frag, ok := <-ch.Out(): + if !ok || atomic.LoadUint64(&d.isClosed) == 1 { return } table := frag.versionedTable diff --git a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go index 9ee41bb548d..deab4d5b50d 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go @@ -63,8 +63,8 @@ func (w *encodingWorker) run(ctx context.Context, msgChan *chann.Chann[eventFrag select { case <-ctx.Done(): return - case frag := <-msgChan.Out(): - if atomic.LoadUint64(&w.isClosed) == 1 { + case frag, ok := <-msgChan.Out(): + if !ok || atomic.LoadUint64(&w.isClosed) == 1 { return } err := w.encodeEvents(ctx, frag) diff --git a/pkg/sink/cloudstorage/table_definition.go b/pkg/sink/cloudstorage/table_definition.go index e3f0ea16d43..5cac26c154e 100644 --- a/pkg/sink/cloudstorage/table_definition.go +++ b/pkg/sink/cloudstorage/table_definition.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" @@ -53,7 +54,6 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) { if mysql.HasUnsignedFlag(col.GetFlag()) { t.Tp += " UNSIGNED" } - if mysql.HasPriKeyFlag(col.GetFlag()) { t.IsPK = "true" } @@ -91,13 +91,18 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) { tp := types.StrToType(strings.ToLower(strings.TrimSuffix(t.Tp, " UNSIGNED"))) col.FieldType = *types.NewFieldType(tp) if strings.Contains(t.Tp, "UNSIGNED") { - col.SetFlag(mysql.UnsignedFlag) + col.AddFlag(mysql.UnsignedFlag) } if t.IsPK == "true" { - col.SetFlag(mysql.PriKeyFlag) + col.AddFlag(mysql.PriKeyFlag) } if t.Nullable == "false" { - col.SetFlag(mysql.NotNullFlag) + col.AddFlag(mysql.NotNullFlag) + } + if strings.Contains(t.Tp, "BLOB") || strings.Contains(t.Tp, "BINARY") { + col.SetCharset(charset.CharsetBin) + } else { + col.SetCharset(charset.CharsetUTF8MB4) } setFlen := func(precision string) error { if len(precision) > 0 { diff --git a/pkg/sink/cloudstorage/table_definition_test.go b/pkg/sink/cloudstorage/table_definition_test.go index b13c11ef904..24f2c408bf4 100644 --- a/pkg/sink/cloudstorage/table_definition_test.go +++ b/pkg/sink/cloudstorage/table_definition_test.go @@ -18,6 +18,7 @@ import ( "math" "testing" + "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" @@ -32,6 +33,7 @@ func TestTableCol(t *testing.T) { flen int decimal int flag uint + charset string expected string }{ { @@ -194,6 +196,15 @@ func TestTableCol(t *testing.T) { flag: 0, expected: `{"ColumnName":"","ColumnType":"BLOB","ColumnPrecision":"100"}`, }, + { + name: "text", + filedType: mysql.TypeBlob, + flen: 100, + decimal: math.MinInt, + flag: 0, + charset: charset.CharsetUTF8MB4, + expected: `{"ColumnName":"","ColumnType":"TEXT","ColumnPrecision":"100"}`, + }, { name: "tinyblob", filedType: mysql.TypeTinyBlob, @@ -297,6 +308,9 @@ func TestTableCol(t *testing.T) { if tc.flag != 0 { ft.SetFlag(tc.flag) } + if len(tc.charset) != 0 { + ft.SetCharset(tc.charset) + } col := &timodel.ColumnInfo{FieldType: *ft} var tableCol TableCol tableCol.FromTiColumnInfo(col)