Skip to content

Commit

Permalink
sink(ticdc): Fix several issues in cloud storage sink (#7517)
Browse files Browse the repository at this point in the history
ref #6797
  • Loading branch information
zhaoxinyu committed Nov 2, 2022
1 parent 4c7161b commit 915ad30
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 20 deletions.
71 changes: 62 additions & 9 deletions cdc/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -226,8 +227,47 @@ func (c *csvMessage) formatValue(value any, strBuilder *strings.Builder) {
}
}

// convertToCSVType converts column from TiDB type to csv type.
func convertToCSVType(col *model.Column, ft *types.FieldType) (any, error) {
func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) {
str, ok := csvVal.(string)
if !ok {
return csvVal, nil
}

switch ft.GetType() {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
if ft.GetCharset() == charset.CharsetBin {
blob, err := base64.StdEncoding.DecodeString(str)
return blob, err
}
return []byte(str), nil
case mysql.TypeFloat:
val, err := strconv.ParseFloat(str, 32)
return val, err
case mysql.TypeDouble:
val, err := strconv.ParseFloat(str, 64)
return val, err
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
if mysql.HasUnsignedFlag(ft.GetFlag()) {
val, err := strconv.ParseUint(str, 10, 64)
return val, err
}
val, err := strconv.ParseInt(str, 10, 64)
return val, err
case mysql.TypeBit:
val, err := strconv.ParseUint(str, 10, 64)
return val, err
default:
return str, nil
}
}

// fromColValToCsvVal converts column from TiDB type to csv type.
func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) {
if col.Value == nil {
return nil, nil
}

switch col.Type {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
Expand Down Expand Up @@ -297,6 +337,7 @@ func rowChangedEvent2CSVMsg(csvConfig *config.CSVConfig, e *model.RowChangedEven
}

func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) {
var err error
if len(csvMsg.columns) != len(ticols) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed,
fmt.Errorf("the column length of csv message %d doesn't equal to that of tableInfo %d",
Expand All @@ -310,9 +351,13 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*
Table: csvMsg.tableName,
}
if csvMsg.opType == operationDelete {
e.PreColumns = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
e.PreColumns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
} else {
e.Columns = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
e.Columns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
}

if err != nil {
return nil, err
}

return e, nil
Expand All @@ -327,7 +372,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn
continue
}

converted, err := convertToCSVType(column, colInfos[i].Ft)
converted, err := fromColValToCsvVal(column, colInfos[i].Ft)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -337,19 +382,27 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn
return csvColumns, nil
}

func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) []*model.Column {
func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) {
cols := make([]*model.Column, 0, len(csvCols))
for idx, csvCol := range csvCols {
col := new(model.Column)
col.Charset = mysql.DefaultCharset
col.Value = csvCol

ticol := ticols[idx]
col.Type = ticol.GetType()
col.Charset = ticol.GetCharset()
col.Name = ticol.Name.O
if mysql.HasPriKeyFlag(ticol.GetFlag()) {
col.Flag.SetIsHandleKey()
col.Flag.SetIsPrimaryKey()
}

val, err := fromCsvValToColValue(csvCol, ticol.FieldType)
if err != nil {
return cols, err
}
col.Value = val
cols = append(cols, col)
}

return cols
return cols, nil
}
13 changes: 10 additions & 3 deletions cdc/sink/codec/csv/csv_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"testing"

