From 6eb25c251263eb0d452100968ff9f8cf9b52382b Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 4 Dec 2023 20:46:12 -0500 Subject: [PATCH] fix: include dropped messages in source watermark calculation (#1404) --- .../sources/transformer/overview.md | 19 +++++++++++-------- go.mod | 2 +- go.sum | 4 ++-- pkg/sources/forward/data_forward.go | 7 +------ .../transformer/builtin/filter/filter.go | 4 ++-- .../time_extraction_filter.go | 4 ++-- 6 files changed, 19 insertions(+), 21 deletions(-) diff --git a/docs/user-guide/sources/transformer/overview.md b/docs/user-guide/sources/transformer/overview.md index 70ea8886e5..3b8aa62752 100644 --- a/docs/user-guide/sources/transformer/overview.md +++ b/docs/user-guide/sources/transformer/overview.md @@ -25,17 +25,17 @@ package main import ( "context" "encoding/json" + "log" "time" - functionsdk "github.com/numaproj/numaflow-go/pkg/function" - "github.com/numaproj/numaflow-go/pkg/function/server" + "github.com/numaproj/numaflow-go/pkg/sourcetransformer" ) -func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsdk.MessageTs { +func transform(_ context.Context, keys []string, data sourcetransformer.Datum) sourcetransformer.Messages { /* Input messages are in JSON format. Sample: {"timestamp": "1673239888", "filterOut": "true"}. - Field "timestamp" shows the real event time of the message, in format of epoch. - Field "filterOut" indicates whether the message should be filtered out, in format of boolean. + Field "timestamp" shows the real event time of the message, in the format of epoch. + Field "filterOut" indicates whether the message should be filtered out, in the format of boolean. */ var jsonObject map[string]interface{} json.Unmarshal(data.Value(), &jsonObject) @@ -53,14 +53,17 @@ func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsd filterOut = f.(bool) } if filterOut { - return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop()) + return sourcetransformer.MessagesBuilder().Append(sourcetransformer.MessageToDrop(eventTime)) } else { - return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(data.Value(), eventTime).WithKeys(keys)) + return sourcetransformer.MessagesBuilder().Append(sourcetransformer.NewMessage(data.Value(), eventTime).WithKeys(keys)) } } func main() { - server.New().RegisterMapperT(functionsdk.MapTFunc(Handle)).Start(context.Background()) + err := sourcetransformer.NewServer(sourcetransformer.SourceTransformFunc(transform)).Start(context.Background()) + if err != nil { + log.Panic("Failed to start source transform server: ", err) + } } ``` diff --git a/go.mod b/go.mod index 0a82939beb..12f6a9bbc0 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe github.com/nats-io/nats-server/v2 v2.10.4 github.com/nats-io/nats.go v1.31.0 - github.com/numaproj/numaflow-go v0.5.2 + github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/common v0.37.0 github.com/redis/go-redis/v9 v9.0.3 diff --git a/go.sum b/go.sum index 6a44db10fa..02a2a5a686 100644 --- a/go.sum +++ b/go.sum @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/numaproj/numaflow-go v0.5.2 h1:U/57eDqodVVpzLQqMR/iql8eQf6HsgFMbnWCUU69HZA= -github.com/numaproj/numaflow-go v0.5.2/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= +github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I= +github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 1abcf31e5b..010dfd3b4e 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -278,12 +278,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) { // since we use message event time instead of the watermark to determine and publish source watermarks, // time.UnixMilli(-1) is assigned to the message watermark. transformedReadMessages are immediately // used below for publishing source watermarks. - - // if message.EventTime is -1, it means that the message was dropped by the transformer - // so we should exclude it from watermark computation - if message.EventTime != time.UnixMilli(-1) { - transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1))) - } + transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1))) } } // publish source watermark diff --git a/pkg/sources/transformer/builtin/filter/filter.go b/pkg/sources/transformer/builtin/filter/filter.go index 5dedb05c8f..08f47795f2 100644 --- a/pkg/sources/transformer/builtin/filter/filter.go +++ b/pkg/sources/transformer/builtin/filter/filter.go @@ -53,10 +53,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) { result, err := expr.EvalBool(f.expression, msg) if err != nil { - return sourcetransformer.MessageToDrop(), err + return sourcetransformer.MessageToDrop(et), err } if result { return sourcetransformer.NewMessage(msg, et), nil } - return sourcetransformer.MessageToDrop(), nil + return sourcetransformer.MessageToDrop(et), nil } diff --git a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go index e0d4fb51ce..112c5de25c 100644 --- a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go +++ b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go @@ -71,7 +71,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) { result, err := expr.EvalBool(e.filterExpr, payload) if err != nil { - return sourcetransformer.MessageToDrop(), err + return sourcetransformer.MessageToDrop(et), err } if result { timeStr, err := expr.EvalStr(e.eventTimeExpr, payload) @@ -91,5 +91,5 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess return sourcetransformer.NewMessage(payload, newEventTime), nil } } - return sourcetransformer.MessageToDrop(), nil + return sourcetransformer.MessageToDrop(et), nil }