Skip to content

Commit

Permalink
sink(ticdc): remove useless op return value of AppendRowChangedEvent (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Feb 22, 2022
1 parent 97c5305 commit ec2f775
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 67 deletions.
12 changes: 6 additions & 6 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ func (a *AvroEventBatchEncoder) SetTimeZone(tz *time.Location) {

// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
}

evlp, err := res.toEnvelope()
if err != nil {
log.Warn("AppendRowChangedEvent: could not construct Avro envelope", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not construct Avro envelope")
return errors.Annotate(err, "AppendRowChangedEvent could not construct Avro envelope")
}

mqMessage.Value = evlp
Expand All @@ -113,20 +113,20 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
}

evlp, err := res.toEnvelope()
if err != nil {
log.Warn("AppendRowChangedEvent: could not construct Avro envelope", zap.String("table", e.Table.String()))
return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not construct Avro envelope")
return errors.Annotate(err, "AppendRowChangedEvent could not construct Avro envelope")
}

mqMessage.Key = evlp
mqMessage.IncRowsCount()
a.resultBuf = append(a.resultBuf, mqMessage)

return EncoderNeedAsyncWrite, nil
return nil
}

// EncodeCheckpointEvent is no-op for now
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,6 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) {
_, err := s.encoder.EncodeDDLEvent(testCaseDdl)
c.Check(err, check.IsNil)

_, err = s.encoder.AppendRowChangedEvent(testCaseUpdate)
err = s.encoder.AppendRowChangedEvent(testCaseUpdate)
c.Check(err, check.IsNil)
}
8 changes: 4 additions & 4 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,17 +404,17 @@ func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, e
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
func (d *CanalEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
entry, err := d.entryBuilder.FromRowEvent(e)
if err != nil {
return EncoderNoOperation, errors.Trace(err)
return errors.Trace(err)
}
b, err := proto.Marshal(entry)
if err != nil {
return EncoderNoOperation, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
return cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}
d.messages.Messages = append(d.messages.Messages, b)
return EncoderNoOperation, nil
return nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,13 @@ func (c *CanalFlatEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessag
}

// AppendRowChangedEvent implements the interface EventBatchEncoder
func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
message, err := c.newFlatMessageForDML(e)
if err != nil {
return EncoderNoOperation, errors.Trace(err)
return errors.Trace(err)
}
c.messageBuf = append(c.messageBuf, message)
return EncoderNoOperation, nil
return nil
}

// EncodeDDLEvent encodes DDL events
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
encoder := &CanalFlatEventBatchEncoder{builder: NewCanalEntryBuilder(), enableTiDBExtension: encodeEnable}
c.Assert(encoder, check.NotNil)

result, err := encoder.AppendRowChangedEvent(testCaseInsert)
err := encoder.AppendRowChangedEvent(testCaseInsert)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

