When dealing with streaming functions, one creates a processor
in riff like so:
riff streaming processor create example --function-ref some-existing-fn \
--input input-stream \
--input i2:maybe-another-input \
--output some-output-stream \
--output out3:maybe-another \
--output even-a-third
This creates a pod made of two containers. This component will run in the function pod, alongside the function container, and will
-
continuously pump data from any of the function input streams (
input-stream
andmaybe-another-input
in the example above) out of the stream gateway(s) -
extract the message payloads from the serialized form used in the broker(s).
-
decide how to craft windows of function invocation (currently hardcoded to one minute of wallclock time)
-
invoke the function over RPC, multiplexing the many input streams over the single stream allowed by the RPC spec (see riff-rpc.proto)
-
upon reception of result frames, de-mux messages and serialize them back to the appropriate output streams (
some-output-stream
,maybe-another
andeven-a-third
in the example above).
Notice that the number of input and output streams can be different and depends entirely on how the actual function is implemented. Likewise, the rate at which messages flow in and out is entirely dictated by the function implementation (one piece of input may trigger the emission of N output messages, in quick succession or over time).
This component is written in Java and leverages the following technologies:
-
Spring Boot as the overall framework, in particular for crafting an executable jar,
-
Liiklus as the stream gateway,
-
gRPC as both the RPC protocol to interact with liiklus and the riff RPC invocation protocol,
-
Reactor and reactor-grpc as the main event loop mechanism, preserving back-pressure from output to input.
To build the project, assuming a fresh checkout of this repository as the current directory:
mvn package
To build and create (and push) a docker image:
mvn package com.google.cloud.tools:jib-maven-plugin:1.3.0:build -Dimage=<MY IMAGE>
To create (and push) a docker image containing a native version:
mvn package && docker build . -t <MY NATIVE IMAGE> && docker push <MY NATIVE IMAGE>
When run, the processor expects the following environment variables to be set:
-
INPUT_NAMES
: a comma separated list of N input parameter logical names, -
OUTPUT_NAMES
: a comma separated list of M output result logical names, -
GROUP
: a string identifier that will be used as the consumer group for the processor. -
FUNCTION
: location of the function RPC server, in the formhost:port
, -
INPUT_START_OFFSETS
: a comma separated list of N strings (eachearliest
orlatest
) to drive where to start subscribing from for each input, -
CNB_BINDINGS
: directory location of stream bindings information. The processor will read N bindings at$CNB_BINDINGS/input_xxx
and M bindings at$CNB_BINDINGS/output_xxx
.