Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add batch buffer for flush worker #4884

Merged
merged 11 commits into from
Mar 15, 2022
59 changes: 36 additions & 23 deletions cdc/sink/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,27 @@ func newFlushWorker(encoder codec.EventBatchEncoder, producer producer.Producer,
}

// batch collects a batch of messages to be sent to the Kafka producer.
func (w *flushWorker) batch(ctx context.Context) ([]mqEvent, error) {
var events []mqEvent
func (w *flushWorker) batch(
ctx context.Context, events []mqEvent,
) (int, error) {
index := 0
max := len(events)
// We need to receive at least one message or be interrupted,
// otherwise it will lead to idling.
select {
case <-ctx.Done():
return nil, ctx.Err()
return index, ctx.Err()
case msg := <-w.msgChan:
// When the resolved ts is received,
// we need to write the previous data to the producer as soon as possible.
if msg.resolvedTs != 0 {
w.needSyncFlush = true
return events, nil
return index, nil
}

if msg.row != nil {
events = append(events, msg)
events[index] = msg
index++
}
}

Expand All @@ -90,43 +94,47 @@ func (w *flushWorker) batch(ctx context.Context) ([]mqEvent, error) {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
return index, ctx.Err()
case msg := <-w.msgChan:
if msg.resolvedTs != 0 {
w.needSyncFlush = true
return events, nil
return index, nil
}

if msg.row != nil {
events = append(events, msg)
events[index] = msg
index++
}

if len(events) >= flushBatchSize {
return events, nil
if index >= max {
return index, nil
}
case <-w.ticker.C:
return events, nil
return index, nil
}
}
}

// group is responsible for grouping messages by the partition.
func (w *flushWorker) group(events []mqEvent) map[int32][]mqEvent {
paritionedEvents := make(map[int32][]mqEvent)
func (w *flushWorker) group(events []mqEvent) map[int32][]*model.RowChangedEvent {
paritionedRows := make(map[int32][]*model.RowChangedEvent)
for _, event := range events {
if _, ok := paritionedEvents[event.partition]; !ok {
paritionedEvents[event.partition] = make([]mqEvent, 0)
if _, ok := paritionedRows[event.partition]; !ok {
paritionedRows[event.partition] = make([]*model.RowChangedEvent, 0)
}
paritionedEvents[event.partition] = append(paritionedEvents[event.partition], event)
paritionedRows[event.partition] = append(paritionedRows[event.partition], event.row)
}
return paritionedEvents
return paritionedRows
}

// asyncSend is responsible for sending messages to the Kafka producer.
func (w *flushWorker) asyncSend(ctx context.Context, paritionedEvents map[int32][]mqEvent) error {
for partition, events := range paritionedEvents {
func (w *flushWorker) asyncSend(
ctx context.Context,
paritionedRows map[int32][]*model.RowChangedEvent,
) error {
for partition, events := range paritionedRows {
for _, event := range events {
err := w.encoder.AppendRowChangedEvent(event.row)
err := w.encoder.AppendRowChangedEvent(event)
if err != nil {
return err
}
Expand Down Expand Up @@ -166,13 +174,18 @@ func (w *flushWorker) asyncSend(ctx context.Context, paritionedEvents map[int32]
// until it encounters an error or is interrupted.
func (w *flushWorker) run(ctx context.Context) error {
defer w.ticker.Stop()
eventsBuf := make([]mqEvent, flushBatchSize)
for {
events, err := w.batch(ctx)
endIndex, err := w.batch(ctx, eventsBuf)
if err != nil {
return errors.Trace(err)
}
paritionedEvents := w.group(events)
err = w.asyncSend(ctx, paritionedEvents)
if endIndex == 0 {
continue
}
msgs := eventsBuf[:endIndex]
paritionedRows := w.group(msgs)
err = w.asyncSend(ctx, paritionedRows)
if err != nil {
return errors.Trace(err)
}
Expand Down
135 changes: 101 additions & 34 deletions cdc/sink/mq_flush_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,46 +79,101 @@ func newTestWorker() (*flushWorker, *mockProducer) {
}

func TestBatch(t *testing.T) {
t.Parallel()

worker, _ := newTestWorker()
events := []mqEvent{
tests := []struct {
name string
events []mqEvent
expectedN int
}{
{
resolvedTs: 0,
name: "Normal batching",
events: []mqEvent{
{
resolvedTs: 0,
},
{
row: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
},
partition: 1,
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
partition: 1,
},
},
expectedN: 2,
},
{
row: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
name: "No row change events",
events: []mqEvent{
{
resolvedTs: 1,
},
},
partition: 1,
expectedN: 0,
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
name: "The resolved ts event appears in the middle",
events: []mqEvent{
{
row: &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
},
partition: 1,
},
{
resolvedTs: 1,
},
{
row: &model.RowChangedEvent{
CommitTs: 2,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}},
},
partition: 1,
},
},
partition: 1,
expectedN: 1,
},
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
result, err := worker.batch(context.Background())
require.NoError(t, err)
require.Len(t, result, 2)
}()
ctx := context.Background()
batch := make([]mqEvent, 3)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
endIndex, err := worker.batch(ctx, batch)
require.NoError(t, err)
require.Equal(t, test.expectedN, endIndex)
}()

for _, event := range events {
worker.msgChan <- event
go func() {
for _, event := range test.events {
worker.msgChan <- event
}
}()
wg.Wait()
})
}

wg.Wait()
}

func TestGroup(t *testing.T) {
t.Parallel()

worker, _ := newTestWorker()
events := []mqEvent{
{
Expand Down Expand Up @@ -155,15 +210,21 @@ func TestGroup(t *testing.T) {
},
}

paritionedEvents := worker.group(events)
require.Len(t, paritionedEvents, 2)
require.Len(t, paritionedEvents[1], 3)
paritionedRows := worker.group(events)
require.Len(t, paritionedRows, 2)
require.Len(t, paritionedRows[1], 3)
// We must ensure that the sequence is not broken.
require.LessOrEqual(t, paritionedEvents[1][0].row.CommitTs, paritionedEvents[1][1].row.CommitTs, paritionedEvents[1][2].row.CommitTs)
require.Len(t, paritionedEvents[2], 1)
require.LessOrEqual(
t,
paritionedRows[1][0].CommitTs, paritionedRows[1][1].CommitTs,
paritionedRows[1][2].CommitTs,
)
require.Len(t, paritionedRows[2], 1)
}

func TestAsyncSend(t *testing.T) {
t.Parallel()

worker, producer := newTestWorker()
events := []mqEvent{
{
Expand Down Expand Up @@ -216,8 +277,8 @@ func TestAsyncSend(t *testing.T) {
},
}

paritionedEvents := worker.group(events)
err := worker.asyncSend(context.Background(), paritionedEvents)
paritionedRows := worker.group(events)
err := worker.asyncSend(context.Background(), paritionedRows)
require.NoError(t, err)
require.Len(t, producer.mqEvent, 3)
require.Len(t, producer.mqEvent[1], 3)
Expand All @@ -226,6 +287,8 @@ func TestAsyncSend(t *testing.T) {
}

func TestFlush(t *testing.T) {
t.Parallel()

worker, producer := newTestWorker()
events := []mqEvent{
{
Expand Down Expand Up @@ -261,13 +324,15 @@ func TestFlush(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
batchBuf := make([]mqEvent, 4)
ctx := context.Background()
batch, err := worker.batch(ctx)
endIndex, err := worker.batch(ctx, batchBuf)
require.NoError(t, err)
require.Len(t, batch, 3)
require.Equal(t, 3, endIndex)
require.True(t, worker.needSyncFlush)
paritionedEvents := worker.group(batch)
err = worker.asyncSend(ctx, paritionedEvents)
msgs := batchBuf[:endIndex]
paritionedRows := worker.group(msgs)
err = worker.asyncSend(ctx, paritionedRows)
require.NoError(t, err)
require.True(t, producer.flushed)
require.False(t, worker.needSyncFlush)
Expand All @@ -281,6 +346,8 @@ func TestFlush(t *testing.T) {
}

func TestAbort(t *testing.T) {
t.Parallel()

worker, _ := newTestWorker()
ctx, cancel := context.WithCancel(context.Background())

Expand Down