Skip to content

Commit

Permalink
fix(watermark): generator should not publish wm for every message (#217)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Oct 27, 2022
1 parent 32f7b1d commit a37cece
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ nav:
- debugging.md
- static-code-analysis.md
- releasing.md
- Numaproj: https://www.numaproj.io
21 changes: 11 additions & 10 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,29 @@ func (mg *memgen) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, er
msgs := make([]*isb.ReadMessage, 0, count)
// timeout should not be re-triggered for every run of the for loop. it is for the entire Read() call.
timeout := time.After(mg.readTimeout)
loop:
for i := int64(0); i < count; i++ {
// since the Read call is blocking, and runs in an infinite loop
// we implement Read With Wait semantics
select {
case r := <-mg.srcchan:
tickgenSourceReadCount.With(map[string]string{metricspkg.LabelVertex: mg.name, metricspkg.LabelPipeline: mg.pipelineName}).Inc()
msgs = append(msgs, newreadmessage(r.data, r.offset))
// publish the last message's offset with watermark, this is an optimization to avoid too many insert calls
// into the offset timeline store.
// Please note that we are inserting the watermark before the data has been persisted into ISB by the forwarder.
o := msgs[len(msgs)-1].ReadOffset
nanos, _ := o.Sequence()
// remove the nanosecond precision
mg.sourcePublishWM.PublishWatermark(processor.Watermark(time.Unix(0, nanos)), o)

case <-timeout:
mg.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", mg.readTimeout))
return msgs, nil
break loop
}
}
if len(msgs) > 0 {
// publish the last message's offset with watermark, this is an optimization to avoid too many insert calls
// into the offset timeline store.
// Please note that we are inserting the watermark before the data has been persisted into ISB by the forwarder.
o := msgs[len(msgs)-1].ReadOffset
nanos, _ := o.Sequence()
// remove the nanosecond precision
mg.sourcePublishWM.PublishWatermark(processor.Watermark(time.Unix(0, nanos)), o)
}
return msgs, nil

}

// Ack acknowledges an array of offset.
Expand Down
4 changes: 4 additions & 0 deletions pkg/watermark/generic/jetstream/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
)

// BuildWatermarkProgressors is used to populate fetchWatermark, and a map of publishWatermark with edge name as the key.
Expand Down Expand Up @@ -87,6 +88,9 @@ func BuildSourcePublisherStores(ctx context.Context, vertexInstance *v1alpha1.Ve
if !vertexInstance.Vertex.IsASource() {
return nil, fmt.Errorf("not a source vertex")
}
if !sharedutil.IsWatermarkEnabled() {
return store.BuildWatermarkStore(noop.NewKVNoOpStore(), noop.NewKVNoOpStore()), nil
}
pipelineName := vertexInstance.Vertex.Spec.PipelineName
sourceBufferName := vertexInstance.Vertex.GetFromBuffers()[0].Name
// heartbeat
Expand Down

0 comments on commit a37cece

Please sign in to comment.