Skip to content

Commit

Permalink
feat: idle watermark v0 (#520)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
Co-authored-by: jyu6 <[email protected]>
  • Loading branch information
2 people authored and ashwinidulams committed Feb 14, 2023
1 parent 2844cfb commit ac33fb0
Show file tree
Hide file tree
Showing 16 changed files with 602 additions and 210 deletions.
15 changes: 15 additions & 0 deletions pkg/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
return
}

// activeWatermarkBuffers records the buffers that the publisher has published
// a watermark in this batch processing cycle.
// it's used to determine which buffers should receive an idle watermark.
var activeWatermarkBuffers = make(map[string]bool)
// forward the highest watermark to all the edges to avoid idle edge problem
// TODO: sort and get the highest value
for bufferName, offsets := range writeOffsets {
Expand All @@ -283,11 +287,22 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {
isdf.opts.vertexType == dfv1.VertexTypeReduceUDF {
if len(offsets) > 0 {
publisher.PublishWatermark(processorWM, offsets[len(offsets)-1])
activeWatermarkBuffers[bufferName] = true
}
// This (len(offsets) == 0) happens at conditional forwarding, there's no data written to the buffer
// TODO: Should also publish to those edges without writing (fall out of conditional forwarding)
} else { // For Sink vertex, and it does not care about the offset during watermark publishing
publisher.PublishWatermark(processorWM, nil)
activeWatermarkBuffers[bufferName] = true
}
}
}
if len(activeWatermarkBuffers) < len(isdf.publishWatermark) {
// if there's any buffers that haven't received any watermark during this
// batch processing cycle, send an idle watermark
for bufferName := range isdf.publishWatermark {
if !activeWatermarkBuffers[bufferName] {
isdf.publishWatermark[bufferName].PublishIdleWatermark()
}
}
}
Expand Down
Loading

0 comments on commit ac33fb0

Please sign in to comment.