Skip to content

Commit

Permalink
ticdc(sink): remove useless AppendResolvedEvent (#4596)
Browse files Browse the repository at this point in the history
ref #4423
  • Loading branch information
Rustin170506 committed Feb 15, 2022
1 parent 87cfd44 commit 2293dc2
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 87 deletions.
5 changes: 0 additions & 5 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
return EncoderNeedAsyncWrite, nil
}

// AppendResolvedEvent is no-op for Avro
func (a *AvroEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent is no-op for now
func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return nil, nil
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,6 @@ type CanalEventBatchEncoder struct {
entryBuilder *canalEntryBuilder
}

// AppendResolvedEvent appends a resolved event to the encoder
// TODO TXN support
func (d *CanalEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
// For canal now, there is no such a corresponding type to ResolvedEvent so far.
Expand Down
36 changes: 8 additions & 28 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ const tidbWaterMarkType = "TIDB_WATERMARK"

// CanalFlatEventBatchEncoder encodes Canal flat messages in JSON format
type CanalFlatEventBatchEncoder struct {
builder *canalEntryBuilder
unresolvedBuf []canalFlatMessageInterface
resolvedBuf []canalFlatMessageInterface
builder *canalEntryBuilder
messageBuf []canalFlatMessageInterface
// When it is true, canal-json would generate TiDB extension information
// which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields.
enableTiDBExtension bool
Expand All @@ -47,8 +46,7 @@ type CanalFlatEventBatchEncoder struct {
func NewCanalFlatEventBatchEncoder() EventBatchEncoder {
return &CanalFlatEventBatchEncoder{
builder: NewCanalEntryBuilder(),
unresolvedBuf: make([]canalFlatMessageInterface, 0),
resolvedBuf: make([]canalFlatMessageInterface, 0),
messageBuf: make([]canalFlatMessageInterface, 0),
enableTiDBExtension: false,
}
}
Expand Down Expand Up @@ -322,25 +320,7 @@ func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEv
if err != nil {
return EncoderNoOperation, errors.Trace(err)
}
c.unresolvedBuf = append(c.unresolvedBuf, message)
return EncoderNoOperation, nil
}

// AppendResolvedEvent receives the latest resolvedTs
func (c *CanalFlatEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
nextIdx := 0
for _, msg := range c.unresolvedBuf {
if msg.getTikvTs() <= ts {
c.resolvedBuf = append(c.resolvedBuf, msg)
} else {
break
}
nextIdx++
}
c.unresolvedBuf = c.unresolvedBuf[nextIdx:]
if len(c.resolvedBuf) > 0 {
return EncoderNeedAsyncWrite, nil
}
c.messageBuf = append(c.messageBuf, message)
return EncoderNoOperation, nil
}

Expand All @@ -356,11 +336,11 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa

// Build implements the EventBatchEncoder interface
func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
if len(c.resolvedBuf) == 0 {
if len(c.messageBuf) == 0 {
return nil
}
ret := make([]*MQMessage, len(c.resolvedBuf))
for i, msg := range c.resolvedBuf {
ret := make([]*MQMessage, len(c.messageBuf))
for i, msg := range c.messageBuf {
value, err := json.Marshal(msg)
if err != nil {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
Expand All @@ -370,7 +350,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
c.messageBuf = make([]canalFlatMessageInterface, 0)
return ret
}

Expand Down
26 changes: 4 additions & 22 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

result, err = encoder.AppendResolvedEvent(417318403368288260)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

mqMessages := encoder.Build()
c.Assert(len(mqMessages), check.Equals, 1)

Expand Down Expand Up @@ -295,27 +291,17 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
c.Assert(encoder, check.NotNil)

updateCase := *testCaseUpdate
lastResolved := uint64(0)
for i := 1; i < 1000; i++ {
for i := 1; i <= 1000; i++ {
ts := uint64(i)
updateCase.CommitTs = ts
result, err := encoder.AppendRowChangedEvent(&updateCase)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

if i >= 100 && (i%100 == 0 || i == 999) {
resolvedTs := uint64(i - 50)
if i == 999 {
resolvedTs = 999
}
result, err := encoder.AppendResolvedEvent(resolvedTs)

c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

if i%100 == 0 {
msgs := encoder.Build()
c.Assert(msgs, check.NotNil)
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))
c.Assert(msgs, check.HasLen, 100)

for j := range msgs {
c.Assert(msgs[j].GetRowsCount(), check.Equals, 1)
Expand All @@ -324,15 +310,11 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
err := json.Unmarshal(msgs[j].Value, &msg)
c.Assert(err, check.IsNil)
c.Assert(msg.EventType, check.Equals, "UPDATE")
c.Assert(msg.ExecutionTime, check.Equals, convertToCanalTs(lastResolved+uint64(i)))
}

lastResolved = resolvedTs
}
}

c.Assert(encoder.unresolvedBuf, check.HasLen, 0)
c.Assert(encoder.resolvedBuf, check.HasLen, 0)
c.Assert(encoder.messageBuf, check.HasLen, 0)
}

func (s *canalFlatSuite) TestEncodeCheckpointEvent(c *check.C) {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent
return EncoderNoOperation, nil
}

// AppendResolvedEvent is no-op
func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type EventBatchEncoder interface {
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends a row changed event into the batch
AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error)
// AppendResolvedEvent appends a resolved event into the batch.
// This event is used to tell the encoder that no event prior to ts will be sent.
AppendResolvedEvent(ts uint64) (EncoderResult, error)
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Expand All @@ -47,7 +44,6 @@ type EventBatchEncoder interface {
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
// Deprecated: Size is deprecated
Size() int
// Reset reset the kv buffer
Reset()
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,6 @@ func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) {
d.supportMixedBuild = enabled
}

// AppendResolvedEvent is no-op
func (d *JSONEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
keyMsg := newResolvedMessage(ts)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ func (d *MaxwellEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage,
return nil, nil
}

// AppendResolvedEvent implements the EventBatchEncoder interface
func (d *MaxwellEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) {
var partition *int64
if e.Table.IsPartition {
Expand Down
11 changes: 4 additions & 7 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,12 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
continue
case e = <-input:
}
// flush resolvedTs event
if e.row == nil {
// When receiving resolved ts events, we need to write all events to the producer.
// We don't need to flush it immediately, we wait until all partitions have received
// this event before we flush it uniformly.
if e.resolvedTs != 0 {
op, err := encoder.AppendResolvedEvent(e.resolvedTs)
if err != nil {
return errors.Trace(err)
}

if err := flushToProducer(op); err != nil {
if err := flushToProducer(codec.EncoderNoOperation); err != nil {
return errors.Trace(err)
}

Expand Down

0 comments on commit 2293dc2

Please sign in to comment.