Skip to content

Commit

Permalink
feat: Add watermark for sink vertex (#124)
Browse files Browse the repository at this point in the history
* feat: Add watermark for sink vertex

Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored and whynowy committed Aug 6, 2022
1 parent 0eb0b0c commit 8d83540
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 44 deletions.
16 changes: 11 additions & 5 deletions pkg/daemon/server/daemon_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ func (ds *daemonServer) Run(ctx context.Context) error {
}

tlsConfig := &tls.Config{Certificates: []tls.Certificate{*cer}, MinVersion: tls.VersionTLS12}

grpcServer := ds.newGRPCServer(isbSvcClient)
grpcServer, err := ds.newGRPCServer(isbSvcClient)
if err != nil {
return fmt.Errorf("failed to create grpc server: %w", err)
}
httpServer := ds.newHTTPServer(ctx, v1alpha1.DaemonServicePort, tlsConfig)

conn = tls.NewListener(conn, tlsConfig)
Expand All @@ -90,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error {
return nil
}

func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) *grpc.Server {
func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) (*grpc.Server, error) {
// "Prometheus histograms are a great way to measure latency distributions of your RPCs.
// However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default.
// To enable them please call the following in your server initialization code:"
Expand All @@ -104,8 +106,12 @@ func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) *grpc.Serv
}
grpcServer := grpc.NewServer(sOpts...)
grpc_prometheus.Register(grpcServer)
daemon.RegisterDaemonServiceServer(grpcServer, service.NewPipelineMetadataQuery(isbSvcClient, ds.pipeline))
return grpcServer
pipelineMetadataQuery, err := service.NewPipelineMetadataQuery(isbSvcClient, ds.pipeline)
if err != nil {
return nil, err
}
daemon.RegisterDaemonServiceServer(grpcServer, pipelineMetadataQuery)
return grpcServer, nil
}

// newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented
Expand Down
10 changes: 7 additions & 3 deletions pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type pipelineMetadataQuery struct {
}

// NewPipelineMetadataQuery returns a new instance of pipelineMetadataQuery
func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) *pipelineMetadataQuery {
func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) (*pipelineMetadataQuery, error) {
var err error
ps := pipelineMetadataQuery{
isbSvcClient: isbSvcClient,
pipeline: pipeline,
Expand All @@ -45,8 +46,11 @@ func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1
Timeout: time.Second * 3,
},
}
ps.vertexWatermark = newVertexWatermarkFetcher(pipeline)
return &ps
ps.vertexWatermark, err = newVertexWatermarkFetcher(pipeline)
if err != nil {
return nil, err
}
return &ps, nil
}

// ListBuffers is used to obtain the all the edge buffers information of a pipeline
Expand Down
9 changes: 6 additions & 3 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func TestGetVertexMetrics(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: pipelineName},
}
client, _ := isbsvc.NewISBJetStreamSvc(pipelineName)
pipelineMetricsQueryService := NewPipelineMetadataQuery(client, pipeline)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline)
assert.NoError(t, err)

metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod.
# TYPE vertex_processing_rate gauge
Expand Down Expand Up @@ -125,7 +126,8 @@ func TestGetBuffer(t *testing.T) {
}

ms := &mockIsbSvcClient{}
pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(ms, pipeline)
assert.NoError(t, err)

bufferName := "numaflow-system-simple-pipeline-in-cat"

Expand Down Expand Up @@ -160,7 +162,8 @@ func TestListBuffers(t *testing.T) {
}

ms := &mockIsbSvcClient{}
pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline)
pipelineMetricsQueryService, err := NewPipelineMetadataQuery(ms, pipeline)
assert.NoError(t, err)

req := &daemon.ListBuffersRequest{Pipeline: &pipelineName}

Expand Down
45 changes: 31 additions & 14 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
"go.uber.org/zap"
)

