Skip to content

Commit

Permalink
sink/kafka(ticdc): remove useless partition flush logic (#4598)
Browse files Browse the repository at this point in the history
ref #4423
  • Loading branch information
Rustin170506 authored Feb 25, 2022
1 parent f3dacea commit 4ef1aed
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 114 deletions.
4 changes: 0 additions & 4 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ func (k *mqSink) bgFlushTs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
err = k.mqProducer.Flush(ctx)
if err != nil {
return errors.Trace(err)
}
// Since CDC does not guarantee exactly once semantic, it won't cause any problem
// here even if the table was moved or removed.
// ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134
Expand Down
26 changes: 20 additions & 6 deletions cdc/sink/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,23 @@ type mqEvent struct {

// flushWorker is responsible for sending messages to the Kafka producer on a batch basis.
type flushWorker struct {
msgChan chan mqEvent
msgChan chan mqEvent
ticker *time.Ticker
needSyncFlush bool

encoder codec.EventBatchEncoder
producer producer.Producer
statistics *Statistics
ticker *time.Ticker
}

// newFlushWorker creates a new flush worker.
func newFlushWorker(encoder codec.EventBatchEncoder, producer producer.Producer, statistics *Statistics) *flushWorker {
w := &flushWorker{
msgChan: make(chan mqEvent),
ticker: time.NewTicker(flushInterval),
encoder: encoder,
producer: producer,
statistics: statistics,
ticker: time.NewTicker(flushInterval),
}
return w
}
Expand All @@ -74,6 +76,7 @@ func (w *flushWorker) batch(ctx context.Context) ([]mqEvent, error) {
// When the resolved ts is received,
// we need to write the previous data to the producer as soon as possible.
if msg.resolvedTs != 0 {
w.needSyncFlush = true
return events, nil
}

Expand All @@ -90,6 +93,7 @@ func (w *flushWorker) batch(ctx context.Context) ([]mqEvent, error) {
return nil, ctx.Err()
case msg := <-w.msgChan:
if msg.resolvedTs != 0 {
w.needSyncFlush = true
return events, nil
}

Expand Down Expand Up @@ -118,8 +122,8 @@ func (w *flushWorker) group(events []mqEvent) map[int32][]mqEvent {
return paritionedEvents
}

// flush is responsible for sending messages to the Kafka producer.
func (w *flushWorker) flush(ctx context.Context, paritionedEvents map[int32][]mqEvent) error {
// asyncSend is responsible for sending messages to the Kafka producer.
func (w *flushWorker) asyncSend(ctx context.Context, paritionedEvents map[int32][]mqEvent) error {
for partition, events := range paritionedEvents {
for _, event := range events {
err := w.encoder.AppendRowChangedEvent(event.row)
Expand All @@ -145,6 +149,16 @@ func (w *flushWorker) flush(ctx context.Context, paritionedEvents map[int32][]mq
}
}

if w.needSyncFlush {
start := time.Now()
err := w.producer.Flush(ctx)
if err != nil {
return err
}
w.needSyncFlush = false
log.Debug("flush worker flushed", zap.Duration("duration", time.Since(start)))
}

return nil
}

Expand All @@ -158,7 +172,7 @@ func (w *flushWorker) run(ctx context.Context) error {
return errors.Trace(err)
}
paritionedEvents := w.group(events)
err = w.flush(ctx, paritionedEvents)
err = w.asyncSend(ctx, paritionedEvents)
if err != nil {
return errors.Trace(err)
}
Expand Down
73 changes: 65 additions & 8 deletions cdc/sink/mq_flush_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,31 @@ import (

type mockProducer struct {
mqEvent map[int32][]*codec.MQMessage
flushed bool
}

func (m mockProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error {
func (m *mockProducer) AsyncSendMessage(ctx context.Context, message *codec.MQMessage, partition int32) error {
if _, ok := m.mqEvent[partition]; !ok {
m.mqEvent[partition] = make([]*codec.MQMessage, 0)
}
m.mqEvent[partition] = append(m.mqEvent[partition], message)
return nil
}

func (m mockProducer) SyncBroadcastMessage(ctx context.Context, message *codec.MQMessage) error {
func (m *mockProducer) SyncBroadcastMessage(ctx context.Context, message *codec.MQMessage) error {
panic("Not used")
}

func (m mockProducer) Flush(ctx context.Context) error {
panic("Not used")
func (m *mockProducer) Flush(ctx context.Context) error {
m.flushed = true
return nil
}

func (m mockProducer) GetPartitionNum() int32 {
func (m *mockProducer) GetPartitionNum() int32 {
panic("Not used")
}

func (m mockProducer) Close() error {
func (m *mockProducer) Close() error {
panic("Not used")
}

Expand Down Expand Up @@ -153,7 +155,7 @@ func TestGroup(t *testing.T) {
require.Len(t, paritionedEvents[2], 1)
}

func TestFlush(t *testing.T) {
func TestAsyncSend(t *testing.T) {
worker, producer := newTestWorker()
events := []mqEvent{
{
Expand Down Expand Up @@ -207,14 +209,69 @@ func TestFlush(t *testing.T) {
}

paritionedEvents := worker.group(events)
err := worker.flush(context.Background(), paritionedEvents)
err := worker.asyncSend(context.Background(), paritionedEvents)
require.NoError(t, err)
require.Len(t, producer.mqEvent, 3)
require.Len(t, producer.mqEvent[1], 3)
require.Len(t, producer.mqEvent[2], 1)
require.Len(t, producer.mqEvent[3], 2)
}

func TestFlush(t *testing.T) {
worker, producer := newTestWorker()
events := []mqEvent{
{
row: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
},
partition: 1,
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
partition: 1,
},
{
row: &model.RowChangedEvent{
CommitTs: 3,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}},
},
partition: 1,
},
{
resolvedTs: 1,
},
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
batch, err := worker.batch(ctx)
require.NoError(t, err)
require.Len(t, batch, 3)
require.True(t, worker.needSyncFlush)
paritionedEvents := worker.group(batch)
err = worker.asyncSend(ctx, paritionedEvents)
require.NoError(t, err)
require.True(t, producer.flushed)
require.False(t, worker.needSyncFlush)
}()

for _, event := range events {
worker.msgChan <- event
}

wg.Wait()
}

func TestAbort(t *testing.T) {
worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading

0 comments on commit 4ef1aed

Please sign in to comment.