-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: generalize watermark fetching as an interface of ISB service. Fixes #252 #263
refactor: generalize watermark fetching as an interface of ISB service. Fixes #252 #263
Conversation
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
On issue #252 , there is also the following request
@whynowy Hey Derek, regarding the request above, is it already resolved by https://github.com/numaproj/numaflow/pull/238/files? |
pkg/isbsvc/jetstream_service.go
Outdated
@@ -21,6 +25,21 @@ type jetStreamSvc struct { | |||
js *jsclient.JetStreamContext | |||
} | |||
|
|||
func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, pipelineName string, bufferName string) (fetch.Fetcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipelineName
is already a property of the struct, not needed in the function signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Addressed.
pkg/isbsvc/jetstream_service.go
Outdated
@@ -21,6 +25,21 @@ type jetStreamSvc struct { | |||
js *jsclient.JetStreamContext | |||
} | |||
|
|||
func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, pipelineName string, bufferName string) (fetch.Fetcher, error) { | |||
hbBucketName := JetStreamProcessorBucket(pipelineName, bufferName) | |||
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucketName, jsclient.NewInClusterJetStreamClient()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the jetstream client in the struct.
pkg/isbsvc/jetstream_service.go
Outdated
return nil, err | ||
} | ||
otBucketName := JetStreamOTBucket(pipelineName, bufferName) | ||
otWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, otBucketName, jsclient.NewInClusterJetStreamClient()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
Signed-off-by: Keran Yang <[email protected]>
…aflow into generic-watermark-fetching
pkg/isbsvc/jetstream_service.go
Outdated
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, pipelineName, hbBucketName, jsclient.NewInClusterJetStreamClient()) | ||
func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error) { | ||
hbBucketName := JetStreamProcessorBucket(jss.pipelineName, bufferName) | ||
hbWatch, err := jetstream.NewKVJetStreamKVWatch(ctx, jss.pipelineName, hbBucketName, jsclient.NewInClusterJetStreamClient()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need a new client, can't we get it from jss.jsClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. There are two types of jsClient - defaultJetStreamClient
and inClusterJetStreamClient
. Searching across the code base I see defaultJetStreamClient
is only used in unit test files so I think it's safe to get jsClient from jss.jsClient. Will address.
Signed-off-by: Keran Yang <[email protected]>
pkg/isbsvc/jetstream_service.go
Outdated
@@ -21,6 +25,21 @@ type jetStreamSvc struct { | |||
js *jsclient.JetStreamContext | |||
} | |||
|
|||
func (jss *jetStreamSvc) CreateWatermarkFetcher(ctx context.Context, bufferName string) (fetch.Fetcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: move this function to the place after NewISBJetStreamSvc(
, let it sit together with other interface functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, addressed.
Signed-off-by: Keran Yang <[email protected]>
Fixes #252 (#263) ### Description Fixes #252 This change is to NOT hardcode JetStream as ISB service when fetching watermarks. When the ISB service is Redis, we use noop KV watchers(for both heartbeat and offset timeline) for fetching watermarks. Signed-off-by: Keran Yang <[email protected]>
Description
Fixes #252 This change is to NOT hardcode JetStream as ISB service when fetching watermarks. When the ISB service is Redis, we use noop KV watchers(for both heartbeat and offset timeline) for fetching watermarks.
Testing
Before the change, if we use Redis as ISB service, when creating a simple-pipeline, the
simple-pipeline-daemon-xxx
pod crashes with following error:And numaflow UI shows empty page for simple-pipeline.
After the change, when using Redis as ISB, we can start the
simple-pipeline-daemon-xxx
pod successfully. On the UI, we show default watermark -1 across ALL vertices.Signed-off-by: Keran Yang [email protected]
Explain what this PR does.