Skip to content

Commit

Permalink
refactor: move udf forwarder to the right dir (#1381)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Nov 21, 2023
1 parent cfa55f0 commit 0c82ee0
Show file tree
Hide file tree
Showing 44 changed files with 160 additions and 135 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
max-parallel: 11
matrix:
driver: [jetstream]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinput-e2e]
case: [e2e, diamond-e2e, transformer-e2e, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e, udsource-e2e, api-e2e, sideinputs-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ test-api-e2e:
test-udsource-e2e:
test-transformer-e2e:
test-diamond-e2e:
test-sideinput-e2e:
test-sideinputs-e2e:
test-%:
$(MAKE) cleanup-e2e
$(MAKE) image e2eapi-image
Expand Down
18 changes: 18 additions & 0 deletions pkg/forwarder/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package forwarder defines the interfaces for data forwarders in different type of vertices.
package forwarder
2 changes: 1 addition & 1 deletion pkg/forward/interfaces.go → pkg/forwarder/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package forward
package forwarder

// VertexBuffer points to the partition of a buffer owned by the vertex.
type VertexBuffer struct {
Expand Down
7 changes: 4 additions & 3 deletions pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ import (
"github.com/stretchr/testify/assert"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
natstest "github.com/numaproj/numaflow/pkg/shared/clients/nats/test"
"github.com/numaproj/numaflow/pkg/udf/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

type myForwardJetStreamTest struct {
}

func (f myForwardJetStreamTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{{
func (f myForwardJetStreamTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
return []forwarder.VertexBuffer{{
ToVertexName: "to1",
ToVertexPartitionIdx: 0,
}}, nil
Expand Down
7 changes: 4 additions & 3 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (
"github.com/stretchr/testify/suite"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/udf/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)
Expand Down Expand Up @@ -299,8 +300,8 @@ type ReadWritePerformance struct {
type forwardReadWritePerformance struct {
}

func (f forwardReadWritePerformance) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{{
func (f forwardReadWritePerformance) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
return []forwarder.VertexBuffer{{
ToVertexName: "to1",
ToVertexPartitionIdx: 0,
}}, nil
Expand Down
7 changes: 4 additions & 3 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"github.com/stretchr/testify/assert"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/udf/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)
Expand Down Expand Up @@ -342,8 +343,8 @@ func Test_GetRefreshFullError(t *testing.T) {
type myForwardRedisTest struct {
}

func (f myForwardRedisTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{{
func (f myForwardRedisTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
return []forwarder.VertexBuffer{{
ToVertexName: "to1",
ToVertexPartitionIdx: 0,
}}, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/reduce/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/reduce/pbq"
Expand Down Expand Up @@ -69,7 +69,7 @@ type DataForward struct {
// wmbChecker checks if the idle watermark is valid when the len(readMessage) is 0.
wmbChecker wmb.WMBChecker
pbqManager *pbq.Manager
whereToDecider forward.ToWhichStepDecider
whereToDecider forwarder.ToWhichStepDecider
udfInvocationTracking map[partition.ID]*pnf.ForwardTask
of *pnf.OrderedProcessor
opts *Options
Expand All @@ -82,7 +82,7 @@ func NewDataForward(ctx context.Context,
fromBuffer isb.BufferReader,
toBuffers map[string][]isb.BufferWriter,
pbqManager *pbq.Manager,
whereToDecider forward.ToWhichStepDecider,
whereToDecider forwarder.ToWhichStepDecider,
fw fetch.Fetcher,
watermarkPublishers map[string]publish.Publisher,
windowingStrategy window.Windower,
Expand Down
10 changes: 5 additions & 5 deletions pkg/reduce/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.uber.org/atomic"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/reduce/pbq"
Expand Down Expand Up @@ -125,8 +125,8 @@ type myForwardTestRoundRobin struct {
count int
}

func (f *myForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
var output = []forward.VertexBuffer{{
func (f *myForwardTestRoundRobin) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
var output = []forwarder.VertexBuffer{{
ToVertexName: "reduce-to-vertex",
ToVertexPartitionIdx: int32(f.count % 2),
}}
Expand Down Expand Up @@ -164,8 +164,8 @@ func (f CounterReduceTest) ApplyReduce(_ context.Context, partitionID *partition
}, nil
}

func (f CounterReduceTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{{
func (f CounterReduceTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
return []forwarder.VertexBuffer{{
ToVertexName: "reduce-to-vertex",
ToVertexPartitionIdx: 0,
}}, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/reduce/pnf/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/watermark/wmb"

"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/reduce/applier"
Expand Down Expand Up @@ -60,7 +60,7 @@ type OrderedProcessor struct {
pbqManager *pbq.Manager
udf applier.ReduceApplier
toBuffers map[string][]isb.BufferWriter
whereToDecider forward.ToWhichStepDecider
whereToDecider forwarder.ToWhichStepDecider
watermarkPublishers map[string]publish.Publisher
idleManager wmb.IdleManager
log *zap.SugaredLogger
Expand All @@ -72,7 +72,7 @@ func NewOrderedProcessor(ctx context.Context,
udf applier.ReduceApplier,
toBuffers map[string][]isb.BufferWriter,
pbqManager *pbq.Manager,
whereToDecider forward.ToWhichStepDecider,
whereToDecider forwarder.ToWhichStepDecider,
watermarkPublishers map[string]publish.Publisher,
idleManager wmb.IdleManager) *OrderedProcessor {

Expand Down
6 changes: 3 additions & 3 deletions pkg/reduce/pnf/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/isb/testutils"
Expand All @@ -40,8 +40,8 @@ import (
type myForwardTest struct {
}

func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forward.VertexBuffer, error) {
return []forward.VertexBuffer{}, nil
func (f myForwardTest) WhereTo(_ []string, _ []string) ([]forwarder.VertexBuffer, error) {
return []forwarder.VertexBuffer{}, nil
}

func (f myForwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/reduce/pnf/processandforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/reduce/applier"
Expand All @@ -56,7 +56,7 @@ type processAndForward struct {
pbqReader pbq.Reader
log *zap.SugaredLogger
toBuffers map[string][]isb.BufferWriter
whereToDecider forward.ToWhichStepDecider
whereToDecider forwarder.ToWhichStepDecider
wmPublishers map[string]publish.Publisher
idleManager wmb.IdleManager
}
Expand All @@ -70,7 +70,7 @@ func newProcessAndForward(ctx context.Context,
udf applier.ReduceApplier,
pbqReader pbq.Reader,
toBuffers map[string][]isb.BufferWriter,
whereToDecider forward.ToWhichStepDecider,
whereToDecider forwarder.ToWhichStepDecider,
pw map[string]publish.Publisher,
idleManager wmb.IdleManager) *processAndForward {

Expand Down Expand Up @@ -174,7 +174,7 @@ func (p *processAndForward) whereToStep() map[string][][]isb.Message {
// writer doesn't accept array of pointers
messagesToStep := make(map[string][][]isb.Message)

var to []forward.VertexBuffer
var to []forwarder.VertexBuffer
var err error
for _, msg := range p.writeMessages {
to, err = p.whereToDecider.WhereTo(msg.Keys, msg.Tags)
Expand Down
12 changes: 6 additions & 6 deletions pkg/reduce/pnf/processandforward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1/reducemock"

"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
"github.com/numaproj/numaflow/pkg/reduce/pbq"
Expand Down Expand Up @@ -63,24 +63,24 @@ type forwardTest struct {
buffers []string
}

func (f *forwardTest) WhereTo(keys []string, _ []string) ([]forward.VertexBuffer, error) {
func (f *forwardTest) WhereTo(keys []string, _ []string) ([]forwarder.VertexBuffer, error) {
if strings.Compare(keys[len(keys)-1], "test-forward-one") == 0 {
return []forward.VertexBuffer{{
return []forwarder.VertexBuffer{{
ToVertexName: "buffer1",
ToVertexPartitionIdx: int32(f.count % 2),
}}, nil
} else if strings.Compare(keys[len(keys)-1], "test-forward-all") == 0 {
var steps []forward.VertexBuffer
var steps []forwarder.VertexBuffer
for _, buffer := range f.buffers {
steps = append(steps, forward.VertexBuffer{
steps = append(steps, forwarder.VertexBuffer{
ToVertexName: buffer,
ToVertexPartitionIdx: int32(f.count % 2),
})
}
return steps, nil
}
f.count++
return []forward.VertexBuffer{}, nil
return []forwarder.VertexBuffer{}, nil
}

func (f forwardTest) Apply(ctx context.Context, message *isb.ReadMessage) ([]*isb.WriteMessage, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinks/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package sinks

import (
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
)

// Sinker interface defines what a Sink should implement.
type Sinker interface {
isb.BufferWriter
forward.StarterStopper
forwarder.StarterStopper
}
6 changes: 3 additions & 3 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/idlehandler"
Expand All @@ -51,7 +51,7 @@ type DataForward struct {
reader isb.BufferReader
// toBuffers store the toVertex name to its owned buffers mapping.
toBuffers map[string][]isb.BufferWriter
toWhichStepDecider forward.ToWhichStepDecider
toWhichStepDecider forwarder.ToWhichStepDecider
transformer applier.SourceTransformApplier
wmFetcher fetch.Fetcher
toVertexWMStores map[string]store.WatermarkStore
Expand All @@ -73,7 +73,7 @@ func NewDataForward(
vertexInstance *dfv1.VertexInstance,
fromStep isb.BufferReader,
toSteps map[string][]isb.BufferWriter,
toWhichStepDecider forward.ToWhichStepDecider,
toWhichStepDecider forwarder.ToWhichStepDecider,
transformer applier.SourceTransformApplier,
fetchWatermark fetch.Fetcher,
srcWMPublisher isb.SourceWatermarkPublisher,
Expand Down
Loading

0 comments on commit 0c82ee0

Please sign in to comment.