-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement nats_jetstream using new jetstream package #1083
Implement nats_jetstream using new jetstream package #1083
Conversation
ca8466a
to
a4a3019
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still torn between adding a new V2 API vs introducing a breaking change and remove the old code.
) | ||
|
||
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber") | ||
var ErrNoConsumerConfig = errors.New("no consumer config was given") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: do we need all those typed errors? They're now part of the API. Also, in those cases a more simple pattern is to use an error struct, for example https://github.com/aws-controllers-k8s/eventbridge-controller/blob/67a9375a6ac7f95f1a3eb8b1b49f329755305c24/pkg/resource/rule/hooks.go#L29-L43
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, with errors.As
, using simple exported errors works rather than needing to do something more complicated unless you want additional methods.
@@ -67,6 +86,37 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st | |||
return p, nil | |||
} | |||
|
|||
func NewProtocolFromConnV2(conn *nats.Conn, stream, sendSubject string, jsOpts []jetstream.JetStreamOpt, opts ...ProtocolOption) (*Protocol, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@evankanderson what do you think about the V2 change? We have to trade backwards-compatibility with maintenance and simple APIs.
@stephen-totty-hpe
If we go down this route, we need to mark the other method(s) deprecated so linters can catch it. Also, should we use this as opportunity to also add context.Context
as first parameter to the API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I have other PRs to modify behavior in the existing implementation that are not needed using the implementation I have here with the jetstream package. So however it is done, the new jetstream library would be a welcome change. For instance, the new jetstream consumers are pull consumers. Also, an array of filter subjects are exposed. An there are error handlers that can be passed through the "config" objects.
var err error | ||
var js jetstream.JetStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to define those? Is it because of the err
in L96 and L100?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is because of the combined "if"+"assignment" on line 100. The js
would be undefined later on line 105.
if js, err = jetstream.New(conn, jsOpts...); err != nil { | ||
return nil, err | ||
} | ||
ctx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, see earlier comment on ctx
:)
// NewConsumerV2 consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig | ||
// embedded in ConsumerOption. | ||
// See: WithConsumerConfig(...) and WithOrderedConsumerConfig(...) | ||
func NewConsumerV2(url string, natsOpts []nats.Option, jsOpts []jetstream.JetStreamOpt, opts ...ConsumerOption) (*Consumer, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: do those need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you mean the options? Maybe possible to drop jetstream.JetStreamOpt
nats.Option:
About 40 to 50 options
jetstream.JetStreamOpt:
- WithClientTrace(ct *ClientTrace) JetStreamOpt
- WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt
- WithPublishAsyncMaxPending(max int) JetStreamOpt
The ConsumerOption options are needed because there are variations of how the NATS consumer is created.
There are two variations, "normal" and "ordered". Each has mutiple fields on how to configure.
} else if c.JetStream != nil { | ||
err = c.createConsumer(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
consumeContext, err = c.JetstreamConsumer.Consume(c.MsgHandlerV2, c.PullConsumeOpt...) | ||
if err != nil { | ||
return err | ||
} | ||
} else { | ||
return ErrNoJetstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be simplified to:
- just
if c.JetStream != nil {...}
- remove final
else
and return error directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(avoids else logic)
return sub.Drain() | ||
if sub != nil { | ||
return sub.Drain() | ||
} else if consumeContext != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
if c.OrderedConsumerConfig != nil { | ||
consumer, err = c.JetStream.OrderedConsumer(ctx, stream, *c.OrderedConsumerConfig) | ||
} else if c.ConsumerConfig != nil { | ||
consumer, err = c.JetStream.CreateOrUpdateConsumer(ctx, stream, *c.ConsumerConfig) | ||
} else { | ||
return ErrNoConsumerConfig | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this switch/case style?
// getStreamFromSubjects finds the unique stream for the set of filter subjects | ||
// If more than one stream is found, returns ErrMoreThanOneStream | ||
func (c *Consumer) getStreamFromSubjects() (string, error) { | ||
ctx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow passing ctx as parameter?
@embano1 |
a4a3019
to
a64334f
Compare
Signed-off-by: stephen-totty-hpe <[email protected]>
a64334f
to
4a199db
Compare
@duglin @lionelvillard what are your thoughts on changing the NATS JetStream implementation to the latest NATS SDK which provides several advantages for users, and is the recommended way to use JetStream. However, this means:
What are your preferences? |
) | ||
|
||
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber") | ||
var ErrNoConsumerConfig = errors.New("no consumer config was given") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, with errors.As
, using simple exported errors works rather than needing to do something more complicated unless you want additional methods.
natsMessage := &nats.Msg{ | ||
Subject: msg.Subject(), | ||
Reply: msg.Reply(), | ||
Header: msg.Headers(), | ||
Data: msg.Data(), | ||
} | ||
r.incoming <- msgErr{msg: NewMessage(natsMessage)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option would be to define an interface that both nats.Msg
and jetstream.Msg
could be wrapped to implement, e.g.
type natsData interface {
Subject() string
Reply() string
Headers() nats.Header
Data() []byte
}
But this seems likely to be just as effective.
@@ -54,12 +69,20 @@ func (r *Receiver) Receive(ctx context.Context) (binding.Message, error) { | |||
type Consumer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we've exposed this as a struct with named v1 fields, it feels like maybe introducing a new v3
version of nats_jetstream
might be the way to go. I don't have a really strong feeling about it, though.
@embano1 @evankanderson |
@duglin @lionelvillard any thoughts on #1083 (comment) ? |
I will close this PR if #1095 gets merged. |
Implement nats_jetstream using new jetstream package.
Because many of the options were incompatible, I was forced to create new constructors for Consumer, Sender, and Protocol.
Also, *nats.Msg does not implement jetstream.Msg. Because of this, there is a MsgHandlerV2.
Integration tests are modified to test the new code.
@embano1, let me know if you would rather create a V2 implementation elsewhere