Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement nats_jetstream using new jetstream package #1083

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions protocol/nats_jetstream/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/cloudevents/sdk-go/v2 v2.14.0
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.36.0
)

require (
Expand All @@ -16,11 +16,12 @@ require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
18 changes: 10 additions & 8 deletions protocol/nats_jetstream/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand All @@ -30,10 +30,12 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
52 changes: 52 additions & 0 deletions protocol/nats_jetstream/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,23 @@ import (
"errors"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
var ErrNoConsumerConfig = errors.New("no consumer config was given")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: do we need all those typed errors? They're now part of the API. Also, in those cases a more simple pattern is to use an error struct, for example https://github.com/aws-controllers-k8s/eventbridge-controller/blob/67a9375a6ac7f95f1a3eb8b1b49f329755305c24/pkg/resource/rule/hooks.go#L29-L43

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, with errors.As, using simple exported errors works rather than needing to do something more complicated unless you want additional methods.

var ErrNoJetstream = errors.New("no jetstream implementation provided")
var ErrMoreThanOneStream = errors.New("more than one stream for given filter subjects")
var ErrMoreThanOneConsumerConfig = errors.New("more than one consumer config given")

// ConsumerType - consumer types that have configurations defined in jetstream package
type ConsumerType int

const (
ConsumerType_Unknown ConsumerType = iota
ConsumerType_Ordinary
ConsumerType_Ordered
)

// NatsOptions is a helper function to group a variadic nats.ProtocolOption into
// []nats.Option that can be used by either Sender, Consumer or Protocol
Expand All @@ -38,6 +52,14 @@ func WithSenderOptions(opts ...SenderOption) ProtocolOption {

type SenderOption func(*Sender) error

// WithPublishOptions configures the Sender
func WithPublishOptions(publishOpts []jetstream.PublishOpt) SenderOption {
return func(s *Sender) error {
s.PublishOpts = publishOpts
return nil
}
}

type ConsumerOption func(*Consumer) error

// WithQueueSubscriber configures the Consumer to join a queue group when subscribing
Expand All @@ -50,3 +72,33 @@ func WithQueueSubscriber(queue string) ConsumerOption {
return nil
}
}

// WithConsumerConfig configures the Consumer with the given config
func WithConsumerConfig(consumerConfig *jetstream.ConsumerConfig) ConsumerOption {
return func(c *Consumer) error {
if c.OrderedConsumerConfig != nil {
return ErrMoreThanOneConsumerConfig
}
c.ConsumerConfig = consumerConfig
return nil
}
}

// WithOrderedConsumerConfig configures the Consumer with the given config
func WithOrderedConsumerConfig(orderedConsumerConfig *jetstream.OrderedConsumerConfig) ConsumerOption {
return func(c *Consumer) error {
if c.ConsumerConfig != nil {
return ErrMoreThanOneConsumerConfig
}
c.OrderedConsumerConfig = orderedConsumerConfig
return nil
}
}

// WithPullConsumeOptions configures the Consumer with the given pullConsumeOpts
func WithPullConsumeOptions(pullConsumeOpt []jetstream.PullConsumeOpt) ConsumerOption {
return func(c *Consumer) error {
c.PullConsumeOpt = pullConsumeOpt
return nil
}
}
49 changes: 49 additions & 0 deletions protocol/nats_jetstream/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cloudevents/sdk-go/v2/protocol"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

// Protocol is a reference implementation for using the CloudEvents binding
Expand Down Expand Up @@ -46,6 +47,24 @@ func NewProtocol(url, stream, sendSubject, receiveSubject string, natsOpts []nat
return p, nil
}

// NewProtocolV2 creates a new NATS protocol.
func NewProtocolV2(ctx context.Context, url, stream, sendSubject string, natsOpts []nats.Option, jsOpts []jetstream.JetStreamOpt, opts ...ProtocolOption) (*Protocol, error) {
conn, err := nats.Connect(url, natsOpts...)
if err != nil {
return nil, err
}

p, err := NewProtocolFromConnV2(ctx, conn, stream, sendSubject, jsOpts, opts...)
if err != nil {
conn.Close()
return nil, err
}

p.connOwned = true

return p, nil
}

func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject string, jsOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error) {
var err error
p := &Protocol{
Expand All @@ -67,6 +86,36 @@ func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject st
return p, nil
}

func NewProtocolFromConnV2(ctx context.Context, conn *nats.Conn, stream, sendSubject string, jsOpts []jetstream.JetStreamOpt, opts ...ProtocolOption) (*Protocol, error) {
var err error
var js jetstream.JetStream
Comment on lines +90 to +91
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to define those? Is it because of the err in L96 and L100?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is because of the combined "if"+"assignment" on line 100. The js would be undefined later on line 105.

p := &Protocol{
Conn: conn,
}

if err := p.applyOptions(opts...); err != nil {
return nil, err
}

if js, err = jetstream.New(conn, jsOpts...); err != nil {
return nil, err
}
streamConfig := jetstream.StreamConfig{Name: stream, Subjects: []string{sendSubject}}
if _, err := js.CreateOrUpdateStream(ctx, streamConfig); err != nil {
return nil, err
}

if p.Consumer, err = NewConsumerFromConnV2(ctx, conn, jsOpts, p.consumerOptions...); err != nil {
return nil, err
}

if p.Sender, err = NewSenderFromConnV2(ctx, conn, sendSubject, jsOpts, p.senderOptions...); err != nil {
return nil, err
}

return p, nil
}

// Send implements Sender.Send
func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error {
return p.Sender.Send(ctx, in, transformers...)
Expand Down
Loading