Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc(sink): remove useless AppendResolvedEvent #4596

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
return EncoderNeedAsyncWrite, nil
}

// AppendResolvedEvent is no-op for Avro
func (a *AvroEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent is no-op for now
func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return nil, nil
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,6 @@ type CanalEventBatchEncoder struct {
entryBuilder *canalEntryBuilder
}

// AppendResolvedEvent appends a resolved event to the encoder
// TODO TXN support
func (d *CanalEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
// For canal now, there is no such a corresponding type to ResolvedEvent so far.
Expand Down
36 changes: 8 additions & 28 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ const tidbWaterMarkType = "TIDB_WATERMARK"

// CanalFlatEventBatchEncoder encodes Canal flat messages in JSON format
type CanalFlatEventBatchEncoder struct {
builder *canalEntryBuilder
unresolvedBuf []canalFlatMessageInterface
resolvedBuf []canalFlatMessageInterface
builder *canalEntryBuilder
messageBuf []canalFlatMessageInterface
// When it is true, canal-json would generate TiDB extension information
// which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields.
enableTiDBExtension bool
Expand All @@ -47,8 +46,7 @@ type CanalFlatEventBatchEncoder struct {
func NewCanalFlatEventBatchEncoder() EventBatchEncoder {
return &CanalFlatEventBatchEncoder{
builder: NewCanalEntryBuilder(),
unresolvedBuf: make([]canalFlatMessageInterface, 0),
resolvedBuf: make([]canalFlatMessageInterface, 0),
messageBuf: make([]canalFlatMessageInterface, 0),
enableTiDBExtension: false,
}
}
Expand Down Expand Up @@ -322,25 +320,7 @@ func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEv
if err != nil {
return EncoderNoOperation, errors.Trace(err)
}
c.unresolvedBuf = append(c.unresolvedBuf, message)
return EncoderNoOperation, nil
}

// AppendResolvedEvent receives the latest resolvedTs
func (c *CanalFlatEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
nextIdx := 0
for _, msg := range c.unresolvedBuf {
if msg.getTikvTs() <= ts {
c.resolvedBuf = append(c.resolvedBuf, msg)
} else {
break
}
nextIdx++
}
c.unresolvedBuf = c.unresolvedBuf[nextIdx:]
if len(c.resolvedBuf) > 0 {
return EncoderNeedAsyncWrite, nil
}
c.messageBuf = append(c.messageBuf, message)
return EncoderNoOperation, nil
}

Expand All @@ -356,11 +336,11 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa

// Build implements the EventBatchEncoder interface
func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
if len(c.resolvedBuf) == 0 {
if len(c.messageBuf) == 0 {
return nil
}
ret := make([]*MQMessage, len(c.resolvedBuf))
for i, msg := range c.resolvedBuf {
ret := make([]*MQMessage, len(c.messageBuf))
for i, msg := range c.messageBuf {
value, err := json.Marshal(msg)
if err != nil {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
Expand All @@ -370,7 +350,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
c.messageBuf = make([]canalFlatMessageInterface, 0)
return ret
}

Expand Down
12 changes: 1 addition & 11 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNoOperation)

result, err = encoder.AppendResolvedEvent(417318403368288260)
c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

mqMessages := encoder.Build()
c.Assert(len(mqMessages), check.Equals, 1)

Expand Down Expand Up @@ -308,11 +304,6 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
if i == 999 {
resolvedTs = 999
}
result, err := encoder.AppendResolvedEvent(resolvedTs)

c.Assert(err, check.IsNil)
c.Assert(result, check.Equals, EncoderNeedAsyncWrite)

msgs := encoder.Build()
c.Assert(msgs, check.NotNil)
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))
Expand All @@ -331,8 +322,7 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
}
}

c.Assert(encoder.unresolvedBuf, check.HasLen, 0)
c.Assert(encoder.resolvedBuf, check.HasLen, 0)
c.Assert(encoder.messageBuf, check.HasLen, 0)
}

func (s *canalFlatSuite) TestEncodeCheckpointEvent(c *check.C) {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent
return EncoderNoOperation, nil
}

