Skip to content

Commit

Permalink
fix: dropped messages should not be considered for watermark propagat…
Browse files Browse the repository at this point in the history
…ion (#1386)

Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Nov 27, 2023
1 parent 5bf6307 commit 96cfa55
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,12 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
// since we use message event time instead of the watermark to determine and publish source watermarks,
// time.UnixMilli(-1) is assigned to the message watermark. transformedReadMessages are immediately
// used below for publishing source watermarks.
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))

// if message.EventTime is -1, it means that the message was dropped by the transformer
// so we should exclude it from watermark computation
if message.EventTime != time.UnixMilli(-1) {
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))
}
}
}
// publish source watermark
Expand Down

0 comments on commit 96cfa55

Please sign in to comment.