Skip to content

Commit

Permalink
doc: reduce streaming (#1689)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Apr 19, 2024
1 parent aea4a32 commit 6da2796
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 2 deletions.
36 changes: 35 additions & 1 deletion docs/user-guide/user-defined-functions/reduce/windowing/fixed.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,44 @@ event time characteristic) of `2031-09-29T18:47:00Z` will belong to the window w
`[2031-09-29T18:47:00Z, 2031-09-29T18:48:00Z)`

It is important to note that because of this property, for a constant throughput, the first window
may contain fewer elements than other windows.
may contain fewer elements than other windows.

Check the links below to see the UDF examples for different languages.

- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducer/examples)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reduce)



## Streaming Mode

Reduce can be enabled on streaming mode to stream messages or forward partial responses to the next vertex.
This is useful for custom triggering, where we want to forward responses to the next vertex quickly,
even before the fixed window closes. The close-of-book and a final triggering will still happen even if
partial results have been emitted.


To enable reduce streaming, set the `streaming` flag to `true` in the fixed window configuration.

```yaml
vertices:
- name: my-udf
udf:
groupBy:
window:
fixed:
length: duration
streaming: true # set streaming to true to enable reduce streamer
```

Note: UDFs should use the ReduceStreamer functionality in the SDKs to use this feature.

Check the links below to see the UDF examples in streaming mode for different languages.

- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reducestream)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducestreamer/examples)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum)



Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ later, meaning it's beyond the session gap of the previous window, initiating a
`Event-4` and `Event-5`, and it closes 30 seconds after `Event-5` at `2031-09-29T18:47:40Z`, if no further events arrive
for the key until the timeout.

Note: Streaming mode is by default enabled for session windows.

Check the links below to see the UDF examples for different languages. Currently, we have the SDK support for Golang and Java.

- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/sessionreducer)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducesession/counter)




Expand Down
37 changes: 37 additions & 0 deletions docs/user-guide/user-defined-functions/reduce/windowing/sliding.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,43 @@ window `present + sliding`)
From the point above, it follows then that immediately upon startup, for the first window, fewer elements may get
aggregated depending on the current _lateness_ of the data stream.

Check the links below to see the UDF examples for different languages.

- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reduce)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducer/examples)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reduce)


## Streaming Mode

Reduce can be enabled on streaming mode to stream messages or forward partial responses to the next vertex.
This is useful for custom triggering, where we want to forward responses to the next vertex quickly,
even before the fixed window closes. The close-of-book and a final triggering will still happen even if
partial results have been emitted.


To enable reduce streaming, set the `streaming` flag to `true` in the sliding window configuration.

```yaml
vertices:
- name: my-udf
udf:
groupBy:
window:
sliding:
length: duration
slide: duration
streaming: true # set streaming to true to enable reduce streamer
```

Note: UDFs should use the ReduceStreamer functionality in the SDKs to use this feature.

Check the links below to see the UDF examples in streaming mode for different languages.

- [Python](https://github.com/numaproj/numaflow-python/tree/main/examples/reducestream)
- [Golang](https://github.com/numaproj/numaflow-go/tree/main/pkg/reducestreamer/examples)
- [Java](https://github.com/numaproj/numaflow-java/tree/main/examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer/sum)




Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ func validateMetrics(t *testing.T, batchSize int64) {

err := testutil.CollectAndCompare(metrics.ReadDataMessagesCount, strings.NewReader(metadata+expected), "forwarder_data_read_total")
if err != nil {
t.Errorf("unexpected collecting result:\n%s", err)
t.Errorf("unexpected collecting result: %v", err)
}

writeMetadata := `
Expand Down

0 comments on commit 6da2796

Please sign in to comment.