diff --git a/docs/specifications/edges-and-buffers.md b/docs/specifications/edges-and-buffers.md index 9a9033d9ae..b9e2bb083b 100644 --- a/docs/specifications/edges-and-buffers.md +++ b/docs/specifications/edges-and-buffers.md @@ -48,7 +48,7 @@ spec: `Buffer` is a concept different from `edge` but has lots of connections. Usually, each `edge` has one or more corresponding buffers, depending on the to vertex type (map or reduce). If the to vertex is a reduce and the edge parallelism > 1, there will be multiple buffers defined for that edge. -`Buffers` are not only defined for `edges`, but also for `Source` and `Sink` vertices - each `Source` and `Sink` vertex has a coresponding `buffer`. +`Buffers` are not only defined for `edges`, but also for `Source` and `Sink` vertices - each `Source` and `Sink` vertex has a corresponding `buffer`. In summary, there are 3 types of `buffers` in a pipeline: @@ -56,6 +56,6 @@ In summary, there are 3 types of `buffers` in a pipeline: - Source Buffer - Sink Buffer -Each buffer has a name, the naming convertion for different type of buffers can be found in the [source code](https://github.com/numaproj/numaflow/blob/main/pkg/apis/numaflow/v1alpha1/vertex_types.go). +Each buffer has a name, the naming conversions for different type of buffers can be found in the [source code](https://github.com/numaproj/numaflow/blob/main/pkg/apis/numaflow/v1alpha1/vertex_types.go). `Buffer` is only used internally, it's transparent to the users. Each Inter-Step Buffer implementation should have something physical to map to the `buffers`. For example, In JetStream Inter-Step ISB implementation, a K/V bucket will be created for a `Source Buffer` or a `Sink Buffer`, and a K/V bucket plus a Stream will be created for a `Edge Buffer`. These buffer management operations are done by K8s jobs spawned by the controllers during pipeline creation and deletion. diff --git a/docs/user-guide/inter-step-buffer-service.md b/docs/user-guide/inter-step-buffer-service.md index 526c408ee0..829094c399 100644 --- a/docs/user-guide/inter-step-buffer-service.md +++ b/docs/user-guide/inter-step-buffer-service.md @@ -2,7 +2,7 @@ Inter-Step Buffer Service is the service to provide [Inter-Step Buffers](./inter-step-buffer.md). -An Inter-Step Buffer Service is describe by a [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/), it is required to be existing in a namespace before Pipeline objects are created. A sample `InterStepBufferService` with JetStream implementation looks like below. +An Inter-Step Buffer Service is described by a [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/), it is required to be existing in a namespace before Pipeline objects are created. A sample `InterStepBufferService` with JetStream implementation looks like below. ```yaml apiVersion: numaflow.numaproj.io/v1alpha1 diff --git a/docs/user-guide/sources/transformer/builtin-transformers/README.md b/docs/user-guide/sources/transformer/builtin-transformers/README.md new file mode 100644 index 0000000000..ff4ec8ff75 --- /dev/null +++ b/docs/user-guide/sources/transformer/builtin-transformers/README.md @@ -0,0 +1,42 @@ +# Built-in Functions + +Numaflow provides some built-in source data transformers that can be used directly. + +**Filter** + +A `filter` built-in transformer filters the message based on expression. `payload` keyword represents message object. +see documentation for filter expression [here](filter.md#expression) + +```yaml +spec: + vertices: + - name: in + source: + http: {} + transformer: + builtin: + name: filter + kwargs: + expression: int(json(payload).id) < 100 +``` + +**Event Time Extractor** + +A `eventTimeExtractor` built-in transformer extracts event time from the payload of the message, based on expression and user-specified format. `payload` keyword represents message object. +see documentation for event time extractor expression [here](event-time-extractor.md#expression). + +If you want to handle event times in epoch format, you can find helpful resource [here](event-time-extractor.md#epoch-format). + +```yaml +spec: + vertices: + - name: in + source: + http: {} + transformer: + builtin: + name: eventTimeExtractor + kwargs: + expression: json(payload).item[0].time + format: 2006-01-02T15:04:05Z07:00 +``` diff --git a/docs/user-guide/sources/transformer/builtin-transformers/event-time-extractor.md b/docs/user-guide/sources/transformer/builtin-transformers/event-time-extractor.md new file mode 100644 index 0000000000..8e0fad96e9 --- /dev/null +++ b/docs/user-guide/sources/transformer/builtin-transformers/event-time-extractor.md @@ -0,0 +1,71 @@ +# Event Time Extractor + +A `eventTimeExtractor` built-in transformer extracts event time from the payload of the message, based on a user-provided `expression` and an optional `format` specification. + +`expression` is used to compile the payload to a string representation of the event time. + +`format` is used to convert the event time in string format to a `time.Time` object. + +## Expression (required) + +Event Time Extractor expression is implemented with `expr` and `sprig` libraries. + +### Data conversion functions + +These function can be accessed directly in expression. `payload` keyword represents the message object. It will be the root element to represent the message object in expression. + +- `json` - Convert payload in JSON object. e.g: `json(payload)` +- `int` - Convert element/payload into `int` value. e.g: `int(json(payload).id)` +- `string` - Convert element/payload into `string` value. e.g: `string(json(payload).amount)` + +### Sprig functions + +`Sprig` library has 70+ functions. `sprig` prefix need to be added to access the sprig functions. + +[sprig functions](http://masterminds.github.io/sprig/) + +E.g: + +- `sprig.trim(string(json(payload).timestamp))` # Remove spaces from either side of the value of field `timestamp` + +## Format (optional) + +Depending on whether a `format` is specified, Event Time Extractor uses different approaches to convert the event time string to a `time.Time` object. + +### When specified +When `format` is specified, the native [time.Parse(layout, value string)](https://pkg.go.dev/time#Parse) library is used to make the conversion. In this process, the `format` parameter is passed as the layout input to the time.Parse() function, while the event time string is passed as the value parameter. + +### When not specified +When `format` is not specified, the extractor uses [dateparse](https://github.com/araddon/dateparse) to parse the event time string without knowing the format in advance. + +### How to specify format +Please refer to [golang format library](https://cs.opensource.google/go/go/+/refs/tags/go1.19.5:src/time/format.go). + +### Error Scenarios +When encountering parsing errors, event time extractor skips the extraction and passes on the message without modifying the original input message event time. Errors can occur for a variety of reasons, including: + +1. `format` is specified but the event time string can't parse to the specified format. +1. `format` is not specified but dataparse can't convert the event time string to a `time.Time` object. + +### Ambiguous event time strings +Event time strings can be ambiguous when it comes to date format, such as MM/DD/YYYY versus DD/MM/YYYY. When using such format, you're required to explicitly specify `format`, to avoid confusion. +If no format is provided, event time extractor treats ambiguous event time strings as an error scenario. + +### Epoch format +If the event time string in your message payload is in epoch format, you can skip specifying a `format`. You can rely on `dateparse` to recognize a wide range of epoch timestamp formats, including Unix seconds, milliseconds, microseconds, and nanoseconds. + +## Event Time Extractor Spec + +```yaml +spec: + vertices: + - name: in + source: + http: {} + transformer: + builtin: + name: eventTimeExtractor + kwargs: + expression: sprig.trim(string(json(payload).timestamp)) + format: 2006-01-02T15:04:05Z07:00 +``` \ No newline at end of file diff --git a/docs/user-guide/sources/transformer/builtin-transformers/filter.md b/docs/user-guide/sources/transformer/builtin-transformers/filter.md new file mode 100644 index 0000000000..67e4da8a3e --- /dev/null +++ b/docs/user-guide/sources/transformer/builtin-transformers/filter.md @@ -0,0 +1,46 @@ +# Filter + +A `filter` is a special-purpose built-in function. It is used to evaluate on each message in a pipeline and +is often used to filter the number of messages that are passed to next vertices. + +Filter function supports comprehensive expression language which extends flexibility write complex expressions. + +`payload` will be root element to represent the message object in expression. + +## Expression + +Filter expression implemented with `expr` and `sprig` libraries. + +### Data conversion functions + +These function can be accessed directly in expression. + +- `json` - Convert payload in JSON object. e.g: `json(payload)` +- `int` - Convert element/payload into `int` value. e.g: `int(json(payload).id)` +- `string` - Convert element/payload into `string` value. e.g: `string(json(payload).amount)` + +### Sprig functions + +`Sprig` library has 70+ functions. `sprig` prefix need to be added to access the sprig functions. + +[sprig functions](http://masterminds.github.io/sprig/) + +E.g: + +- `sprig.contains('James', json(payload).name)` # `James` is contained in the value of `name`. +- `int(json(sprig.b64dec(payload)).id) < 100` + +## Filter Spec + +```yaml +spec: + vertices: + - name: in + source: + http: {} + transformer: + builtin: + name: filter + kwargs: + expression: int(json(payload).id) < 100 +``` diff --git a/docs/user-guide/sources/transformer/overview.md b/docs/user-guide/sources/transformer/overview.md new file mode 100644 index 0000000000..11b6c27a27 --- /dev/null +++ b/docs/user-guide/sources/transformer/overview.md @@ -0,0 +1,104 @@ +# Source Data Transformer + +The Source Data Transformer is a feature that allows users to execute custom code to transform their data at source. + +This functionality offers two primary advantages to users: + +1. Event Time Assignment - It enables users to extract the event time from the message payload, providing a more precise and accurate event time than the default mechanisms like LOG_APPEND_TIME of Kafka for Kafka source, custom HTTP header for HTTP source, and others. +1. Early Data Filtering - It filters out unwanted data at source vertex, saving the cost of creating the filtering UDF vertex and the inter-step buffer between source and the filtering UDF. + +Source Data Transformer runs as a sidecar container in a Source Vertex Pod. Data processing in the transformer is supposed to be idempotent. +The communication between the main container (platform code) and the sidecar container (user code) is through gRPC over Unix Domain Socket. + +## Built-in Transformers + +There are some [Built-in Transformers](builtin-transformers/README.md) that can be used directly. + +## Build Your Own Transformer + +You can build your own transformer in multiple languages. A User Defined Transformer could be as simple as the example below in Golang. +In the example, the transformer extracts event times from `timestamp` of the JSON payload and assigns them to messages as new event times. It also filters out unwanted messages based on `filterOut` of the payload. + +```golang +package main + +import ( + "context" + "encoding/json" + "time" + + functionsdk "github.com/numaproj/numaflow-go/pkg/function" + "github.com/numaproj/numaflow-go/pkg/function/server" +) + +func Handle(_ context.Context, key string, data functionsdk.Datum) functionsdk.MessageTs { + /* + Input messages are in JSON format. Sample: {"timestamp": "1673239888", "filterOut": "true"}. + Field "timestamp" shows the real event time of the message, in format of epoch. + Field "filterOut" indicates whether the message should be filtered out, in format of boolean. + */ + var jsonObject map[string]interface{} + json.Unmarshal(data.Value(), &jsonObject) + + // event time assignment + eventTime := data.EventTime() + // if timestamp field exists, extract event time from payload. + if ts, ok := jsonObject["timestamp"]; ok { + eventTime = time.Unix(int64(ts.(float64)), 0) + } + + // data filtering + var shouldFilter bool + if f, ok := jsonObject["filterOut"]; ok { + shouldFilter = f.(bool) + } + if shouldFilter { + return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop()) + } else { + return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTTo(eventTime, key, data.Value())) + } +} + +func main() { + server.New().RegisterMapperT(functionsdk.MapTFunc(Handle)).Start(context.Background()) +} +``` + +Check the links below to see another transformer example in various programming languages, where we apply conditional forwarding based on the input event time. + +- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/function/event_time_filter) +- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/event_time_filter) +- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter) + +After building a docker image for the written transformer, specify the image as below in the source vertex spec. + +```yaml +spec: + vertices: + - name: my-vertex + source: + http: {} + transformer: + container: + image: my-python-transformer-example:latest +``` + +### Available Environment Variables + +Some environment variables are available in the source vertex Pods, they might be useful in you own source data transformer implementation. + +- `NUMAFLOW_NAMESPACE` - Namespace. +- `NUMAFLOW_POD` - Pod name. +- `NUMAFLOW_REPLICA` - Replica index. +- `NUMAFLOW_PIPELINE_NAME` - Name of the pipeline. +- `NUMAFLOW_VERTEX_NAME` - Name of the vertex. + +### Configuration + +Configuration data can be provided to the transformer container at runtime multiple ways. + +* [`environment variables`](../../environment-variables.md) +* `args` +* `command` +* [`volumes`](../../volumes.md) +* [`init containers`](../../init-containers.md) diff --git a/mkdocs.yml b/mkdocs.yml index 201bf384c2..c134c4bd4f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,12 @@ nav: - user-guide/sources/http.md - user-guide/sources/kafka.md - user-guide/sources/nats.md + - Data Transformer: + - Overview: "user-guide/sources/transformer/overview.md" + - Built-in Transformers: + - Overview: "user-guide/sources/transformer/builtin-transformers/README.md" + - Filter: "user-guide/sources/transformer/builtin-transformers/filter.md" + - Event Time Extractor: "user-guide/sources/transformer/builtin-transformers/event-time-extractor.md" - Sinks: - user-guide/sinks/kafka.md - user-guide/sinks/log.md diff --git a/pkg/shared/expr/eval_string.go b/pkg/shared/expr/eval_string.go index be581e82d8..78a45aa8a8 100644 --- a/pkg/shared/expr/eval_string.go +++ b/pkg/shared/expr/eval_string.go @@ -23,7 +23,7 @@ import ( ) // EvalStr uses the given input expression to evaluate input message and compile it to a string. -// See examples in compile_test.go +// See examples in eval_string_test.go func EvalStr(expression string, msg []byte) (string, error) { msgMap := map[string]interface{}{ root: string(msg),