Skip to content

Commit

Permalink
fix: e2e testing for PBQ WAL with reduce pipeline (#393)
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Hao <[email protected]>
  • Loading branch information
xdevxy authored and whynowy committed Dec 7, 2022
1 parent f760b1b commit e6e24ee
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 3 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 4
max-parallel: 5
matrix:
driver: [jetstream]
case: [e2e, kafka-e2e, http-e2e, sdks-e2e]
case: [e2e, kafka-e2e, http-e2e, sdks-e2e, reduce-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ test-e2e:
test-kafka-e2e:
test-http-e2e:
test-sdks-e2e:
test-reduce-e2e:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) image e2eapi-image
Expand Down
7 changes: 6 additions & 1 deletion test/fixtures/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ var OutputRegexp = func(rx string) func(t *testing.T, output string, err error)
}
}

var CheckPodKillSucceeded = func(t *testing.T, output string, err error) {
assert.Contains(t, output, "deleted")
assert.NoError(t, err)
}

func Exec(name string, args ...string) (string, error) {
cmd := exec.Command(name, args...)
cmd.Env = os.Environ()
Expand Down Expand Up @@ -237,7 +242,7 @@ func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowp
if err != nil {
return fmt.Errorf("error getting vertex pod name: %w", err)
}
ok = ok && len(podList.Items) > 0 && len(podList.Items) == int(*vertexList.Items[0].Spec.Replicas) // pod number should equal to desired replicas
ok = ok && len(podList.Items) > 0 && len(podList.Items) == vertexList.Items[0].GetReplicas() // pod number should equal to desired replicas
for _, p := range podList.Items {
ok = ok && p.Status.Phase == corev1.PodRunning
}
Expand Down
97 changes: 97 additions & 0 deletions test/reduce-e2e/reduce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reduce_e2e

import (
"strconv"
"testing"
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
. "github.com/numaproj/numaflow/test/fixtures"
"github.com/stretchr/testify/suite"
)

type ReduceSuite struct {
E2ESuite
}

func (s *ReduceSuite) TestSimpleReducePipelineFailOver() {
w := s.Given().Pipeline("@testdata/simple-reduce-pipeline.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()

pipelineName := "even-odd-sum"

w.Expect().VertexPodsRunning()

w.Expect().
VertexPodLogContains("in", LogSourceVertexStarted).
VertexPodLogContains("atoi", LogUDFVertexStarted, PodLogCheckOptionWithContainer("numa")).
VertexPodLogContains("sink", LogSinkVertexStarted).
DaemonPodLogContains(pipelineName, LogDaemonStarted)

defer w.VertexPodPortForward("in", 8443, dfv1.VertexHTTPSPort).
TerminateAllPodPortForwards()

args := "kubectl delete po -n numaflow-system -l " +
"numaflow.numaproj.io/pipeline-name=even-odd-sum,numaflow.numaproj.io/vertex-name=compute-sum"

// Kill the reducer pods before processing to trigger failover.
w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded)

startTime := int(time.Unix(1000, 0).UnixMilli())
for i := 1; i <= 100; i++ {
eventTime := startTime + (i * 1000)
if i == 5 {
// Kill the reducer pods during processing to trigger failover.
w.Expect().VertexPodsRunning()
w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded)
}

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithHeader("X-Numaflow-Event-Time", strconv.Itoa(eventTime)).WithBytes([]byte("1")).
Expect().
Status(204)

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithHeader("X-Numaflow-Event-Time", strconv.Itoa(eventTime)).WithBytes([]byte("2")).
Expect().
Status(204)

HTTPExpect(s.T(), "https://localhost:8443").POST("/vertices/in").WithHeader("X-Numaflow-Event-Time", strconv.Itoa(eventTime)).WithBytes([]byte("3")).
Expect().
Status(204)
}

w.Expect().VertexPodLogContains("sink", "Payload - 38", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 76", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 120", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 240", PodLogCheckOptionWithCount(1))

// Kill the reducer pods after processing to trigger failover.
w.Exec("/bin/sh", []string{"-c", args}, CheckPodKillSucceeded)
w.Expect().VertexPodsRunning()
w.Expect().VertexPodLogContains("sink", "Payload - 38", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 76", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 120", PodLogCheckOptionWithCount(1))
w.Expect().VertexPodLogContains("sink", "Payload - 240", PodLogCheckOptionWithCount(1))
}

func TestReduceSuite(t *testing.T) {
suite.Run(t, new(ReduceSuite))
}
45 changes: 45 additions & 0 deletions test/reduce-e2e/testdata/simple-reduce-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: even-odd-sum
spec:
limits:
readBatchSize: 1
vertices:
- name: in
source:
http: {}
- name: atoi
scale:
min: 1
udf:
container:
# Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/function/examples/evenodd
image: quay.io/numaio/go-even-odd-example
- name: compute-sum
udf:
container:
# compute the sum
image: quay.io/numaio/go-integer-sum-example
groupBy:
window:
fixed:
length: 60s
keyed: true
storage:
persistentVolumeClaim:
volumeSize: 10Gi
accessMode: ReadWriteOnce
- name: sink
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: atoi
- from: atoi
to: compute-sum
parallelism: 2
- from: compute-sum
to: sink

0 comments on commit e6e24ee

Please sign in to comment.