From 7971f221173f8e107628ea9d023329873518cd03 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 9 Aug 2024 13:01:00 -0700 Subject: [PATCH] feat: source to sink with an optional transformer without ISB (#1904) Signed-off-by: Vigith Maurice Signed-off-by: Yashash H L Signed-off-by: Sidhant Kohli Co-authored-by: Yashash H L Co-authored-by: Sidhant Kohli --- Dockerfile | 20 +- Makefile | 2 +- examples/21-simple-mono-vertex.yaml | 2 +- .../numaflow/v1alpha1/mono_vertex_types.go | 3 +- pkg/apis/numaflow/v1alpha1/vertex_types.go | 1 + pkg/metrics/metrics_server.go | 1 - pkg/udf/map_udf.go | 5 - serving/Cargo.lock | 271 +++++++++++- serving/Cargo.toml | 2 +- serving/Dockerfile | 20 +- serving/source-sink/Cargo.toml | 39 ++ serving/source-sink/Dockerfile | 19 + serving/source-sink/build.rs | 13 + serving/source-sink/proto/sink.proto | 57 +++ serving/source-sink/proto/source.proto | 153 +++++++ .../source-sink/proto/sourcetransform.proto | 47 ++ serving/source-sink/src/config.rs | 153 +++++++ serving/source-sink/src/error.rs | 36 ++ serving/source-sink/src/forwarder.rs | 415 ++++++++++++++++++ serving/source-sink/src/lib.rs | 306 +++++++++++++ serving/source-sink/src/main.rs | 64 +++ serving/source-sink/src/message.rs | 85 ++++ serving/source-sink/src/metrics.rs | 332 ++++++++++++++ serving/source-sink/src/shared.rs | 38 ++ serving/source-sink/src/sink.rs | 174 ++++++++ serving/source-sink/src/source.rs | 249 +++++++++++ serving/source-sink/src/transformer.rs | 159 +++++++ 27 files changed, 2646 insertions(+), 20 deletions(-) create mode 100644 serving/source-sink/Cargo.toml create mode 100644 serving/source-sink/Dockerfile create mode 100644 serving/source-sink/build.rs create mode 100644 serving/source-sink/proto/sink.proto create mode 100644 serving/source-sink/proto/source.proto create mode 100644 serving/source-sink/proto/sourcetransform.proto create mode 100644 serving/source-sink/src/config.rs create mode 100644 serving/source-sink/src/error.rs create mode 100644 serving/source-sink/src/forwarder.rs create mode 100644 serving/source-sink/src/lib.rs create mode 100644 serving/source-sink/src/main.rs create mode 100644 serving/source-sink/src/message.rs create mode 100644 serving/source-sink/src/metrics.rs create mode 100644 serving/source-sink/src/shared.rs create mode 100644 serving/source-sink/src/sink.rs create mode 100644 serving/source-sink/src/source.rs create mode 100644 serving/source-sink/src/transformer.rs diff --git a/Dockerfile b/Dockerfile index 4ea993836f..350a9fe4e0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,9 @@ FROM rust:1.79-bookworm as extension-base RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash +RUN apt-get update +RUN apt-get install protobuf-compiler -y + RUN cargo new serve # Create a new empty shell project WORKDIR /serve @@ -32,6 +35,9 @@ COPY ./serving/backoff/Cargo.toml ./backoff/ RUN cargo new numaflow-models COPY ./serving/numaflow-models/Cargo.toml ./numaflow-models/ +RUN cargo new source-sink +COPY ./serving/source-sink/Cargo.toml ./source-sink/ + # Copy all Cargo.toml and Cargo.lock files for caching dependencies COPY ./serving/Cargo.toml ./serving/Cargo.lock ./ @@ -44,21 +50,29 @@ COPY ./serving/servesink/src ./servesink/src COPY ./serving/extras/upstreams/src ./extras/upstreams/src COPY ./serving/backoff/src ./backoff/src COPY ./serving/numaflow-models/src ./numaflow-models/src +COPY ./serving/source-sink/src ./source-sink/src +COPY ./serving/source-sink/build.rs ./source-sink/build.rs +COPY ./serving/source-sink/proto ./source-sink/proto # Build the real binaries -RUN touch src/main.rs servesink/main.rs extras/upstreams/main.rs numaflow-models/main.rs && \ - cargo build --release +RUN touch src/main.rs servesink/src/main.rs numaflow-models/src/main.rs source-sink/src/main.rs && \ + cargo build --workspace --all --release #################################################################################################### # numaflow #################################################################################################### ARG BASE_IMAGE -FROM ${BASE_IMAGE} as numaflow +FROM debian:bookworm as numaflow + +# Install necessary libraries +RUN apt-get update && apt-get install -y libssl3 COPY --from=base /bin/numaflow /bin/numaflow COPY ui/build /ui/build COPY --from=extension-base /serve/target/release/serve /bin/serve +COPY --from=extension-base /serve/target/release/sourcer-sinker /bin/sourcer-sinker + COPY ./serving/config config ENTRYPOINT [ "/bin/numaflow" ] diff --git a/Makefile b/Makefile index 4e1ee98fa4..1c11b01583 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ DIST_DIR=${CURRENT_DIR}/dist BINARY_NAME:=numaflow DOCKERFILE:=Dockerfile DEV_BASE_IMAGE:=debian:bookworm -RELEASE_BASE_IMAGE:=gcr.io/distroless/cc-debian12 +RELEASE_BASE_IMAGE:=debian:bookworm BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ') GIT_COMMIT=$(shell git rev-parse HEAD) diff --git a/examples/21-simple-mono-vertex.yaml b/examples/21-simple-mono-vertex.yaml index b1f7dbd1fd..be625c41d2 100644 --- a/examples/21-simple-mono-vertex.yaml +++ b/examples/21-simple-mono-vertex.yaml @@ -10,4 +10,4 @@ spec: sink: udsink: container: - image: quay.io/numaio/numaflow-java/simple-sink:stable + image: quay.io/numaio/numaflow-java/simple-sink:stable \ No newline at end of file diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 359056c14c..7dd39fd2e8 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -403,7 +403,8 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec { func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container { mainContainer := containerBuilder{}. - init(req).command("/bin/serve").build() // TODO: command + init(req).command(MonoVertexBinary).build() + containers := []corev1.Container{mainContainer} if mvspec.Source.UDSource != nil { // Only support UDSource for now. containers = append(containers, mvspec.Source.getUDSourceContainer(req)) diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index c93af36a98..71cd9c7171 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -51,6 +51,7 @@ const ( ) const ServingBinary = "/bin/serve" +const MonoVertexBinary = "/bin/sourcer-sinker" // +genclient // +kubebuilder:object:root=true diff --git a/pkg/metrics/metrics_server.go b/pkg/metrics/metrics_server.go index bb7626aa78..7cafaa9070 100644 --- a/pkg/metrics/metrics_server.go +++ b/pkg/metrics/metrics_server.go @@ -148,7 +148,6 @@ func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer { if m.lagReaders != nil { for partitionName := range m.lagReaders { m.partitionPendingInfo[partitionName] = sharedqueue.New[timestampedPending](1800) - } } return m diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 3e33d5f8b9..44cb1b5aa7 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -219,11 +219,6 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { opts = append(opts, forward.WithUDFUnaryMap(mapHandler)) } - // We can have the vertex running only of the map modes - if enableMapUdfStream && enableBatchMapUdf { - return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled") - } - for index, bufferPartition := range fromBuffer { // Populate shuffle function map shuffleFuncMap := make(map[string]*shuffle.Shuffle) diff --git a/serving/Cargo.lock b/serving/Cargo.lock index e27de24107..0e70179df8 100644 --- a/serving/Cargo.lock +++ b/serving/Cargo.lock @@ -143,6 +143,33 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "aws-lc-rs" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae74d9bd0a7530e8afd1770739ad34b36838829d6ad61818f9230f683f5ad77" +dependencies = [ + "aws-lc-sys", + "mirai-annotations", + "paste", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", + "libc", + "paste", +] + [[package]] name = "axum" version = "0.7.5" @@ -210,6 +237,30 @@ dependencies = [ "syn", ] +[[package]] +name = "axum-server" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56bac90848f6a9393ac03c63c640925c4b7c8ca21654de40d53f55964667c7d8" +dependencies = [ + "arc-swap", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "rustls", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower", + "tower-service", +] + [[package]] name = "backoff" version = "0.1.0" @@ -251,6 +302,29 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "lazy_static", + "lazycell", + "log", + "prettyplease", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", + "which", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -301,6 +375,19 @@ name = "cc" version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +dependencies = [ + "jobserver", + "libc", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -323,6 +410,26 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "combine" version = "4.6.7" @@ -520,6 +627,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "ed25519" version = "2.2.3" @@ -621,6 +734,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.30" @@ -737,6 +856,12 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -1120,6 +1245,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1135,6 +1269,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -1253,12 +1396,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if", + "windows-targets 0.52.6", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -1390,6 +1549,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mirai-annotations" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" + [[package]] name = "multimap" version = "0.10.0" @@ -1644,6 +1809,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pathdiff" version = "0.2.1" @@ -1836,7 +2007,7 @@ checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck 0.5.0", - "itertools", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -1856,7 +2027,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "syn", @@ -1934,6 +2105,19 @@ dependencies = [ "bitflags 2.6.0", ] +[[package]] +name = "rcgen" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54077e1872c46788540de1ea3d7f4ccb1983d12f9aa909b234468676c1a36779" +dependencies = [ + "pem", + "ring", + "rustls-pki-types", + "time", + "yasna", +] + [[package]] name = "redis" version = "0.26.1" @@ -2139,6 +2323,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc_version" version = "0.4.0" @@ -2167,6 +2357,7 @@ version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -2220,6 +2411,7 @@ version = "0.102.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -2473,6 +2665,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2535,6 +2733,40 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "sourcer-sinker" +version = "0.1.0" +dependencies = [ + "axum", + "axum-server", + "base64 0.22.1", + "bytes", + "chrono", + "hyper-util", + "log", + "metrics", + "metrics-exporter-prometheus", + "numaflow", + "numaflow-models", + "once_cell", + "prost", + "prost-types", + "rcgen", + "rustls", + "serde_json", + "tempfile", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tonic", + "tonic-build", + "tower", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "spin" version = "0.9.8" @@ -3209,6 +3441,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix", +] + [[package]] name = "winapi" version = "0.3.9" @@ -3417,6 +3661,15 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -3443,3 +3696,17 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/serving/Cargo.toml b/serving/Cargo.toml index 10138b1386..58525ad62b 100644 --- a/serving/Cargo.toml +++ b/serving/Cargo.toml @@ -1,4 +1,4 @@ -workspace = { members = ["backoff", "extras/upstreams", "numaflow-models", "servesink"] } +workspace = { members = ["backoff", "extras/upstreams", "numaflow-models", "servesink", "source-sink"] } [package] name = "serve" version = "0.1.0" diff --git a/serving/Dockerfile b/serving/Dockerfile index 863b999a81..697cfd6f27 100644 --- a/serving/Dockerfile +++ b/serving/Dockerfile @@ -4,6 +4,9 @@ FROM rust:1.79-bookworm as builder RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash +RUN apt-get update +RUN apt-get install protobuf-compiler -y + RUN cargo new serve # Create a new empty shell project WORKDIR /serve @@ -19,6 +22,9 @@ COPY ./backoff/Cargo.toml ./backoff/Cargo.toml RUN cargo new numaflow-models COPY ./numaflow-models/Cargo.toml ./numaflow-models/ +RUN cargo new source-sink +COPY ./source-sink/Cargo.toml ./source-sink/Cargo.toml + # Copy all Cargo.toml and Cargo.lock files for caching dependencies COPY ./Cargo.toml ./Cargo.lock ./ @@ -31,16 +37,20 @@ COPY ./servesink/src ./servesink/src COPY ./extras/upstreams/src ./extras/upstreams/src COPY ./backoff/src ./backoff/src COPY ./numaflow-models/src ./numaflow-models/src +COPY ./source-sink/src ./source-sink/src +COPY ./source-sink/build.rs ./source-sink/build.rs +COPY ./source-sink/proto ./source-sink/proto # Build the real binaries -RUN touch src/main.rs servesink/main.rs extras/upstreams/main.rs numaflow-models/main.rs && \ - cargo build --release +RUN touch src/main.rs servesink/src/main.rs numaflow-models/src/main.rs source-sink/src/main.rs && \ + cargo build --workspace --all --release # Use a lightweight image for the runtime -FROM gcr.io/distroless/cc-debian12 as numaflow-ext +FROM debian:bookworm as numaflow-ext -COPY --from=builder /serve/target/release/serve . -COPY ./config config +RUN apt-get update && apt-get install -y libssl3 +COPY --from=builder /serve/target/release/ . +COPY ./config config ENTRYPOINT ["./serve"] \ No newline at end of file diff --git a/serving/source-sink/Cargo.toml b/serving/source-sink/Cargo.toml new file mode 100644 index 0000000000..9db2bbff35 --- /dev/null +++ b/serving/source-sink/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "sourcer-sinker" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum = "0.7.5" +axum-server = { version = "0.7.1", features = ["tls-rustls"] } +tonic = "0.12.0" +bytes = "1.7.1" +thiserror = "1.0.63" +tokio = { version = "1.39.2", features = ["full"] } +tracing = "0.1.40" +tokio-util = "0.7.11" +tokio-stream = "0.1.15" +prost = "0.13.1" +prost-types = "0.13.1" +chrono = "0.4.31" +base64 = "0.22.1" +metrics = { version = "0.23.0", default-features = false } +metrics-exporter-prometheus = { version = "0.15.3", default-features = false } +log = "0.4.22" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +hyper-util = "0.1.6" +tower = "0.4.13" +uuid = { version = "1.10.0", features = ["v4"] } +once_cell = "1.19.0" +serde_json = "1.0.122" +numaflow-models = { path = "../numaflow-models"} +rcgen = "0.13.1" +rustls = { version = "0.23.12", features = ["aws_lc_rs"] } + +[dev-dependencies] +tower = "0.4.13" +tempfile = "3.11.0" +numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="main" } + +[build-dependencies] +tonic-build = "0.12.1" diff --git a/serving/source-sink/Dockerfile b/serving/source-sink/Dockerfile new file mode 100644 index 0000000000..4ed8bb62f7 --- /dev/null +++ b/serving/source-sink/Dockerfile @@ -0,0 +1,19 @@ +FROM rust:1.76-bookworm AS build + +RUN apt-get update +RUN apt-get install protobuf-compiler -y + +WORKDIR /source-sink +COPY ./ ./ + +# build for release +RUN cargo build --release + +# our final base +FROM debian:bookworm AS simple-source + +# copy the build artifact from the build stage +COPY --from=build /source-sink/target/release/source-sink /bin/serve + +# set the startup command to run your binary +CMD ["/bin/serve"] diff --git a/serving/source-sink/build.rs b/serving/source-sink/build.rs new file mode 100644 index 0000000000..fc30e6b678 --- /dev/null +++ b/serving/source-sink/build.rs @@ -0,0 +1,13 @@ +fn main() { + tonic_build::configure() + .build_server(true) + .compile( + &[ + "proto/source.proto", + "proto/sourcetransform.proto", + "proto/sink.proto", + ], + &["proto"], + ) + .unwrap_or_else(|e| panic!("failed to compile the proto, {:?}", e)) +} diff --git a/serving/source-sink/proto/sink.proto b/serving/source-sink/proto/sink.proto new file mode 100644 index 0000000000..c413ea863b --- /dev/null +++ b/serving/source-sink/proto/sink.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +package sink.v1; + +service Sink { + // SinkFn writes the request to a user defined sink. + rpc SinkFn(stream SinkRequest) returns (SinkResponse); + + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); +} + +/** + * SinkRequest represents a request element. + */ +message SinkRequest { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + string id = 5; + map headers = 6; +} + +/** + * ReadyResponse is the health check result. + */ +message ReadyResponse { + bool ready = 1; +} + +/** + * SinkResponse is the individual response of each message written to the sink. + */ +message SinkResponse { + message Result { + // id is the ID of the message, can be used to uniquely identify the message. + string id = 1; + // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. + Status status = 2; + // err_msg is the error message, set it if success is set to false. + string err_msg = 3; + } + repeated Result results = 1; +} + +/* + * Status is the status of the response. + */ +enum Status { + SUCCESS = 0; + FAILURE = 1; + FALLBACK = 2; +} \ No newline at end of file diff --git a/serving/source-sink/proto/source.proto b/serving/source-sink/proto/source.proto new file mode 100644 index 0000000000..131cc36d30 --- /dev/null +++ b/serving/source-sink/proto/source.proto @@ -0,0 +1,153 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; + +package source.v1; + +service Source { + // Read returns a stream of datum responses. + // The size of the returned ReadResponse is less than or equal to the num_records specified in ReadRequest. + // If the request timeout is reached on server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list). + rpc ReadFn(ReadRequest) returns (stream ReadResponse); + + // AckFn acknowledges a list of datum offsets. + // When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex. + // The caller (numa) expects the AckFn to be successful, and it does not expect any errors. + // If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, + // then it is best to crash because there are no other retry mechanisms possible. + rpc AckFn(AckRequest) returns (AckResponse); + + // PendingFn returns the number of pending records at the user defined source. + rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); + + // PartitionsFn returns the list of partitions for the user defined source. + rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse); + + // IsReady is the heartbeat endpoint for user defined source gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); +} + +/* + * ReadRequest is the request for reading datum stream from user defined source. + */ +message ReadRequest { + message Request { + // Required field indicating the number of records to read. + uint64 num_records = 1; + // Required field indicating the request timeout in milliseconds. + // uint32 can represent 2^32 milliseconds, which is about 49 days. + // We don't use uint64 because time.Duration takes int64 as nano seconds. Using uint64 for milli will cause overflow. + uint32 timeout_in_ms = 2; + } + // Required field indicating the request. + Request request = 1; +} + +/* + * ReadResponse is the response for reading datum stream from user defined source. + */ +message ReadResponse { + message Result { + // Required field holding the payload of the datum. + bytes payload = 1; + // Required field indicating the offset information of the datum. + Offset offset = 2; + // Required field representing the time associated with each datum. It is used for watermarking. + google.protobuf.Timestamp event_time = 3; + // Optional list of keys associated with the datum. + // Key is the "key" attribute in (key,value) as in the map-reduce paradigm. + // We add this optional field to support the use case where the user defined source can provide keys for the datum. + // e.g. Kafka and Redis Stream message usually include information about the keys. + repeated string keys = 4; + // Optional list of headers associated with the datum. + // Headers are the metadata associated with the datum. + // e.g. Kafka and Redis Stream message usually include information about the headers. + map headers = 5; + } + // Required field holding the result. + Result result = 1; +} + +/* + * AckRequest is the request for acknowledging datum. + * It takes a list of offsets to be acknowledged. + */ +message AckRequest { + message Request { + // Required field holding a list of offsets to be acknowledged. + // The offsets must be strictly corresponding to the previously read batch, + // meaning the offsets must be in the same order as the datum responses in the ReadResponse. + // By enforcing ordering, we can save deserialization effort on the server side, assuming the server keeps a local copy of the raw/un-serialized offsets. + repeated Offset offsets = 1; + } + // Required field holding the request. The list will be ordered and will have the same order as the original Read response. + Request request = 1; +} + +/* + * AckResponse is the response for acknowledging datum. It contains one empty field confirming + * the batch of offsets that have been successfully acknowledged. The contract between client and server + * is that the server will only return the AckResponse if the ack request is successful. + * If the server hangs during the ack request, the client can decide to timeout and error out the data forwarder. + * The reason why we define such contract is that we always expect the server to be able to process the ack request. + * Client is expected to send the AckRequest to the server with offsets that are strictly + * corresponding to the previously read batch. If the client sends the AckRequest with offsets that are not, + * it is considered as a client error and the server will not return the AckResponse. + */ +message AckResponse { + message Result { + // Required field indicating the ack request is successful. + google.protobuf.Empty success = 1; + } + // Required field holding the result. + Result result = 1; +} + +/* + * ReadyResponse is the health check result for user defined source. + */ +message ReadyResponse { + // Required field holding the health check result. + bool ready = 1; +} + +/* + * PendingResponse is the response for the pending request. + */ +message PendingResponse { + message Result { + // Required field holding the number of pending records at the user defined source. + // A negative count indicates that the pending information is not available. + int64 count = 1; + } + // Required field holding the result. + Result result = 1; +} + +/* + * PartitionsResponse is the response for the partitions request. + */ +message PartitionsResponse { + message Result { + // Required field holding the list of partitions. + repeated int32 partitions = 1; + } + // Required field holding the result. + Result result = 1; +} + +/* + * Offset is the offset of the datum. + */ +message Offset { + // offset is the offset of the datum. This field is required. + // We define Offset as a byte array because different input data sources can have different representations for Offset. + // The only way to generalize it is to define it as a byte array, + // Such that we can let the UDSource to de-serialize the offset using its own interpretation logics. + bytes offset = 1; + // Optional partition_id indicates which partition of the source the datum belongs to. + // It is useful for sources that have multiple partitions. e.g. Kafka. + // If the partition_id is not specified, it is assumed that the source has a single partition. + int32 partition_id = 2; +} \ No newline at end of file diff --git a/serving/source-sink/proto/sourcetransform.proto b/serving/source-sink/proto/sourcetransform.proto new file mode 100644 index 0000000000..18e045c323 --- /dev/null +++ b/serving/source-sink/proto/sourcetransform.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; + +package sourcetransformer.v1; + +service SourceTransform { + // SourceTransformFn applies a function to each request element. + // In addition to map function, SourceTransformFn also supports assigning a new event time to response. + // SourceTransformFn can be used only at source vertex by source data transformer. + rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse); + + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); +} + +/** + * SourceTransformerRequest represents a request element. + */ +message SourceTransformRequest { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; +} + +/** + * SourceTransformerResponse represents a response element. + */ +message SourceTransformResponse { + message Result { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + repeated string tags = 4; + } + repeated Result results = 1; +} + +/** + * ReadyResponse is the health check result. + */ +message ReadyResponse { + bool ready = 1; +} \ No newline at end of file diff --git a/serving/source-sink/src/config.rs b/serving/source-sink/src/config.rs new file mode 100644 index 0000000000..9ac27a3413 --- /dev/null +++ b/serving/source-sink/src/config.rs @@ -0,0 +1,153 @@ +use crate::error::Error; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use log::LevelFilter; +use numaflow_models::models::MonoVertex; +use std::env; +use std::sync::OnceLock; + +const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT"; +const ENV_GRPC_MAX_MESSAGE_SIZE: &str = "NUMAFLOW_GRPC_MAX_MESSAGE_SIZE"; +const ENV_POD_REPLICA: &str = "NUMAFLOW_REPLICA"; +const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB +const DEFAULT_METRICS_PORT: u16 = 2469; +const ENV_LOG_LEVEL: &str = "NUMAFLOW_DEBUG"; +const DEFAULT_LAG_CHECK_INTERVAL_IN_SECS: u16 = 5; +const DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS: u16 = 3; +const DEFAULT_BATCH_SIZE: u64 = 500; +const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; + +pub fn config() -> &'static Settings { + static CONF: OnceLock = OnceLock::new(); + CONF.get_or_init(|| match Settings::load() { + Ok(v) => v, + Err(e) => { + panic!("Failed to load configuration: {:?}", e); + } + }) +} + +pub struct Settings { + pub mono_vertex_name: String, + pub replica: u32, + pub batch_size: u64, + pub timeout_in_ms: u32, + pub metrics_server_listen_port: u16, + pub log_level: String, + pub grpc_max_message_size: usize, + pub is_transformer_enabled: bool, + pub lag_check_interval_in_secs: u16, + pub lag_refresh_interval_in_secs: u16, +} + +impl Default for Settings { + fn default() -> Self { + Self { + mono_vertex_name: "default".to_string(), + replica: 0, + batch_size: DEFAULT_BATCH_SIZE, + timeout_in_ms: DEFAULT_TIMEOUT_IN_MS, + metrics_server_listen_port: DEFAULT_METRICS_PORT, + log_level: "info".to_string(), + grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE, + is_transformer_enabled: false, + lag_check_interval_in_secs: DEFAULT_LAG_CHECK_INTERVAL_IN_SECS, + lag_refresh_interval_in_secs: DEFAULT_LAG_REFRESH_INTERVAL_IN_SECS, + } + } +} + +impl Settings { + fn load() -> Result { + let mut settings = Settings::default(); + if let Ok(mono_vertex_spec) = env::var(ENV_MONO_VERTEX_OBJ) { + // decode the spec it will be base64 encoded + let mono_vertex_spec = BASE64_STANDARD + .decode(mono_vertex_spec.as_bytes()) + .map_err(|e| { + Error::ConfigError(format!("Failed to decode mono vertex spec: {:?}", e)) + })?; + + let mono_vertex_obj: MonoVertex = + serde_json::from_slice(&mono_vertex_spec).map_err(|e| { + Error::ConfigError(format!("Failed to parse mono vertex spec: {:?}", e)) + })?; + + settings.batch_size = mono_vertex_obj + .spec + .limits + .clone() + .unwrap() + .read_batch_size + .map(|x| x as u64) + .unwrap_or(DEFAULT_BATCH_SIZE); + + settings.timeout_in_ms = mono_vertex_obj + .spec + .limits + .clone() + .unwrap() + .read_timeout + .map(|x| std::time::Duration::from(x).as_millis() as u32) + .unwrap_or(DEFAULT_TIMEOUT_IN_MS); + + settings.mono_vertex_name = mono_vertex_obj + .metadata + .and_then(|metadata| metadata.name) + .ok_or_else(|| Error::ConfigError("Mono vertex name not found".to_string()))?; + + settings.is_transformer_enabled = mono_vertex_obj + .spec + .source + .ok_or(Error::ConfigError("Source not found".to_string()))? + .transformer + .is_some(); + } + + settings.log_level = + env::var(ENV_LOG_LEVEL).unwrap_or_else(|_| LevelFilter::Info.to_string()); + + settings.grpc_max_message_size = env::var(ENV_GRPC_MAX_MESSAGE_SIZE) + .unwrap_or_else(|_| DEFAULT_GRPC_MAX_MESSAGE_SIZE.to_string()) + .parse() + .map_err(|e| { + Error::ConfigError(format!("Failed to parse grpc max message size: {:?}", e)) + })?; + + settings.replica = env::var(ENV_POD_REPLICA) + .unwrap_or_else(|_| "0".to_string()) + .parse() + .map_err(|e| Error::ConfigError(format!("Failed to parse pod replica: {:?}", e)))?; + + Ok(settings) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test_settings_load() { + // Set up environment variables + env::set_var(ENV_MONO_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLW1vbm8tdmVydGV4IiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJyZXBsaWNhcyI6MCwic291cmNlIjp7InRyYW5zZm9ybWVyIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6InF1YXkuaW8vbnVtYWlvL251bWFmbG93LXJzL21hcHQtZXZlbnQtdGltZS1maWx0ZXI6c3RhYmxlIiwicmVzb3VyY2VzIjp7fX0sImJ1aWx0aW4iOm51bGx9LCJ1ZHNvdXJjZSI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJkb2NrZXIuaW50dWl0LmNvbS9wZXJzb25hbC95aGwwMS9zaW1wbGUtc291cmNlOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sInNpbmsiOnsidWRzaW5rIjp7ImNvbnRhaW5lciI6eyJpbWFnZSI6ImRvY2tlci5pbnR1aXQuY29tL3BlcnNvbmFsL3lobDAxL2JsYWNraG9sZS1zaW5rOnN0YWJsZSIsInJlc291cmNlcyI6e319fX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9fSwic3RhdHVzIjp7InJlcGxpY2FzIjowLCJsYXN0VXBkYXRlZCI6bnVsbCwibGFzdFNjYWxlZEF0IjpudWxsfX0="); + env::set_var(ENV_LOG_LEVEL, "debug"); + env::set_var(ENV_GRPC_MAX_MESSAGE_SIZE, "128000000"); + + // Load settings + let settings = Settings::load().unwrap(); + + // Verify settings + assert_eq!(settings.mono_vertex_name, "simple-mono-vertex"); + assert_eq!(settings.batch_size, 500); + assert_eq!(settings.timeout_in_ms, 1000); + assert_eq!(settings.log_level, "debug"); + assert_eq!(settings.grpc_max_message_size, 128000000); + + // Clean up environment variables + env::remove_var(ENV_MONO_VERTEX_OBJ); + env::remove_var(ENV_LOG_LEVEL); + env::remove_var(ENV_GRPC_MAX_MESSAGE_SIZE); + } +} diff --git a/serving/source-sink/src/error.rs b/serving/source-sink/src/error.rs new file mode 100644 index 0000000000..76ae1ce590 --- /dev/null +++ b/serving/source-sink/src/error.rs @@ -0,0 +1,36 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Error, Debug, Clone)] +pub enum Error { + #[error("Metrics Error - {0}")] + MetricsError(String), + + #[error("Source Error - {0}")] + SourceError(String), + + #[error("Sink Error - {0}")] + SinkError(String), + + #[error("Transformer Error - {0}")] + TransformerError(String), + + #[error("Forwarder Error - {0}")] + ForwarderError(String), + + #[error("Connection Error - {0}")] + ConnectionError(String), + + #[error("gRPC Error - {0}")] + GRPCError(String), + + #[error("Config Error - {0}")] + ConfigError(String), +} + +impl From for Error { + fn from(status: tonic::Status) -> Self { + Error::GRPCError(status.to_string()) + } +} diff --git a/serving/source-sink/src/forwarder.rs b/serving/source-sink/src/forwarder.rs new file mode 100644 index 0000000000..cd39038a7c --- /dev/null +++ b/serving/source-sink/src/forwarder.rs @@ -0,0 +1,415 @@ +use crate::config::config; +use crate::error::{Error, Result}; +use crate::metrics::{ + FORWARDER_ACK_TOTAL, FORWARDER_READ_BYTES_TOTAL, FORWARDER_READ_TOTAL, FORWARDER_WRITE_TOTAL, + MONO_VERTEX_NAME, PARTITION_LABEL, REPLICA_LABEL, VERTEX_TYPE_LABEL, +}; +use crate::sink::SinkClient; +use crate::source::SourceClient; +use crate::transformer::TransformerClient; +use chrono::Utc; +use metrics::counter; +use tokio::sync::oneshot; +use tokio::task::JoinSet; +use tracing::{info, trace}; + +const MONO_VERTEX_TYPE: &str = "mono_vertex"; + +/// Forwarder is responsible for reading messages from the source, applying transformation if +/// transformer is present, writing the messages to the sink, and then acknowledging the messages +/// back to the source. +pub(crate) struct Forwarder { + source_client: SourceClient, + sink_client: SinkClient, + transformer_client: Option, + shutdown_rx: oneshot::Receiver<()>, + common_labels: Vec<(String, String)>, +} + +impl Forwarder { + #[allow(clippy::too_many_arguments)] + pub(crate) async fn new( + source_client: SourceClient, + sink_client: SinkClient, + transformer_client: Option, + shutdown_rx: oneshot::Receiver<()>, + ) -> Result { + let common_labels = vec![ + ( + MONO_VERTEX_NAME.to_string(), + config().mono_vertex_name.clone(), + ), + (VERTEX_TYPE_LABEL.to_string(), MONO_VERTEX_TYPE.to_string()), + (REPLICA_LABEL.to_string(), config().replica.to_string()), + (PARTITION_LABEL.to_string(), "0".to_string()), + ]; + + Ok(Self { + source_client, + sink_client, + transformer_client, + shutdown_rx, + common_labels, + }) + } + + /// run starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed. + /// this means that, in the happy path scenario a block is always completely processed. + /// this function will return on any error and will cause end up in a non-0 exit code. + pub(crate) async fn run(&mut self) -> Result<()> { + let mut messages_count: u64 = 0; + let mut last_forwarded_at = std::time::Instant::now(); + loop { + // TODO: emit latency metrics, metrics-rs histograms has memory leak issues. + let start_time = tokio::time::Instant::now(); + // two arms, either shutdown or forward-a-chunk + tokio::select! { + _ = &mut self.shutdown_rx => { + info!("Shutdown signal received, stopping forwarder..."); + break; + } + result = self.source_client.read_fn(config().batch_size, config().timeout_in_ms) => { + // Read messages from the source + let messages = result?; + info!("Read batch size: {} and latency - {}ms", messages.len(), start_time.elapsed().as_millis()); + + messages_count += messages.len() as u64; + let bytes_count = messages.iter().map(|msg| msg.value.len() as u64).sum::(); + counter!(FORWARDER_READ_TOTAL, &self.common_labels).increment(messages_count); + counter!(FORWARDER_READ_BYTES_TOTAL, &self.common_labels).increment(bytes_count); + + // Extract offsets from the messages + let offsets = messages.iter().map(|message| message.offset.clone()).collect(); + // Apply transformation if transformer is present + let transformed_messages = if let Some(transformer_client) = &self.transformer_client { + let start_time = tokio::time::Instant::now(); + let mut jh = JoinSet::new(); + for message in messages { + let mut transformer_client = transformer_client.clone(); + jh.spawn(async move { transformer_client.transform_fn(message).await }); + } + + let mut results = Vec::new(); + while let Some(task) = jh.join_next().await { + let result = task.map_err(|e| Error::TransformerError(format!("{:?}", e)))?; + let result = result?; + results.extend(result); + } + info!("Transformer latency - {}ms", start_time.elapsed().as_millis()); + results + } else { + messages + }; + + // Write messages to the sink + // TODO: should we retry writing? what if the error is transient? + // we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit. + // we need to confirm this via FMEA tests. + let start_time = tokio::time::Instant::now(); + self.sink_client.sink_fn(transformed_messages).await?; + info!("Sink latency - {}ms", start_time.elapsed().as_millis()); + counter!(FORWARDER_WRITE_TOTAL, &self.common_labels).increment(messages_count); + + // Acknowledge the messages + // TODO: should we retry acking? what if the error is transient? + // we could rely on gRPC retries and say that any error that is bubbled up is worthy of non-0 exit. + // we need to confirm this via FMEA tests. + let start_time = tokio::time::Instant::now(); + self.source_client.ack_fn(offsets).await?; + info!("Ack latency - {}ms", start_time.elapsed().as_millis()); + + counter!(FORWARDER_ACK_TOTAL, &self.common_labels).increment(messages_count); + trace!("Forwarded {} messages", messages_count); + } + } + // if the last forward was more than 1 second ago, forward a chunk print the number of messages forwarded + if last_forwarded_at.elapsed().as_millis() >= 1000 { + info!( + "Forwarded {} messages at time {}", + messages_count, + Utc::now() + ); + messages_count = 0; + last_forwarded_at = std::time::Instant::now(); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use chrono::Utc; + use numaflow::source::{Message, Offset, SourceReadRequest}; + use numaflow::{sink, source, sourcetransform}; + use tokio::sync::mpsc::Sender; + + use crate::forwarder::Forwarder; + use crate::sink::{SinkClient, SinkConfig}; + use crate::source::{SourceClient, SourceConfig}; + use crate::transformer::{TransformerClient, TransformerConfig}; + + struct SimpleSource { + yet_to_be_acked: std::sync::RwLock>, + } + + impl SimpleSource { + fn new() -> Self { + Self { + yet_to_be_acked: std::sync::RwLock::new(HashSet::new()), + } + } + } + + #[tonic::async_trait] + impl source::Sourcer for SimpleSource { + async fn read(&self, request: SourceReadRequest, transmitter: Sender) { + let event_time = Utc::now(); + let mut message_offsets = Vec::with_capacity(request.count); + for i in 0..2 { + let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i); + transmitter + .send(Message { + value: "test-message".as_bytes().to_vec(), + event_time, + offset: Offset { + offset: offset.clone().into_bytes(), + partition_id: 0, + }, + keys: vec!["test-key".to_string()], + headers: Default::default(), + }) + .await + .unwrap(); + message_offsets.push(offset) + } + self.yet_to_be_acked + .write() + .unwrap() + .extend(message_offsets) + } + + async fn ack(&self, offsets: Vec) { + for offset in offsets { + self.yet_to_be_acked + .write() + .unwrap() + .remove(&String::from_utf8(offset.offset).unwrap()); + } + } + + async fn pending(&self) -> usize { + self.yet_to_be_acked.read().unwrap().len() + } + + async fn partitions(&self) -> Option> { + Some(vec![0]) + } + } + + struct SimpleTransformer; + #[tonic::async_trait] + impl sourcetransform::SourceTransformer for SimpleTransformer { + async fn transform( + &self, + input: sourcetransform::SourceTransformRequest, + ) -> Vec { + let keys = input + .keys + .iter() + .map(|k| k.clone() + "-transformed") + .collect(); + let message = sourcetransform::Message::new(input.value, Utc::now()) + .keys(keys) + .tags(vec![]); + vec![message] + } + } + + struct InMemorySink { + sender: Sender, + } + + impl InMemorySink { + fn new(sender: Sender) -> Self { + Self { sender } + } + } + + #[tonic::async_trait] + impl sink::Sinker for InMemorySink { + async fn sink( + &self, + mut input: tokio::sync::mpsc::Receiver, + ) -> Vec { + let mut responses: Vec = Vec::new(); + while let Some(datum) = input.recv().await { + let response = match std::str::from_utf8(&datum.value) { + Ok(_) => { + self.sender + .send(Message { + value: datum.value.clone(), + event_time: datum.event_time, + offset: Offset { + offset: "test-offset".to_string().into_bytes(), + partition_id: 0, + }, + keys: datum.keys.clone(), + headers: Default::default(), + }) + .await + .unwrap(); + sink::Response::ok(datum.id) + } + Err(e) => { + sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)) + } + }; + responses.push(response); + } + responses + } + } + + #[tokio::test] + async fn test_forwarder_source_sink() { + // Create channels for communication + let (sink_tx, mut sink_rx) = tokio::sync::mpsc::channel(10); + + // Start the source server + let (source_shutdown_tx, source_shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let source_sock_file = tmp_dir.path().join("source.sock"); + let server_info_file = tmp_dir.path().join("source-server-info"); + + let server_info = server_info_file.clone(); + let source_socket = source_sock_file.clone(); + let source_server_handle = tokio::spawn(async move { + source::Server::new(SimpleSource::new()) + .with_socket_file(source_socket) + .with_server_info_file(server_info) + .start_with_shutdown(source_shutdown_rx) + .await + .unwrap(); + }); + let source_config = SourceConfig { + socket_path: source_sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }; + + // Start the sink server + let (sink_shutdown_tx, sink_shutdown_rx) = tokio::sync::oneshot::channel(); + let sink_tmp_dir = tempfile::TempDir::new().unwrap(); + let sink_sock_file = sink_tmp_dir.path().join("sink.sock"); + let server_info_file = sink_tmp_dir.path().join("sink-server-info"); + + let server_info = server_info_file.clone(); + let sink_socket = sink_sock_file.clone(); + let sink_server_handle = tokio::spawn(async move { + sink::Server::new(InMemorySink::new(sink_tx)) + .with_socket_file(sink_socket) + .with_server_info_file(server_info) + .start_with_shutdown(sink_shutdown_rx) + .await + .unwrap(); + }); + let sink_config = SinkConfig { + socket_path: sink_sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }; + + // Start the transformer server + let (transformer_shutdown_tx, transformer_shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let transformer_sock_file = tmp_dir.path().join("transformer.sock"); + let server_info_file = tmp_dir.path().join("transformer-server-info"); + + let server_info = server_info_file.clone(); + let transformer_socket = transformer_sock_file.clone(); + let transformer_server_handle = tokio::spawn(async move { + sourcetransform::Server::new(SimpleTransformer) + .with_socket_file(transformer_socket) + .with_server_info_file(server_info) + .start_with_shutdown(transformer_shutdown_rx) + .await + .unwrap(); + }); + let transformer_config = TransformerConfig { + socket_path: transformer_sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }; + + // Wait for the servers to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let (forwarder_shutdown_tx, forwarder_shutdown_rx) = tokio::sync::oneshot::channel(); + + let source_client = SourceClient::connect(source_config) + .await + .expect("failed to connect to source server"); + + let sink_client = SinkClient::connect(sink_config) + .await + .expect("failed to connect to sink server"); + + let transformer_client = TransformerClient::connect(transformer_config) + .await + .expect("failed to connect to transformer server"); + + let mut forwarder = Forwarder::new( + source_client, + sink_client, + Some(transformer_client), + forwarder_shutdown_rx, + ) + .await + .expect("failed to create forwarder"); + + let forwarder_handle = tokio::spawn(async move { + forwarder.run().await.unwrap(); + }); + + // Receive messages from the sink + let received_message = sink_rx.recv().await.unwrap(); + assert_eq!(received_message.value, "test-message".as_bytes()); + assert_eq!( + received_message.keys, + vec!["test-key-transformed".to_string()] + ); + + // stop the forwarder + forwarder_shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + forwarder_handle + .await + .expect("failed to join forwarder task"); + + // stop the servers + source_shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + source_server_handle + .await + .expect("failed to join source server task"); + + transformer_shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + transformer_server_handle + .await + .expect("failed to join transformer server task"); + + sink_shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + sink_server_handle + .await + .expect("failed to join sink server task"); + } +} diff --git a/serving/source-sink/src/lib.rs b/serving/source-sink/src/lib.rs new file mode 100644 index 0000000000..2099bc63b9 --- /dev/null +++ b/serving/source-sink/src/lib.rs @@ -0,0 +1,306 @@ +use std::fs; +use std::time::Duration; + +use tokio::signal; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; +use tokio::time::sleep; +use tracing::{error, info}; + +pub(crate) use crate::error::Error; +use crate::forwarder::Forwarder; +use crate::sink::{SinkClient, SinkConfig}; +use crate::source::{SourceClient, SourceConfig}; +use crate::transformer::{TransformerClient, TransformerConfig}; + +pub(crate) use self::error::Result; + +/// SourcerSinker orchestrates data movement from the Source to the Sink via the optional SourceTransformer. +/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received: +/// - Read X messages from the source +/// - Invokes the SourceTransformer concurrently +/// - Calls the Sinker to write the batch to the Sink +/// - Send Acknowledgement back to the Source +pub mod error; + +pub mod metrics; + +pub mod source; + +pub mod sink; + +pub mod transformer; + +pub mod forwarder; + +pub mod config; + +pub mod message; +pub(crate) mod shared; + +/// forwards a chunk of data from the source to the sink via an optional transformer. +/// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing. +pub async fn run_forwarder( + source_config: SourceConfig, + sink_config: SinkConfig, + transformer_config: Option, + custom_shutdown_rx: Option>, +) -> Result<()> { + wait_for_server_info(&source_config.server_info_file).await?; + let mut source_client = SourceClient::connect(source_config).await?; + + // start the lag reader to publish lag metrics + let mut lag_reader = metrics::LagReader::new(source_client.clone(), None, None); + lag_reader.start().await; + + wait_for_server_info(&sink_config.server_info_file).await?; + let mut sink_client = SinkClient::connect(sink_config).await?; + + let mut transformer_client = if let Some(config) = transformer_config { + wait_for_server_info(&config.server_info_file).await?; + Some(TransformerClient::connect(config).await?) + } else { + None + }; + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + // readiness check for all the ud containers + wait_until_ready( + &mut source_client, + &mut sink_client, + &mut transformer_client, + ) + .await?; + + // TODO: use builder pattern of options like TIMEOUT, BATCH_SIZE, etc? + let mut forwarder = + Forwarder::new(source_client, sink_client, transformer_client, shutdown_rx).await?; + + let forwarder_handle: JoinHandle> = tokio::spawn(async move { + forwarder.run().await?; + Ok(()) + }); + + let shutdown_handle: JoinHandle> = tokio::spawn(async move { + shutdown_signal(custom_shutdown_rx).await; + shutdown_tx + .send(()) + .map_err(|_| Error::ForwarderError("Failed to send shutdown signal".to_string()))?; + Ok(()) + }); + + forwarder_handle + .await + .unwrap_or_else(|e| { + error!("Forwarder task panicked: {:?}", e); + Err(Error::ForwarderError("Forwarder task panicked".to_string())) + }) + .unwrap_or_else(|e| { + error!("Forwarder failed: {:?}", e); + }); + + if !shutdown_handle.is_finished() { + shutdown_handle.abort(); + } + + lag_reader.shutdown().await; + info!("Forwarder stopped gracefully"); + Ok(()) +} + +async fn wait_for_server_info(file_path: &str) -> Result<()> { + loop { + if let Ok(metadata) = fs::metadata(file_path) { + if metadata.len() > 0 { + return Ok(()); + } + } + info!("Server info file {} is not ready, waiting...", file_path); + sleep(Duration::from_secs(1)).await; + } +} + +async fn wait_until_ready( + source_client: &mut SourceClient, + sink_client: &mut SinkClient, + transformer_client: &mut Option, +) -> Result<()> { + loop { + let source_ready = source_client.is_ready().await.is_ok(); + if !source_ready { + info!("UDSource is not ready, waiting..."); + } + + let sink_ready = sink_client.is_ready().await.is_ok(); + if !sink_ready { + info!("UDSink is not ready, waiting..."); + } + + let transformer_ready = if let Some(client) = transformer_client { + let ready = client.is_ready().await.is_ok(); + if !ready { + info!("UDTransformer is not ready, waiting..."); + } + ready + } else { + true + }; + + if source_ready && sink_ready && transformer_ready { + break; + } + + sleep(Duration::from_secs(1)).await; + } + + Ok(()) +} + +async fn shutdown_signal(shutdown_rx: Option>) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + info!("Received terminate signal"); + }; + + let custom_shutdown = async { + if let Some(rx) = shutdown_rx { + rx.await.ok(); + } else { + // Create a watch channel that never sends + let (_tx, mut rx) = tokio::sync::watch::channel(()); + rx.changed().await.ok(); + } + info!("Received custom shutdown signal"); + }; + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + _ = custom_shutdown => {}, + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use numaflow::source::{Message, Offset, SourceReadRequest}; + use numaflow::{sink, source}; + use tokio::sync::mpsc::Sender; + + use crate::sink::SinkConfig; + use crate::source::SourceConfig; + + struct SimpleSource; + #[tonic::async_trait] + impl source::Sourcer for SimpleSource { + async fn read(&self, _: SourceReadRequest, _: Sender) {} + + async fn ack(&self, _: Vec) {} + + async fn pending(&self) -> usize { + 0 + } + + async fn partitions(&self) -> Option> { + None + } + } + + struct SimpleSink; + + #[tonic::async_trait] + impl sink::Sinker for SimpleSink { + async fn sink( + &self, + _input: tokio::sync::mpsc::Receiver, + ) -> Vec { + vec![] + } + } + #[tokio::test] + async fn run_forwarder() { + let (src_shutdown_tx, src_shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let src_sock_file = tmp_dir.path().join("source.sock"); + let src_info_file = tmp_dir.path().join("source-server-info"); + + let server_info = src_info_file.clone(); + let server_socket = src_sock_file.clone(); + let src_server_handle = tokio::spawn(async move { + source::Server::new(SimpleSource) + .with_socket_file(server_socket) + .with_server_info_file(server_info) + .start_with_shutdown(src_shutdown_rx) + .await + .unwrap(); + }); + let source_config = SourceConfig { + socket_path: src_sock_file.to_str().unwrap().to_string(), + server_info_file: src_info_file.to_str().unwrap().to_string(), + max_message_size: 100, + }; + + let (sink_shutdown_tx, sink_shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let sink_sock_file = tmp_dir.path().join("sink.sock"); + let sink_server_info = tmp_dir.path().join("sink-server-info"); + + let server_socket = sink_sock_file.clone(); + let server_info = sink_server_info.clone(); + let sink_server_handle = tokio::spawn(async move { + sink::Server::new(SimpleSink) + .with_socket_file(server_socket) + .with_server_info_file(server_info) + .start_with_shutdown(sink_shutdown_rx) + .await + .unwrap(); + }); + let sink_config = SinkConfig { + socket_path: sink_sock_file.to_str().unwrap().to_string(), + server_info_file: sink_server_info.to_str().unwrap().to_string(), + max_message_size: 100, + }; + + // wait for the servers to start + // FIXME: we need to have a better way, this is flaky + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + env::set_var("SOURCE_SOCKET", src_sock_file.to_str().unwrap()); + env::set_var("SINK_SOCKET", sink_sock_file.to_str().unwrap()); + + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + + let forwarder_handle = tokio::spawn(async move { + let result = + super::run_forwarder(source_config, sink_config, None, Some(shutdown_rx)).await; + assert!(result.is_ok()); + }); + + // wait for the forwarder to start + // FIXME: we need to have a better way, this is flaky + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // stop the forwarder + shutdown_tx.send(()).unwrap(); + forwarder_handle.await.unwrap(); + + // stop the source and sink servers + src_shutdown_tx.send(()).unwrap(); + sink_shutdown_tx.send(()).unwrap(); + + src_server_handle.await.unwrap(); + sink_server_handle.await.unwrap(); + } +} diff --git a/serving/source-sink/src/main.rs b/serving/source-sink/src/main.rs new file mode 100644 index 0000000000..aa1d8c0605 --- /dev/null +++ b/serving/source-sink/src/main.rs @@ -0,0 +1,64 @@ +use log::Level::Info; +use sourcer_sinker::config::config; +use sourcer_sinker::sink::SinkConfig; +use sourcer_sinker::source::SourceConfig; +use sourcer_sinker::transformer::TransformerConfig; +use sourcer_sinker::run_forwarder; +use std::env; +use std::net::SocketAddr; +use tracing::error; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::EnvFilter; +use sourcer_sinker::metrics::start_metrics_https_server; + +#[tokio::main] +async fn main() { + let log_level = env::var("NUMAFLOW_DEBUG").unwrap_or_else(|_| Info.to_string()); + // Initialize the logger + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .parse_lossy(log_level), + ) + .with_target(false) + .init(); + + // Start the metrics server, which server the prometheus metrics. + // TODO: make the port configurable. + let metrics_addr: SocketAddr = "0.0.0.0:2469".parse().expect("Invalid address"); + + // Start the metrics server in a separate background async spawn, + // This should be running throughout the lifetime of the application, hence the handle is not + // joined. + tokio::spawn(async move { + if let Err(e) = start_metrics_https_server(metrics_addr).await { + error!("Metrics server error: {:?}", e); + } + }); + + // Initialize the source, sink and transformer configurations + // We are using the default configurations for now. + let source_config = SourceConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }; + + let sink_config = SinkConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }; + let transformer_config = if config().is_transformer_enabled { + Some(TransformerConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }) + } else { + None + }; + + // Run the forwarder + if let Err(e) = run_forwarder(source_config, sink_config, transformer_config, None).await { + error!("Application error: {:?}", e); + } +} diff --git a/serving/source-sink/src/message.rs b/serving/source-sink/src/message.rs new file mode 100644 index 0000000000..1ca69e9878 --- /dev/null +++ b/serving/source-sink/src/message.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; + +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use base64::Engine; +use chrono::{DateTime, Utc}; + +use crate::error::Error; +use crate::shared::{prost_timestamp_from_utc, utc_from_timestamp}; +use crate::sink::proto; +use crate::source::proto::read_response; +use crate::transformer::proto::SourceTransformRequest; + +/// A message that is sent from the source to the sink. +#[derive(Debug, Clone)] +pub(crate) struct Message { + /// keys of the message + pub(crate) keys: Vec, + /// actual payload of the message + pub(crate) value: Vec, + /// offset of the message + pub(crate) offset: Offset, + /// event time of the message + pub(crate) event_time: DateTime, + /// headers of the message + pub(crate) headers: HashMap, +} + +/// Offset of the message which will be used to acknowledge the message. +#[derive(Debug, Clone)] +pub(crate) struct Offset { + /// unique identifier of the message + pub(crate) offset: String, + /// partition id of the message + pub(crate) partition_id: i32, +} + +/// Convert the [`Message`] to [`SourceTransformRequest`] +impl From for SourceTransformRequest { + fn from(message: Message) -> Self { + Self { + keys: message.keys, + value: message.value, + event_time: prost_timestamp_from_utc(message.event_time), + watermark: None, + headers: message.headers, + } + } +} + +/// Convert [`read_response::Result`] to [`Message`] +impl TryFrom for Message { + type Error = crate::Error; + + fn try_from(result: read_response::Result) -> Result { + let source_offset = match result.offset { + Some(o) => Offset { + offset: BASE64_STANDARD.encode(o.offset), + partition_id: o.partition_id, + }, + None => return Err(Error::SourceError("Offset not found".to_string())), + }; + + Ok(Message { + keys: result.keys, + value: result.payload, + offset: source_offset, + event_time: utc_from_timestamp(result.event_time), + headers: result.headers, + }) + } +} + +/// Convert [`Message`] to [`proto::SinkRequest`] +impl From for proto::SinkRequest { + fn from(message: Message) -> Self { + Self { + keys: message.keys, + value: message.value, + event_time: prost_timestamp_from_utc(message.event_time), + watermark: None, + id: format!("{}-{}", message.offset.partition_id, message.offset.offset), + headers: message.headers, + } + } +} diff --git a/serving/source-sink/src/metrics.rs b/serving/source-sink/src/metrics.rs new file mode 100644 index 0000000000..d257609c76 --- /dev/null +++ b/serving/source-sink/src/metrics.rs @@ -0,0 +1,332 @@ +use std::future::ready; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::{routing::get, Router}; +use axum_server::tls_rustls::RustlsConfig; +use log::info; +use metrics::describe_counter; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}; +use rcgen::{CertifiedKey, generate_simple_self_signed}; +use tokio::net::{TcpListener, ToSocketAddrs}; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error}; + +use crate::error::Error; +use crate::source::SourceClient; + +// Define the labels for the metrics +pub const MONO_VERTEX_NAME: &str = "vertex"; +pub const REPLICA_LABEL: &str = "replica"; +pub const PARTITION_LABEL: &str = "partition_name"; +pub const VERTEX_TYPE_LABEL: &str = "vertex_type"; + +// Define the metrics +pub const FORWARDER_READ_TOTAL: &str = "forwarder_read_total"; +pub const FORWARDER_READ_BYTES_TOTAL: &str = "forwarder_read_bytes_total"; + +pub const FORWARDER_ACK_TOTAL: &str = "forwarder_ack_total"; +pub const FORWARDER_WRITE_TOTAL: &str = "forwarder_write_total"; + +/// Collect and emit prometheus metrics. +/// Metrics router and server +pub async fn start_metrics_http_server(addr: A) -> crate::Result<()> +where + A: ToSocketAddrs + std::fmt::Debug, +{ + // setup_metrics_recorder should only be invoked once + let recorder_handle = setup_metrics_recorder()?; + + let metrics_app = Router::new() + .route("/metrics", get(move || ready(recorder_handle.render()))) + .route("/livez", get(livez)) + .route("/readyz", get(readyz)) + .route("/sidecar-livez", get(sidecar_livez)); + + let listener = TcpListener::bind(&addr) + .await + .map_err(|e| Error::MetricsError(format!("Creating listener on {:?}: {}", addr, e)))?; + + debug!("metrics server started at addr: {:?}", addr); + + axum::serve(listener, metrics_app) + .await + .map_err(|e| Error::MetricsError(format!("Starting web server for metrics: {}", e)))?; + Ok(()) +} + +pub async fn start_metrics_https_server(addr: SocketAddr) -> crate::Result<()> +where +{ + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + // Generate a self-signed certificate + let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()]) + .map_err(|e| Error::MetricsError(format!("Generating self-signed certificate: {}", e)))?; + + + let tls_config = RustlsConfig::from_pem(cert.pem().into(), key_pair.serialize_pem().into()) + .await + .map_err(|e| Error::MetricsError(format!("Creating tlsConfig from pem: {}", e)))?; + + // setup_metrics_recorder should only be invoked once + let recorder_handle = setup_metrics_recorder()?; + + let metrics_app = Router::new() + .route("/metrics", get(move || ready(recorder_handle.render()))) + .route("/livez", get(livez)) + .route("/readyz", get(readyz)) + .route("/sidecar-livez", get(sidecar_livez)); + + axum_server::bind_rustls(addr, tls_config) + .serve(metrics_app.into_make_service()) + .await + .map_err(|e| Error::MetricsError(format!("Starting web server for metrics: {}", e)))?; + + Ok(()) +} + +async fn livez() -> impl IntoResponse { + StatusCode::NO_CONTENT +} + +async fn readyz() -> impl IntoResponse { + StatusCode::NO_CONTENT +} + +async fn sidecar_livez() -> impl IntoResponse { + StatusCode::NO_CONTENT +} + +/// setup the Prometheus metrics recorder. +fn setup_metrics_recorder() -> crate::Result { + // 1 micro-sec < t < 1000 seconds + let log_to_power_of_sqrt2_bins: [f64; 62] = (0..62) + .map(|i| 2_f64.sqrt().powf(i as f64)) + .collect::>() + .try_into() + .unwrap(); + + let prometheus_handle = PrometheusBuilder::new() + .set_buckets_for_metric( + Matcher::Full("fac_total_duration_micros".to_string()), // fac == forward-a-chunk + &log_to_power_of_sqrt2_bins, + ) + .map_err(|e| Error::MetricsError(format!("Prometheus install_recorder: {}", e)))? + .install_recorder() + .map_err(|e| Error::MetricsError(format!("Prometheus install_recorder: {}", e)))?; + + // Define forwarder metrics + describe_counter!( + FORWARDER_READ_TOTAL, + "Total number of Data Messages Read in the forwarder" + ); + describe_counter!( + FORWARDER_READ_BYTES_TOTAL, + "Total number of bytes read in the forwarder" + ); + describe_counter!( + FORWARDER_ACK_TOTAL, + "Total number of acknowledgments by the forwarder" + ); + describe_counter!( + FORWARDER_WRITE_TOTAL, + "Total number of Data Messages written by the forwarder" + ); + Ok(prometheus_handle) +} + +const MAX_PENDING_STATS: usize = 1800; + +// Pending info with timestamp +struct TimestampedPending { + pending: i64, + timestamp: std::time::Instant, +} + +/// `LagReader` is responsible for periodically checking the lag of the source client +/// and exposing the metrics. It maintains a list of pending stats and ensures that +/// only the most recent entries are kept. +pub(crate) struct LagReader { + source_client: SourceClient, + lag_checking_interval: Duration, + refresh_interval: Duration, + cancellation_token: CancellationToken, + buildup_handle: Option>, + expose_handle: Option>, + pending_stats: Arc>>, +} + +impl LagReader { + /// Creates a new `LagReader` instance. + pub(crate) fn new( + source_client: SourceClient, + lag_checking_interval: Option, + refresh_interval: Option, + ) -> Self { + Self { + source_client, + lag_checking_interval: lag_checking_interval.unwrap_or_else(|| Duration::from_secs(3)), + refresh_interval: refresh_interval.unwrap_or_else(|| Duration::from_secs(5)), + cancellation_token: CancellationToken::new(), + buildup_handle: None, + expose_handle: None, + pending_stats: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Starts the lag reader by spawning tasks to build up pending info and expose pending metrics. + /// + /// This method spawns two asynchronous tasks: + /// - One to periodically check the lag and update the pending stats. + /// - Another to periodically expose the pending metrics. + pub async fn start(&mut self) { + let token = self.cancellation_token.clone(); + let source_client = self.source_client.clone(); + let lag_checking_interval = self.lag_checking_interval; + let refresh_interval = self.refresh_interval; + let pending_stats = self.pending_stats.clone(); + + self.buildup_handle = Some(tokio::spawn(async move { + buildup_pending_info(source_client, token, lag_checking_interval, pending_stats).await; + })); + + let token = self.cancellation_token.clone(); + let pending_stats = self.pending_stats.clone(); + self.expose_handle = Some(tokio::spawn(async move { + expose_pending_metrics(token, refresh_interval, pending_stats).await; + })); + } + + /// Shuts down the lag reader by cancelling the tasks and waiting for them to complete. + pub(crate) async fn shutdown(self) { + self.cancellation_token.cancel(); + if let Some(handle) = self.buildup_handle { + let _ = handle.await; + } + if let Some(handle) = self.expose_handle { + let _ = handle.await; + } + } +} + +// Periodically checks the pending messages from the source client and updates the pending stats. +async fn buildup_pending_info( + mut source_client: SourceClient, + cancellation_token: CancellationToken, + lag_checking_interval: Duration, + pending_stats: Arc>>, +) { + let mut ticker = time::interval(lag_checking_interval); + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + return; + } + _ = ticker.tick() => { + match source_client.pending_fn().await { + Ok(pending) => { + if pending != -1 { + let mut stats = pending_stats.lock().await; + stats.push(TimestampedPending { + pending, + timestamp: std::time::Instant::now(), + }); + let n = stats.len(); + // Ensure only the most recent MAX_PENDING_STATS entries are kept + if n > MAX_PENDING_STATS { + stats.drain(0..(n - MAX_PENDING_STATS)); + } + } + } + Err(err) => { + error!("Failed to get pending messages: {:?}", err); + } + } + } + } + } +} + +// Periodically exposes the pending metrics by calculating the average pending messages over different intervals. +async fn expose_pending_metrics( + cancellation_token: CancellationToken, + refresh_interval: Duration, + pending_stats: Arc>>, +) { + let mut ticker = time::interval(refresh_interval); + let lookback_seconds_map = vec![("1m", 60), ("5m", 300), ("15m", 900)]; + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + return; + } + _ = ticker.tick() => { + for (label, seconds) in &lookback_seconds_map { + let pending = calculate_pending(*seconds, &pending_stats).await; + if pending != -1 { + // TODO: emit it as a metric + info!("Pending messages ({}): {}", label, pending); + } + } + } + } + } +} + +// Calculate the average pending messages over the last `seconds` seconds. +async fn calculate_pending( + seconds: i64, + pending_stats: &Arc>>, +) -> i64 { + let mut result = -1; + let mut total = 0; + let mut num = 0; + let now = std::time::Instant::now(); + + let stats = pending_stats.lock().await; + for item in stats.iter().rev() { + if now.duration_since(item.timestamp).as_secs() < seconds as u64 { + total += item.pending; + num += 1; + } else { + break; + } + } + + if num > 0 { + result = total / num; + } + + result +} +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use std::time::Duration; + + use tokio::time::sleep; + + use super::*; + + #[tokio::test] + async fn test_start_metrics_server() { + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); + let server = tokio::spawn(async move { + let result = start_metrics_http_server(addr).await; + assert!(result.is_ok()) + }); + + // Give the server a little bit of time to start + sleep(Duration::from_millis(100)).await; + + // Stop the server + server.abort(); + } +} diff --git a/serving/source-sink/src/shared.rs b/serving/source-sink/src/shared.rs new file mode 100644 index 0000000000..2c63244647 --- /dev/null +++ b/serving/source-sink/src/shared.rs @@ -0,0 +1,38 @@ +use std::path::PathBuf; + +use chrono::{DateTime, TimeZone, Timelike, Utc}; +use prost_types::Timestamp; +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint, Uri}; +use tower::service_fn; + +use crate::error::Error; + +pub(crate) fn utc_from_timestamp(t: Option) -> DateTime { + t.map_or(Utc.timestamp_nanos(-1), |t| { + DateTime::from_timestamp(t.seconds, t.nanos as u32).unwrap_or(Utc.timestamp_nanos(-1)) + }) +} + +pub(crate) fn prost_timestamp_from_utc(t: DateTime) -> Option { + Some(Timestamp { + seconds: t.timestamp(), + nanos: t.nanosecond() as i32, + }) +} + +pub(crate) async fn connect_with_uds(uds_path: PathBuf) -> Result { + let channel = Endpoint::try_from("http://[::]:50051") + .map_err(|e| Error::ConnectionError(format!("Failed to create endpoint: {:?}", e)))? + .connect_with_connector(service_fn(move |_: Uri| { + let uds_socket = uds_path.clone(); + async move { + Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new( + UnixStream::connect(uds_socket).await?, + )) + } + })) + .await + .map_err(|e| Error::ConnectionError(format!("Failed to connect: {:?}", e)))?; + Ok(channel) +} diff --git a/serving/source-sink/src/sink.rs b/serving/source-sink/src/sink.rs new file mode 100644 index 0000000000..e2801873df --- /dev/null +++ b/serving/source-sink/src/sink.rs @@ -0,0 +1,174 @@ +use tonic::transport::Channel; +use tonic::Request; + +use crate::error::Result; +use crate::message::Message; +use crate::shared::connect_with_uds; + +pub mod proto { + tonic::include_proto!("sink.v1"); +} + +const SINK_SOCKET: &str = "/var/run/numaflow/sink.sock"; +const SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info"; + +/// SinkConfig is the configuration for the sink server. +#[derive(Debug, Clone)] +pub struct SinkConfig { + pub socket_path: String, + pub server_info_file: String, + pub max_message_size: usize, +} + +impl Default for SinkConfig { + fn default() -> Self { + SinkConfig { + socket_path: SINK_SOCKET.to_string(), + server_info_file: SINK_SERVER_INFO_FILE.to_string(), + max_message_size: 64 * 1024 * 1024, // 64 MB + } + } +} + +/// SinkClient is a client to interact with the sink server. +pub struct SinkClient { + client: proto::sink_client::SinkClient, +} + +impl SinkClient { + pub(crate) async fn connect(config: SinkConfig) -> Result { + let channel = connect_with_uds(config.socket_path.into()).await?; + let client = proto::sink_client::SinkClient::new(channel) + .max_decoding_message_size(config.max_message_size) + .max_encoding_message_size(config.max_message_size); + Ok(Self { client }) + } + + pub(crate) async fn sink_fn(&mut self, messages: Vec) -> Result { + let requests: Vec = + messages.into_iter().map(|message| message.into()).collect(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + + tokio::spawn(async move { + for request in requests { + if tx.send(request).await.is_err() { + break; + } + } + }); + + // TODO: retry for response with failure status + let response = self + .client + .sink_fn(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await? + .into_inner(); + Ok(response) + } + + pub(crate) async fn is_ready(&mut self) -> Result { + let request = Request::new(()); + let response = self.client.is_ready(request).await?.into_inner(); + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use chrono::offset::Utc; + use log::info; + use numaflow::sink; + + use crate::message::Offset; + + use super::*; + + struct Logger; + #[tonic::async_trait] + impl sink::Sinker for Logger { + async fn sink( + &self, + mut input: tokio::sync::mpsc::Receiver, + ) -> Vec { + let mut responses: Vec = Vec::new(); + while let Some(datum) = input.recv().await { + let response = match std::str::from_utf8(&datum.value) { + Ok(v) => { + info!("{}", v); + sink::Response::ok(datum.id) + } + Err(e) => { + sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)) + } + }; + responses.push(response); + } + responses + } + } + #[tokio::test] + async fn sink_operations() { + // start the server + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let sock_file = tmp_dir.path().join("sink.sock"); + let server_info_file = tmp_dir.path().join("sink-server-info"); + + let server_info = server_info_file.clone(); + let server_socket = sock_file.clone(); + let server_handle = tokio::spawn(async move { + sink::Server::new(Logger) + .with_socket_file(server_socket) + .with_server_info_file(server_info) + .start_with_shutdown(shutdown_rx) + .await + .unwrap(); + }); + + // wait for the server to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let mut sink_client = SinkClient::connect(SinkConfig { + socket_path: sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }) + .await + .expect("failed to connect to sink server"); + + let messages = vec![ + Message { + keys: vec![], + value: b"Hello, World!".to_vec(), + offset: Offset { + offset: "1".to_string(), + partition_id: 0, + }, + event_time: Utc::now(), + headers: Default::default(), + }, + Message { + keys: vec![], + value: b"Hello, World!".to_vec(), + offset: Offset { + offset: "2".to_string(), + partition_id: 0, + }, + event_time: Utc::now(), + headers: Default::default(), + }, + ]; + + let ready_response = sink_client.is_ready().await.unwrap(); + assert_eq!(ready_response.ready, true); + + let response = sink_client.sink_fn(messages).await.unwrap(); + assert_eq!(response.results.len(), 2); + + shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + server_handle.await.expect("failed to join server task"); + } +} diff --git a/serving/source-sink/src/source.rs b/serving/source-sink/src/source.rs new file mode 100644 index 0000000000..3c164bb5e2 --- /dev/null +++ b/serving/source-sink/src/source.rs @@ -0,0 +1,249 @@ +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use tokio_stream::StreamExt; +use tonic::transport::Channel; +use tonic::Request; + +use crate::error::{Error, Result}; +use crate::message::{Message, Offset}; +use crate::shared::connect_with_uds; + +pub mod proto { + tonic::include_proto!("source.v1"); +} + +const SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock"; +const SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; + +/// SourceConfig is the configuration for the source server. +#[derive(Debug, Clone)] +pub struct SourceConfig { + pub socket_path: String, + pub server_info_file: String, + pub max_message_size: usize, +} + +impl Default for SourceConfig { + fn default() -> Self { + SourceConfig { + socket_path: SOURCE_SOCKET.to_string(), + server_info_file: SOURCE_SERVER_INFO_FILE.to_string(), + max_message_size: 64 * 1024 * 1024, // 64 MB + } + } +} + +/// SourceClient is a client to interact with the source server. +#[derive(Debug, Clone)] +pub(crate) struct SourceClient { + client: proto::source_client::SourceClient, +} + +impl SourceClient { + pub(crate) async fn connect(config: SourceConfig) -> Result { + let channel = connect_with_uds(config.socket_path.into()).await?; + let client = proto::source_client::SourceClient::new(channel) + .max_encoding_message_size(config.max_message_size) + .max_decoding_message_size(config.max_message_size); + + Ok(Self { client }) + } + + pub(crate) async fn read_fn( + &mut self, + num_records: u64, + timeout_in_ms: u32, + ) -> Result> { + let request = Request::new(proto::ReadRequest { + request: Some(proto::read_request::Request { + num_records, + timeout_in_ms, + }), + }); + + let mut stream = self.client.read_fn(request).await?.into_inner(); + let mut messages = Vec::with_capacity(num_records as usize); + + while let Some(response) = stream.next().await { + let result = response? + .result + .ok_or_else(|| Error::SourceError("Empty message".to_string()))?; + + messages.push(result.try_into()?); + } + + Ok(messages) + } + + pub(crate) async fn ack_fn(&mut self, offsets: Vec) -> Result { + let offsets = offsets + .into_iter() + .map(|offset| proto::Offset { + offset: BASE64_STANDARD + .decode(offset.offset) + .expect("we control the encoding, so this should never fail"), + partition_id: offset.partition_id, + }) + .collect(); + + let request = Request::new(proto::AckRequest { + request: Some(proto::ack_request::Request { offsets }), + }); + + Ok(self.client.ack_fn(request).await?.into_inner()) + } + + #[allow(dead_code)] + // TODO: remove dead_code + pub(crate) async fn pending_fn(&mut self) -> Result { + let request = Request::new(()); + let response = self + .client + .pending_fn(request) + .await? + .into_inner() + .result + .map_or(0, |r| r.count); + Ok(response) + } + + #[allow(dead_code)] + // TODO: remove dead_code + pub(crate) async fn partitions_fn(&mut self) -> Result> { + let request = Request::new(()); + let response = self.client.partitions_fn(request).await?.into_inner(); + Ok(response.result.map_or(vec![], |r| r.partitions)) + } + + pub(crate) async fn is_ready(&mut self) -> Result { + let request = Request::new(()); + let response = self.client.is_ready(request).await?.into_inner(); + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::error::Error; + + use chrono::Utc; + use numaflow::source; + use numaflow::source::{Message, Offset, SourceReadRequest}; + use tokio::sync::mpsc::Sender; + + use crate::source::{SourceClient, SourceConfig}; + + struct SimpleSource { + num: usize, + yet_to_ack: std::sync::RwLock>, + } + + impl SimpleSource { + fn new(num: usize) -> Self { + Self { + num, + yet_to_ack: std::sync::RwLock::new(HashSet::new()), + } + } + } + + #[tonic::async_trait] + impl source::Sourcer for SimpleSource { + async fn read(&self, request: SourceReadRequest, transmitter: Sender) { + let event_time = Utc::now(); + let mut message_offsets = Vec::with_capacity(request.count); + for i in 0..request.count { + let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i); + transmitter + .send(Message { + value: self.num.to_le_bytes().to_vec(), + event_time, + offset: Offset { + offset: offset.clone().into_bytes(), + partition_id: 0, + }, + keys: vec![], + headers: Default::default(), + }) + .await + .unwrap(); + message_offsets.push(offset) + } + self.yet_to_ack.write().unwrap().extend(message_offsets) + } + + async fn ack(&self, offsets: Vec) { + for offset in offsets { + self.yet_to_ack + .write() + .unwrap() + .remove(&String::from_utf8(offset.offset).unwrap()); + } + } + + async fn pending(&self) -> usize { + self.yet_to_ack.read().unwrap().len() + } + + async fn partitions(&self) -> Option> { + Some(vec![2]) + } + } + + #[tokio::test] + async fn source_operations() -> Result<(), Box> { + // start the server + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = tempfile::TempDir::new().unwrap(); + let sock_file = tmp_dir.path().join("source.sock"); + let server_info_file = tmp_dir.path().join("source-server-info"); + + let server_info = server_info_file.clone(); + let server_socket = sock_file.clone(); + let server_handle = tokio::spawn(async move { + source::Server::new(SimpleSource::new(10)) + .with_socket_file(server_socket) + .with_server_info_file(server_info) + .start_with_shutdown(shutdown_rx) + .await + .unwrap(); + }); + + // wait for the server to start + // TODO: flaky + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let mut source_client = SourceClient::connect(SourceConfig { + socket_path: sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }) + .await + .expect("failed to connect to source server"); + + let response = source_client.is_ready().await.unwrap(); + assert!(response.ready); + + let messages = source_client.read_fn(5, 1000).await.unwrap(); + assert_eq!(messages.len(), 5); + + let response = source_client + .ack_fn(messages.iter().map(|m| m.offset.clone()).collect()) + .await + .unwrap(); + assert!(response.result.unwrap().success.is_some()); + + let pending = source_client.pending_fn().await.unwrap(); + assert_eq!(pending, 0); + + let partitions = source_client.partitions_fn().await.unwrap(); + assert_eq!(partitions, vec![2]); + + shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + server_handle.await.expect("failed to join server task"); + Ok(()) + } +} diff --git a/serving/source-sink/src/transformer.rs b/serving/source-sink/src/transformer.rs new file mode 100644 index 0000000000..2bbca45bce --- /dev/null +++ b/serving/source-sink/src/transformer.rs @@ -0,0 +1,159 @@ +use tonic::transport::Channel; +use tonic::Request; + +use crate::error::Result; +use crate::message::Message; +use crate::shared::{connect_with_uds, utc_from_timestamp}; +use crate::transformer::proto::SourceTransformRequest; + +pub mod proto { + tonic::include_proto!("sourcetransformer.v1"); +} + +const TRANSFORMER_SOCKET: &str = "/var/run/numaflow/sourcetransform.sock"; +const TRANSFORMER_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info"; + +/// TransformerConfig is the configuration for the transformer server. +#[derive(Debug, Clone)] +pub struct TransformerConfig { + pub socket_path: String, + pub server_info_file: String, + pub max_message_size: usize, +} + +impl Default for TransformerConfig { + fn default() -> Self { + TransformerConfig { + socket_path: TRANSFORMER_SOCKET.to_string(), + server_info_file: TRANSFORMER_SERVER_INFO_FILE.to_string(), + max_message_size: 64 * 1024 * 1024, // 64 MB + } + } +} + +/// TransformerClient is a client to interact with the transformer server. +#[derive(Clone)] +pub struct TransformerClient { + client: proto::source_transform_client::SourceTransformClient, +} + +impl TransformerClient { + pub(crate) async fn connect(config: TransformerConfig) -> Result { + let channel = connect_with_uds(config.socket_path.into()).await?; + let client = proto::source_transform_client::SourceTransformClient::new(channel) + .max_decoding_message_size(config.max_message_size) + .max_encoding_message_size(config.max_message_size); + Ok(Self { client }) + } + + pub(crate) async fn transform_fn(&mut self, message: Message) -> Result> { + // fields which will not be changed + let offset = message.offset.clone(); + let headers = message.headers.clone(); + + // TODO: is this complex? the reason to do this is, tomorrow when we have the normal + // Pipeline CRD, we can require the Into trait. + let response = self + .client + .source_transform_fn(>::into(message)) + .await? + .into_inner(); + + let mut messages = Vec::new(); + for result in response.results { + let message = Message { + keys: result.keys, + value: result.value, + offset: offset.clone(), + event_time: utc_from_timestamp(result.event_time), + headers: headers.clone(), + }; + messages.push(message); + } + + Ok(messages) + } + + pub(crate) async fn is_ready(&mut self) -> Result { + let request = Request::new(()); + let response = self.client.is_ready(request).await?.into_inner(); + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use std::error::Error; + + use numaflow::sourcetransform; + use tempfile::TempDir; + + use crate::transformer::{TransformerClient, TransformerConfig}; + + struct NowCat; + + #[tonic::async_trait] + impl sourcetransform::SourceTransformer for NowCat { + async fn transform( + &self, + input: sourcetransform::SourceTransformRequest, + ) -> Vec { + let message = sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) + .keys(input.keys) + .tags(vec![]); + vec![message] + } + } + + #[tokio::test] + async fn transformer_operations() -> Result<(), Box> { + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + let tmp_dir = TempDir::new()?; + let sock_file = tmp_dir.path().join("sourcetransform.sock"); + let server_info_file = tmp_dir.path().join("sourcetransformer-server-info"); + + let server_info = server_info_file.clone(); + let server_socket = sock_file.clone(); + let handle = tokio::spawn(async move { + sourcetransform::Server::new(NowCat) + .with_socket_file(server_socket) + .with_server_info_file(server_info) + .start_with_shutdown(shutdown_rx) + .await + .expect("server failed"); + }); + + // wait for the server to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let mut client = TransformerClient::connect(TransformerConfig { + socket_path: sock_file.to_str().unwrap().to_string(), + server_info_file: server_info_file.to_str().unwrap().to_string(), + max_message_size: 4 * 1024 * 1024, + }) + .await?; + + let message = crate::message::Message { + keys: vec!["first".into(), "second".into()], + value: "hello".into(), + offset: crate::message::Offset { + partition_id: 0, + offset: "0".into(), + }, + event_time: chrono::Utc::now(), + headers: Default::default(), + }; + + let resp = client.is_ready().await?; + assert_eq!(resp.ready, true); + + let resp = client.transform_fn(message).await?; + assert_eq!(resp.len(), 1); + + shutdown_tx + .send(()) + .expect("failed to send shutdown signal"); + handle.await.expect("failed to join server task"); + Ok(()) + } +}