Skip to content

Commit

Permalink
fix: JetStream context KV store/watch fix (#460)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored Jan 11, 2023
1 parent 3fcee65 commit 2298615
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 125 deletions.
14 changes: 7 additions & 7 deletions pkg/isb/stores/redis/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"testing"
"time"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/watermark/generic"

"github.com/go-redis/redis/v8"
Expand All @@ -42,7 +42,7 @@ import (

var (
redisOptions = &redis.UniversalOptions{
//Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}
// Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"}
Addrs: []string{":6379"},
}

Expand Down Expand Up @@ -132,7 +132,7 @@ func TestRedisCheckBacklog(t *testing.T) {
"to1": rqw,
}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward2.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, forward2.WithReadBatchSize(10))
f, err := forward.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark, forward.WithReadBatchSize(10))

stopped := f.Start()
// validate the length of the toStep stream.
Expand Down Expand Up @@ -283,7 +283,7 @@ type ReadWritePerformance struct {
rclient *redisclient.RedisClient
rqr *BufferRead
rqw *BufferWrite
isdf *forward2.InterStepDataForward
isdf *forward.InterStepDataForward
count int64
withPipelining bool
cancel context.CancelFunc
Expand Down Expand Up @@ -324,7 +324,7 @@ func (suite *ReadWritePerformance) SetupSuite() {
}}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
isdf, _ := forward2.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark)
isdf, _ := forward.NewInterStepDataForward(vertex, rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark)

suite.ctx = ctx
suite.rclient = client
Expand Down Expand Up @@ -412,7 +412,7 @@ func (suite *ReadWritePerformance) TestReadWriteLatencyPipelining() {
"to1": suite.rqw,
}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
suite.isdf, _ = forward2.NewInterStepDataForward(vertex, suite.rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark)
suite.isdf, _ = forward.NewInterStepDataForward(vertex, suite.rqr, toSteps, forwardReadWritePerformance{}, forwardReadWritePerformance{}, fetchWatermark, publishWatermark)

suite.False(suite.rqw.IsFull())
var writeMessages = make([]isb.Message, 0, suite.count)
Expand Down Expand Up @@ -491,7 +491,7 @@ func generateLatencySlice(xMessages []redis.XMessage, suite *ReadWritePerformanc
suite.NoError(err)

// We store a difference of the id and the offset in the to stream.
//This gives us a difference between the time it was read that is stored in ID of the Header and the time it was written as stored in the ID.
// This gives us a difference between the time it was read that is stored in ID of the Header and the time it was written as stored in the ID.
latency[idx] = float64(id - offset)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/clients/nats/in_cluster_jetstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (isc *inClusterJetStreamClient) connect(ctx context.Context) (*nats.Conn, e
return natsJetStreamConnection(ctx, url, natsOpts)
}

// Connect is used to establish an incluster NATS jetstream connection
// Connect is used to establish an inCluster NATS JetStream connection
func (isc *inClusterJetStreamClient) Connect(ctx context.Context, opts ...JetStreamClientOption) (*NatsConn, error) {
options := defaultJetStreamClientOptions()
for _, o := range opts {
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/clients/nats/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package nats

import "context"

// JetStreamClient is used to provide a jetstream client
// JetStreamClient is used to provide a JetStream client
type JetStreamClient interface {
Connect(ctx context.Context, opts ...JetStreamClientOption) (*NatsConn, error)
}
10 changes: 5 additions & 5 deletions pkg/sinks/blackhole/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package blackhole
import (
"context"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
Expand All @@ -37,7 +37,7 @@ import (
type Blackhole struct {
name string
pipelineName string
isdf *forward2.InterStepDataForward
isdf *forward.InterStepDataForward
logger *zap.SugaredLogger
}

Expand Down Expand Up @@ -66,14 +66,14 @@ func NewBlackhole(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWaterma
bh.logger = logging.NewLogger()
}

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSink), forward2.WithLogger(bh.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(bh.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

isdf, err := forward2.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: bh}, forward2.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: bh}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinks/blackhole/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/udf/applier"

"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestBlackhole_ForwardToTwoVertex(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward2.NewInterStepDataForward(vertex, fromStep, toSteps, forward2.All, applier.Terminal, fetchWatermark, publishWatermark)
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, forward.All, applier.Terminal, fetchWatermark, publishWatermark)
assert.NoError(t, err)

stopped := f.Start()
Expand Down
14 changes: 7 additions & 7 deletions pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand All @@ -43,7 +43,7 @@ type ToKafka struct {
producer sarama.AsyncProducer
connected bool
topic string
isdf *forward2.InterStepDataForward
isdf *forward.InterStepDataForward
kafkaSink *dfv1.KafkaSink
log *zap.SugaredLogger
}
Expand All @@ -61,14 +61,14 @@ func WithLogger(log *zap.SugaredLogger) Option {
func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*ToKafka, error) {
kafkaSink := vertex.Spec.Sink.Kafka
toKafka := new(ToKafka)
//apply options for kafka sink
// apply options for kafka sink
for _, o := range opts {
if err := o(toKafka); err != nil {
return nil, err
}
}

//set default logger
// set default logger
if toKafka.log == nil {
toKafka.log = logging.NewLogger()
}
Expand All @@ -78,14 +78,14 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark
toKafka.topic = kafkaSink.Topic
toKafka.kafkaSink = kafkaSink

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSink), forward2.WithLogger(toKafka.log)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toKafka.log)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

f, err := forward2.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: toKafka}, forward2.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: toKafka}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"testing"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/udf/applier"

