Skip to content

Commit

Permalink
sink: fix avro timezone (#1615) (#1712)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored May 26, 2021
1 parent c8e5331 commit fa8979d
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 12 deletions.
51 changes: 40 additions & 11 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type AvroEventBatchEncoder struct {
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
resultBuf []*MQMessage

tz *time.Location
}

type avroEncodeResult struct {
Expand Down Expand Up @@ -75,13 +77,19 @@ func (a *AvroEventBatchEncoder) GetKeySchemaManager() *AvroSchemaManager {
return a.keySchemaManager
}

// SetTimeZone sets the time-zone that is used to serialize Avro date-time types
func (a *AvroEventBatchEncoder) SetTimeZone(tz *time.Location) {
log.Debug("Setting Avro serializer timezone", zap.String("tz", tz.String()))
a.tz = tz
}

// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
mqMessage := NewMQMessage(ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns)
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand All @@ -100,7 +108,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

pkeyCols := e.HandleKeyColumns()

res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols)
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand Down Expand Up @@ -169,7 +177,7 @@ func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error {
return nil
}

func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) {
func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, tz *time.Location) (*avroEncodeResult, error) {
schemaGen := func() (string, error) {
schema, err := ColumnInfoToAvroSchema(table.Table, cols)
if err != nil {
Expand All @@ -184,7 +192,7 @@ func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion
return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed")
}

native, err := rowToAvroNativeData(cols)
native, err := rowToAvroNativeData(cols, tz)
if err != nil {
return nil, errors.Annotate(err, "AvroEventBatchEncoder: converting to native failed")
}
Expand Down Expand Up @@ -255,13 +263,13 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er
return string(str), nil
}

func rowToAvroNativeData(cols []*model.Column) (interface{}, error) {
func rowToAvroNativeData(cols []*model.Column, tz *time.Location) (interface{}, error) {
ret := make(map[string]interface{}, len(cols))
for _, col := range cols {
if col == nil {
continue
}
data, str, err := columnToAvroNativeData(col)
data, str, err := columnToAvroNativeData(col, tz)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,7 +369,12 @@ func getAvroDataTypeFromColumn(col *model.Column) (interface{}, error) {
}
}

func columnToAvroNativeData(col *model.Column) (interface{}, string, error) {
var (
zeroTimeStr = types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, 0).String()
zeroDateStr = types.NewTime(types.ZeroCoreTime, mysql.TypeDate, 0).String()
)

func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, string, error) {
if col.Value == nil {
return nil, "null", nil
}
Expand All @@ -379,19 +392,35 @@ func columnToAvroNativeData(col *model.Column) (interface{}, string, error) {

switch col.Type {
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp:
str := col.Value.(string)
t, err := time.Parse(types.DateFormat, str)
// Refer to `unflatten` in cdc/entry/codec.go for why this piece of code is like this.
const fullType = "long." + timestampMillis
str := col.Value.(string)

if (col.Type == mysql.TypeDate && str == zeroDateStr) ||
(col.Type != mysql.TypeDate && str == zeroTimeStr) {

return time.Time{}, string(fullType), nil
}

var actualTz *time.Location
if col.Type != mysql.TypeTimestamp {
actualTz = time.UTC
} else {
actualTz = tz
}

t, err := time.ParseInLocation(types.DateFormat, str, actualTz)

if err == nil {
return t, string(fullType), nil
}

t, err = time.Parse(types.TimeFormat, str)
t, err = time.ParseInLocation(types.TimeFormat, str, actualTz)
if err == nil {
return t, string(fullType), nil
}

t, err = time.Parse(types.TimeFSPFormat, str)
t, err = time.ParseInLocation(types.TimeFSPFormat, str, actualTz)
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
Expand Down
46 changes: 45 additions & 1 deletion cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat},
{Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob},
{Name: "ts", Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
})
}, time.Local)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
Expand All @@ -100,6 +100,50 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
log.Info("TestAvroEncodeOnly", zap.ByteString("result", txt))
}

func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
defer testleak.AfterTest(c)()
avroCodec, err := goavro.NewCodec(`
{
"type": "record",
"name": "test1",
"fields" : [
{"name": "id", "type": ["null", "int"], "default": null},
{"name": "myint", "type": ["null", "int"], "default": null},
{"name": "mybool", "type": ["null", "int"], "default": null},
{"name": "myfloat", "type": ["null", "float"], "default": null},
{"name": "mybytes", "type": ["null", "bytes"], "default": null},
{"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}
]
}`)

c.Assert(err, check.IsNil)

table := model.TableName{
Schema: "testdb",
Table: "test1",
}

location, err := time.LoadLocation("UTC")
c.Check(err, check.IsNil)

timestamp := time.Now()
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{
{Name: "id", Value: int64(1), Type: mysql.TypeLong},
{Name: "myint", Value: int64(2), Type: mysql.TypeLong},
{Name: "mybool", Value: int64(1), Type: mysql.TypeTiny},
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat},
{Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob},
{Name: "ts", Value: timestamp.In(location).Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
}, location)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
c.Check(err, check.IsNil)
c.Check(res, check.NotNil)
actual := (res.(map[string]interface{}))["ts"].(map[string]interface{})["long.timestamp-millis"].(time.Time)
c.Check(actual.Local().Sub(timestamp), check.LessEqual, time.Millisecond)
}

