Skip to content

Commit

Permalink
sink(ticdc): add flush worker to flush row changed events (#4633)
Browse files Browse the repository at this point in the history
ref #4423
  • Loading branch information
Rustin170506 committed Feb 24, 2022
1 parent 79be937 commit ed2ac96
Show file tree
Hide file tree
Showing 3 changed files with 427 additions and 157 deletions.
186 changes: 29 additions & 157 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -32,18 +30,12 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type mqEvent struct {
row *model.RowChangedEvent
resolvedTs model.Ts
}

type resolvedTsEvent struct {
tableID model.TableID
resolvedTs model.Ts
Expand All @@ -67,13 +59,9 @@ type mqSink struct {
filter *filter.Filter
protocol config.Protocol

partitionNum int32
partitionInput []chan mqEvent
partitionResolvedTs []uint64
flushWorker *flushWorker
tableCheckpointTsMap sync.Map
resolvedBuffer chan resolvedTsEvent
resolvedNotifier *notify.Notifier
resolvedReceiver *notify.Receiver

statistics *Statistics

Expand Down Expand Up @@ -111,34 +99,27 @@ func newMqSink(
partitionInput[i] = make(chan mqEvent, defaultPartitionInputChSize)
}

notifier := new(notify.Notifier)
resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)

encoder, err := encoderBuilder.Build(ctx)
if err != nil {
return nil, errors.Trace(err)
}

changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)
statistics := NewStatistics(ctx, "MQ")
flushWorker := newFlushWorker(encoder, mqProducer, statistics)

s := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
encoderBuilder: encoderBuilder,
filter: filter,
protocol: protocol,

partitionNum: partitionNum,
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),

resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
flushWorker: flushWorker,
resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),
statistics: statistics,
role: role,
id: changefeedID,
}

go func() {
Expand Down Expand Up @@ -181,10 +162,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
select {
case <-ctx.Done():
return ctx.Err()
case k.partitionInput[partition] <- struct {
row *model.RowChangedEvent
resolvedTs uint64
}{row: row}:
case k.flushWorker.msgChan <- mqEvent{row: row, partition: partition}:
}
rowsCount++
}
Expand Down Expand Up @@ -239,30 +217,13 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error {
}

func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error {
// flush resolvedTs to all partition workers
for i := 0; i < int(k.partitionNum); i++ {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case k.partitionInput[i] <- mqEvent{resolvedTs: resolvedTs}:
}
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case k.flushWorker.msgChan <- mqEvent{resolvedTs: resolvedTs}:
}

// waiting for all row events are sent to mq producer
flushLoop:
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-k.resolvedReceiver.C:
for i := 0; i < int(k.partitionNum); i++ {
if resolvedTs > atomic.LoadUint64(&k.partitionResolvedTs[i]) {
continue flushLoop
}
}
return nil
}
}
return nil
}

func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand All @@ -277,7 +238,7 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
if msg == nil {
return nil
}
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
err = k.writeToProducer(ctx, msg, -1)
return errors.Trace(err)
}

Expand Down Expand Up @@ -319,7 +280,7 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
log.Debug("emit ddl event", zap.String("query", ddl.Query),
zap.Uint64("commitTs", ddl.CommitTs), zap.Int32("partition", partition),
zap.String("changefeed", k.id), zap.Any("role", k.role))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, partition)
err = k.writeToProducer(ctx, msg, partition)
return errors.Trace(err)
}

Expand All @@ -335,115 +296,26 @@ func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
}

func (k *mqSink) run(ctx context.Context) error {
defer k.resolvedReceiver.Stop()
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
return k.bgFlushTs(ctx)
})
for i := int32(0); i < k.partitionNum; i++ {
partition := i
wg.Go(func() error {
return k.runWorker(ctx, partition)
})
}
wg.Go(func() error {
return k.flushWorker.run(ctx)
})
return wg.Wait()
}

const batchSizeLimit = 4 * 1024 * 1024 // 4MB

func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
input := k.partitionInput[partition]
encoder, err := k.encoderBuilder.Build(ctx)
if err != nil {
return errors.Trace(err)
}
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()

flushToProducer := func() error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
}

for _, msg := range messages {
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
thisBatchSize += msg.GetRowsCount()
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
})
}
for {
var e mqEvent
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
continue
case e = <-input:
}
if e.row == nil {
// When receiving resolved ts events, we need to write all events to the producer.
// We don't need to flush it immediately, we wait until all partitions have received
// this event before we flush it uniformly.
if e.resolvedTs != 0 {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}

atomic.StoreUint64(&k.partitionResolvedTs[partition], e.resolvedTs)
k.resolvedNotifier.Notify()
}
continue
}
err := encoder.AppendRowChangedEvent(e.row)
func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, partition int32) error {
if partition >= 0 {
err := k.mqProducer.AsyncSendMessage(ctx, message, partition)
if err != nil {
return errors.Trace(err)
}

if encoder.Size() >= batchSizeLimit {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
return err
}
return k.mqProducer.Flush(ctx)
}
}

func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, op codec.EncoderResult, partition int32) error {
switch op {
case codec.EncoderNeedAsyncWrite:
if partition >= 0 {
return k.mqProducer.AsyncSendMessage(ctx, message, partition)
}
return cerror.ErrAsyncBroadcastNotSupport.GenWithStackByArgs()
case codec.EncoderNeedSyncWrite:
if partition >= 0 {
err := k.mqProducer.AsyncSendMessage(ctx, message, partition)
if err != nil {
return err
}
return k.mqProducer.Flush(ctx)
}
return k.mqProducer.SyncBroadcastMessage(ctx, message)
}

log.Warn("writeToProducer called with no-op",
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
return k.mqProducer.SyncBroadcastMessage(ctx, message)
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
Expand Down
Loading

0 comments on commit ed2ac96

Please sign in to comment.