Skip to content

Commit

Permalink
fix: non-ack failed offsets (#1370)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Nov 16, 2023
1 parent a5c0805 commit 118c309
Showing 1 changed file with 74 additions and 52 deletions.
126 changes: 74 additions & 52 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
}).Add(float64(len(messages)))

// write messages to windows based by PBQs.
successfullyWrittenMessages, err := df.writeMessagesToWindows(ctx, dataMessages)
successfullyWrittenMessages, failedMessages, err := df.writeMessagesToWindows(ctx, dataMessages)
if err != nil {
df.log.Errorw("Failed to write messages", zap.Int("totalMessages", len(messages)), zap.Int("writtenMessage", len(successfullyWrittenMessages)))
}
Expand All @@ -348,9 +348,17 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
if len(successfullyWrittenMessages) == 0 {
return
}

// ack successful messages
df.ackMessages(ctx, successfullyWrittenMessages)

// no-ack the failed messages, so that they will be retried in the next iteration
// if we don't do this, the failed messages will be retried after the ackWait time
// which will cause correctness issues. We want these messages to be immediately retried.
// When a message is retried, the offset remains the same, so an old message might jump out of the offset-timeline and cause the watermark to be -1.
// The correctness is violated because the queue offset and message order are no longer monotonically increasing for those failed messages.
df.noAckMessages(ctx, failedMessages)

// close any windows that need to be closed.
// since the watermark will be same for all the messages in the batch
// we can invoke remove windows only once per batch
Expand Down Expand Up @@ -387,56 +395,15 @@ func (df *DataForward) Process(ctx context.Context, messages []*isb.ReadMessage)
}

// writeMessagesToWindows write the messages to each window that message belongs to. Each window is backed by a PBQ.
func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*isb.ReadMessage) ([]*isb.ReadMessage, error) {
func (df *DataForward) writeMessagesToWindows(ctx context.Context, messages []*isb.ReadMessage) ([]*isb.ReadMessage, []*isb.ReadMessage, error) {
var err error
var writtenMessages = make([]*isb.ReadMessage, 0, len(messages))
var failedMessages = make([]*isb.ReadMessage, 0)

messagesLoop:
for _, message := range messages {
// drop the late messages only if there is no window open
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
writtenMessages = append(writtenMessages, message)
continue
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "late"}).Inc()

// mark it as a successfully written message as the message will be acked to avoid subsequent retries
writtenMessages = append(writtenMessages, message)
continue
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
}

// We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed,
// it is accepted.

// NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message.
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
readLoop:
for i, message := range messages {
if df.shouldDropMessage(message) {
writtenMessages = append(writtenMessages, message)
// let's not continue processing this message, most likely the window has already been closed and the message
// won't be processed anyways.
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "watermark_issue"}).Inc()
continue
}

Expand All @@ -446,23 +413,69 @@ messagesLoop:
// for each window we will have a PBQ. A message could belong to multiple windows (e.g., sliding).
// We need to write the messages to these PBQs
for _, kw := range windows {

for _, partitionID := range kw.Partitions() {

err := df.writeToPBQ(ctx, message, partitionID, kw)
err = df.writeToPBQ(ctx, message, partitionID, kw)
// there is no point continuing because we are seeing an error.
// this error will ONLY BE set if we are in a erroring loop and ctx.Done() has been invoked.
if err != nil {
df.log.Errorw("Failed to write message, asked to stop trying", zap.Any("msgOffSet", message.ReadOffset.String()), zap.String("partitionID", partitionID.String()), zap.Error(err))
break messagesLoop
failedMessages = append(failedMessages, messages[i:]...)
break readLoop
}
}
}

writtenMessages = append(writtenMessages, message)
}

return writtenMessages, err
return writtenMessages, failedMessages, err
}

func (df *DataForward) shouldDropMessage(message *isb.ReadMessage) bool {
if message.IsLate {
// we should be able to get the late message in as long as there is an open window
nextWinAsSeenByWriter := df.windower.NextWindowToBeClosed()
// if there is no window open, drop the message
if nextWinAsSeenByWriter == nil {
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark))
return true
} else if message.EventTime.Before(nextWinAsSeenByWriter.StartTime()) { // if the message doesn't fall in the next window that is about to be closed drop it.
df.log.Warnw("Dropping the late message", zap.Time("eventTime", message.EventTime), zap.Time("watermark", message.Watermark), zap.Time("nextWindowToBeClosed", nextWinAsSeenByWriter.StartTime()))
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "late"}).Inc()
return true

// mark it as a successfully written message as the message will be acked to avoid subsequent retries
} else { // if the message falls in the next window that is about to be closed, keep it
df.log.Debugw("Keeping the late message for next condition check because COB has not happened yet", zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Int64("nextWindowToBeClosed.startTime", nextWinAsSeenByWriter.StartTime().UnixMilli()))
}
}

// We will accept data as long as window is open. If a straggler (late data) makes in before the window is closed,
// it is accepted.

// NOTE(potential bug): if we get a message where the event-time is < (watermark-allowedLateness), skip processing the message.
// This could be due to a couple of problem, eg. ack was not registered, etc.
// Please do not confuse this with late data! This is a platform related problem causing the watermark inequality
// to be violated.
if !message.IsLate && message.EventTime.Before(message.Watermark.Add(-1*df.opts.allowedLateness)) {
// TODO: track as a counter metric
df.log.Errorw("An old message just popped up", zap.Any("msgOffSet", message.ReadOffset.String()), zap.Int64("eventTime", message.EventTime.UnixMilli()), zap.Int64("watermark", message.Watermark.UnixMilli()), zap.Any("message", message.Message))
// mark it as a successfully written message as the message will be acked to avoid subsequent retries
// let's not continue processing this message, most likely the window has already been closed and the message
// won't be processed anyways.
metrics.ReduceDroppedMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
metrics.LabelPipeline: df.pipelineName,
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
metrics.LabelReason: "watermark_issue"}).Inc()
return true
}

return false
}

// writeToPBQ writes to the PBQ. It will return error only if it is not failing to write to PBQ and is in a continuous
Expand Down Expand Up @@ -579,6 +592,15 @@ func (df *DataForward) ackMessages(ctx context.Context, messages []*isb.ReadMess
wg.Wait()
}

// noAckMessages no-acks all the read offsets of failed messages.
func (df *DataForward) noAckMessages(ctx context.Context, failedMessages []*isb.ReadMessage) {
var readOffsets []isb.Offset
for _, m := range failedMessages {
readOffsets = append(readOffsets, m.ReadOffset)
}
df.fromBufferPartition.NoAck(ctx, readOffsets)
}

// ShutDown shutdowns the read-loop.
func (df *DataForward) ShutDown(ctx context.Context) {

Expand Down

0 comments on commit 118c309

Please sign in to comment.