From 653af8ed8dc5e05dbae73cf240c8cd79dbe7403f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B7=A5=E4=B8=9A=E5=BA=9F=E6=B0=B4?= Date: Tue, 15 Feb 2022 18:51:38 +0800 Subject: [PATCH] ticdc(sink): remove useless AppendResolvedEvent (#4596) ref pingcap/tiflow#4423 --- cdc/sink/codec/avro.go | 5 ----- cdc/sink/codec/canal.go | 6 ------ cdc/sink/codec/canal_flat.go | 36 +++++++------------------------ cdc/sink/codec/canal_flat_test.go | 26 ++++------------------ cdc/sink/codec/craft.go | 5 ----- cdc/sink/codec/interface.go | 4 ---- cdc/sink/codec/json.go | 5 ----- cdc/sink/codec/maxwell.go | 5 ----- cdc/sink/mq.go | 11 ++++------ 9 files changed, 16 insertions(+), 87 deletions(-) diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 49e4b45090f..f9cab6aabc0 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -129,11 +129,6 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) return EncoderNeedAsyncWrite, nil } -// AppendResolvedEvent is no-op for Avro -func (a *AvroEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - return EncoderNoOperation, nil -} - // EncodeCheckpointEvent is no-op for now func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { return nil, nil diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 20fdfa363ae..51e25f6d144 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -396,12 +396,6 @@ type CanalEventBatchEncoder struct { entryBuilder *canalEntryBuilder } -// AppendResolvedEvent appends a resolved event to the encoder -// TODO TXN support -func (d *CanalEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - return EncoderNoOperation, nil -} - // EncodeCheckpointEvent implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { // For canal now, there is no such a corresponding type to ResolvedEvent so far. diff --git a/cdc/sink/codec/canal_flat.go b/cdc/sink/codec/canal_flat.go index 98bcff8fa55..6feec68d120 100644 --- a/cdc/sink/codec/canal_flat.go +++ b/cdc/sink/codec/canal_flat.go @@ -35,9 +35,8 @@ const tidbWaterMarkType = "TIDB_WATERMARK" // CanalFlatEventBatchEncoder encodes Canal flat messages in JSON format type CanalFlatEventBatchEncoder struct { - builder *canalEntryBuilder - unresolvedBuf []canalFlatMessageInterface - resolvedBuf []canalFlatMessageInterface + builder *canalEntryBuilder + messageBuf []canalFlatMessageInterface // When it is true, canal-json would generate TiDB extension information // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. enableTiDBExtension bool @@ -47,8 +46,7 @@ type CanalFlatEventBatchEncoder struct { func NewCanalFlatEventBatchEncoder() EventBatchEncoder { return &CanalFlatEventBatchEncoder{ builder: NewCanalEntryBuilder(), - unresolvedBuf: make([]canalFlatMessageInterface, 0), - resolvedBuf: make([]canalFlatMessageInterface, 0), + messageBuf: make([]canalFlatMessageInterface, 0), enableTiDBExtension: false, } } @@ -322,25 +320,7 @@ func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEv if err != nil { return EncoderNoOperation, errors.Trace(err) } - c.unresolvedBuf = append(c.unresolvedBuf, message) - return EncoderNoOperation, nil -} - -// AppendResolvedEvent receives the latest resolvedTs -func (c *CanalFlatEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - nextIdx := 0 - for _, msg := range c.unresolvedBuf { - if msg.getTikvTs() <= ts { - c.resolvedBuf = append(c.resolvedBuf, msg) - } else { - break - } - nextIdx++ - } - c.unresolvedBuf = c.unresolvedBuf[nextIdx:] - if len(c.resolvedBuf) > 0 { - return EncoderNeedAsyncWrite, nil - } + c.messageBuf = append(c.messageBuf, message) return EncoderNoOperation, nil } @@ -356,11 +336,11 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa // Build implements the EventBatchEncoder interface func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { - if len(c.resolvedBuf) == 0 { + if len(c.messageBuf) == 0 { return nil } - ret := make([]*MQMessage, len(c.resolvedBuf)) - for i, msg := range c.resolvedBuf { + ret := make([]*MQMessage, len(c.messageBuf)) + for i, msg := range c.messageBuf { value, err := json.Marshal(msg) if err != nil { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) @@ -370,7 +350,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { m.IncRowsCount() ret[i] = m } - c.resolvedBuf = c.resolvedBuf[0:0] + c.messageBuf = make([]canalFlatMessageInterface, 0) return ret } diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index fb968f2b2ac..567560a435e 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -163,10 +163,6 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C c.Assert(err, check.IsNil) c.Assert(result, check.Equals, EncoderNoOperation) - result, err = encoder.AppendResolvedEvent(417318403368288260) - c.Assert(err, check.IsNil) - c.Assert(result, check.Equals, EncoderNeedAsyncWrite) - mqMessages := encoder.Build() c.Assert(len(mqMessages), check.Equals, 1) @@ -295,27 +291,17 @@ func (s *canalFlatSuite) TestBatching(c *check.C) { c.Assert(encoder, check.NotNil) updateCase := *testCaseUpdate - lastResolved := uint64(0) - for i := 1; i < 1000; i++ { + for i := 1; i <= 1000; i++ { ts := uint64(i) updateCase.CommitTs = ts result, err := encoder.AppendRowChangedEvent(&updateCase) c.Assert(err, check.IsNil) c.Assert(result, check.Equals, EncoderNoOperation) - if i >= 100 && (i%100 == 0 || i == 999) { - resolvedTs := uint64(i - 50) - if i == 999 { - resolvedTs = 999 - } - result, err := encoder.AppendResolvedEvent(resolvedTs) - - c.Assert(err, check.IsNil) - c.Assert(result, check.Equals, EncoderNeedAsyncWrite) - + if i%100 == 0 { msgs := encoder.Build() c.Assert(msgs, check.NotNil) - c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved)) + c.Assert(msgs, check.HasLen, 100) for j := range msgs { c.Assert(msgs[j].GetRowsCount(), check.Equals, 1) @@ -324,15 +310,11 @@ func (s *canalFlatSuite) TestBatching(c *check.C) { err := json.Unmarshal(msgs[j].Value, &msg) c.Assert(err, check.IsNil) c.Assert(msg.EventType, check.Equals, "UPDATE") - c.Assert(msg.ExecutionTime, check.Equals, convertToCanalTs(lastResolved+uint64(i))) } - - lastResolved = resolvedTs } } - c.Assert(encoder.unresolvedBuf, check.HasLen, 0) - c.Assert(encoder.resolvedBuf, check.HasLen, 0) + c.Assert(encoder.messageBuf, check.HasLen, 0) } func (s *canalFlatSuite) TestEncodeCheckpointEvent(c *check.C) { diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go index b3f35187a29..b390595b761 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/codec/craft.go @@ -62,11 +62,6 @@ func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent return EncoderNoOperation, nil } -// AppendResolvedEvent is no-op -func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - return EncoderNoOperation, nil -} - // EncodeDDLEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 0dafac1d16b..847e447c8df 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -33,9 +33,6 @@ type EventBatchEncoder interface { EncodeCheckpointEvent(ts uint64) (*MQMessage, error) // AppendRowChangedEvent appends a row changed event into the batch AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) - // AppendResolvedEvent appends a resolved event into the batch. - // This event is used to tell the encoder that no event prior to ts will be sent. - AppendResolvedEvent(ts uint64) (EncoderResult, error) // EncodeDDLEvent appends a DDL event into the batch EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) // Build builds the batch and returns the bytes of key and value. @@ -47,7 +44,6 @@ type EventBatchEncoder interface { // TODO decouple it out MixedBuild(withVersion bool) []byte // Size returns the size of the batch(bytes) - // Deprecated: Size is deprecated Size() int // Reset reset the kv buffer Reset() diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 979dacbfa37..b96da852c92 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -400,11 +400,6 @@ func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) { d.supportMixedBuild = enabled } -// AppendResolvedEvent is no-op -func (d *JSONEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - return EncoderNoOperation, nil -} - // EncodeCheckpointEvent implements the EventBatchEncoder interface func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { keyMsg := newResolvedMessage(ts) diff --git a/cdc/sink/codec/maxwell.go b/cdc/sink/codec/maxwell.go index 42f54aafc10..03ab4eda75e 100644 --- a/cdc/sink/codec/maxwell.go +++ b/cdc/sink/codec/maxwell.go @@ -85,11 +85,6 @@ func (d *MaxwellEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, return nil, nil } -// AppendResolvedEvent implements the EventBatchEncoder interface -func (d *MaxwellEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { - return EncoderNoOperation, nil -} - func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) { var partition *int64 if e.Table.IsPartition { diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index cf69bdf70e4..2071097ad1a 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -399,15 +399,12 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error { continue case e = <-input: } - // flush resolvedTs event if e.row == nil { + // When receiving resolved ts events, we need to write all events to the producer. + // We don't need to flush it immediately, we wait until all partitions have received + // this event before we flush it uniformly. if e.resolvedTs != 0 { - op, err := encoder.AppendResolvedEvent(e.resolvedTs) - if err != nil { - return errors.Trace(err) - } - - if err := flushToProducer(op); err != nil { + if err := flushToProducer(codec.EncoderNoOperation); err != nil { return errors.Trace(err) }