Skip to content

Commit

Permalink
refactor: generalize watermark fetching as an interface of ISB service.
Browse files Browse the repository at this point in the history
Fixes #252 (#263)

### Description
Fixes #252 This change is to NOT hardcode JetStream as ISB service when fetching watermarks. When the ISB service is Redis, we use noop KV watchers(for both heartbeat and offset timeline) for fetching watermarks.

Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Oct 25, 2022
1 parent 8e038d1 commit 8ff9e28
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/daemon/server/service/pipeline_metrics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewPipelineMetadataQuery(isbSvcClient isbsvc.ISBService, pipeline *v1alpha1
Timeout: time.Second * 3,
},
}
ps.vertexWatermark, err = newVertexWatermarkFetcher(pipeline)
ps.vertexWatermark, err = newVertexWatermarkFetcher(pipeline, isbSvcClient)
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/daemon/server/service/pipeline_metrics_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)

type mockGetType func(url string) (*http.Response, error)
Expand Down Expand Up @@ -50,6 +51,10 @@ func (ms *mockIsbSvcClient) ValidateBuffers(ctx context.Context, buffers []v1alp
return nil
}

func (ms *mockIsbSvcClient) CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error) {
return nil, nil
}

func TestGetVertexMetrics(t *testing.T) {
pipelineName := "simple-pipeline"
pipeline := &v1alpha1.Pipeline{
Expand Down
26 changes: 3 additions & 23 deletions pkg/daemon/server/service/pipeline_watermark_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/apis/proto/daemon"
"github.com/numaproj/numaflow/pkg/isbsvc"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
)

// TODO - Write Unit Tests for this file
Expand All @@ -29,7 +25,7 @@ type watermarkFetchers struct {

// newVertexWatermarkFetcher creates a new instance of watermarkFetchers. This is used to populate a map of vertices to
// corresponding fetchers. The fetchers are to retrieve vertex level watermarks.
func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers, error) {
func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline, isbSvcClient isbsvc.ISBService) (*watermarkFetchers, error) {
ctx := context.Background()
var wmFetcher = new(watermarkFetchers)
var toBufferName string
Expand All @@ -45,7 +41,7 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers,
for _, vertex := range pipeline.Spec.Vertices {
if vertex.Sink != nil {
toBufferName := v1alpha1.GenerateSinkBufferName(pipeline.Namespace, pipelineName, vertex.Name)
fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, toBufferName)
fetchWatermark, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
}
Expand All @@ -55,7 +51,7 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers,
var wmFetcherList []fetch.Fetcher
for _, edge := range pipeline.GetToEdges(vertex.Name) {
toBufferName = v1alpha1.GenerateEdgeBufferName(pipeline.Namespace, pipelineName, edge.From, edge.To)
fetchWatermark, err := createWatermarkFetcher(ctx, pipelineName, toBufferName)
fetchWatermark, err := isbSvcClient.CreateWatermarkFetcher(ctx, toBufferName)
if err != nil {
return nil, fmt.Errorf("failed to create watermark fetcher %w", err)
}
Expand All @@ -68,22 +64,6 @@ func newVertexWatermarkFetcher(pipeline *v1alpha1.Pipeline) (*watermarkFetchers,
return wmFetcher, nil
}

// Create a watermark fetcher to fetch watermark from a buffer.
func createWatermarkFetcher(ctx context.Context, pipelineName string, bufferName string) (fetch.Fetcher, error) {
hbBucketName := isbsvc.JetStreamProcessorBucket(pipelineName, bufferName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucketName, jsclient.NewInClusterJetStreamClient())
if err != nil {
return nil, err
}
otBucketName := isbsvc.JetStreamOTBucket(pipelineName, bufferName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucketName, jsclient.NewInClusterJetStreamClient())
if err != nil {
return nil, err
}
fetchWatermark := generic.NewGenericEdgeFetch(ctx, bufferName, store.BuildWatermarkStoreWatcher(hbWatch, otWatch))
return fetchWatermark, nil
}

// GetVertexWatermark is used to return the head watermark for a given vertex.
func (ps *pipelineMetadataQuery) GetVertexWatermark(ctx context.Context, request *daemon.GetVertexWatermarkRequest) (*daemon.GetVertexWatermarkResponse, error) {
log := logging.FromContext(ctx)
Expand Down
4 changes: 3 additions & 1 deletion pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)

// ISBService is an interface used to do the operations on ISBSvc
Expand All @@ -12,11 +13,12 @@ type ISBService interface {
DeleteBuffers(ctx context.Context, buffers []dfv1.Buffer) error
ValidateBuffers(ctx context.Context, buffers []dfv1.Buffer) error
GetBufferInfo(ctx context.Context, buffer dfv1.Buffer) (*BufferInfo, error)
CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error)
}

// bufferCreateOptions describes the options for creating buffers
type bufferCreateOptions struct {
// bufferConfig is configuratiion for the to be created buffer
// bufferConfig is configuration for the to be created buffer
bufferConfig string
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
jsclient "github.com/numaproj/numaflow/pkg/shared/clients/jetstream"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/jetstream"
"github.com/spf13/viper"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -258,6 +262,21 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer dfv1.Buffer)
return bufferInfo, nil
}

func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error) {
hbBucketName := JetStreamProcessorBucket(jss.pipelineName, bufferName)
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jss.jsClient)
if err != nil {
return nil, err
}
otBucketName := JetStreamOTBucket(jss.pipelineName, bufferName)
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, otBucketName, jss.jsClient)
if err != nil {
return nil, err
}
watermarkFetcher := generic.NewGenericEdgeFetch(ctx, bufferName, store.BuildWatermarkStoreWatcher(hbWatch, otWatch))
return watermarkFetcher, nil
}

