From 8d8354082966a524275539dfc2c31e2c2a2c47bc Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 3 Aug 2022 18:57:31 -0700 Subject: [PATCH] feat: Add watermark for sink vertex (#124) * feat: Add watermark for sink vertex Signed-off-by: Sidhant Kohli --- pkg/daemon/server/daemon_server.go | 16 ++++--- .../server/service/pipeline_metrics_query.go | 10 +++-- .../service/pipeline_metrics_query_test.go | 9 ++-- .../service/pipeline_watermark_query.go | 45 +++++++++++++------ pkg/isb/forward/forward.go | 1 + pkg/sinks/kafka/kafka.go | 13 ++++-- pkg/sinks/logger/log.go | 10 ++++- pkg/sinks/logger/log_test.go | 6 +-- pkg/sinks/sink.go | 25 ++++++++--- pkg/sinks/udsink/sink.go | 9 ++-- pkg/watermark/generic/generic.go | 2 +- 11 files changed, 102 insertions(+), 44 deletions(-) diff --git a/pkg/daemon/server/daemon_server.go b/pkg/daemon/server/daemon_server.go index 55c5f15a3a..0e7ae72f77 100644 --- a/pkg/daemon/server/daemon_server.go +++ b/pkg/daemon/server/daemon_server.go @@ -71,8 +71,10 @@ func (ds *daemonServer) Run(ctx context.Context) error { } tlsConfig := &tls.Config{Certificates: []tls.Certificate{*cer}, MinVersion: tls.VersionTLS12} - - grpcServer := ds.newGRPCServer(isbSvcClient) + grpcServer, err := ds.newGRPCServer(isbSvcClient) + if err != nil { + return fmt.Errorf("failed to create grpc server: %w", err) + } httpServer := ds.newHTTPServer(ctx, v1alpha1.DaemonServicePort, tlsConfig) conn = tls.NewListener(conn, tlsConfig) @@ -90,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error { return nil } -func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) *grpc.Server { +func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) (*grpc.Server, error) { // "Prometheus histograms are a great way to measure latency distributions of your RPCs. // However, since it is bad practice to have metrics of high cardinality the latency monitoring metrics are disabled by default. // To enable them please call the following in your server initialization code:" @@ -104,8 +106,12 @@ func (ds *daemonServer) newGRPCServer(isbSvcClient isbsvc.ISBService) *grpc.Serv } grpcServer := grpc.NewServer(sOpts...) grpc_prometheus.Register(grpcServer) - daemon.RegisterDaemonServiceServer(grpcServer, service.NewPipelineMetadataQuery(isbSvcClient, ds.pipeline)) - return grpcServer + pipelineMetadataQuery, err := service.NewPipelineMetadataQuery(isbSvcClient, ds.pipeline) + if err != nil { + return nil, err + } + daemon.RegisterDaemonServiceServer(grpcServer, pipelineMetadataQuery) + return grpcServer, nil } // newHTTPServer returns the HTTP server to serve HTTP/HTTPS requests. This is implemented diff --git a/pkg/daemon/server/service/pipeline_metrics_query.go b/pkg/daemon/server/service/pipeline_metrics_query.go index 0e09887047..6fbdea7bce 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query.go +++ b/pkg/daemon/server/service/pipeline_metrics_query.go @@ -34,7 +34,8 @@ type pipelineMetadataQuery struct { } // NewPipelineMetadataQuery returns a new instance of pipelineMetadataQuery -func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) *pipelineMetadataQuery { +func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1.Pipeline) (*pipelineMetadataQuery, error) { + var err error ps := pipelineMetadataQuery{ isbSvcClient: isbSvcClient, pipeline: pipeline, @@ -45,8 +46,11 @@ func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1 Timeout: time.Second * 3, }, } - ps.vertexWatermark = newVertexWatermarkFetcher(pipeline) - return &ps + ps.vertexWatermark, err = newVertexWatermarkFetcher(pipeline) + if err != nil { + return nil, err + } + return &ps, nil } // ListBuffers is used to obtain the all the edge buffers information of a pipeline diff --git a/pkg/daemon/server/service/pipeline_metrics_query_test.go b/pkg/daemon/server/service/pipeline_metrics_query_test.go index f4f2440f98..2a676a087b 100644 --- a/pkg/daemon/server/service/pipeline_metrics_query_test.go +++ b/pkg/daemon/server/service/pipeline_metrics_query_test.go @@ -56,7 +56,8 @@ func TestGetVertexMetrics(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: pipelineName}, } client, _ := isbsvc.NewISBJetStreamSvc(pipelineName) - pipelineMetricsQueryService := NewPipelineMetadataQuery(client, pipeline) + pipelineMetricsQueryService, err := NewPipelineMetadataQuery(client, pipeline) + assert.NoError(t, err) metricsResponse := `# HELP vertex_processing_rate Message processing rate in the last period of seconds, tps. It represents the rate of a vertex instead of a pod. # TYPE vertex_processing_rate gauge @@ -125,7 +126,8 @@ func TestGetBuffer(t *testing.T) { } ms := &mockIsbSvcClient{} - pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline) + pipelineMetricsQueryService, err := NewPipelineMetadataQuery(ms, pipeline) + assert.NoError(t, err) bufferName := "numaflow-system-simple-pipeline-in-cat" @@ -160,7 +162,8 @@ func TestListBuffers(t *testing.T) { } ms := &mockIsbSvcClient{} - pipelineMetricsQueryService := NewPipelineMetadataQuery(ms, pipeline) + pipelineMetricsQueryService, err := NewPipelineMetadataQuery(ms, pipeline) + assert.NoError(t, err) req := &daemon.ListBuffersRequest{Pipeline: &pipelineName} diff --git a/pkg/daemon/server/service/pipeline_watermark_query.go b/pkg/daemon/server/service/pipeline_watermark_query.go index fd6f42b6ad..62b4ee0553 100644 --- a/pkg/daemon/server/service/pipeline_watermark_query.go +++ b/pkg/daemon/server/service/pipeline_watermark_query.go @@ -14,7 +14,6 @@ import ( "github.com/numaproj/numaflow/pkg/watermark/fetch" "github.com/numaproj/numaflow/pkg/watermark/generic" "github.com/numaproj/numaflow/pkg/watermark/store/jetstream" - "go.uber.org/zap" ) // watermarkFetchers used to store watermark metadata for propagation @@ -27,17 +26,16 @@ type watermarkFetchers struct { // corresponding fetchers. These fetchers are tied to the incoming edge buffer of the current vertex (Vn), and read the // watermark propagated by the vertex (Vn-1). As each vertex has one incoming edge, for the input vertex we read the source // data buffer. -func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) *watermarkFetchers { +func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers, error) { // TODO: Return err instead of logging (https://github.com/numaproj/numaflow/pull/120#discussion_r927271677) ctx := context.Background() - log := logging.FromContext(ctx) var wmFetcher = new(watermarkFetchers) var fromBufferName string wmFetcher.isWatermarkEnabled = pipeline.Spec.Watermark.Propagate if !wmFetcher.isWatermarkEnabled { - return wmFetcher + return wmFetcher, nil } vertexWmMap := make(map[string]fetch.Fetcher) @@ -53,22 +51,41 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) *watermarkFetchers { edge := pipeline.GetFromEdges(vertex.Name)[0] fromBufferName = v1alpha1.GenerateEdgeBufferName(pipeline.Namespace, pipelineName, edge.From, edge.To) } - hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName) - hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient()) - if err != nil { - log.Fatalw("JetStreamKVWatch failed", zap.String("HeartbeatBucket", hbBucket), zap.Error(err)) + // Adding an extra entry for the sink vertex to check the watermark value progressed out of the vertex. + // Can be checked by querying sinkName_SINK in the service + if vertex.Sink != nil { + toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipelineName, vertex.Name) + fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, toBufferName, vertex.Name) + if err != nil { + return nil, fmt.Errorf("failed to create watermark fetcher %w", err) + } + sinkVertex := vertex.Name + "_SINK" + vertexWmMap[sinkVertex] = fetchWatermark } - otBucket := isbsvc.JetStreamOTBucket(pipelineName, fromBufferName) - otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucket, clients.NewInClusterJetStreamClient()) + fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, fromBufferName, vertex.Name) if err != nil { - log.Fatalw("JetStreamKVWatch failed", zap.String("OTBucket", otBucket), zap.Error(err)) + return nil, fmt.Errorf("failed to create watermark fetcher %w", err) } - var fetchWmWatchers = generic.BuildFetchWMWatchers(hbWatch, otWatch) - fetchWatermark := generic.NewGenericFetch(ctx, vertex.Name, fetchWmWatchers) vertexWmMap[vertex.Name] = fetchWatermark } wmFetcher.fetchMap = vertexWmMap - return wmFetcher + return wmFetcher, nil +} + +func createWatermarkFetcher(ctx context.Context, pipelineName string, fromBufferName string, vertexName string) (*generic.GenericFetch, error) { + hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, fromBufferName) + hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient()) + if err != nil { + return nil, err + } + otBucket := isbsvc.JetStreamOTBucket(pipelineName, fromBufferName) + otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucket, clients.NewInClusterJetStreamClient()) + if err != nil { + return nil, err + } + var fetchWmWatchers = generic.BuildFetchWMWatchers(hbWatch, otWatch) + fetchWatermark := generic.NewGenericFetch(ctx, vertexName, fetchWmWatchers) + return fetchWatermark, nil } // GetVertexWatermark is used to return the head watermark for a given vertex. diff --git a/pkg/isb/forward/forward.go b/pkg/isb/forward/forward.go index 9a2f69ba37..2c6f7b7fa1 100644 --- a/pkg/isb/forward/forward.go +++ b/pkg/isb/forward/forward.go @@ -302,6 +302,7 @@ func (isdf *InterStepDataForward) ackFromBuffer(ctx context.Context, offsets []i // has been initiated while we are stuck looping on an InternalError. func (isdf *InterStepDataForward) writeToBuffers(ctx context.Context, messageToStep map[string][]isb.Message) (writeOffsetsEdge map[string][]isb.Offset, err error) { writeOffsetsEdge = make(map[string][]isb.Offset, len(messageToStep)) + // TODO: rename key to edgeName for key, toBuffer := range isdf.toBuffers { writeOffsetsEdge[key], err = isdf.writeToBuffer(ctx, toBuffer, messageToStep[key]) if err != nil { diff --git a/pkg/sinks/kafka/kafka.go b/pkg/sinks/kafka/kafka.go index e851bbe99a..2112a021f2 100644 --- a/pkg/sinks/kafka/kafka.go +++ b/pkg/sinks/kafka/kafka.go @@ -3,10 +3,11 @@ package kafka import ( "context" "fmt" - "time" - "github.com/Shopify/sarama" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" + "time" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" @@ -39,7 +40,7 @@ func WithLogger(log *zap.SugaredLogger) Option { } // NewToKafka returns ToKafka type. -func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*ToKafka, error) { +func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*ToKafka, error) { kafkaSink := vertex.Spec.Sink.Kafka toKafka := new(ToKafka) //apply options for kafka sink @@ -65,7 +66,11 @@ func NewToKafka(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } - f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{vertex.Name: toKafka}, forward.All, applier.Terminal, nil, nil, forwardOpts...) + bufferKey := "" + if len(vertex.GetToBuffers()) > 0 { + bufferKey = vertex.GetToBuffers()[0].Name + } + f, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{bufferKey: toKafka}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/logger/log.go b/pkg/sinks/logger/log.go index 166dcf7232..4dd7e79a64 100644 --- a/pkg/sinks/logger/log.go +++ b/pkg/sinks/logger/log.go @@ -2,6 +2,8 @@ package logger import ( "context" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/publish" "log" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -33,7 +35,7 @@ func WithLogger(log *zap.SugaredLogger) Option { } // NewToLog returns ToLog type. -func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*ToLog, error) { +func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*ToLog, error) { toLog := new(ToLog) name := vertex.Spec.Name toLog.name = name @@ -54,7 +56,11 @@ func NewToLog(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) forwardOpts = append(forwardOpts, forward.WithReadBatchSize(int64(*x.ReadBatchSize))) } } - isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: toLog}, forward.All, applier.Terminal, nil, nil, forwardOpts...) + bufferKey := "" + if len(vertex.GetToBuffers()) > 0 { + bufferKey = vertex.GetToBuffers()[0].Name + } + isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{bufferKey: toLog}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/sinks/logger/log_test.go b/pkg/sinks/logger/log_test.go index 1db2560047..2e1f79d2e6 100644 --- a/pkg/sinks/logger/log_test.go +++ b/pkg/sinks/logger/log_test.go @@ -32,7 +32,7 @@ func TestToLog_Start(t *testing.T) { Name: "sinks.logger", }} - s, err := NewToLog(vertex, fromStep) + s, err := NewToLog(vertex, fromStep, nil, nil) assert.NoError(t, err) stopped := s.Start() @@ -73,8 +73,8 @@ func TestToLog_ForwardToTwoVertex(t *testing.T) { vertex2 := &dfv1.Vertex{ObjectMeta: v1.ObjectMeta{ Name: "sinks.logger2", }} - logger1, _ := NewToLog(vertex1, to1) - logger2, _ := NewToLog(vertex2, to2) + logger1, _ := NewToLog(vertex1, to1, nil, nil) + logger2, _ := NewToLog(vertex2, to2, nil, nil) logger1Stopped := logger1.Start() logger2Stopped := logger2.Start() diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index 12269d9917..2cf5c264d7 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -3,6 +3,9 @@ package sinks import ( "context" "fmt" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/generic" + "github.com/numaproj/numaflow/pkg/watermark/publish" "sync" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -32,6 +35,15 @@ func (u *SinkProcessor) Start(ctx context.Context) error { var reader isb.BufferReader var err error fromBufferName := u.VertexInstance.Vertex.GetFromBuffers()[0].Name + + // watermark variables no-op initialization + var fetchWatermark fetch.Fetcher = generic.NewNoOpWMProgressor() + // publishWatermark is a map representing a progressor per edge, we are initializing them to a no-op progressor + publishWatermark := make(map[string]publish.Publisher) + for _, buffer := range u.VertexInstance.Vertex.GetToBuffers() { + publishWatermark[buffer.Name] = generic.NewNoOpWMProgressor() + } + switch u.ISBSvcType { case dfv1.ISBSvcTypeRedis: redisClient := clients.NewInClusterRedisClient() @@ -50,6 +62,9 @@ func (u *SinkProcessor) Start(ctx context.Context) error { if x := u.VertexInstance.Vertex.Spec.Limits; x != nil && x.ReadTimeout != nil { readOptions = append(readOptions, jetstreamisb.WithReadTimeOut(x.ReadTimeout.Duration)) } + // build watermark progressors + fetchWatermark, publishWatermark = generic.BuildJetStreamWatermarkProgressors(ctx, u.VertexInstance) + jetStreamClient := clients.NewInClusterJetStreamClient() reader, err = jetstreamisb.NewJetStreamBufferReader(ctx, jetStreamClient, fromBufferName, streamName, streamName, readOptions...) if err != nil { @@ -59,7 +74,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { return fmt.Errorf("unrecognized isbs type %q", u.ISBSvcType) } - sinker, err := u.getSinker(reader, log) + sinker, err := u.getSinker(reader, log, fetchWatermark, publishWatermark) if err != nil { return fmt.Errorf("failed to find a sink, errpr: %w", err) } @@ -100,15 +115,15 @@ func (u *SinkProcessor) Start(ctx context.Context) error { } // getSinker takes in the logger from the parent context -func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger) (Sinker, error) { +func (u *SinkProcessor) getSinker(reader isb.BufferReader, logger *zap.SugaredLogger, fetchWM fetch.Fetcher, publishWM map[string]publish.Publisher) (Sinker, error) { sink := u.VertexInstance.Vertex.Spec.Sink // TODO: add watermark if x := sink.Log; x != nil { - return logsink.NewToLog(u.VertexInstance.Vertex, reader, logsink.WithLogger(logger)) + return logsink.NewToLog(u.VertexInstance.Vertex, reader, fetchWM, publishWM, logsink.WithLogger(logger)) } else if x := sink.Kafka; x != nil { - return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, kafkasink.WithLogger(logger)) + return kafkasink.NewToKafka(u.VertexInstance.Vertex, reader, fetchWM, publishWM, kafkasink.WithLogger(logger)) } else if x := sink.UDSink; x != nil { - return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, udsink.WithLogger(logger)) + return udsink.NewUserDefinedSink(u.VertexInstance.Vertex, reader, fetchWM, publishWM, udsink.WithLogger(logger)) } return nil, fmt.Errorf("invalid sink spec") } diff --git a/pkg/sinks/udsink/sink.go b/pkg/sinks/udsink/sink.go index c3687eb818..86ff79ceb9 100644 --- a/pkg/sinks/udsink/sink.go +++ b/pkg/sinks/udsink/sink.go @@ -2,8 +2,6 @@ package udsink import ( "context" - "time" - sinksdk "github.com/numaproj/numaflow-go/sink" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" @@ -11,7 +9,10 @@ import ( "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" "github.com/numaproj/numaflow/pkg/udf/applier" + "github.com/numaproj/numaflow/pkg/watermark/fetch" + "github.com/numaproj/numaflow/pkg/watermark/publish" "go.uber.org/zap" + "time" ) type userDefinedSink struct { @@ -32,7 +33,7 @@ func WithLogger(log *zap.SugaredLogger) Option { } // NewUserDefinedSink returns genericSink type. -func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts ...Option) (*userDefinedSink, error) { +func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, fetchWatermark fetch.Fetcher, publishWatermark map[string]publish.Publisher, opts ...Option) (*userDefinedSink, error) { s := new(userDefinedSink) name := vertex.Spec.Name s.name = name @@ -54,7 +55,7 @@ func NewUserDefinedSink(vertex *dfv1.Vertex, fromBuffer isb.BufferReader, opts . } contentType := sharedutil.LookupEnvStringOr(dfv1.EnvUDSinkContentType, string(dfv1.MsgPackType)) s.udsink = NewUDSHTTPBasedUDSink(dfv1.PathVarRun+"/udsink.sock", withTimeout(20*time.Second), withContentType(dfv1.ContentType(contentType))) - isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, nil, nil, forwardOpts...) + isdf, err := forward.NewInterStepDataForward(vertex, fromBuffer, map[string]isb.BufferWriter{name: s}, forward.All, applier.Terminal, fetchWatermark, publishWatermark, forwardOpts...) if err != nil { return nil, err } diff --git a/pkg/watermark/generic/generic.go b/pkg/watermark/generic/generic.go index 1e7d2fe6f8..274ff032d3 100644 --- a/pkg/watermark/generic/generic.go +++ b/pkg/watermark/generic/generic.go @@ -117,7 +117,7 @@ func BuildJetStreamWatermarkProgressorsForSource(ctx context.Context, vertexInst pipelineName := vertexInstance.Vertex.Spec.PipelineName sourceBufferName := vertexInstance.Vertex.GetFromBuffers()[0].Name - // hearbeat + // heartbeat hbBucket := isbsvc.JetStreamProcessorBucket(pipelineName, sourceBufferName) hbKVStore, err := jetstream.NewKVJetStreamKVStore(ctx, pipelineName, hbBucket, clients.NewInClusterJetStreamClient()) if err != nil {