Skip to content

Commit

Permalink
Add an e2e test for source data transformer (#505)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Jan 24, 2023
1 parent 2af9193 commit 58b12ec
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ jobs:
timeout-minutes: 20
strategy:
fail-fast: false
max-parallel: 6
max-parallel: 7
matrix:
driver: [jetstream]
case: [e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e]
case: [e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, transformer-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nats.go v1.21.0
github.com/numaproj/numaflow-go v0.3.0
github.com/numaproj/numaflow-go v0.3.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/soheilhy/cmux v0.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.3.0 h1:ry2C67eMcZ6MAgWn8BOPbRjAWIwMg5T5F7mdsvfJAYA=
github.com/numaproj/numaflow-go v0.3.0/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo=
github.com/numaproj/numaflow-go v0.3.1 h1:WRaTKNaaQE4oXvfD826BhUYQecy5j0nYc7Z5wdPxUko=
github.com/numaproj/numaflow-go v0.3.1/go.mod h1:TOawJdyf1C4V98zKnjjFhbHLBtg/TDyzZM+1MsfZuPo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down
2 changes: 1 addition & 1 deletion pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (t *ToLog) Write(_ context.Context, messages []isb.Message) ([]isb.Offset,
prefix := "(" + t.GetName() + ")"
for _, message := range messages {
logSinkWriteCount.With(map[string]string{metrics.LabelVertex: t.name, metrics.LabelPipeline: t.pipelineName}).Inc()
log.Println(prefix, " Payload - ", string(message.Payload), " Key - ", message.Key, " Start - ", message.StartTime.UnixMilli(), " End - ", message.EndTime.UnixMilli())
log.Println(prefix, " Payload - ", string(message.Payload), " Key - ", message.Key, " Start - ", message.StartTime.UnixMilli(), " End - ", message.EndTime.UnixMilli(), " EventTime - ", message.EventTime.UnixMilli())
}
return nil, make([]error, len(messages))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/client"

"github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/udf/function"
Expand Down Expand Up @@ -87,12 +88,11 @@ func (u *gRPCBasedTransformer) ApplyMap(ctx context.Context, readMessage *isb.Re
}

ctx = metadata.NewOutgoingContext(ctx, metadata.New(map[string]string{functionsdk.DatumKey: key}))
// TODO - change to MapTFn once numaflow-go sdk changes merge in.
datumList, err := u.client.MapFn(ctx, d)
datumList, err := u.client.MapTFn(ctx, d)
if err != nil {
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
Message: fmt.Sprintf("gRPC client.MapTFn failed, %s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
Expand Down
12 changes: 4 additions & 8 deletions pkg/sources/transformer/grpc_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
// TODO - change to MapTFn once numaflow-go sdk changes merge in.
mockClient.EXPECT().MapFn(gomock.Any(), &rpcMsg{msg: req}).Return(&functionpb.DatumList{
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(&functionpb.DatumList{
Elements: []*functionpb.Datum{
{
Key: "test_success_key",
Expand Down Expand Up @@ -144,8 +143,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169660, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
// TODO - change to MapTFn once numaflow-go sdk changes merge in.
mockClient.EXPECT().MapFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, fmt.Errorf("mock error"))
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, fmt.Errorf("mock error"))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -196,8 +194,7 @@ func TestGRPCBasedTransformer_ApplyWithMockClient_ChangePayload(t *testing.T) {
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
// TODO - change to MapTFn once numaflow-go sdk changes merge in.
mockClient.EXPECT().MapFn(gomock.Any(), gomock.Any()).DoAndReturn(
mockClient.EXPECT().MapTFn(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, datum *functionpb.Datum, opts ...grpc.CallOption) (*functionpb.DatumList, error) {
var originalValue testutils.PayloadForTest
_ = json.Unmarshal(datum.GetValue(), &originalValue)
Expand Down Expand Up @@ -268,8 +265,7 @@ func TestGRPCBasedTransformer_ApplyWithMockClient_ChangeEventTime(t *testing.T)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
// TODO - change to MapTFn once numaflow-go sdk changes merge in.
mockClient.EXPECT().MapFn(gomock.Any(), gomock.Any()).DoAndReturn(
mockClient.EXPECT().MapTFn(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, datum *functionpb.Datum, opts ...grpc.CallOption) (*functionpb.DatumList, error) {
var elements []*functionpb.Datum
elements = append(elements, &functionpb.Datum{
Expand Down
19 changes: 0 additions & 19 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,6 @@ func (s *FunctionalSuite) TestConditionalForwarding() {
w.Expect().SinkNotContains("number-sink", "not an integer")
}

func (s *FunctionalSuite) TestSourceDataTransform() {
w := s.Given().Pipeline("@testdata/source-data-transform.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "source-data-transform"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("88"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("89"))).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("not an integer")))

w.Expect().SinkContains("even-sink", "88")
w.Expect().SinkNotContains("even-sink", "89")
w.Expect().SinkNotContains("even-sink", "not an integer")
}

func (s *FunctionalSuite) TestWatermarkEnabled() {
w := s.Given().Pipeline("@testdata/watermark.yaml").
When().
Expand Down
25 changes: 0 additions & 25 deletions test/e2e/testdata/source-data-transform.yaml

This file was deleted.

40 changes: 40 additions & 0 deletions test/transformer-e2e/testdata/event-time-filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: event-time-filter
spec:
vertices:
- name: in
source:
http: {}
transformer:
container:
image: quay.io/numaio/numaflow-go/mapt-event-time-filter:latest
- name: sink-within-2022
scale:
min: 1
sink:
log: {}
- name: sink-after-2022
scale:
min: 1
sink:
log: {}
- name: sink-all
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: sink-within-2022
conditions:
keyIn:
- within_year_2022
- from: in
to: sink-after-2022
conditions:
keyIn:
- after_year_2022
- from: in
to: sink-all
79 changes: 79 additions & 0 deletions test/transformer-e2e/transformer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package transformer_e2e

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/suite"

. "github.com/numaproj/numaflow/test/fixtures"
)

type TransformerSuite struct {
E2ESuite
}

func (s *TransformerSuite) TestEventTimeFilter() {
w := s.Given().Pipeline("@testdata/event-time-filter.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "event-time-filter"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

eventTimeBefore2022_1 := strconv.FormatInt(time.Date(2021, 4, 2, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeBefore2022_2 := strconv.FormatInt(time.Date(1998, 4, 2, 8, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeBefore2022_3 := strconv.FormatInt(time.Date(2013, 4, 4, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)

eventTimeAfter2022_1 := strconv.FormatInt(time.Date(2023, 4, 2, 7, 4, 5, 2, time.UTC).UnixMilli(), 10)
eventTimeAfter2022_2 := strconv.FormatInt(time.Date(2026, 4, 2, 3, 4, 5, 2, time.UTC).UnixMilli(), 10)

eventTimeWithin2022_1 := strconv.FormatInt(time.Date(2022, 4, 2, 3, 4, 5, 2, time.UTC).UnixMilli(), 10)

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_1)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_2)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Before2022")).WithHeader("X-Numaflow-Event-Time", eventTimeBefore2022_3)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("After2022")).WithHeader("X-Numaflow-Event-Time", eventTimeAfter2022_1)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("After2022")).WithHeader("X-Numaflow-Event-Time", eventTimeAfter2022_2)).
SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("Within2022")).WithHeader("X-Numaflow-Event-Time", eventTimeWithin2022_1))

janFirst2022 := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
janFirst2023 := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)

w.Expect().
VertexPodLogContains("sink-within-2022", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli()), PodLogCheckOptionWithCount(1)).
VertexPodLogContains("sink-after-2022", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli()), PodLogCheckOptionWithCount(2)).
VertexPodLogContains("sink-all", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli()), PodLogCheckOptionWithCount(1)).
VertexPodLogContains("sink-all", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli()), PodLogCheckOptionWithCount(2)).
VertexPodLogNotContains("sink-within-2022", fmt.Sprintf("EventTime - %d", janFirst2023.UnixMilli())).
VertexPodLogNotContains("sink-after-2022", fmt.Sprintf("EventTime - %d", janFirst2022.UnixMilli())).
VertexPodLogNotContains("sink-all", "Before2022").
VertexPodLogNotContains("sink-within-2022", "Before2022").
VertexPodLogNotContains("sink-after-2022", "Before2022")
}

func TestTransformerSuite(t *testing.T) {
suite.Run(t, new(TransformerSuite))
}

0 comments on commit 58b12ec

Please sign in to comment.