Skip to content

Commit

Permalink
feat: serving and tracking endpoint for Numaflow (#1765)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigith authored Jun 21, 2024
1 parent 8da7c22 commit 1fc41e9
Show file tree
Hide file tree
Showing 30 changed files with 3,673 additions and 0 deletions.
4 changes: 4 additions & 0 deletions serving/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Dockerfile
coverage.out
target/
debug/
2 changes: 2 additions & 0 deletions serving/.rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
edition = "2021"
indent_style = "Block"
31 changes: 31 additions & 0 deletions serving/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
workspace = { members = ["extras/upstreams", "servesink"] }
[package]
name = "serve"
version = "0.1.0"
edition = "2021"

[features]
redis-tests = []
nats-tests = []
all-tests = ["redis-tests", "nats-tests"]

[dependencies]
async-nats = "0.35.1"
axum = "0.7.5"
axum-macros = "0.4.1"
hyper-util = { version = "0.1.3", features = ["client-legacy"] }
metrics = { version = "0.23.0", default-features = false }
metrics-exporter-prometheus = { version = "0.15.0", default-features = false }
serde = { version = "1.0.201", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.36.0", features = ["full"] }
tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["trace", "timeout"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.8.0", features = ["v4"] }
tempfile = "3.10.1"
redis = { version = "0.25.3", features = ["tokio-comp", "aio", "connection-manager"] }
config = "0.14.0"
trait-variant = "0.1.2"
chrono = { version = "0.4", features = ["serde"] }
39 changes: 39 additions & 0 deletions serving/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM rust:1.79-bookworm as build

# For faster/easier installation of Rust binaries
RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash

RUN apt-get update \
&& apt-get install -y protobuf-compiler

# Compile and cache our dependencies in a layer
RUN cargo new /serve
WORKDIR /serve
COPY Cargo.toml .
RUN cargo new extras/upstreams
COPY extras/upstreams/Cargo.toml extras/upstreams/Cargo.toml
RUN cargo new servesink
COPY servesink/Cargo.toml servesink/Cargo.toml

RUN --mount=type=cache,target=/usr/local/cargo/registry cargo build --release

COPY ./ /serve
# update timestamps to force a new build
RUN touch src/main.rs servesink/src/main.rs extras/upstreams/src/main.rs

RUN --mount=type=cache,target=/usr/local/cargo/registry cargo build --release

### Final

FROM debian:bookworm

USER root

RUN apt-get update \
&& apt-get install -y openssl

COPY --from=build /serve/target/release/serve .
COPY ./config config


ENTRYPOINT ["./serve"]
17 changes: 17 additions & 0 deletions serving/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Numaserve

Numaserve is the serving/HTTP endpoint for [Numaflow](https://numaflow.numaproj.io/) which is a distributed scalable general-purpose
async processing platform.

## Problem

[Numaflow](https://numaflow.numaproj.io/) being a DCG (Directed Compute Graph), there is no direct way to interact via request/response
protocol (in other words, the Source of Numaflow is decoupled with the Sink). There are a couple of use-cases where request/response semantics
are required. E.g.,

* Response with the status of completion (ability to say whether the processing is complete)
* Response with output of a complex computation over the DCG directly to the callee (say, something like [kserve](https://kserve.github.io/website/latest/)).

## Demo

Please try out [our example](./example/README.md)
15 changes: 15 additions & 0 deletions serving/config/default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
tid_header = "ID"
app_listen_port = 3000
metrics_server_listen_port = 3001
upstream_addr = "localhost:8888"
drain_timeout_secs = 10
host_ip = "localhost"
pipeline_spec_path = "./config/pipeline_spec.json"

[jetstream]
stream = "default"
addr = "localhost:4222"

[redis]
addr = "redis://127.0.0.1/"
max_tasks = 50
4 changes: 4 additions & 0 deletions serving/config/jetstream.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
jetstream: {
max_mem_store: 1MiB,
max_file_store: 1GiB
}
17 changes: 17 additions & 0 deletions serving/config/pipeline_spec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"vertices": [
"in",
"cat",
"out"
],
"edges": [
{
"from": "in",
"to": "cat"
},
{
"from": "cat",
"to": "out"
}
]
}
20 changes: 20 additions & 0 deletions serving/extras/upstreams/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "upstreams"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = "0.7.5"
axum-macros = "0.4.1"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.36.0", features = ["full"] }
tower-http = { version = "0.5.2", features = ["trace"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dev-dependencies]
http-body-util = "0.1.1"
tower = "0.4.13"
94 changes: 94 additions & 0 deletions serving/extras/upstreams/src/bin/simple_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::time::Duration;

use axum::{response::IntoResponse, routing::get, Router};
use axum_macros::debug_handler;
use tokio::{net::TcpListener, time::sleep};
use tower_http::trace::TraceLayer;
use tracing::{debug, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "simple_proxy=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

let router = app();
let router = router.layer(TraceLayer::new_for_http());

let listener = TcpListener::bind("localhost:8888").await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, router).await.unwrap();
}

fn app() -> Router {
Router::new()
.route("/fast", get(|| async {}))
.route(
"/slow",
get(|| async {
debug!("sleeping");
sleep(Duration::from_secs(1)).await
}),
)
.route("/", get(root_handler))
}

#[debug_handler]
async fn root_handler() -> impl IntoResponse {
"ok"
}

#[cfg(test)]
mod tests {

// inspired from: https://github.com/tokio-rs/axum/blob/main/examples/testing/src/main.rs

use axum::{
body::Body,
http::{Request, StatusCode},
};
use http_body_util::BodyExt;
use tower::ServiceExt;

use super::*;

#[tokio::test]
async fn fast() {
let app = app();

let request = Request::builder().uri("/fast").body(Body::empty()).unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
}

#[tokio::test]
async fn slow() {
let app = app();

let request = Request::builder().uri("/slow").body(Body::empty()).unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
}

#[tokio::test]
async fn root() {
let app = app();

let request = Request::builder().uri("/").body(Body::empty()).unwrap();

let response = app.oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);

let body = response.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b"ok");
}
}
1 change: 1 addition & 0 deletions serving/servesink/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
16 changes: 16 additions & 0 deletions serving/servesink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "servesink"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "servesink"
path = "src/main.rs"

[dependencies]
tonic = "0.11"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
reqwest = "0.12.4"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
27 changes: 27 additions & 0 deletions serving/servesink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM rust:1.79 as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y

# create a new empty shell project
WORKDIR /servesink

# copy your source tree
COPY src ./src

COPY ./Cargo.toml ./Cargo.toml

# build for release
RUN cargo build --release

# our final base
FROM debian:bookworm

RUN apt-get update \
&& apt-get install -y openssl

# copy the build artifact from the build stage
COPY --from=build /servesink/target/release/servesink .

# set the startup command to run your binary
CMD ["./servesink"]
73 changes: 73 additions & 0 deletions serving/servesink/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::error::Error;

use numaflow::sink::{self, Response, SinkRequest};
use reqwest::Client;
use tracing::{error, warn};
use tracing_subscriber::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "servesink=debug".into()),
)
.with(tracing_subscriber::fmt::layer().with_ansi(false))
.init();

sink::Server::new(Logger::new()).start().await
}

struct Logger {
client: Client,
}

impl Logger {
fn new() -> Self {
Self {
client: Client::new(),
}
}
}

#[tonic::async_trait]
impl sink::Sinker for Logger {
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();

while let Some(datum) = input.recv().await {
// do something better, but for now let's just log it.
// please note that `from_utf8` is working because the input in this
// example uses utf-8 data.
let response = match std::str::from_utf8(&datum.value) {
Ok(_v) => {
// record the response
Response::ok(datum.id)
}
Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
};
// return the responses
responses.push(response);
let Some(url) = datum.headers.get("X-Numaflow-Callback-Url") else {
warn!("X-Numaflow-Callback-Url header is not found in the payload");
continue;
};
let Some(numaflow_id) = datum.headers.get("X-Numaflow-Id") else {
warn!("X-Numaflow-Id header is not found in the payload");
continue;
};
let resp = self
.client
.post(format!("{}_{}", url, "save"))
.header("X-Numaflow-Id", numaflow_id)
.header("id", numaflow_id)
.body(datum.value)
.send()
.await;
if let Err(e) = resp {
error!(error=?e, url=url, "Sending result to numaserve")
}
}
responses
}
}
Loading

0 comments on commit 1fc41e9

Please sign in to comment.