Skip to content

Commit

Permalink
docs: add docs for source data transformer (#605)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Mar 14, 2023
1 parent 84aea2a commit 746984f
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/specifications/edges-and-buffers.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ spec:
`Buffer` is a concept different from `edge` but has lots of connections. Usually, each `edge` has one or more corresponding buffers, depending on the to vertex type (map or reduce). If the to vertex is a reduce and the edge parallelism > 1, there will be multiple buffers defined for that edge.

`Buffers` are not only defined for `edges`, but also for `Source` and `Sink` vertices - each `Source` and `Sink` vertex has a coresponding `buffer`.
`Buffers` are not only defined for `edges`, but also for `Source` and `Sink` vertices - each `Source` and `Sink` vertex has a corresponding `buffer`.

In summary, there are 3 types of `buffers` in a pipeline:

- Edge Buffer
- Source Buffer
- Sink Buffer

Each buffer has a name, the naming convertion for different type of buffers can be found in the [source code](https://github.com/numaproj/numaflow/blob/main/pkg/apis/numaflow/v1alpha1/vertex_types.go).
Each buffer has a name, the naming conversions for different type of buffers can be found in the [source code](https://github.com/numaproj/numaflow/blob/main/pkg/apis/numaflow/v1alpha1/vertex_types.go).

`Buffer` is only used internally, it's transparent to the users. Each Inter-Step Buffer implementation should have something physical to map to the `buffers`. For example, In JetStream Inter-Step ISB implementation, a K/V bucket will be created for a `Source Buffer` or a `Sink Buffer`, and a K/V bucket plus a Stream will be created for a `Edge Buffer`. These buffer management operations are done by K8s jobs spawned by the controllers during pipeline creation and deletion.
2 changes: 1 addition & 1 deletion docs/user-guide/inter-step-buffer-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Inter-Step Buffer Service is the service to provide [Inter-Step Buffers](./inter-step-buffer.md).

An Inter-Step Buffer Service is describe by a [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/), it is required to be existing in a namespace before Pipeline objects are created. A sample `InterStepBufferService` with JetStream implementation looks like below.
An Inter-Step Buffer Service is described by a [Custom Resource](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/), it is required to be existing in a namespace before Pipeline objects are created. A sample `InterStepBufferService` with JetStream implementation looks like below.

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
Expand Down
42 changes: 42 additions & 0 deletions docs/user-guide/sources/transformer/builtin-transformers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Built-in Functions

Numaflow provides some built-in source data transformers that can be used directly.

**Filter**

A `filter` built-in transformer filters the message based on expression. `payload` keyword represents message object.
see documentation for filter expression [here](filter.md#expression)

```yaml
spec:
vertices:
- name: in
source:
http: {}
transformer:
builtin:
name: filter
kwargs:
expression: int(json(payload).id) < 100
```
**Event Time Extractor**
A `eventTimeExtractor` built-in transformer extracts event time from the payload of the message, based on expression and user-specified format. `payload` keyword represents message object.
see documentation for event time extractor expression [here](event-time-extractor.md#expression).

If you want to handle event times in epoch format, you can find helpful resource [here](event-time-extractor.md#epoch-format).

```yaml
spec:
vertices:
- name: in
source:
http: {}
transformer:
builtin:
name: eventTimeExtractor
kwargs:
expression: json(payload).item[0].time
format: 2006-01-02T15:04:05Z07:00
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Event Time Extractor

A `eventTimeExtractor` built-in transformer extracts event time from the payload of the message, based on a user-provided `expression` and an optional `format` specification.

`expression` is used to compile the payload to a string representation of the event time.

`format` is used to convert the event time in string format to a `time.Time` object.

## Expression (required)

Event Time Extractor expression is implemented with `expr` and `sprig` libraries.

### Data conversion functions

These function can be accessed directly in expression. `payload` keyword represents the message object. It will be the root element to represent the message object in expression.

- `json` - Convert payload in JSON object. e.g: `json(payload)`
- `int` - Convert element/payload into `int` value. e.g: `int(json(payload).id)`
- `string` - Convert element/payload into `string` value. e.g: `string(json(payload).amount)`

### Sprig functions

`Sprig` library has 70+ functions. `sprig` prefix need to be added to access the sprig functions.

[sprig functions](http://masterminds.github.io/sprig/)

E.g:

- `sprig.trim(string(json(payload).timestamp))` # Remove spaces from either side of the value of field `timestamp`

## Format (optional)

Depending on whether a `format` is specified, Event Time Extractor uses different approaches to convert the event time string to a `time.Time` object.

### When specified
When `format` is specified, the native [time.Parse(layout, value string)](https://pkg.go.dev/time#Parse) library is used to make the conversion. In this process, the `format` parameter is passed as the layout input to the time.Parse() function, while the event time string is passed as the value parameter.

### When not specified
When `format` is not specified, the extractor uses [dateparse](https://github.com/araddon/dateparse) to parse the event time string without knowing the format in advance.

### How to specify format
Please refer to [golang format library](https://cs.opensource.google/go/go/+/refs/tags/go1.19.5:src/time/format.go).

### Error Scenarios
When encountering parsing errors, event time extractor skips the extraction and passes on the message without modifying the original input message event time. Errors can occur for a variety of reasons, including:

1. `format` is specified but the event time string can't parse to the specified format.
1. `format` is not specified but dataparse can't convert the event time string to a `time.Time` object.

### Ambiguous event time strings
Event time strings can be ambiguous when it comes to date format, such as MM/DD/YYYY versus DD/MM/YYYY. When using such format, you're required to explicitly specify `format`, to avoid confusion.
If no format is provided, event time extractor treats ambiguous event time strings as an error scenario.

### Epoch format
If the event time string in your message payload is in epoch format, you can skip specifying a `format`. You can rely on `dateparse` to recognize a wide range of epoch timestamp formats, including Unix seconds, milliseconds, microseconds, and nanoseconds.

## Event Time Extractor Spec

```yaml
spec:
vertices:
- name: in
source:
http: {}
transformer:
builtin:
name: eventTimeExtractor
kwargs:
expression: sprig.trim(string(json(payload).timestamp))
format: 2006-01-02T15:04:05Z07:00
```
46 changes: 46 additions & 0 deletions docs/user-guide/sources/transformer/builtin-transformers/filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Filter

A `filter` is a special-purpose built-in function. It is used to evaluate on each message in a pipeline and
is often used to filter the number of messages that are passed to next vertices.

Filter function supports comprehensive expression language which extends flexibility write complex expressions.

`payload` will be root element to represent the message object in expression.

## Expression

Filter expression implemented with `expr` and `sprig` libraries.

### Data conversion functions

These function can be accessed directly in expression.

- `json` - Convert payload in JSON object. e.g: `json(payload)`
- `int` - Convert element/payload into `int` value. e.g: `int(json(payload).id)`
- `string` - Convert element/payload into `string` value. e.g: `string(json(payload).amount)`

### Sprig functions

`Sprig` library has 70+ functions. `sprig` prefix need to be added to access the sprig functions.

[sprig functions](http://masterminds.github.io/sprig/)

E.g:

- `sprig.contains('James', json(payload).name)` # `James` is contained in the value of `name`.
- `int(json(sprig.b64dec(payload)).id) < 100`

## Filter Spec

```yaml
spec:
vertices:
- name: in
source:
http: {}
transformer:
builtin:
name: filter
kwargs:
expression: int(json(payload).id) < 100
```
104 changes: 104 additions & 0 deletions docs/user-guide/sources/transformer/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Source Data Transformer

The Source Data Transformer is a feature that allows users to execute custom code to transform their data at source.

This functionality offers two primary advantages to users:

1. Event Time Assignment - It enables users to extract the event time from the message payload, providing a more precise and accurate event time than the default mechanisms like LOG_APPEND_TIME of Kafka for Kafka source, custom HTTP header for HTTP source, and others.
1. Early Data Filtering - It filters out unwanted data at source vertex, saving the cost of creating the filtering UDF vertex and the inter-step buffer between source and the filtering UDF.

Source Data Transformer runs as a sidecar container in a Source Vertex Pod. Data processing in the transformer is supposed to be idempotent.
The communication between the main container (platform code) and the sidecar container (user code) is through gRPC over Unix Domain Socket.

## Built-in Transformers

There are some [Built-in Transformers](builtin-transformers/README.md) that can be used directly.

## Build Your Own Transformer

You can build your own transformer in multiple languages. A User Defined Transformer could be as simple as the example below in Golang.
In the example, the transformer extracts event times from `timestamp` of the JSON payload and assigns them to messages as new event times. It also filters out unwanted messages based on `filterOut` of the payload.

```golang
package main

import (
"context"
"encoding/json"
"time"

functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/server"
)

func Handle(_ context.Context, key string, data functionsdk.Datum) functionsdk.MessageTs {
/*
Input messages are in JSON format. Sample: {"timestamp": "1673239888", "filterOut": "true"}.
Field "timestamp" shows the real event time of the message, in format of epoch.
Field "filterOut" indicates whether the message should be filtered out, in format of boolean.
*/
var jsonObject map[string]interface{}
json.Unmarshal(data.Value(), &jsonObject)

// event time assignment
eventTime := data.EventTime()
// if timestamp field exists, extract event time from payload.
if ts, ok := jsonObject["timestamp"]; ok {
eventTime = time.Unix(int64(ts.(float64)), 0)
}

// data filtering
var shouldFilter bool
if f, ok := jsonObject["filterOut"]; ok {
shouldFilter = f.(bool)
}
if shouldFilter {
return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTToDrop())
} else {
return functionsdk.MessageTsBuilder().Append(functionsdk.MessageTTo(eventTime, key, data.Value()))
}
}

func main() {
server.New().RegisterMapperT(functionsdk.MapTFunc(Handle)).Start(context.Background())
}
```

Check the links below to see another transformer example in various programming languages, where we apply conditional forwarding based on the input event time.

- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/function/event_time_filter)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/event_time_filter)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/function/map/eventtimefilter)

After building a docker image for the written transformer, specify the image as below in the source vertex spec.

```yaml
spec:
vertices:
- name: my-vertex
source:
http: {}
transformer:
container:
image: my-python-transformer-example:latest
```
### Available Environment Variables
Some environment variables are available in the source vertex Pods, they might be useful in you own source data transformer implementation.
- `NUMAFLOW_NAMESPACE` - Namespace.
- `NUMAFLOW_POD` - Pod name.
- `NUMAFLOW_REPLICA` - Replica index.
- `NUMAFLOW_PIPELINE_NAME` - Name of the pipeline.
- `NUMAFLOW_VERTEX_NAME` - Name of the vertex.

### Configuration

Configuration data can be provided to the transformer container at runtime multiple ways.

* [`environment variables`](../../environment-variables.md)
* `args`
* `command`
* [`volumes`](../../volumes.md)
* [`init containers`](../../init-containers.md)
6 changes: 6 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ nav:
- user-guide/sources/http.md
- user-guide/sources/kafka.md
- user-guide/sources/nats.md
- Data Transformer:
- Overview: "user-guide/sources/transformer/overview.md"
- Built-in Transformers:
- Overview: "user-guide/sources/transformer/builtin-transformers/README.md"
- Filter: "user-guide/sources/transformer/builtin-transformers/filter.md"
- Event Time Extractor: "user-guide/sources/transformer/builtin-transformers/event-time-extractor.md"
- Sinks:
- user-guide/sinks/kafka.md
- user-guide/sinks/log.md
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/expr/eval_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// EvalStr uses the given input expression to evaluate input message and compile it to a string.
// See examples in compile_test.go
// See examples in eval_string_test.go
func EvalStr(expression string, msg []byte) (string, error) {
msgMap := map[string]interface{}{
root: string(msg),
Expand Down

0 comments on commit 746984f

Please sign in to comment.