Skip to content

Commit

Permalink
sink(ticdc): fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Mar 14, 2022
1 parent cd973f7 commit ee1567e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
5 changes: 4 additions & 1 deletion cdc/sink/mq_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ func (w *flushWorker) group(events []mqEvent) map[int32][]*model.RowChangedEvent
}

// asyncSend is responsible for sending messages to the Kafka producer.
func (w *flushWorker) asyncSend(ctx context.Context, paritionedRows map[int32][]*model.RowChangedEvent) error {
func (w *flushWorker) asyncSend(
ctx context.Context,
paritionedRows map[int32][]*model.RowChangedEvent,
) error {
for partition, events := range paritionedRows {
for _, event := range events {
err := w.encoder.AppendRowChangedEvent(event)
Expand Down
6 changes: 5 additions & 1 deletion cdc/sink/mq_flush_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ func TestGroup(t *testing.T) {
require.Len(t, paritionedRows, 2)
require.Len(t, paritionedRows[1], 3)
// We must ensure that the sequence is not broken.
require.LessOrEqual(t, paritionedRows[1][0].CommitTs, paritionedRows[1][1].CommitTs, paritionedRows[1][2].CommitTs)
require.LessOrEqual(
t,
paritionedRows[1][0].CommitTs, paritionedRows[1][1].CommitTs,
paritionedRows[1][2].CommitTs,
)
require.Len(t, paritionedRows[2], 1)
}

Expand Down

0 comments on commit ee1567e

Please sign in to comment.