Skip to content

Commit

Permalink
fix: include dropped messages in source watermark calculation (#1404)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeranYang authored Dec 5, 2023
1 parent 35ad8d9 commit 6eb25c2
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
19 changes: 11 additions & 8 deletions docs/user-guide/sources/transformer/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ package main
import (
"context"
"encoding/json"
"log"
"time"

functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
)

func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsdk.MessageTs {
func transform(_ context.Context, keys []string, data sourcetransformer.Datum) sourcetransformer.Messages {
/*
Input messages are in JSON format. Sample: {"timestamp": "1673239888", "filterOut": "true"}.
Field "timestamp" shows the real event time of the message, in format of epoch.
Field "filterOut" indicates whether the message should be filtered out, in format of boolean.
Field "timestamp" shows the real event time of the message, in the format of epoch.
Field "filterOut" indicates whether the message should be filtered out, in the format of boolean.
*/
var jsonObject map[string]interface{}
json.Unmarshal(data.Value(), &jsonObject)
Expand All @@ -53,14 +53,17 @@ func Handle(_ context.Context, keys []string, data functionsdk.Datum) functionsd
filterOut = f.(bool)
}
if filterOut {
return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop())
return sourcetransformer.MessagesBuilder().Append(sourcetransformer.MessageToDrop(eventTime))
} else {
return functionsdk.MessageTsBuilder().Append(functionsdk.NewMessageT(data.Value(), eventTime).WithKeys(keys))
return sourcetransformer.MessagesBuilder().Append(sourcetransformer.NewMessage(data.Value(), eventTime).WithKeys(keys))
}
}

func main() {
server.New().RegisterMapperT(functionsdk.MapTFunc(Handle)).Start(context.Background())
err := sourcetransformer.NewServer(sourcetransformer.SourceTransformFunc(transform)).Start(context.Background())
if err != nil {
log.Panic("Failed to start source transform server: ", err)
}
}
```

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.31.0
github.com/numaproj/numaflow-go v0.5.2
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.37.0
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,8 @@ github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADym
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.5.2 h1:U/57eDqodVVpzLQqMR/iql8eQf6HsgFMbnWCUU69HZA=
github.com/numaproj/numaflow-go v0.5.2/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932 h1:gAURJvmJv7nP8+Y7X+GGHGZ5sg7KatM4dhkWpFCsk+I=
github.com/numaproj/numaflow-go v0.5.3-0.20231204234402-c6d81fd39932/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
7 changes: 1 addition & 6 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,7 @@ func (isdf *DataForward) forwardAChunk(ctx context.Context) {
// since we use message event time instead of the watermark to determine and publish source watermarks,
// time.UnixMilli(-1) is assigned to the message watermark. transformedReadMessages are immediately
// used below for publishing source watermarks.

// if message.EventTime is -1, it means that the message was dropped by the transformer
// so we should exclude it from watermark computation
if message.EventTime != time.UnixMilli(-1) {
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))
}
transformedReadMessages = append(transformedReadMessages, message.ToReadMessage(m.readMessage.ReadOffset, time.UnixMilli(-1)))
}
}
// publish source watermark
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/transformer/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)
func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(f.expression, msg)
if err != nil {
return sourcetransformer.MessageToDrop(), err
return sourcetransformer.MessageToDrop(et), err
}
if result {
return sourcetransformer.NewMessage(msg, et), nil
}
return sourcetransformer.MessageToDrop(), nil
return sourcetransformer.MessageToDrop(et), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)
func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(e.filterExpr, payload)
if err != nil {
return sourcetransformer.MessageToDrop(), err
return sourcetransformer.MessageToDrop(et), err
}
if result {
timeStr, err := expr.EvalStr(e.eventTimeExpr, payload)
Expand All @@ -91,5 +91,5 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess
return sourcetransformer.NewMessage(payload, newEventTime), nil
}
}
return sourcetransformer.MessageToDrop(), nil
return sourcetransformer.MessageToDrop(et), nil
}

0 comments on commit 6eb25c2

Please sign in to comment.