// watermarkFetchers used to store watermark metadata for propagation
Expand All @@ -27,17 +26,16 @@ type watermarkFetchers struct {
// corresponding fetchers. These fetchers are tied to the incoming edge buffer of the current vertex (Vn), and read the
// watermark propagated by the vertex (Vn-1). As each vertex has one incoming edge, for the input vertex we read the source
// data buffer.
func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) *watermarkFetchers {
func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers, error) {

// TODO: Return err instead of logging (https://github.com/numaproj/numaflow/pull/120#discussion_r927271677)
ctx := context.Background()
log := logging.FromContext(ctx)
var wmFetcher = new(watermarkFetchers)
var fromBufferName string

wmFetcher.isWatermarkEnabled = pipeline.Spec.Watermark.Propagate
if !wmFetcher.isWatermarkEnabled {
return wmFetcher
return wmFetcher, nil
}

vertexWmMap := make(map[string]fetch.Fetcher)
Expand All @@ -53,22 +51,41 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) *watermarkFetchers {
edge := pipeline.GetFromEdges(vertex.Name)[0]
fromBufferName = v1alpha1.GenerateEdgeBufferName(pipeline.Namespace, pipelineName, edge.From, edge.To)
}
hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient())
if err != nil {
log.Fatalw("JetStreamKVWatch failed", zap.String("HeartbeatBucket", hbBucket), zap.Error(err))
// Adding an extra entry for the sink vertex to check the watermark value progressed out of the vertex.
// Can be checked by querying sinkName_SINK in the service
if vertex.Sink != nil {
toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipelineName, vertex.Name)
fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, toBufferName, vertex.Name)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
}
sinkVertex := vertex.Name + "_SINK"
vertexWmMap[sinkVertex] = fetchWatermark
}
otBucket := isbsvc.JetStreamOTBucket(pipelineName, fromBufferName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucket, clients.NewInClusterJetStreamClient())
fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, fromBufferName, vertex.Name)
if err != nil {
log.Fatalw("JetStreamKVWatch failed", zap.String("OTBucket", otBucket), zap.Error(err))
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
}
var fetchWmWatchers = generic.BuildFetchWMWatchers(hbWatch, otWatch)
fetchWatermark := generic.NewGenericFetch(ctx, vertex.Name, fetchWmWatchers)
vertexWmMap[vertex.Name] = fetchWatermark
}
wmFetcher.fetchMap = vertexWmMap
return wmFetcher
return wmFetcher, nil
}

func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (*generic.GenericFetch, error) {
hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient())
if err != nil {
return nil, err
}
otBucket := isbsvc.JetStreamOTBucket(pipelineName, fromBufferName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucket, clients.NewInClusterJetStreamClient())
if err != nil {
return nil, err
}
var fetchWmWatchers = generic.BuildFetchWMWatchers(hbWatch, otWatch)
fetchWatermark := generic.NewGenericFetch(ctx, vertexName, fetchWmWatchers)
return fetchWatermark, nil
}

// GetVertexWatermark is used to return the head watermark for a given vertex.
Expand Down
1 change: 1 addition & 0 deletions pkg/isb/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []i
// has been initiated while we are stuck looping on an InternalError.
func (isdf *InterStepDataForward) writeToBuffers(ctx context.Context, messageToStep map[string][]isb.Message) (writeOffsetsEdge map[string][]isb.Offset, err error) {
writeOffsetsEdge = make(map[string][]isb.Offset, len(messageToStep))
// TODO: rename key to edgeName
for key, toBuffer := range isdf.toBuffers {
writeOffsetsEdge[key], err = isdf.writeToBuffer(ctx, toBuffer, messageToStep[key])
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package kafka
import (
"context"
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
Expand Down Expand Up @@ -39,7 +40,7 @@ func WithLogger(log *zap.SugaredLogger) Option {
}

// NewToKafka returns ToKafka type.
func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*ToKafka, error) {
func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*ToKafka, error) {
kafkaSink := vertex.Spec.Sink.Kafka
toKafka := new(ToKafka)
//apply options for kafka sink
Expand All @@ -65,7 +66,11 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.Name: toKafka}, forward.All, applier.Terminal, nil, nil, forwardOpts...)
bufferKey := ""
if len(vertex.GetToBuffers()) > 0 {
bufferKey = vertex.GetToBuffers()[0].Name
}
f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{bufferKey: toKafka}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/sinks/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package logger

import (
"context"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"log"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -33,7 +35,7 @@ func WithLogger(log *zap.SugaredLogger) Option {
}

// NewToLog returns ToLog type.
func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*ToLog, error) {
func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*ToLog, error) {
toLog := new(ToLog)
name := vertex.Spec.Name
toLog.name = name
Expand All @@ -54,7 +56,11 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option)
forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: toLog}, forward.All, applier.Terminal, nil, nil, forwardOpts...)
bufferKey := ""
if len(vertex.GetToBuffers()) > 0 {
bufferKey = vertex.GetToBuffers()[0].Name
}
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{bufferKey: toLog}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/logger/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestToLog_Start(t *testing.T) {
Name: "sinks.logger",
}}

s, err := NewToLog(vertex, fromStep)
s, err := NewToLog(vertex, fromStep, nil, nil)
assert.NoError(t, err)

