diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index e473dbac34..6483f68fc2 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -26,11 +26,12 @@ import ( "sync" "time" - "github.com/numaproj/numaflow/pkg/watermark/fetch" - "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/publish" + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" metricspkg "github.com/numaproj/numaflow/pkg/metrics" @@ -194,8 +195,10 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { // fetch watermark if available // TODO: make it async (concurrent and wait later) - // let's track only the last element's watermark - processorWM := isdf.fetchWatermark.GetWatermark(readMessages[len(readMessages)-1].ReadOffset) + // let's track only the first element's watermark. This is important because we reassign the watermark we fetch + // to all the elements in the batch. If we were to assign last element's watermark, we will wronly mark on-time data + // as Ο€late date. + processorWM := isdf.fetchWatermark.GetWatermark(readMessages[0].ReadOffset) for _, m := range readMessages { readBytesCount.With(map[string]string{metricspkg.LabelVertex: isdf.vertexName, metricspkg.LabelPipeline: isdf.pipelineName, "buffer": isdf.fromBuffer.GetName()}).Add(float64(len(m.Payload))) m.Watermark = time.Time(processorWM) diff --git a/pkg/sources/generator/tickgen.go b/pkg/sources/generator/tickgen.go index f8a22b45d0..5810a50f57 100644 --- a/pkg/sources/generator/tickgen.go +++ b/pkg/sources/generator/tickgen.go @@ -244,7 +244,8 @@ loop: // into the offset timeline store. // Please note that we are inserting the watermark before the data has been persisted into ISB by the forwarder. o := msgs[len(msgs)-1].ReadOffset - nanos, _ := o.Sequence() + // use the first eventime as watermark to make it conservative + nanos, _ := msgs[0].ReadOffset.Sequence() // remove the nanosecond precision mg.sourcePublishWM.PublishWatermark(processor.Watermark(time.Unix(0, nanos)), o) } diff --git a/pkg/sources/http/http.go b/pkg/sources/http/http.go index aaa8a059c4..4ecea9dd50 100644 --- a/pkg/sources/http/http.go +++ b/pkg/sources/http/http.go @@ -217,14 +217,14 @@ func (h *httpSource) GetName() string { func (h *httpSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) { msgs := []*isb.ReadMessage{} - var latest time.Time + var oldest time.Time timeout := time.After(h.readTimeout) loop: for i := int64(0); i < count; i++ { select { case m := <-h.messages: - if latest.IsZero() || m.EventTime.After(latest) { - latest = m.EventTime + if oldest.IsZero() || m.EventTime.Before(oldest) { + oldest = m.EventTime } msgs = append(msgs, m) httpSourceReadCount.With(map[string]string{metricspkg.LabelVertex: h.name, metricspkg.LabelPipeline: h.pipelineName}).Inc() @@ -234,8 +234,8 @@ loop: } } h.logger.Debugf("Read %d messages.", len(msgs)) - if len(msgs) > 0 && !latest.IsZero() { - h.sourcePublishWM.PublishWatermark(processor.Watermark(latest), msgs[len(msgs)-1].ReadOffset) + if len(msgs) > 0 && !oldest.IsZero() { + h.sourcePublishWM.PublishWatermark(processor.Watermark(oldest), msgs[len(msgs)-1].ReadOffset) } return msgs, nil } diff --git a/pkg/sources/kafka/reader.go b/pkg/sources/kafka/reader.go index 1deec3ba28..5fa31dd3a3 100644 --- a/pkg/sources/kafka/reader.go +++ b/pkg/sources/kafka/reader.go @@ -128,7 +128,7 @@ func (r *KafkaSource) GetName() string { // at-least-once semantics for reading, during restart we will have to reprocess all unacknowledged messages. func (r *KafkaSource) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) { // It stores latest timestamps for different partitions - latestTimestamps := make(map[int32]time.Time) + oldestTimestamps := make(map[int32]time.Time) msgs := make([]*isb.ReadMessage, 0, count) timeout := time.After(r.readTimeout) loop: @@ -139,8 +139,8 @@ loop: _m := toReadMessage(m) msgs = append(msgs, _m) // Get latest timestamps for different partitions - if t, ok := latestTimestamps[m.Partition]; !ok || m.Timestamp.After(t) { - latestTimestamps[m.Partition] = m.Timestamp + if t, ok := oldestTimestamps[m.Partition]; !ok || m.Timestamp.Before(t) { + oldestTimestamps[m.Partition] = m.Timestamp } case <-timeout: // log that timeout has happened and don't return an error @@ -148,7 +148,7 @@ loop: break loop } } - for p, t := range latestTimestamps { + for p, t := range oldestTimestamps { publisher := r.loadSourceWartermarkPublisher(p) publisher.PublishWatermark(processor.Watermark(t), nil) // Source publisher does not care about the offset }