func (s *avroBatchEncoderSuite) TestAvroEnvelope(c *check.C) {
defer testleak.AfterTest(c)()
avroCodec, err := goavro.NewCodec(`
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -104,6 +105,7 @@ func newMqSink(
avroEncoder := newEncoder1().(*codec.AvroEventBatchEncoder)
avroEncoder.SetKeySchemaManager(keySchemaManager)
avroEncoder.SetValueSchemaManager(valueSchemaManager)
avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx))
return avroEncoder
}
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue {
Expand Down
3 changes: 3 additions & 0 deletions docker-compose-avro.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ services:
- --log-file=/logs/capturer0.log
- --log-level=debug
- --advertise-addr=capturer0:8300
- --tz=${CDC_TIME_ZONE:-SYSTEM}
depends_on:
- "upstream-tidb"
- "downstream-tidb"
Expand All @@ -64,6 +65,7 @@ services:
- --log-file=/logs/capturer1.log
- --log-level=debug
- --advertise-addr=capturer1:8300
- --tz=${CDC_TIME_ZONE:-SYSTEM}
depends_on:
- "upstream-tidb"
- "downstream-tidb"
Expand All @@ -85,6 +87,7 @@ services:
- --log-file=/logs/capturer2.log
- --log-level=debug
- --advertise-addr=capturer2:8300
- --tz=${CDC_TIME_ZONE:-SYSTEM}
depends_on:
- "upstream-tidb"
- "downstream-tidb"
Expand Down
2 changes: 2 additions & 0 deletions integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ var (

func testAvro() {
env := avro.NewKafkaDockerEnv(*dockerComposeFile)
env.DockerComposeOperator.ExecEnv = []string{"CDC_TIME_ZONE=America/Los_Angeles"}
task := &avro.SingleTableTask{TableName: "test"}
testCases := []framework.Task{
tests.NewDateTimeCase(task),
tests.NewSimpleCase(task),
tests.NewDeleteCase(task),
tests.NewManyTypesCase(task),
Expand Down
110 changes: 110 additions & 0 deletions integration/tests/case_date_time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tests

import (
"errors"
"log"
"time"

"github.com/pingcap/ticdc/integration/framework"
"github.com/pingcap/ticdc/integration/framework/avro"
"github.com/pingcap/ticdc/integration/framework/canal"
"github.com/pingcap/ticdc/integration/framework/mysql"
)

// DateTimeCase is base impl of test case for different types data
type DateTimeCase struct {
framework.Task
}

// NewDateTimeCase create a test case which has many types
func NewDateTimeCase(task framework.Task) *DateTimeCase {
return &DateTimeCase{
Task: task,
}
}

// Name impl framework.Task interface
func (s *DateTimeCase) Name() string {
return "Date Time"
}

// Run impl framework.Task interface
func (s *DateTimeCase) Run(ctx *framework.TaskContext) error {
var createDBQuery string
switch s.Task.(type) {
case *avro.SingleTableTask:
createDBQuery = `create table test (
id INT,
t_date DATE,
t_datetime DATETIME,
t_timestamp TIMESTAMP NULL,
PRIMARY KEY (id)
)`
case *canal.SingleTableTask, *mysql.SingleTableTask:
log.Panic("DateTimeCase does not support downstreams other than Avro")
default:
return errors.New("unknown test case type")
}

_, err := ctx.Upstream.ExecContext(ctx.Ctx, createDBQuery)
if err != nil {
return err
}
if _, ok := s.Task.(*avro.SingleTableTask); ok {
_, err = ctx.Downstream.ExecContext(ctx.Ctx, "drop table if exists test")
if err != nil {
return err
}

_, err = ctx.Downstream.ExecContext(ctx.Ctx, createDBQuery)
if err != nil {
return err
}
}

// Get a handle of an existing table
table := ctx.SQLHelper().GetTable("test")

// Zero value case
zeroValue := time.Unix(0, 0)
data := map[string]interface{}{
"id": 0,
"t_date": zeroValue,
"t_datetime": zeroValue,
"t_timestamp": zeroValue.Add(time.Second),
}
err = table.Insert(data).Send().Wait().Check()
if err != nil {
return err
}

// Ancient date case. We DO NOT support it.
// TODO investigate why and find out a solution
/* ancientTime := time.Date(960, 1, 1, 15, 33, 0, 0, time.UTC)
data = map[string]interface{}{
"id": 1,
"t_date": ancientTime,
"t_datetime": ancientTime,
"t_timestamp": zeroValue.Add(time.Second), // Timestamp does not support the Zero value of `time.Time`, so we test the Unix epoch instead
}
err = table.Insert(data).Send().Wait().Check()
if err != nil {
return err
}
*/

return nil
}

0 comments on commit fa8979d

Please sign in to comment.