Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc(sink): remove useless AppendResolvedEvent #4596

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
return EncoderNeedAsyncWrite, nil
}

// AppendResolvedEvent is no-op for Avro
func (a *AvroEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent is no-op for now
func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return nil, nil
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,6 @@ type CanalEventBatchEncoder struct {
entryBuilder *canalEntryBuilder
}

// AppendResolvedEvent appends a resolved event to the encoder
// TODO TXN support
func (d *CanalEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
// For canal now, there is no such a corresponding type to ResolvedEvent so far.
Expand Down
36 changes: 8 additions & 28 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ const tidbWaterMarkType = "TIDB_WATERMARK"

// CanalFlatEventBatchEncoder encodes Canal flat messages in JSON format
type CanalFlatEventBatchEncoder struct {
builder *canalEntryBuilder
unresolvedBuf []canalFlatMessageInterface
resolvedBuf []canalFlatMessageInterface
builder *canalEntryBuilder
messageBuf []canalFlatMessageInterface
// When it is true, canal-json would generate TiDB extension information
// which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields.
enableTiDBExtension bool
Expand All @@ -47,8 +46,7 @@ type CanalFlatEventBatchEncoder struct {
func NewCanalFlatEventBatchEncoder() EventBatchEncoder {
return &CanalFlatEventBatchEncoder{
builder: NewCanalEntryBuilder(),
unresolvedBuf: make([]canalFlatMessageInterface, 0),
resolvedBuf: make([]canalFlatMessageInterface, 0),
messageBuf: make([]canalFlatMessageInterface, 0),
enableTiDBExtension: false,
}
}
Expand Down Expand Up @@ -322,25 +320,7 @@ func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEv
if err != nil {
return EncoderNoOperation, errors.Trace(err)
}
c.unresolvedBuf = append(c.unresolvedBuf, message)
return EncoderNoOperation, nil
}

// AppendResolvedEvent receives the latest resolvedTs
func (c *CanalFlatEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
nextIdx := 0
for _, msg := range c.unresolvedBuf {
if msg.getTikvTs() <= ts {
c.resolvedBuf = append(c.resolvedBuf, msg)
} else {
break
}
nextIdx++
}
c.unresolvedBuf = c.unresolvedBuf[nextIdx:]
if len(c.resolvedBuf) > 0 {
return EncoderNeedAsyncWrite, nil
}
c.messageBuf = append(c.messageBuf, message)
return EncoderNoOperation, nil
}

Expand All @@ -356,11 +336,11 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa

// Build implements the EventBatchEncoder interface
func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
if len(c.resolvedBuf) == 0 {
if len(c.messageBuf) == 0 {
return nil
}
ret := make([]*MQMessage, len(c.resolvedBuf))
for i, msg := range c.resolvedBuf {
ret := make([]*MQMessage, len(c.messageBuf))
for i, msg := range c.messageBuf {
value, err := json.Marshal(msg)
if err != nil {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
Expand All @@ -370,7 +350,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
c.messageBuf = make([]canalFlatMessageInterface, 0)
return ret
}

Expand Down
26 changes: 4 additions & 22 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

result, err = encoder.AppendResolvedEvent(417318403368288260)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

mqMessages := encoder.Build()
c.Assert(len(mqMessages), check.Equals, 1)

Expand Down Expand Up @@ -295,27 +291,17 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
c.Assert(encoder, check.NotNil)

updateCase := *testCaseUpdate
lastResolved := uint64(0)
for i := 1; i < 1000; i++ {
for i := 1; i <= 1000; i++ {
ts := uint64(i)
updateCase.CommitTs = ts
result, err := encoder.AppendRowChangedEvent(&updateCase)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

if i >= 100 && (i%100 == 0 || i == 999) {
resolvedTs := uint64(i - 50)
if i == 999 {
resolvedTs = 999
}
result, err := encoder.AppendResolvedEvent(resolvedTs)

c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

if i%100 == 0 {
msgs := encoder.Build()
c.Assert(msgs, check.NotNil)
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))
c.Assert(msgs, check.HasLen, 100)

for j := range msgs {
c.Assert(msgs[j].GetRowsCount(), check.Equals, 1)
Expand All @@ -324,15 +310,11 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
err := json.Unmarshal(msgs[j].Value, &msg)
c.Assert(err, check.IsNil)
c.Assert(msg.EventType, check.Equals, "UPDATE")
c.Assert(msg.ExecutionTime, check.Equals, convertToCanalTs(lastResolved+uint64(i)))
}

lastResolved = resolvedTs
}
}

c.Assert(encoder.unresolvedBuf, check.HasLen, 0)
c.Assert(encoder.resolvedBuf, check.HasLen, 0)
c.Assert(encoder.messageBuf, check.HasLen, 0)
}

func (s *canalFlatSuite) TestEncodeCheckpointEvent(c *check.C) {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent
return EncoderNoOperation, nil
}

// AppendResolvedEvent is no-op
func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type EventBatchEncoder interface {
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends a row changed event into the batch
AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error)
// AppendResolvedEvent appends a resolved event into the batch.
// This event is used to tell the encoder that no event prior to ts will be sent.
AppendResolvedEvent(ts uint64) (EncoderResult, error)
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Expand All @@ -47,7 +44,6 @@ type EventBatchEncoder interface {
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
// Deprecated: Size is deprecated
Size() int
// Reset reset the kv buffer
Reset()
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,6 @@ func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) {
d.supportMixedBuild = enabled
}

// AppendResolvedEvent is no-op
func (d *JSONEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
keyMsg := newResolvedMessage(ts)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ func (d *MaxwellEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage,
return nil, nil
}

// AppendResolvedEvent implements the EventBatchEncoder interface
func (d *MaxwellEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) {
var partition *int64
if e.Table.IsPartition {
Expand Down
11 changes: 4 additions & 7 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,12 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
continue
case e = <-input:
}
// flush resolvedTs event
if e.row == nil {
// When receiving resolved ts events, we need to write all events to the producer.
// We don't need to flush it immediately, we wait until all partitions have received
// this event before we flush it uniformly.
if e.resolvedTs != 0 {
op, err := encoder.AppendResolvedEvent(e.resolvedTs)
if err != nil {
return errors.Trace(err)
}

if err := flushToProducer(op); err != nil {
if err := flushToProducer(codec.EncoderNoOperation); err != nil {
return errors.Trace(err)
}

Expand Down