Skip to content

Commit

Permalink
Merge branch 'main' into pipeline-watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
veds-g authored Jan 30, 2023
2 parents c5aee7c + 4cf74f9 commit b82730a
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 85 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 7
max-parallel: 6
matrix:
driver: [jetstream]
case: [e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, transformer-e2e]
case: [e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
4 changes: 2 additions & 2 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ In this example, there are five vertices in a pipeline. An [HTTP](./user-guide/s
Create the `even-odd` pipeline.

```shell
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/stable/test/e2e/testdata/even-odd.yaml
kubectl apply -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/2-even-odd-pipeline.yaml

# Wait for pods to be ready
kubectl get pods
Expand Down Expand Up @@ -119,7 +119,7 @@ The source code of the `even-odd` [User Defined Function](user-guide/user-define
The pipeline can be deleted by

```shell
kubectl delete -f https://raw.githubusercontent.com/numaproj/numaflow/stable/test/e2e/testdata/even-odd.yaml
kubectl delete -f https://raw.githubusercontent.com/numaproj/numaflow/main/examples/2-even-odd-pipeline.yaml
```

## A pipeline with reduce (aggregation)
Expand Down
43 changes: 43 additions & 0 deletions examples/2-even-odd-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: even-odd
spec:
vertices:
- name: in
source:
http: {}
- name: even-or-odd
udf:
container:
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/even_odd
image: quay.io/numaio/numaflow-go/map-even-odd
- name: even-sink
sink:
# A simple log printing sink
log: {}
- name: odd-sink
sink:
log: {}
- name: number-sink
sink:
log: {}
edges:
- from: in
to: even-or-odd
- from: even-or-odd
to: even-sink
conditions:
keyIn:
- even
- from: even-or-odd
to: odd-sink
conditions:
keyIn:
- odd
- from: even-or-odd
to: number-sink
conditions:
keyIn:
- even
- odd
25 changes: 24 additions & 1 deletion test/fixtures/e2eapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"log"
"net/http"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/wait"
)

func InvokeE2EAPI(format string, args ...interface{}) string {
Expand Down Expand Up @@ -51,7 +54,27 @@ func InvokeE2EAPI(format string, args ...interface{}) string {

func InvokeE2EAPIPOST(format string, body string, args ...interface{}) string {
url := "http://127.0.0.1:8378" + fmt.Sprintf(format, args...)
resp, err := http.Post(url, "application/json", strings.NewReader(body))

var err error
var resp *http.Response
// Invoking POST can fail due to "500 Internal Server Error". It's because the server is still booting up and not ready to serve requests.
// To prevent such issue, we apply retry strategy.
// 3 attempts with 5 second fixed wait time are tested sufficient for it.
var retryBackOff = wait.Backoff{
Factor: 1,
Jitter: 0,
Steps: 3,
Duration: time.Second * 5,
}
_ = wait.ExponentialBackoff(retryBackOff, func() (done bool, err error) {
resp, err = http.Post(url, "application/json", strings.NewReader(body))
if err == nil && resp.StatusCode < 300 {
return true, nil
}
fmt.Printf("Got error %v, response %v, retrying.\n", err, *resp)
return false, nil
})

if err != nil {
panic(err)
}
Expand Down
44 changes: 43 additions & 1 deletion test/sdks-e2e/sdks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package sdks_e2e

import (
"context"
"fmt"
"strconv"
"testing"
"time"

. "github.com/numaproj/numaflow/test/fixtures"
"github.com/stretchr/testify/suite"

. "github.com/numaproj/numaflow/test/fixtures"
)

type SDKsSuite struct {
Expand Down Expand Up @@ -92,6 +94,46 @@ func (s *SDKsSuite) TestReduceSDK() {
done <- struct{}{}
}

func (s *SDKsSuite) TestSourceTransformer() {
w := s.Given().Pipeline("@testdata/event-time-filter.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "event-time-filter"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

eventTimeBefore2022_1 := strconv.FormatInt(time.Date(2021, 4, 2, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeBefore2022_2 := strconv.FormatInt(time.Date(1998, 4, 2, 8, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeBefore2022_3 := strconv.FormatInt(time.Date(2013, 4, 4, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)

eventTimeAfter2022_1 := strconv.FormatInt(time.Date(2023, 4, 2, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeAfter2022_2 := strconv.FormatInt(time.Date(2026, 4, 2, 3, 4, 5, 2, time.UTC).UnixMilli(), 10)

eventTimeWithin2022_1 := strconv.FormatInt(time.Date(2022, 4, 2, 3, 4, 5, 2, time.UTC).UnixMilli(), 10)

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_1)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_2)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_3)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("After2022")).WithHeader("X-Numaflow-Event-Time", eventTimeAfter2022_1)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("After2022")).WithHeader("X-Numaflow-Event-Time", eventTimeAfter2022_2)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Within2022")).WithHeader("X-Numaflow-Event-Time", eventTimeWithin2022_1))

janFirst2022 := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
janFirst2023 := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)

w.Expect().VertexPodLogContains("sink-within-2022", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli()), PodLogCheckOptionWithCount(1)).
VertexPodLogContains("sink-after-2022", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli()), PodLogCheckOptionWithCount(2)).
VertexPodLogContains("sink-all", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli()), PodLogCheckOptionWithCount(1)).
VertexPodLogContains("sink-all", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli()), PodLogCheckOptionWithCount(2)).
VertexPodLogNotContains("sink-within-2022", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli()), PodLogCheckOptionWithTimeout(1*time.Second)).
VertexPodLogNotContains("sink-after-2022", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli()), PodLogCheckOptionWithTimeout(1*time.Second)).
VertexPodLogNotContains("sink-all", "Before2022", PodLogCheckOptionWithTimeout(1*time.Second)).
VertexPodLogNotContains("sink-within-2022", "Before2022", PodLogCheckOptionWithTimeout(1*time.Second)).
VertexPodLogNotContains("sink-after-2022", "Before2022", PodLogCheckOptionWithTimeout(1*time.Second))
}

func TestHTTPSuite(t *testing.T) {
suite.Run(t, new(SDKsSuite))
}
79 changes: 0 additions & 79 deletions test/transformer-e2e/transformer_test.go

This file was deleted.

0 comments on commit b82730a

Please sign in to comment.