Skip to content

Commit

Permalink
Merge branch 'main' into pipeline-watermark
Browse files Browse the repository at this point in the history
Signed-off-by: veds-g <[email protected]>
  • Loading branch information
veds-g committed Jan 23, 2023
2 parents 45bdcae + afa5c76 commit a7ea0a5
Show file tree
Hide file tree
Showing 82 changed files with 1,193 additions and 677 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# Changelog

## v0.7.0 (2023-01-13)

* [734e5d3](https://github.com/numaproj/numaflow/commit/734e5d3b44dee2ef690c9a1fe4d9d1ecb092a16c) Update manifests to v0.7.0
* [5d6c533](https://github.com/numaproj/numaflow/commit/5d6c53369a6ea8da5f9f5036fce9aa81f6308fbf) fix: JetStream context KV store/watch fix (#460)
* [d6152e7](https://github.com/numaproj/numaflow/commit/d6152e772a03e646f2841a34497d498e4c2234c3) doc: reduce persistent store (#458)
* [ac77656](https://github.com/numaproj/numaflow/commit/ac77656dec8164fc162b8339bfff8138d17f73b0) doc: reduce documentation (#448)
* [257356a](https://github.com/numaproj/numaflow/commit/257356af0c932a9c7e84573c9f163e37a3e06dc4) chore(deps): bump json5 from 1.0.1 to 1.0.2 in /ui (#454)
* [7752db4](https://github.com/numaproj/numaflow/commit/7752db4b7d4233e2a691c0d1cc9ef2348dc75ab5) refactor: simplify http request construction in test cases (#444)
* [1a10af4](https://github.com/numaproj/numaflow/commit/1a10af4c20f051e46c063f9d946a39c208b6ec60) refactor: use exact matching instead of regex to perform e2e data validation. (#443)
* [2777e27](https://github.com/numaproj/numaflow/commit/2777e27ac0cdfcc954bf5e453b92b7f4e8a5c201) doc: windowing fixed and sliding (#439)
* [70fc008](https://github.com/numaproj/numaflow/commit/70fc008ffb0016a7310612d7cac191920207d0a6) refactor: move redis sink resources creation to E2ESuite (#437)
* [6c078b4](https://github.com/numaproj/numaflow/commit/6c078b42046b4733f702b3fbb585578d6304dafb) refactor: a prototype for enhancing E2E test framework (#424)
* [e7021c9](https://github.com/numaproj/numaflow/commit/e7021c9ae668724c11ac81fb49527ae8ce0f9240) feat: pipeline watermark (#416)

### Contributors

* Derek Wang
* Juanlu Yu
* Keran Yang
* Vedant Gupta
* Vigith Maurice
* dependabot[bot]

## v0.7.0-rc1 (2022-12-16)

* [71887db](https://github.com/numaproj/numaflow/commit/71887db5cce231b9b0a236f8f00ddeb0d40ac01a) Update manifests to v0.7.0-rc1
Expand Down
14 changes: 8 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
ARG BASE_IMAGE=scratch
ARG ARCH=$TARGETARCH
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
FROM alpine:3.17 as base
ARG ARCH
RUN apk update && apk upgrade && \
apk add ca-certificates && \
Expand All @@ -15,8 +16,8 @@ RUN chmod +x /bin/numaflow
####################################################################################################
# numaflow
####################################################################################################
FROM scratch as numaflow
ARG ARCH
ARG BASE_IMAGE
FROM ${BASE_IMAGE} as numaflow
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/numaflow /bin/numaflow
Expand All @@ -26,16 +27,17 @@ ENTRYPOINT [ "/bin/numaflow" ]
####################################################################################################
# testbase
####################################################################################################
FROM alpine:3.12.3 as testbase
ARG ARCH
FROM alpine:3.17 as testbase
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/e2eapi /bin/e2eapi
RUN chmod +x /bin/e2eapi

####################################################################################################
# testapi
####################################################################################################
FROM scratch AS e2eapi
ARG ARCH
COPY --from=testbase /bin/e2eapi .
ENTRYPOINT ["/e2eapi"]
12 changes: 4 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ CURRENT_DIR=$(shell pwd)
DIST_DIR=${CURRENT_DIR}/dist
BINARY_NAME:=numaflow
DOCKERFILE:=Dockerfile
DEV_BASE_IMAGE:=alpine:3.17
RELEASE_BASE_IMAGE:=scratch

BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ')
GIT_COMMIT=$(shell git rev-parse HEAD)
Expand Down Expand Up @@ -145,20 +147,14 @@ ui-test: ui-build

.PHONY: image
image: clean ui-build dist/$(BINARY_NAME)-linux-amd64
DOCKER_BUILDKIT=1 $(DOCKER) build --build-arg "ARCH=amd64" -t $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --target $(BINARY_NAME) -f $(DOCKERFILE) .
DOCKER_BUILDKIT=1 $(DOCKER) build --build-arg "ARCH=amd64" --build-arg "BASE_IMAGE=$(DEV_BASE_IMAGE)" -t $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --target $(BINARY_NAME) -f $(DOCKERFILE) .
@if [ "$(DOCKER_PUSH)" = "true" ]; then $(DOCKER) push $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION); fi
ifdef IMAGE_IMPORT_CMD
$(IMAGE_IMPORT_CMD) $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION)
endif

image-linux-%: dist/$(BINARY_NAME)-linux-$*
DOCKER_BUILDKIT=1 $(DOCKER) build --build-arg "ARCH=$*" -t $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION)-linux-$* --platform "linux/$*" --target $(BINARY_NAME) -f $(DOCKERFILE) .
@if [ "$(DOCKER_PUSH)" = "true" ]; then $(DOCKER) push $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION)-linux-$*; fi


image-multi: ui-build set-qemu dist/$(BINARY_NAME)-linux-arm64.gz dist/$(BINARY_NAME)-linux-amd64.gz
$(DOCKER) buildx build --tag $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --target $(BINARY_NAME) --platform linux/amd64,linux/arm64 --file ./Dockerfile ${PUSH_OPTION} .

$(DOCKER) buildx build --sbom=false --provenance=false --build-arg "BASE_IMAGE=$(RELEASE_BASE_IMAGE)" --tag $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --target $(BINARY_NAME) --platform linux/amd64,linux/arm64 --file ./Dockerfile ${PUSH_OPTION} .

set-qemu:
$(DOCKER) pull tonistiigi/binfmt:latest
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ full-featured stream processing platforms.

## Roadmap

- Data aggregation (e.g. group-by)
- User Defined Transformer at Source for better deserialization and filter for cost reduction (v0.8)
- Multi partitioned edges for higher throughput (v0.9)

## Resources

Expand Down
3 changes: 2 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ stream processing platforms.

## Roadmap

- Data aggregation (e.g. group-by)
- User Defined Transformer at Source for better deserialization and filter for cost reduction (v0.8)
- Multi partitioned edges for higher throughput (v0.9)

## Getting Started

Expand Down
9 changes: 8 additions & 1 deletion docs/development/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ spec:
## Profiling
Setting `NUMAFLOW_DEBUG` to `true` also enables `pprof` in the Vertex Pod.
If your pipeline is running with `NUMAFLOW_DEBUG` then `pprof` is enabled in the Vertex Pod. You
can also enable just `pprof` by setting `NUMAFLOW_PPROF` to `true`.

For example, run the commands like below to profile memory usage for a Vertex Pod, a web page displaying the memory information will be automatically opened.

Expand All @@ -44,3 +45,9 @@ kubectl port-forward simple-pipeline-p1-0-7jzbn 2469
go tool pprof -http localhost:8081 https+insecure://localhost:2469/debug/pprof/heap
```

## Debug Inside the Container

When doing local [development](development.md) using command lines such as `make start`, or `make image`, the built `numaflow` docker image is based on `alpine`, which allows you to execute into the container for debugging with `kubectl exec -it {pod-name} -c {container-name} -- sh`.

This is not allowed when running pipelines with official released images, as they are based on `scratch`.
8 changes: 7 additions & 1 deletion docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ The pipeline can be deleted by
kubectl delete -f https://raw.githubusercontent.com/numaproj/numaflow/stable/test/e2e/testdata/even-odd.yaml
```

## A pipeline with reduce (aggregation)

[Reduce Examples](user-guide/user-defined-functions/reduce/examples.md)

## What's Next

Try more examples in the [`examples`](https://github.com/numaproj/numaflow/tree/main/examples) directory.

After exploring how Numaflow pipeline run, you can check what data [Sources](./user-guide/sources/generator.md) and [Sinks](./user-guide/sinks/kafka.md) Numaflow supports out of the box, or learn how to write [User Defined Functions](user-guide/user-defined-functions/map/map.md).
After exploring how Numaflow pipeline run, you can check what data [Sources](./user-guide/sources/generator.md)
and [Sinks](./user-guide/sinks/kafka.md) Numaflow supports out of the box, or learn how to write
[User Defined Functions](user-guide/user-defined-functions/user-defined-functions.md).
144 changes: 144 additions & 0 deletions docs/user-guide/user-defined-functions/reduce/examples.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Reduce Examples

Please read [reduce](./reduce.md) to get the best out of these examples.

## Prerequisites

Install the ISB

```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/stable/examples/0-isbsvc-jetstream.yaml
```

## sum pipeline using fixed window
This is a simple reduce pipeline that just does summation (sum of numbers) but uses fixed window.
The snippet for the reduce vertex is as follows.

![plot](../../../assets/simple-reduce.png)

```yaml
- name: compute-sum
udf:
container:
# compute the sum
image: quay.io/numaio/numaflow-go/reduce-sum
groupBy:
window:
fixed:
length: 60s
keyed: true
```
[6-reduce-fixed-window.yaml](https://github.com/numaproj/numaflow/blob/main/examples/6-reduce-fixed-window.yaml)
has the complete pipeline definition.
In this example we use a `parallelism` of `2`. We are setting a parallelism > 1 because it is a
keyed window.

```yaml
- from: atoi
to: compute-sum
parallelism: 2
```

```shell
kubectl apply -f https://github.com/numaproj/numaflow/blob/main/examples/examples/6-reduce-fixed-window.yaml
```

Output :
```text
2023/01/05 11:54:41 (sink) Payload - 300 Key - odd Start - 60000 End - 120000
2023/01/05 11:54:41 (sink) Payload - 600 Key - even Start - 60000 End - 120000
2023/01/05 11:54:41 (sink) Payload - 300 Key - odd Start - 120000 End - 180000
2023/01/05 11:54:41 (sink) Payload - 600 Key - even Start - 120000 End - 180000
2023/01/05 11:54:42 (sink) Payload - 600 Key - even Start - 180000 End - 240000
2023/01/05 11:54:42 (sink) Payload - 300 Key - odd Start - 180000 End - 240000
```

In our example, input is an HTTP source producing 2 messages each second with values 5 and 10,
and the event time starts from 60000. Since we have considered a fixed window of length 60s,
and also we are producing two messages with different keys "even" and "odd". Numaflow will create
two different windows with a start time of 60000 and an end time of 120000. So the output will be
300(5 * 60) and 600(10 * 60).

If we had used a non keyed window (`keyed: false`), we would have seen one single output with value
of 900(300 of odd + 600 of even) for each window.

## sum pipeline using sliding window
This is a simple reduce pipeline that just does summation (sum of numbers) but uses sliding window.
The snippet for the reduce vertex is as follows.

![plot](../../../assets/simple-reduce.png)

```yaml
- name: compute-sum
udf:
container:
# compute the sum
image: quay.io/numaio/numaflow-go/reduce-sum
groupBy:
window:
sliding:
length: 10s
slide: 5s
keyed: true
```

[7-reduce-sliding-window.yaml](https://github.com/numaproj/numaflow/blob/main/examples/examples/7-reduce-sliding-window.yaml)
has the complete pipeline definition

```shell
kubectl apply -f https://github.com/numaproj/numaflow/blob/main/examples/examples/7-reduce-sliding-window.yaml
```
Output:
```text
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 60000 End - 120000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 60000 End - 120000
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 70000 End - 130000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 700000 End - 1300000
2023/01/05 15:13:16 (sink) Payload - 300 Key - odd Start - 80000 End - 140000
2023/01/05 15:13:16 (sink) Payload - 600 Key - even Start - 80000 End - 140000
```

In our example, input is an HTTP source producing 2 messages each second with values 5 and 10,
and the event time starts from 60000. Since we have considered a sliding window of length 60s
and slide 10s, and also we are producing two messages with different keys "even" and "odd".
Numaflow will create two different windows with a start time of 60000 and an end time of 120000,
and because the slide duration is 10s, a next set of windows will be created with start time of
70000 and an end time of 130000. Since its a sum operation the output will be 300(5 * 60) and 600(10 * 60).

`Payload - 50 Key - odd Start - 10000 End - 70000`, we see 50 here for odd because the
first window has only 10 elements

## complex reduce pipeline

In the complex reduce example, we will
* chain of reduce functions
* use both fixed and sliding windows
* use keyed and non-keyed windowing

![plot](../../../assets/complex-reduce.png)

[8-reduce-complex-pipeline.yaml](https://github.com/numaproj/numaflow/blob/main/examples/examples/8-reduce-complex-pipeline.yaml)
has the complete pipeline definition

```shell
kubectl apply -f https://github.com/numaproj/numaflow/blob/main/examples/examples/8-reduce-complex-pipeline.yaml
```

Output:
```text
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 80000 End - 140000
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 90000 End - 150000
2023/01/05 15:33:55 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 100000 End - 160000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 110000 End - 170000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 120000 End - 180000
2023/01/05 15:33:56 (sink) Payload - 900 Key - NON_KEYED_STREAM Start - 130000 End - 190000
```

In our example, first we have the reduce vertex with a fixed window of duration 5s. Since the input is 5
and 10, the output from the first reduce vertex will be 25 (5 * 5) and 50 (5 * 10). This will be passed
to the next non-keyed reduce vertex with the fixed window duration of 10s. This being a non-keyed, it will
combine the inputs and produce the output of 150(25 * 2 + 50 * 2), which will be passed to the reduce
vertex with a sliding window of duration 60s and with the slide duration of 10s. Hence the final output
will be 900(150 * 6).
Loading

0 comments on commit a7ea0a5

Please sign in to comment.