stopped := s.Start()
Expand Down Expand Up @@ -73,8 +73,8 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) {
vertex2 := &dfv1.Vertex{ObjectMeta: v1.ObjectMeta{
Name: "sinks.logger2",
}}
logger1, _ := NewToLog(vertex1, to1)
logger2, _ := NewToLog(vertex2, to2)
logger1, _ := NewToLog(vertex1, to1, nil, nil)
logger2, _ := NewToLog(vertex2, to2, nil, nil)
logger1Stopped := logger1.Start()
logger2Stopped := logger2.Start()

Expand Down
25 changes: 20 additions & 5 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package sinks
import (
"context"
"fmt"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"sync"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
Expand Down Expand Up @@ -32,6 +35,15 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
var reader isb.BufferReader
var err error
fromBufferName := u.VertexInstance.Vertex.GetFromBuffers()[0].Name

// watermark variables no-op initialization
var fetchWatermark fetch.Fetcher = generic.NewNoOpWMProgressor()
// publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor
publishWatermark := make(map[string]publish.Publisher)
for _, buffer := range u.VertexInstance.Vertex.GetToBuffers() {
publishWatermark[buffer.Name] = generic.NewNoOpWMProgressor()
}

switch u.ISBSvcType {
case dfv1.ISBSvcTypeRedis:
redisClient := clients.NewInClusterRedisClient()
Expand All @@ -50,6 +62,9 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
if x := u.VertexInstance.Vertex.Spec.Limits; x != nil && x.ReadTimeout != nil {
readOptions = append(readOptions, jetstreamisb.WithReadTimeOut(x.ReadTimeout.Duration))
}
// build watermark progressors
fetchWatermark, publishWatermark = generic.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance)

jetStreamClient := clients.NewInClusterJetStreamClient()
reader, err = jetstreamisb.NewJetStreamBufferReader(ctx, jetStreamClient, fromBufferName, streamName, streamName, readOptions...)
if err != nil {
Expand All @@ -59,7 +74,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
return fmt.Errorf("unrecognized isbs type %q", u.ISBSvcType)
}

sinker, err := u.getSinker(reader, log)
sinker, err := u.getSinker(reader, log, fetchWatermark, publishWatermark)
if err != nil {
return fmt.Errorf("failed to find a sink, errpr: %w", err)
}
Expand Down Expand Up @@ -100,15 +115,15 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
}

// getSinker takes in the logger from the parent context
func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger) (Sinker, error) {
func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM map[string]publish.Publisher) (Sinker, error) {
sink := u.VertexInstance.Vertex.Spec.Sink
// TODO: add watermark
if x := sink.Log; x != nil {
return logsink.NewToLog(u.VertexInstance.Vertex, reader, logsink.WithLogger(logger))
return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, logsink.WithLogger(logger))
} else if x := sink.Kafka; x != nil {
return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, kafkasink.WithLogger(logger))
return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, kafkasink.WithLogger(logger))
} else if x := sink.UDSink; x != nil {
return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, udsink.WithLogger(logger))
return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, udsink.WithLogger(logger))
}
return nil, fmt.Errorf("invalid sink spec")
}
9 changes: 5 additions & 4 deletions pkg/sinks/udsink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package udsink

import (
"context"
"time"

sinksdk "github.com/numaproj/numaflow-go/sink"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/forward"
"github.com/numaproj/numaflow/pkg/shared/logging"
sharedutil "github.com/numaproj/numaflow/pkg/shared/util"
"github.com/numaproj/numaflow/pkg/udf/applier"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"go.uber.org/zap"
"time"
)

type userDefinedSink struct {
Expand All @@ -32,7 +33,7 @@ func WithLogger(log *zap.SugaredLogger) Option {
}

// NewUserDefinedSink returns genericSink type.
func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*userDefinedSink, error) {
func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*userDefinedSink, error) {
s := new(userDefinedSink)
name := vertex.Spec.Name
s.name = name
Expand All @@ -54,7 +55,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts .
}
contentType := sharedutil.LookupEnvStringOr(dfv1.EnvUDSinkContentType, string(dfv1.MsgPackType))
s.udsink = NewUDSHTTPBasedUDSink(dfv1.PathVarRun+"/udsink.sock", withTimeout(20*time.Second), withContentType(dfv1.ContentType(contentType)))
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, nil, nil, forwardOpts...)
isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func BuildJetStreamWatermarkProgressorsForSource(ctx context.Context, vertexInst
pipelineName := vertexInstance.Vertex.Spec.PipelineName

sourceBufferName := vertexInstance.Vertex.GetFromBuffers()[0].Name
// hearbeat
// heartbeat
hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, sourceBufferName)
hbKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient())
if err != nil {
Expand Down

0 comments on commit 8d83540

Please sign in to comment.