Skip to content

Commit

Permalink
feat: source to sink with an optional transformer without ISB (numapr…
Browse files Browse the repository at this point in the history
…oj#1904)

Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
Co-authored-by: Sidhant Kohli <[email protected]>
  • Loading branch information
3 people authored and Saniya Kalamkar committed Aug 14, 2024
1 parent 0df21eb commit 7971f22
Show file tree
Hide file tree
Showing 27 changed files with 2,646 additions and 20 deletions.
20 changes: 17 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ FROM rust:1.79-bookworm as extension-base

RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash

RUN apt-get update
RUN apt-get install protobuf-compiler -y

RUN cargo new serve
# Create a new empty shell project
WORKDIR /serve
Expand All @@ -32,6 +35,9 @@ COPY ./serving/backoff/Cargo.toml ./backoff/
RUN cargo new numaflow-models
COPY ./serving/numaflow-models/Cargo.toml ./numaflow-models/

RUN cargo new source-sink
COPY ./serving/source-sink/Cargo.toml ./source-sink/

# Copy all Cargo.toml and Cargo.lock files for caching dependencies
COPY ./serving/Cargo.toml ./serving/Cargo.lock ./

Expand All @@ -44,21 +50,29 @@ COPY ./serving/servesink/src ./servesink/src
COPY ./serving/extras/upstreams/src ./extras/upstreams/src
COPY ./serving/backoff/src ./backoff/src
COPY ./serving/numaflow-models/src ./numaflow-models/src
COPY ./serving/source-sink/src ./source-sink/src
COPY ./serving/source-sink/build.rs ./source-sink/build.rs
COPY ./serving/source-sink/proto ./source-sink/proto

# Build the real binaries
RUN touch src/main.rs servesink/main.rs extras/upstreams/main.rs numaflow-models/main.rs && \
cargo build --release
RUN touch src/main.rs servesink/src/main.rs numaflow-models/src/main.rs source-sink/src/main.rs && \
cargo build --workspace --all --release

####################################################################################################
# numaflow
####################################################################################################
ARG BASE_IMAGE
FROM ${BASE_IMAGE} as numaflow
FROM debian:bookworm as numaflow

# Install necessary libraries
RUN apt-get update && apt-get install -y libssl3

COPY --from=base /bin/numaflow /bin/numaflow
COPY ui/build /ui/build

COPY --from=extension-base /serve/target/release/serve /bin/serve
COPY --from=extension-base /serve/target/release/sourcer-sinker /bin/sourcer-sinker

COPY ./serving/config config

ENTRYPOINT [ "/bin/numaflow" ]
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ DIST_DIR=${CURRENT_DIR}/dist
BINARY_NAME:=numaflow
DOCKERFILE:=Dockerfile
DEV_BASE_IMAGE:=debian:bookworm
RELEASE_BASE_IMAGE:=gcr.io/distroless/cc-debian12
RELEASE_BASE_IMAGE:=debian:bookworm

BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ')
GIT_COMMIT=$(shell git rev-parse HEAD)
Expand Down
2 changes: 1 addition & 1 deletion examples/21-simple-mono-vertex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ spec:
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:stable
image: quay.io/numaio/numaflow-java/simple-sink:stable
3 changes: 2 additions & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec {

func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container {
mainContainer := containerBuilder{}.
init(req).command("/bin/serve").build() // TODO: command
init(req).command(MonoVertexBinary).build()

containers := []corev1.Container{mainContainer}
if mvspec.Source.UDSource != nil { // Only support UDSource for now.
containers = append(containers, mvspec.Source.getUDSourceContainer(req))
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
)

const ServingBinary = "/bin/serve"
const MonoVertexBinary = "/bin/sourcer-sinker"

// +genclient
// +kubebuilder:object:root=true
Expand Down
1 change: 0 additions & 1 deletion pkg/metrics/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func NewMetricsServer(vertex *dfv1.Vertex, opts ...Option) *metricsServer {
if m.lagReaders != nil {
for partitionName := range m.lagReaders {
m.partitionPendingInfo[partitionName] = sharedqueue.New[timestampedPending](1800)

}
}
return m
Expand Down
5 changes: 0 additions & 5 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,6 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
opts = append(opts, forward.WithUDFUnaryMap(mapHandler))
}

// We can have the vertex running only of the map modes
if enableMapUdfStream && enableBatchMapUdf {
return fmt.Errorf("vertex cannot have both map stream and batch map modes enabled")
}

for index, bufferPartition := range fromBuffer {
// Populate shuffle function map
shuffleFuncMap := make(map[string]*shuffle.Shuffle)
Expand Down
Loading

0 comments on commit 7971f22

Please sign in to comment.