"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestWriteSuccessToKafka(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromEdgeList(generic.GetBufferNameList(vertex.GetToBuffers()))
toKafka.isdf, err = forward2.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward2.All, applier.Terminal, fetchWatermark, publishWatermark)
toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, map[string]isb.BufferWriter{"name": toKafka}, forward.All, applier.Terminal, fetchWatermark, publishWatermark)
assert.NoError(t, err)
toKafka.kafkaSink = vertex.Spec.Sink.Kafka
toKafka.name = "Test"
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestWriteFailureToKafka(t *testing.T) {
}}
toSteps := map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: toKafka}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
toKafka.isdf, err = forward2.NewInterStepDataForward(vertex, fromStep, toSteps, forward2.All, applier.Terminal, fetchWatermark, publishWatermark)
toKafka.isdf, err = forward.NewInterStepDataForward(vertex, fromStep, toSteps, forward.All, applier.Terminal, fetchWatermark, publishWatermark)
assert.NoError(t, err)
toKafka.name = "Test"
toKafka.topic = "topic-1"
Expand Down
10 changes: 5 additions & 5 deletions pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"log"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
Expand All @@ -38,7 +38,7 @@ import (
type ToLog struct {
name string
pipelineName string
isdf *forward2.InterStepDataForward
isdf *forward.InterStepDataForward
logger *zap.SugaredLogger
}

Expand Down Expand Up @@ -67,14 +67,14 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark f
toLog.logger = logging.NewLogger()
}

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSink), forward2.WithLogger(toLog.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(toLog.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

isdf, err := forward2.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: toLog}, forward2.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: toLog}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinks/logger/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/udf/applier"

"github.com/numaproj/numaflow/pkg/isb/stores/simplebuffer"
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := forward2.NewInterStepDataForward(vertex, fromStep, toSteps, forward2.All, applier.Terminal, fetchWatermark, publishWatermark)
f, err := forward.NewInterStepDataForward(vertex, fromStep, toSteps, forward.All, applier.Terminal, fetchWatermark, publishWatermark)
assert.NoError(t, err)

stopped := f.Start()
Expand Down
10 changes: 5 additions & 5 deletions pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/udf/applier"
Expand All @@ -37,7 +37,7 @@ import (
type UserDefinedSink struct {
name string
pipelineName string
isdf *forward2.InterStepDataForward
isdf *forward.InterStepDataForward
logger *zap.SugaredLogger
udsink *udsGRPCBasedUDSink
}
Expand Down Expand Up @@ -66,18 +66,18 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchW
s.logger = logging.NewLogger()
}

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSink), forward2.WithLogger(s.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSink), forward.WithLogger(s.logger)}
if x := vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
udsink, err := NewUDSGRPCBasedUDSink()
if err != nil {
return nil, fmt.Errorf("failed to create gRPC client, %w", err)
}
s.udsink = udsink
isdf, err := forward2.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward2.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.GetToBuffers()[0].Name: s}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/isb"
metricspkg "github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
Expand Down Expand Up @@ -91,7 +91,7 @@ type memgen struct {
// once terminated the source will not generate any more records.
cancel context.CancelFunc
// forwarder to read from the source and write to the interstep buffer.
forwarder *forward2.InterStepDataForward
forwarder *forward.InterStepDataForward
// lifecycleCtx context is used to control the lifecycle of this instance.
lifecycleCtx context.Context
// read timeout for the reader
Expand Down Expand Up @@ -180,18 +180,18 @@ func NewMemGen(vertexInstance *dfv1.VertexInstance,
destinations[w.GetName()] = w
}

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSource), forward2.WithLogger(gensrc.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSource), forward.WithLogger(gensrc.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

// attach a source publisher so the source can assign the watermarks.
gensrc.sourcePublishWM = gensrc.buildSourceWatermarkPublisher(publishWMStores)

// we pass in the context to forwarder as well so that it can shut down when we cancel the context
forwarder, err := forward2.NewInterStepDataForward(vertexInstance.Vertex, gensrc, destinations, forward2.All, applier.Terminal, fetchWM, publishWM, forwardOpts...)
forwarder, err := forward.NewInterStepDataForward(vertexInstance.Vertex, gensrc, destinations, forward.All, applier.Terminal, fetchWM, publishWM, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/sources/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
forward2 "github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/forward"
"github.com/numaproj/numaflow/pkg/isb"
metricspkg "github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
Expand All @@ -51,7 +51,7 @@ type httpSource struct {
messages chan *isb.ReadMessage
logger *zap.SugaredLogger

forwarder *forward2.InterStepDataForward
forwarder *forward.InterStepDataForward
// source watermark publisher
sourcePublishWM publish.Publisher
// context cancel function
Expand Down Expand Up @@ -186,13 +186,13 @@ func New(vertexInstance *dfv1.VertexInstance, writers []isb.BufferWriter, fetchW
destinations[w.GetName()] = w
}

forwardOpts := []forward2.Option{forward2.WithVertexType(dfv1.VertexTypeSource), forward2.WithLogger(h.logger)}
forwardOpts := []forward.Option{forward.WithVertexType(dfv1.VertexTypeSource), forward.WithLogger(h.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, forward2.WithReadBatchSize(int64(*x.ReadBatchSize)))
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
forwarder, err := forward2.NewInterStepDataForward(vertexInstance.Vertex, h, destinations, forward2.All, applier.Terminal, fetchWM, publishWM, forwardOpts...)
forwarder, err := forward.NewInterStepDataForward(vertexInstance.Vertex, h, destinations, forward.All, applier.Terminal, fetchWM, publishWM, forwardOpts...)
if err != nil {
h.logger.Errorw("Error instantiating the forwarder", zap.Error(err))
return nil, err
Expand Down
Loading

0 comments on commit 2298615

Please sign in to comment.