func JetStreamName(pipelineName, bufferName string) string {
return fmt.Sprintf("%s-%s", pipelineName, bufferName)
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/isbsvc/redis_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package isbsvc
import (
"context"
"fmt"

redis2 "github.com/numaproj/numaflow/pkg/isb/stores/redis"
"github.com/numaproj/numaflow/pkg/watermark/generic"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/store/noop"
"go.uber.org/multierr"
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
redisclient "github.com/numaproj/numaflow/pkg/shared/clients/redis"
"github.com/numaproj/numaflow/pkg/shared/logging"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
)

type isbsRedisSvc struct {
Expand Down Expand Up @@ -122,3 +125,11 @@ func (r *isbsRedisSvc) GetBufferInfo(ctx context.Context, buffer dfv1.Buffer) (*

return bufferInfo, nil
}

func (r *isbsRedisSvc) CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error) {
// Watermark fetching is not supported for Redis ATM. Creating noop watermark fetcher.
hbWatcher := noop.NewKVOpWatch()
otWatcher := noop.NewKVOpWatch()
watermarkFetcher := generic.NewGenericEdgeFetch(ctx, bufferName, store.BuildWatermarkStoreWatcher(hbWatcher, otWatcher))
return watermarkFetcher, nil
}
2 changes: 1 addition & 1 deletion pkg/shared/clients/jetstream/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
// Package jetstream provides interface and two implementations to connect Nats JetStream.
//
// Function NewDefaultJetStreamClient(url string, opts ...nats.Option) returns a client with
// default implementationwhich, which relies on the input url and other nats connection options.
// default implementation, which relies on the input url and other nats connection options.
//
// Function NewInClusterJetStreamClient() assumes the invoker is in a Kubernetes cluster, and
// there are several environment variables are available, which are used to connect to the Nats
Expand Down
4 changes: 2 additions & 2 deletions pkg/watermark/fetch/processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func TestFetcherWithSameOTBucket(t *testing.T) {
assert.NoError(t, err)
defer hbStore.Close()

// create osStore
// create otStore
otStore, err := jetstream.NewKVJetStreamKVStore(ctx, "testFetch", keyspace+"_OT", defaultJetStreamClient)
assert.NoError(t, err)
defer otStore.Close()

// put values into osStore
// put values into otStore
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, uint64(testOffset))
err = otStore.PutKV(ctx, fmt.Sprintf("%s%s%d", "p1", "_", epoch), b)
Expand Down
2 changes: 1 addition & 1 deletion pkg/watermark/generic/jetstream/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// BuildWatermarkProgressors is used to populate fetchWatermark, and a map of publishWatermark with edge name as the key.
// These are used as watermark progressors in the pipeline, and is attached to each edge of the vertex.
// Fetcher has one-to-one relationship , whereas we have multiple publishers as the vertex can read only from one edge,
// Fetcher has one-to-one relationship, whereas we have multiple publishers as the vertex can read only from one edge,
// and it can write to many.
// The function is used only when watermarking is enabled on the pipeline.
func BuildWatermarkProgressors(ctx context.Context, vertexInstance *v1alpha1.VertexInstance) (fetch.Fetcher, map[string]publish.Publisher, error) {
Expand Down

0 comments on commit 8ff9e28

Please sign in to comment.