Skip to content

Commit

Permalink
fix: move watermark based on the head of the read batch (#332)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Nov 8, 2022
1 parent b2b975f commit 10f355c
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
11 changes: 7 additions & 4 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"sync"
"time"

"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
metricspkg "github.com/numaproj/numaflow/pkg/metrics"
Expand Down Expand Up @@ -194,8 +195,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) {

// fetch watermark if available
// TODO: make it async (concurrent and wait later)
// let's track only the last element's watermark
processorWM := isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset)
// let's track only the first element's watermark. This is important because we reassign the watermark we fetch
// to all the elements in the batch. If we were to assign last element's watermark, we will wronly mark on-time data
// as πlate date.
processorWM := isdf.fetchWatermark.GetWatermark(readMessages[0].ReadOffset)
for _, m := range readMessages {
readBytesCount.With(map[string]string{metricspkg.LabelVertex: isdf.vertexName, metricspkg.LabelPipeline: isdf.pipelineName, "buffer": isdf.fromBuffer.GetName()}).Add(float64(len(m.Payload)))
m.Watermark = time.Time(processorWM)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ loop:
// into the offset timeline store.
// Please note that we are inserting the watermark before the data has been persisted into ISB by the forwarder.
o := msgs[len(msgs)-1].ReadOffset
nanos, _ := o.Sequence()
// use the first eventime as watermark to make it conservative
nanos, _ := msgs[0].ReadOffset.Sequence()
// remove the nanosecond precision
mg.sourcePublishWM.PublishWatermark(processor.Watermark(time.Unix(0, nanos)), o)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ func (h *httpSource) GetName() string {

func (h *httpSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
msgs := []*isb.ReadMessage{}
var latest time.Time
var oldest time.Time
timeout := time.After(h.readTimeout)
loop:
for i := int64(0); i < count; i++ {
select {
case m := <-h.messages:
if latest.IsZero() || m.EventTime.After(latest) {
latest = m.EventTime
if oldest.IsZero() || m.EventTime.Before(oldest) {
oldest = m.EventTime
}
msgs = append(msgs, m)
httpSourceReadCount.With(map[string]string{metricspkg.LabelVertex: h.name, metricspkg.LabelPipeline: h.pipelineName}).Inc()
Expand All @@ -234,8 +234,8 @@ loop:
}
}
h.logger.Debugf("Read %d messages.", len(msgs))
if len(msgs) > 0 && !latest.IsZero() {
h.sourcePublishWM.PublishWatermark(processor.Watermark(latest), msgs[len(msgs)-1].ReadOffset)
if len(msgs) > 0 && !oldest.IsZero() {
h.sourcePublishWM.PublishWatermark(processor.Watermark(oldest), msgs[len(msgs)-1].ReadOffset)
}
return msgs, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *KafkaSource) GetName() string {
// at-least-once semantics for reading, during restart we will have to reprocess all unacknowledged messages.
func (r *KafkaSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
// It stores latest timestamps for different partitions
latestTimestamps := make(map[int32]time.Time)
oldestTimestamps := make(map[int32]time.Time)
msgs := make([]*isb.ReadMessage, 0, count)
timeout := time.After(r.readTimeout)
loop:
Expand All @@ -139,16 +139,16 @@ loop:
_m := toReadMessage(m)
msgs = append(msgs, _m)
// Get latest timestamps for different partitions
if t, ok := latestTimestamps[m.Partition]; !ok || m.Timestamp.After(t) {
latestTimestamps[m.Partition] = m.Timestamp
if t, ok := oldestTimestamps[m.Partition]; !ok || m.Timestamp.Before(t) {
oldestTimestamps[m.Partition] = m.Timestamp
}
case <-timeout:
// log that timeout has happened and don't return an error
r.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", r.readTimeout))
break loop
}
}
for p, t := range latestTimestamps {
for p, t := range oldestTimestamps {
publisher := r.loadSourceWartermarkPublisher(p)
publisher.PublishWatermark(processor.Watermark(t), nil) // Source publisher does not care about the offset
}
Expand Down

0 comments on commit 10f355c

Please sign in to comment.