mqMessages := encoder.Build()
c.Assert(len(mqMessages), check.Equals, 1)
Expand Down Expand Up @@ -294,9 +293,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
for i := 1; i <= 1000; i++ {
ts := uint64(i)
updateCase.CommitTs = ts
result, err := encoder.AppendRowChangedEvent(&updateCase)
err := encoder.AppendRowChangedEvent(&updateCase)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

if i%100 == 0 {
msgs := encoder.Build()
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
for _, cs := range s.rowCases {
encoder := NewCanalEventBatchEncoder()
for _, row := range cs {
_, err := encoder.AppendRowChangedEvent(row)
err := encoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)
}
size := encoder.Size()
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func codecEncodeRowCase(encoder EventBatchEncoder, events []*model.RowChangedEve
}

for _, event := range events {
_, err := encoder.AppendRowChangedEvent(event)
err := encoder.AppendRowChangedEvent(event)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func (e *CraftEventBatchEncoder) flush() {
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) {
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) error {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if size > e.maxMessageBytes || rows >= e.maxBatchSize {
e.flush()
}
return EncoderNoOperation, nil
return nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
Expand Down
9 changes: 3 additions & 6 deletions cdc/sink/codec/craft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ func (s *craftBatchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatc

events := 0
for _, row := range cs {
op, err := encoder.AppendRowChangedEvent(row)
err := encoder.AppendRowChangedEvent(row)
events++
c.Assert(err, check.IsNil)
c.Assert(op, check.Equals, EncoderNoOperation)
}
// test normal decode
if len(cs) > 0 {
Expand Down Expand Up @@ -189,8 +188,7 @@ func (s *craftBatchSuite) TestMaxMessageBytes(c *check.C) {
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
err := encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
}

Expand All @@ -213,8 +211,7 @@ func (s *craftBatchSuite) TestMaxBatchSize(c *check.C) {
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
err := encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
}

Expand Down
5 changes: 2 additions & 3 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type EventBatchEncoder interface {
// This event will be broadcast to all partitions to signal a global checkpoint.
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends a row changed event into the batch
AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error)
AppendRowChangedEvent(e *model.RowChangedEvent) error
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Expand Down Expand Up @@ -151,8 +151,7 @@ type EncoderResult uint8

// Enum types of EncoderResult
const (
EncoderNoOperation EncoderResult = iota
EncoderNeedAsyncWrite
EncoderNeedAsyncWrite EncoderResult = iota
EncoderNeedSyncWrite
)

Expand Down
10 changes: 5 additions & 5 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,15 @@ func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, er
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
keyMsg, valueMsg := rowEventToMqMessage(e)
key, err := keyMsg.Encode()
if err != nil {
return EncoderNoOperation, errors.Trace(err)
return errors.Trace(err)
}
value, err := valueMsg.Encode()
if err != nil {
return EncoderNoOperation, errors.Trace(err)
return errors.Trace(err)
}

var keyLenByte [8]byte
Expand All @@ -464,7 +464,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
if length > d.maxMessageBytes {
log.Warn("Single message too large",
zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table))
return EncoderNoOperation, cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs()
}

if len(d.messageBuf) == 0 ||
Expand Down Expand Up @@ -495,7 +495,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
}
d.curBatchSize++
}
return EncoderNoOperation, nil
return nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
Expand Down
17 changes: 6 additions & 11 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,10 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
mixedEncoder := newEncoder()
mixedEncoder.(*JSONEventBatchEncoder).SetMixedBuildSupport(true)
for _, row := range cs {
_, err := encoder.AppendRowChangedEvent(row)
err := encoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)

op, err := mixedEncoder.AppendRowChangedEvent(row)
c.Assert(op, check.Equals, EncoderNoOperation)
err = mixedEncoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)
}
// test mixed decode
Expand Down Expand Up @@ -283,24 +282,21 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
a := strconv.Itoa(87 + 44)
err := encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Check(err, check.IsNil)
r, err := encoder.AppendRowChangedEvent(testEvent)
err = encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)

a = strconv.Itoa(87 + 43)
err = encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Assert(err, check.IsNil)
r, err = encoder.AppendRowChangedEvent(testEvent)
err = encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.NotNil)
c.Check(r, check.Equals, EncoderNoOperation)

// make sure each batch's `Length` not greater than `max-message-bytes`
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
err := encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
}

Expand All @@ -325,8 +321,7 @@ func (s *batchSuite) TestMaxBatchSize(c *check.C) {
}

for i := 0; i < 10000; i++ {
r, err := encoder.AppendRowChangedEvent(testEvent)
c.Check(r, check.Equals, EncoderNoOperation)
err := encoder.AppendRowChangedEvent(testEvent)
c.Check(err, check.IsNil)
}

Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,15 @@ func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwell
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) {
func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
_, valueMsg := rowEventToMaxwellMessage(e)
value, err := valueMsg.Encode()
if err != nil {
return EncoderNoOperation, errors.Trace(err)
return errors.Trace(err)
}
d.valueBuf.Write(value)
d.batchSize++
return EncoderNeedAsyncWrite, nil
return nil
}

// SetParams is no-op for Maxwell for now
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/maxwell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func()
for _, cs := range s.rowCases {
encoder := newEncoder()
for _, row := range cs {
_, err := encoder.AppendRowChangedEvent(row)
err := encoder.AppendRowChangedEvent(row)
c.Assert(err, check.IsNil)
}
size := encoder.Size()
Expand Down
21 changes: 5 additions & 16 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()

flushToProducer := func(op codec.EncoderResult) error {
flushToProducer := func() error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := 0
Expand All @@ -375,13 +375,6 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
}
thisBatchSize += msg.GetRowsCount()
}

if op == codec.EncoderNeedSyncWrite {
err := k.mqProducer.Flush(ctx)
if err != nil {
return 0, err
}
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
Expand All @@ -393,7 +386,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
if err := flushToProducer(codec.EncoderNeedAsyncWrite); err != nil {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
continue
Expand All @@ -404,7 +397,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
// We don't need to flush it immediately, we wait until all partitions have received
// this event before we flush it uniformly.
if e.resolvedTs != 0 {
if err := flushToProducer(codec.EncoderNoOperation); err != nil {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}

Expand All @@ -413,17 +406,13 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
}
continue
}
op, err := encoder.AppendRowChangedEvent(e.row)
err := encoder.AppendRowChangedEvent(e.row)
if err != nil {
return errors.Trace(err)
}

if encoder.Size() >= batchSizeLimit {
op = codec.EncoderNeedAsyncWrite
}

if encoder.Size() >= batchSizeLimit || op != codec.EncoderNoOperation {
if err := flushToProducer(op); err != nil {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
}
Expand Down

0 comments on commit ec2f775

Please sign in to comment.