"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -760,7 +761,7 @@ func TestCSVMessageEncode(t *testing.T) {
func TestConvertToCSVType(t *testing.T) {
for _, group := range csvTestColumnsGroup {
for _, c := range group {
val, _ := convertToCSVType(&c.col, c.colInfo.Ft)
val, _ := fromColValToCsvVal(&c.col, c.colInfo.Ft)
require.Equal(t, c.want, val, c.col.Name)
}
}
Expand Down Expand Up @@ -801,10 +802,16 @@ func TestRowChangeEventConversion(t *testing.T) {

ticols := make([]*timodel.ColumnInfo, 0)
for _, col := range cols {
ticols = append(ticols, &timodel.ColumnInfo{
ticol := &timodel.ColumnInfo{
Name: timodel.NewCIStr(col.Name),
FieldType: *types.NewFieldType(col.Type),
})
}
if col.Flag.IsBinary() {
ticol.SetCharset(charset.CharsetBin)
} else {
ticol.SetCharset(mysql.DefaultCharset)
}
ticols = append(ticols, ticol)
}

row2, err := csvMsg2RowChangedEvent(csvMsg, ticols)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (d *dmlWorker) backgroundDispatchMsgs(ctx context.Context, ch *chann.Chann[
tableSet = make(map[versionedTable]struct{})
default:
}
case frag := <-ch.Out():
if atomic.LoadUint64(&d.isClosed) == 1 {
case frag, ok := <-ch.Out():
if !ok || atomic.LoadUint64(&d.isClosed) == 1 {
return
}
table := frag.versionedTable
Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (w *encodingWorker) run(ctx context.Context, msgChan *chann.Chann[eventFrag
select {
case <-ctx.Done():
return
case frag := <-msgChan.Out():
if atomic.LoadUint64(&w.isClosed) == 1 {
case frag, ok := <-msgChan.Out():
if !ok || atomic.LoadUint64(&w.isClosed) == 1 {
return
}
err := w.encodeEvents(ctx, frag)
Expand Down
13 changes: 9 additions & 4 deletions pkg/sink/cloudstorage/table_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
Expand Down Expand Up @@ -53,7 +54,6 @@ func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) {
if mysql.HasUnsignedFlag(col.GetFlag()) {
t.Tp += " UNSIGNED"
}

if mysql.HasPriKeyFlag(col.GetFlag()) {
t.IsPK = "true"
}
Expand Down Expand Up @@ -91,13 +91,18 @@ func (t *TableCol) ToTiColumnInfo() (*timodel.ColumnInfo, error) {
tp := types.StrToType(strings.ToLower(strings.TrimSuffix(t.Tp, " UNSIGNED")))
col.FieldType = *types.NewFieldType(tp)
if strings.Contains(t.Tp, "UNSIGNED") {
col.SetFlag(mysql.UnsignedFlag)
col.AddFlag(mysql.UnsignedFlag)
}
if t.IsPK == "true" {
col.SetFlag(mysql.PriKeyFlag)
col.AddFlag(mysql.PriKeyFlag)
}
if t.Nullable == "false" {
col.SetFlag(mysql.NotNullFlag)
col.AddFlag(mysql.NotNullFlag)
}
if strings.Contains(t.Tp, "BLOB") || strings.Contains(t.Tp, "BINARY") {
col.SetCharset(charset.CharsetBin)
} else {
col.SetCharset(charset.CharsetUTF8MB4)
}
setFlen := func(precision string) error {
if len(precision) > 0 {
Expand Down
14 changes: 14 additions & 0 deletions pkg/sink/cloudstorage/table_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"math"
"testing"

"github.com/pingcap/tidb/parser/charset"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
Expand All @@ -32,6 +33,7 @@ func TestTableCol(t *testing.T) {
flen int
decimal int
flag uint
charset string
expected string
}{
{
Expand Down Expand Up @@ -194,6 +196,15 @@ func TestTableCol(t *testing.T) {
flag: 0,
expected: `{"ColumnName":"","ColumnType":"BLOB","ColumnPrecision":"100"}`,
},
{
name: "text",
filedType: mysql.TypeBlob,
flen: 100,
decimal: math.MinInt,
flag: 0,
charset: charset.CharsetUTF8MB4,
expected: `{"ColumnName":"","ColumnType":"TEXT","ColumnPrecision":"100"}`,
},
{
name: "tinyblob",
filedType: mysql.TypeTinyBlob,
Expand Down Expand Up @@ -297,6 +308,9 @@ func TestTableCol(t *testing.T) {
if tc.flag != 0 {
ft.SetFlag(tc.flag)
}
if len(tc.charset) != 0 {
ft.SetCharset(tc.charset)
}
col := &timodel.ColumnInfo{FieldType: *ft}
var tableCol TableCol
tableCol.FromTiColumnInfo(col)
Expand Down

0 comments on commit 915ad30

Please sign in to comment.