Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add flush worker to flush row changed events #4633

Merged
merged 16 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 29 additions & 157 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -32,18 +30,12 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type mqEvent struct {
row *model.RowChangedEvent
resolvedTs model.Ts
}

type resolvedTsEvent struct {
tableID model.TableID
resolvedTs model.Ts
Expand All @@ -67,13 +59,9 @@ type mqSink struct {
filter *filter.Filter
protocol config.Protocol

partitionNum int32
partitionInput []chan mqEvent
partitionResolvedTs []uint64
flushWorker *flushWorker
tableCheckpointTsMap sync.Map
resolvedBuffer chan resolvedTsEvent
resolvedNotifier *notify.Notifier
resolvedReceiver *notify.Receiver

statistics *Statistics

Expand Down Expand Up @@ -111,34 +99,27 @@ func newMqSink(
partitionInput[i] = make(chan mqEvent, defaultPartitionInputChSize)
}

notifier := new(notify.Notifier)
resolvedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)

encoder, err := encoderBuilder.Build(ctx)
if err != nil {
return nil, errors.Trace(err)
}

changefeedID := util.ChangefeedIDFromCtx(ctx)
role := util.RoleFromCtx(ctx)
statistics := NewStatistics(ctx, "MQ")
flushWorker := newFlushWorker(encoder, mqProducer, statistics)

s := &mqSink{
mqProducer: mqProducer,
dispatcher: d,
encoderBuilder: encoderBuilder,
filter: filter,
protocol: protocol,

partitionNum: partitionNum,
partitionInput: partitionInput,
partitionResolvedTs: make([]uint64, partitionNum),

resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
flushWorker: flushWorker,
resolvedBuffer: make(chan resolvedTsEvent, defaultResolvedTsEventBufferSize),
statistics: statistics,
role: role,
id: changefeedID,
}

go func() {
Expand Down Expand Up @@ -181,10 +162,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
select {
case <-ctx.Done():
return ctx.Err()
case k.partitionInput[partition] <- struct {
row *model.RowChangedEvent
resolvedTs uint64
}{row: row}:
case k.flushWorker.msgChan <- mqEvent{row: row, partition: partition}:
}
rowsCount++
}
Expand Down Expand Up @@ -239,30 +217,13 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error {
}

func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error {
// flush resolvedTs to all partition workers
for i := 0; i < int(k.partitionNum); i++ {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case k.partitionInput[i] <- mqEvent{resolvedTs: resolvedTs}:
}
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case k.flushWorker.msgChan <- mqEvent{resolvedTs: resolvedTs}:
}

// waiting for all row events are sent to mq producer
flushLoop:
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-k.resolvedReceiver.C:
for i := 0; i < int(k.partitionNum); i++ {
if resolvedTs > atomic.LoadUint64(&k.partitionResolvedTs[i]) {
continue flushLoop
}
}
return nil
}
}
return nil
}

func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand All @@ -277,7 +238,7 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
if msg == nil {
return nil
}
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, -1)
err = k.writeToProducer(ctx, msg, -1)
return errors.Trace(err)
}

Expand Down Expand Up @@ -319,7 +280,7 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
log.Debug("emit ddl event", zap.String("query", ddl.Query),
zap.Uint64("commitTs", ddl.CommitTs), zap.Int32("partition", partition),
zap.String("changefeed", k.id), zap.Any("role", k.role))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, partition)
err = k.writeToProducer(ctx, msg, partition)
return errors.Trace(err)
}

Expand All @@ -335,115 +296,26 @@ func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
}

func (k *mqSink) run(ctx context.Context) error {
defer k.resolvedReceiver.Stop()
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
return k.bgFlushTs(ctx)
})
for i := int32(0); i < k.partitionNum; i++ {
partition := i
wg.Go(func() error {
return k.runWorker(ctx, partition)
})
}
wg.Go(func() error {
return k.flushWorker.run(ctx)
})
return wg.Wait()
}

const batchSizeLimit = 4 * 1024 * 1024 // 4MB

func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
input := k.partitionInput[partition]
encoder, err := k.encoderBuilder.Build(ctx)
if err != nil {
return errors.Trace(err)
}
tick := time.NewTicker(500 * time.Millisecond)
defer tick.Stop()

flushToProducer := func() error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
}

for _, msg := range messages {
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
thisBatchSize += msg.GetRowsCount()
}
log.Debug("MQSink flushed", zap.Int("thisBatchSize", thisBatchSize),
zap.String("changefeed", k.id), zap.Any("role", k.role))
return thisBatchSize, nil
})
}
for {
var e mqEvent
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
continue
case e = <-input:
}
if e.row == nil {
// When receiving resolved ts events, we need to write all events to the producer.
// We don't need to flush it immediately, we wait until all partitions have received
// this event before we flush it uniformly.
if e.resolvedTs != 0 {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}

atomic.StoreUint64(&k.partitionResolvedTs[partition], e.resolvedTs)
k.resolvedNotifier.Notify()
}
continue
}
err := encoder.AppendRowChangedEvent(e.row)
func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, partition int32) error {
if partition >= 0 {
err := k.mqProducer.AsyncSendMessage(ctx, message, partition)
if err != nil {
return errors.Trace(err)
}

if encoder.Size() >= batchSizeLimit {
if err := flushToProducer(); err != nil {
return errors.Trace(err)
}
return err
}
return k.mqProducer.Flush(ctx)
}
}

func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, op codec.EncoderResult, partition int32) error {
switch op {
case codec.EncoderNeedAsyncWrite:
if partition >= 0 {
return k.mqProducer.AsyncSendMessage(ctx, message, partition)
}
return cerror.ErrAsyncBroadcastNotSupport.GenWithStackByArgs()
case codec.EncoderNeedSyncWrite:
if partition >= 0 {
err := k.mqProducer.AsyncSendMessage(ctx, message, partition)
if err != nil {
return err
}
return k.mqProducer.Flush(ctx)
}
return k.mqProducer.SyncBroadcastMessage(ctx, message)
}

log.Warn("writeToProducer called with no-op",
zap.ByteString("key", message.Key),
zap.ByteString("value", message.Value),
zap.Int32("partition", partition),
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
return k.mqProducer.SyncBroadcastMessage(ctx, message)
}

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
Expand Down
Loading