// AppendResolvedEvent is no-op
func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ type EventBatchEncoder interface {
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendRowChangedEvent appends a row changed event into the batch
AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error)
// AppendResolvedEvent appends a resolved event into the batch.
// This event is used to tell the encoder that no event prior to ts will be sent.
AppendResolvedEvent(ts uint64) (EncoderResult, error)
// EncodeDDLEvent appends a DDL event into the batch
EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error)
// Build builds the batch and returns the bytes of key and value.
Expand All @@ -47,7 +44,6 @@ type EventBatchEncoder interface {
// TODO decouple it out
MixedBuild(withVersion bool) []byte
// Size returns the size of the batch(bytes)
// Deprecated: Size is deprecated
Size() int
// Reset reset the kv buffer
Reset()
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,6 @@ func (d *JSONEventBatchEncoder) SetMixedBuildSupport(enabled bool) {
d.supportMixedBuild = enabled
}

// AppendResolvedEvent is no-op
func (d *JSONEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
keyMsg := newResolvedMessage(ts)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ func (d *MaxwellEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage,
return nil, nil
}

// AppendResolvedEvent implements the EventBatchEncoder interface
func (d *MaxwellEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwellMessage) {
var partition *int64
if e.Table.IsPartition {
Expand Down
89 changes: 10 additions & 79 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -32,18 +31,12 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type mqEvent struct {
row *model.RowChangedEvent
resolvedTs model.Ts
}

type resolvedTsEvent struct {
tableID model.TableID
resolvedTs model.Ts
Expand All @@ -68,12 +61,10 @@ type mqSink struct {
protocol config.Protocol

partitionNum int32
partitionInput []chan mqEvent
partitionInput []chan *model.RowChangedEvent
partitionResolvedTs []uint64
tableCheckpointTsMap sync.Map
resolvedBuffer chan resolvedTsEvent
resolvedNotifier *notify.Notifier
resolvedReceiver *notify.Receiver

statistics *Statistics

Expand Down Expand Up @@ -106,15 +97,9 @@ func newMqSink(
return nil, errors.Trace(err)
}

partitionInput := make([]chan mqEvent, partitionNum)
partitionInput := make([]chan *model.RowChangedEvent, partitionNum)
for i := 0; i < int(partitionNum); i++ {
partitionInput[i] = make(chan mqEvent, defaultPartitionInputChSize)
}

notifier := new(notify.Notifier)
resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
partitionInput[i] = make(chan *model.RowChangedEvent, defaultPartitionInputChSize)
}

changefeedID := util.ChangefeedIDFromCtx(ctx)
Expand All @@ -131,9 +116,7 @@ func newMqSink(
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),

resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,
resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),

statistics: NewStatistics(ctx, "MQ", opts),

Expand Down Expand Up @@ -181,10 +164,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
select {
case <-ctx.Done():
return ctx.Err()
case k.partitionInput[partition] <- struct {
row *model.RowChangedEvent
resolvedTs uint64
}{row: row}:
case k.partitionInput[partition] <- row:
}
rowsCount++
}
Expand Down Expand Up @@ -222,11 +202,7 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error {
return errors.Trace(ctx.Err())
case msg := <-k.resolvedBuffer:
resolvedTs := msg.resolvedTs
err := k.flushTsToWorker(ctx, resolvedTs)
if err != nil {
return errors.Trace(err)
}
err = k.mqProducer.Flush(ctx)
err := k.mqProducer.Flush(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -238,33 +214,6 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error {
}
}

func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error {
// flush resolvedTs to all partition workers
for i := 0; i < int(k.partitionNum); i++ {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case k.partitionInput[i] <- mqEvent{resolvedTs: resolvedTs}:
}
}

// waiting for all row events are sent to mq producer
flushLoop:
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-k.resolvedReceiver.C:
for i := 0; i < int(k.partitionNum); i++ {
if resolvedTs > atomic.LoadUint64(&k.partitionResolvedTs[i]) {
continue flushLoop
}
}
return nil
}
}
}

func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
encoder, err := k.encoderBuilder.Build(ctx)
if err != nil {
Expand Down Expand Up @@ -328,14 +277,13 @@ func (k *mqSink) Close(ctx context.Context) error {
return errors.Trace(err)
}

func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
func (k *mqSink) Barrier(_ context.Context, _ model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in mq sink has flushed
// all buffered events by force.
return nil
}

func (k *mqSink) run(ctx context.Context) error {
defer k.resolvedReceiver.Stop()
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
return k.bgFlushTs(ctx)
Expand Down Expand Up @@ -388,7 +336,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
})
}
for {
var e mqEvent
var row *model.RowChangedEvent
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -397,26 +345,9 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
return errors.Trace(err)
}
continue
case e = <-input:
}
// flush resolvedTs event
if e.row == nil {
if e.resolvedTs != 0 {
op, err := encoder.AppendResolvedEvent(e.resolvedTs)
if err != nil {
return errors.Trace(err)
}

if err := flushToProducer(op); err != nil {
return errors.Trace(err)
}

atomic.StoreUint64(&k.partitionResolvedTs[partition], e.resolvedTs)
k.resolvedNotifier.Notify()
}
continue
case row = <-input:
}
op, err := encoder.AppendRowChangedEvent(e.row)
op, err := encoder.AppendRowChangedEvent(row)
if err != nil {
return errors.Trace(err)
}
Expand Down