diff --git a/protocol/nats_jetstream/v3/go.mod b/protocol/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..2b20a3062 --- /dev/null +++ b/protocol/nats_jetstream/v3/go.mod @@ -0,0 +1,27 @@ +module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.5.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.8.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/protocol/nats_jetstream/v3/go.sum b/protocol/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..a6562f7ad --- /dev/null +++ b/protocol/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/nats_jetstream/v3/message.go b/protocol/nats_jetstream/v3/message.go new file mode 100644 index 000000000..ff29c7e56 --- /dev/null +++ b/protocol/nats_jetstream/v3/message.go @@ -0,0 +1,153 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" +) + +const ( + // see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md + prefix = "ce-" + contentTypeHeader = "content-type" +) + +var ( + specs = spec.WithPrefix(prefix) + + // ErrNoVersion returned when no version header is found in the protocol header. + ErrNoVersion = errors.New("message does not contain version header") +) + +// Message implements binding.Message by wrapping an jetstream.Msg. +// This message *can* be read several times safely +type Message struct { + Msg jetstream.Msg + encoding binding.Encoding +} + +// NewMessage wraps an *nats.Msg in a binding.Message. +// The returned message *can* be read several times safely +// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. +func NewMessage(msg jetstream.Msg) *Message { + encoding := binding.EncodingStructured + if msg.Headers() != nil { + if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" { + encoding = binding.EncodingBinary + } + } + return &Message{Msg: msg, encoding: encoding} +} + +var _ binding.Message = (*Message)(nil) + +// ReadEncoding return the type of the message Encoding. +func (m *Message) ReadEncoding() binding.Encoding { + return m.encoding +} + +// ReadStructured transfers a structured-mode event to a StructuredWriter. +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.encoding != binding.EncodingStructured { + return binding.ErrNotStructured + } + return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data())) +} + +// ReadBinary transfers a binary-mode event to an BinaryWriter. +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.encoding != binding.EncodingBinary { + return binding.ErrNotBinary + } + + version := m.GetVersion() + if version == nil { + return ErrNoVersion + } + + var err error + for k, v := range m.Msg.Headers() { + headerValue := v[0] + if strings.HasPrefix(k, prefix) { + attr := version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, headerValue) + } else { + err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) + } + } else if k == contentTypeHeader { + err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) + } + if err != nil { + return err + } + } + + if m.Msg.Data() != nil { + err = encoder.SetData(bytes.NewBuffer(m.Msg.Data())) + } + + return err +} + +// Finish *must* be called when message from a Receiver can be forgotten by the receiver. +func (m *Message) Finish(err error) error { + return nil +} + +// GetAttribute implements binding.MessageMetadataReader +func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { + key := withPrefix(attributeKind.String()) + if m.Msg.Headers() != nil { + version := m.GetVersion() + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + return version.Attribute(key), headerValue + } + return version.Attribute(key), nil + } + // if the headers are nil, the version is also nil. Therefore return nil. + return nil, nil +} + +// GetExtension implements binding.MessageMetadataReader +func (m *Message) GetExtension(name string) interface{} { + key := withPrefix(name) + if m.Msg.Headers() != nil { + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + return headerValue + } + } + return nil +} + +// GetVersion looks for specVersion header and returns a Version object +func (m *Message) GetVersion() spec.Version { + if m.Msg.Headers() == nil { + return nil + } + versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName()) + if versionValue == "" { + return nil + } + return specs.Version(versionValue) +} + +// withPrefix prepends the prefix to the attribute name +func withPrefix(attributeName string) string { + return fmt.Sprintf("%s%s", prefix, attributeName) +} diff --git a/protocol/nats_jetstream/v3/message_test.go b/protocol/nats_jetstream/v3/message_test.go new file mode 100644 index 000000000..140901a0f --- /dev/null +++ b/protocol/nats_jetstream/v3/message_test.go @@ -0,0 +1,116 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "encoding/json" + "testing" + + "github.com/cloudevents/sdk-go/v2/binding/spec" + bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type jetStreamMsg struct { + jetstream.Msg + msg *nats.Msg +} + +func (j *jetStreamMsg) Data() []byte { return j.msg.Data } +func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } + +var ( + outBinaryMessage = bindingtest.MockBinaryMessage{ + Metadata: map[spec.Attribute]interface{}{}, + Extensions: map[string]interface{}{}, + } + outStructMessage = bindingtest.MockStructuredMessage{} + + testEvent = test.FullEvent() + binaryData, _ = json.Marshal(map[string]string{ + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", + "ce_exta": "someext", + }) + structuredReceiverMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: binaryData, + }, + } + binaryReceiverMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: testEvent.Data(), + Header: nats.Header{ + "ce-type": {testEvent.Type()}, + "ce-source": {testEvent.Source()}, + "ce-id": {testEvent.ID()}, + "ce-time": {test.Timestamp.String()}, + "ce-specversion": {"1.0"}, + "ce-dataschema": {test.Schema.String()}, + "ce-datacontenttype": {"text/json"}, + "ce-subject": {"receiverTopic"}, + "ce-exta": {"someext"}, + }, + }, + } +) + +func TestNewMessage(t *testing.T) { + tests := []struct { + name string + receiverMessage jetstream.Msg + expectedEncoding binding.Encoding + expectedStructuredError error + expectedBinaryError error + }{ + { + name: "Structured encoding", + receiverMessage: structuredReceiverMessage, + expectedEncoding: binding.EncodingStructured, + expectedStructuredError: nil, + expectedBinaryError: binding.ErrNotBinary, + }, + { + name: "Binary encoding", + receiverMessage: binaryReceiverMessage, + expectedEncoding: binding.EncodingBinary, + expectedStructuredError: binding.ErrNotStructured, + expectedBinaryError: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewMessage(tt.receiverMessage) + if got == nil { + t.Errorf("Error in NewMessage!") + } + err := got.ReadBinary(context.TODO(), &outBinaryMessage) + if err != tt.expectedBinaryError { + t.Errorf("ReadBinary err:%s", err.Error()) + } + err = got.ReadStructured(context.TODO(), &outStructMessage) + if err != tt.expectedStructuredError { + t.Errorf("ReadStructured err:%s", err.Error()) + } + if got.ReadEncoding() != tt.expectedEncoding { + t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/options.go b/protocol/nats_jetstream/v3/options.go new file mode 100644 index 000000000..f535f34af --- /dev/null +++ b/protocol/nats_jetstream/v3/options.go @@ -0,0 +1,123 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "errors" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var ( + ErrNoConnection = errors.New("URL or nats connection must be given") + ErrNoFilterSubjects = errors.New("no filter subjects were given") + ErrMoreThanOneStream = errors.New("more than one stream for given filter subjects") + ErrNoConsumerConfig = errors.New("no consumer config was given") + ErrMoreThanOneConsumerConfig = errors.New("more than one consumer config given") +) + +// ProtocolOption is the function signature required to be considered an nats.ProtocolOption. +type ProtocolOption func(*Protocol) error + +// WithURL configures the Sender and/or Receiver +func WithURL(url string, natsOpts ...nats.Option) ProtocolOption { + return func(p *Protocol) error { + opts := []nats.Option{} + opts = append(opts, natsOpts...) + p.url = url + p.natsOpts = opts + p.connOwned = true + if p.sender != nil { + p.sender.url = url + p.sender.natsOpts = opts + p.sender.connOwned = true + } + if p.receiver != nil { + p.receiver.url = url + p.receiver.natsOpts = opts + p.receiver.connOwned = true + } + return nil + } +} + +// WithConnection configures the Sender and/or Receiver +func WithConnection(conn *nats.Conn) ProtocolOption { + return func(p *Protocol) error { + p.conn = conn + if p.sender != nil { + p.sender.conn = conn + } + if p.receiver != nil { + p.receiver.conn = conn + } + return nil + } +} + +// WithJetStreamOptions configures the Sender and/or Receiver +func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption { + return func(p *Protocol) error { + if p.sender != nil { + p.sender.jetSteamOpts = jetStreamOpts + } + if p.receiver != nil { + p.receiver.jetSteamOpts = jetStreamOpts + } + return nil + } +} + +// WithPublishOptions configures the Sender +func WithPublishOptions(publishOpts []jetstream.PublishOpt) ProtocolOption { + return func(p *Protocol) error { + if p.sender == nil { + return nil + } + p.sender.publishOpts = publishOpts + return nil + } +} + +// WithConsumerConfig configures the Receiver with the given config +func WithConsumerConfig(consumerConfig *jetstream.ConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + if p.receiver == nil { + return nil + } + if p.receiver.orderedConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + p.receiver.consumerConfig = consumerConfig + return nil + } +} + +// WithOrderedConsumerConfig configures the Receiver with the given config +func WithOrderedConsumerConfig(orderedConsumerConfig *jetstream.OrderedConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + if p.receiver == nil { + return nil + } + if p.receiver.consumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + p.receiver.orderedConsumerConfig = orderedConsumerConfig + return nil + } +} + +// WithPullConsumerOptions configures the Receiver with the given pullConsumeOpts +func WithPullConsumerOptions(pullConsumeOpts []jetstream.PullConsumeOpt) ProtocolOption { + return func(p *Protocol) error { + if p.receiver == nil { + return nil + } + p.receiver.pullConsumeOpts = pullConsumeOpts + return nil + } +} diff --git a/protocol/nats_jetstream/v3/options_test.go b/protocol/nats_jetstream/v3/options_test.go new file mode 100644 index 000000000..503f3b206 --- /dev/null +++ b/protocol/nats_jetstream/v3/options_test.go @@ -0,0 +1,176 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "reflect" + "testing" + + "github.com/nats-io/nats.go/jetstream" +) + +func TestWithConsumerConfig(t *testing.T) { + filterSubjects := []string{"normal"} + type args struct { + receiver *receiver + config *jetstream.ConsumerConfig + } + type wants struct { + err error + receiver *receiver + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid case", + args: args{ + receiver: &receiver{}, + config: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + receiver: &receiver{ + consumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + { + name: "too many consumer options", + args: args{ + receiver: &receiver{orderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}}, + config: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: ErrMoreThanOneConsumerConfig, + receiver: &receiver{orderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.receiver.applyOptions(WithConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.receiver, tt.wants.receiver) { + t.Errorf("p = %v, want %v", tt.args.receiver, tt.wants.receiver) + } + }) + } +} + +func TestOrderedConsumerConfig(t *testing.T) { + filterSubjects := []string{"ordered"} + type args struct { + receiver *receiver + config *jetstream.OrderedConsumerConfig + } + type wants struct { + err error + receiver *receiver + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid case", + args: args{ + receiver: &receiver{}, + config: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + receiver: &receiver{ + orderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + { + name: "too many consumer options", + args: args{ + receiver: &receiver{consumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}}, + config: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: ErrMoreThanOneConsumerConfig, + receiver: &receiver{consumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.receiver.applyOptions(WithOrderedConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithOrderedConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.receiver, tt.wants.receiver) { + t.Errorf("p = %v, want %v", tt.args.receiver, tt.wants.receiver) + } + }) + } +} + +func TestWithPullConsumeOptions(t *testing.T) { + maxMessages := jetstream.PullMaxMessages(1) + maxBytes := jetstream.PullMaxBytes(0) + type args struct { + receiver *receiver + config []jetstream.PullConsumeOpt + } + type wants struct { + err error + receiver *receiver + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "pull consumer option given", + args: args{ + receiver: &receiver{}, + config: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + wants: wants{ + err: nil, + receiver: &receiver{ + pullConsumeOpts: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + }, + }, + { + name: "no pull consumer option given", + args: args{ + receiver: &receiver{}, + config: nil, + }, + wants: wants{ + err: nil, + receiver: &receiver{pullConsumeOpts: nil}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.receiver.applyOptions(WithPullConsumerOptions(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithPullConsumerOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.receiver, tt.wants.receiver) { + t.Errorf("p = %v, want %v", tt.args.receiver, tt.wants.receiver) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/protocol.go b/protocol/nats_jetstream/v3/protocol.go new file mode 100644 index 000000000..78eb00560 --- /dev/null +++ b/protocol/nats_jetstream/v3/protocol.go @@ -0,0 +1,131 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + + "github.com/nats-io/nats.go" +) + +// Protocol is a reference implementation for using the CloudEvents binding +// integration. Protocol acts as both a NATS client and a NATS handler. +type Protocol struct { + + // nats connection + conn *nats.Conn + url string + natsOpts []nats.Option + connOwned bool + + receiver *receiver + + sender *sender +} + +// NewProtocol creates a new NATS protocol. +// Use WithURL() OR WithConnection() +// Use WithConsumerConfig() OR WithOrderedConsumerConfig() +func NewProtocol(ctx context.Context, sendSubject string, opts ...ProtocolOption) (*Protocol, error) { + p := &Protocol{} + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + if err := p.validateOptions(); err != nil { + return nil, err + } + + var errConnection error + defer func() { + // close connection if an error occurs and we created the nats connection + if p.connOwned && p.conn != nil && errConnection != nil { + p.conn.Close() + } + }() + + optsWithConnection := []ProtocolOption{} + optsWithConnection = append(optsWithConnection, opts...) + + // if a URL was given create the nats connection + if p.conn == nil { + p.conn, errConnection = nats.Connect(p.url, p.natsOpts...) + if errConnection != nil { + return nil, errConnection + } + // This will add a WithConnection to the already existing WithURL in optsWithConnection. + optsWithConnection = append(optsWithConnection, WithConnection(p.conn)) + } + + if p.receiver, errConnection = NewReceiver(ctx, optsWithConnection...); errConnection != nil { + return nil, errConnection + } + + if p.sender, errConnection = NewSender(ctx, sendSubject, optsWithConnection...); errConnection != nil { + return nil, errConnection + } + + return p, nil +} + +// Send implements Sender.Send +func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error { + return p.sender.Send(ctx, in, transformers...) +} + +// OpenInbound implements Opener.OpenInbound +func (p *Protocol) OpenInbound(ctx context.Context) error { + return p.receiver.OpenInbound(ctx) +} + +// Receive implements Receiver.Receive +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + return p.receiver.Receive(ctx) +} + +// Close implements Closer.Close +func (p *Protocol) Close(ctx context.Context) error { + if p.connOwned { + defer p.conn.Close() + } + + if err := p.receiver.Close(ctx); err != nil { + return err + } + + if err := p.sender.Close(ctx); err != nil { + return err + } + + return nil +} + +// applyOptions at the protocol layer should run before the sender and receiver are created. +// This allows the protocol to create a nats connection that can be shared for both the sender and receiver. +func (p *Protocol) applyOptions(opts ...ProtocolOption) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// validateOptions runs after all options have been applied and makes sure needed options were set correctly. +func (p *Protocol) validateOptions() error { + if p.url == "" && p.conn == nil { + return ErrNoConnection + } + return nil +} + +var _ protocol.Receiver = (*Protocol)(nil) +var _ protocol.Sender = (*Protocol)(nil) +var _ protocol.Opener = (*Protocol)(nil) +var _ protocol.Closer = (*Protocol)(nil) diff --git a/protocol/nats_jetstream/v3/receiver.go b/protocol/nats_jetstream/v3/receiver.go new file mode 100644 index 000000000..757979628 --- /dev/null +++ b/protocol/nats_jetstream/v3/receiver.go @@ -0,0 +1,229 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "io" + "sync" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type msgErr struct { + msg binding.Message +} + +type receiver struct { + incoming chan msgErr + subMtx sync.Mutex + internalClose chan struct{} + + // nats connection + conn *nats.Conn + url string + natsOpts []nats.Option + connOwned bool + + // jetstream options and configuration + jetSteamOpts []jetstream.JetStreamOpt + jetStream jetstream.JetStream + consumerConfig *jetstream.ConsumerConfig + orderedConsumerConfig *jetstream.OrderedConsumerConfig + pullConsumeOpts []jetstream.PullConsumeOpt + jetstreamConsumer jetstream.Consumer +} + +// NewReceiver consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig +// embedded in ConsumerOption. +// Use WithURL() OR WithConnection() +// Use WithConsumerConfig() OR WithOrderedConsumerConfig() +func NewReceiver(ctx context.Context, opts ...ProtocolOption) (*receiver, error) { + c := &receiver{ + incoming: make(chan msgErr), + internalClose: make(chan struct{}, 1), + } + + if err := c.applyOptions(opts...); err != nil { + return nil, err + } + + if err := c.validateOptions(); err != nil { + return nil, err + } + + var errConnection error + defer func() { + // close connection if an error occurs and we created the nats connection + if c.connOwned && c.conn != nil && errConnection != nil { + c.conn.Close() + } + }() + + // create a connection based on the URL if the connection was not given as an option + if c.conn == nil { + c.conn, errConnection = nats.Connect(c.url, c.natsOpts...) + if errConnection != nil { + return nil, errConnection + } + } + + if c.jetStream, errConnection = jetstream.New(c.conn, c.jetSteamOpts...); errConnection != nil { + return nil, errConnection + } + + return c, nil +} + +// MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered +// via r.Receive(ctx) +func (r *receiver) MsgHandler(msg jetstream.Msg) { + r.incoming <- msgErr{msg: NewMessage(msg)} +} + +// Receive implements Receiver.Receive. +func (r *receiver) Receive(ctx context.Context) (binding.Message, error) { + select { + case msgErr, ok := <-r.incoming: + if !ok { + return nil, io.EOF + } + return msgErr.msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +// OpenInbound implements Opener.OpenInbound +func (r *receiver) OpenInbound(ctx context.Context) error { + r.subMtx.Lock() + defer r.subMtx.Unlock() + + var consumeContext jetstream.ConsumeContext + var err error + if err = r.createJetstreamConsumer(ctx); err != nil { + return err + } + if consumeContext, err = r.jetstreamConsumer.Consume(r.MsgHandler, r.pullConsumeOpts...); err != nil { + return err + } + + // Wait until external or internal context done + select { + case <-ctx.Done(): + case <-r.internalClose: + } + + // Finish to consume messages in the queue and close the subscription + if consumeContext != nil { + consumeContext.Drain() + } + return nil +} + +// Close implements Closer.Close +func (r *receiver) Close(ctx context.Context) error { + // Before closing, let's be sure OpenInbound completes + // We send a signal to close and then we lock on subMtx in order + // to wait OpenInbound to finish draining the queue + r.internalClose <- struct{}{} + r.subMtx.Lock() + defer r.subMtx.Unlock() + + if r.connOwned && r.conn != nil { + r.conn.Close() + } + + close(r.internalClose) + + return nil +} + +// applyOptions sets fields on the receiver +func (r *receiver) applyOptions(opts ...ProtocolOption) error { + p := &Protocol{receiver: r} + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// validateOptions runs after all options have been applied and makes sure needed options were set correctly. +func (r *receiver) validateOptions() error { + // Fail if neither WithURL() or WithConnection() is given + // Allow both to be given. This will occur when WithURL() is used in NewProtocol + if r.url == "" && r.conn == nil { + return ErrNoConnection + } + if r.consumerConfig == nil && r.orderedConsumerConfig == nil { + return ErrNoConsumerConfig + } + if r.consumerConfig != nil && r.orderedConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + return nil +} + +// createJetstreamConsumer creates a consumer based on the configured consumer config +func (r *receiver) createJetstreamConsumer(ctx context.Context) error { + var err error + var stream string + if stream, err = r.getStreamFromSubjects(ctx); err != nil { + return err + } + var consumerErr error + if r.consumerConfig != nil { + r.jetstreamConsumer, consumerErr = r.jetStream.CreateOrUpdateConsumer(ctx, stream, *r.consumerConfig) + } else if r.orderedConsumerConfig != nil { + r.jetstreamConsumer, consumerErr = r.jetStream.OrderedConsumer(ctx, stream, *r.orderedConsumerConfig) + } else { + return ErrNoConsumerConfig + } + return consumerErr +} + +// getStreamFromSubjects finds the unique stream for the set of filter subjects +// If more than one stream is found, returns ErrMoreThanOneStream +func (r *receiver) getStreamFromSubjects(ctx context.Context) (string, error) { + var subjects []string + if r.consumerConfig != nil && r.consumerConfig.FilterSubject != "" { + subjects = []string{r.consumerConfig.FilterSubject} + } + if r.consumerConfig != nil && len(r.consumerConfig.FilterSubjects) > 0 { + subjects = r.consumerConfig.FilterSubjects + } + if r.orderedConsumerConfig != nil && len(r.orderedConsumerConfig.FilterSubjects) > 0 { + subjects = r.orderedConsumerConfig.FilterSubjects + } + if len(subjects) == 0 { + return "", ErrNoFilterSubjects + } + var finalStream string + for i, subject := range subjects { + currentStream, err := r.jetStream.StreamNameBySubject(ctx, subject) + if err != nil { + return "", err + } + if i == 0 { + finalStream = currentStream + continue + } + if finalStream != currentStream { + return "", ErrMoreThanOneStream + } + } + return finalStream, nil +} + +var _ protocol.Opener = (*receiver)(nil) +var _ protocol.Receiver = (*receiver)(nil) +var _ protocol.Closer = (*receiver)(nil) diff --git a/protocol/nats_jetstream/v3/sender.go b/protocol/nats_jetstream/v3/sender.go new file mode 100644 index 000000000..169616c65 --- /dev/null +++ b/protocol/nats_jetstream/v3/sender.go @@ -0,0 +1,139 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type sender struct { + // nats connection + conn *nats.Conn + url string + natsOpts []nats.Option + connOwned bool + + // jetstream options and configuration + jetSteamOpts []jetstream.JetStreamOpt + jetStream jetstream.JetStream + publishOpts []jetstream.PublishOpt + + subject string +} + +// NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection +// Use WithURL() OR WithConnection() +func NewSender(ctx context.Context, subject string, opts ...ProtocolOption) (*sender, error) { + s := &sender{ + subject: subject, + } + + if err := s.applyOptions(opts...); err != nil { + return nil, err + } + + if err := s.validateOptions(); err != nil { + return nil, err + } + + var errConnection error + defer func() { + // close connection if an error occurs and we created the nats connection + if s.connOwned && s.conn != nil && errConnection != nil { + s.conn.Close() + } + }() + + // create a connection based on the URL if the connection was not given as an option + if s.conn == nil { + s.conn, errConnection = nats.Connect(s.url, s.natsOpts...) + if errConnection != nil { + return nil, errConnection + } + } + + var err error + if s.jetStream, err = jetstream.New(s.conn, s.jetSteamOpts...); err != nil { + return nil, err + } + + return s, nil +} + +// Close implements Sender.Sender +// Sender sends messages. +func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { + defer func() { + if err2 := in.Finish(err); err2 != nil { + if err == nil { + err = err2 + } else { + err = fmt.Errorf("failed to call in.Finish() when error already occurred: %s: %w", err2.Error(), err) + } + } + }() + + if _, err = s.jetStream.StreamNameBySubject(ctx, s.subject); err != nil { + return err + } + + writer := new(bytes.Buffer) + header, err := WriteMsg(ctx, in, writer, transformers...) + if err != nil { + return err + } + + natsMsg := &nats.Msg{ + Subject: s.subject, + Data: writer.Bytes(), + Header: header, + } + + _, err = s.jetStream.PublishMsg(ctx, natsMsg, s.publishOpts...) + + return err +} + +// Close implements Closer.Close +// This method only closes the connection if the Sender opened it +func (s *sender) Close(_ context.Context) error { + if s.connOwned { + s.conn.Close() + } + + return nil +} + +func (s *sender) applyOptions(opts ...ProtocolOption) error { + p := &Protocol{sender: s} + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// validateOptions runs after all options have been applied and makes sure needed options were set correctly. +func (s *sender) validateOptions() error { + // Fail if neither WithURL() or WithConnection() is given + // Allow both to be given. This will occur when WithURL() is used in NewProtocol + if s.url == "" && s.conn == nil { + return ErrNoConnection + } + return nil +} + +var _ protocol.Sender = (*sender)(nil) +var _ protocol.Closer = (*Protocol)(nil) diff --git a/protocol/nats_jetstream/v3/write_message.go b/protocol/nats_jetstream/v3/write_message.go new file mode 100644 index 000000000..c5f587b3e --- /dev/null +++ b/protocol/nats_jetstream/v3/write_message.go @@ -0,0 +1,98 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/nats-io/nats.go" +) + +// WriteMsg fills the provided writer with the bindings.Message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +// The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object. +func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error) { + structuredWriter := &natsMessageWriter{writer} + binaryWriter := &natsBinaryMessageWriter{ReaderFrom: writer} + + _, err := binding.Write( + ctx, + m, + structuredWriter, + binaryWriter, + transformers..., + ) + natsHeader := binaryWriter.header + + return natsHeader, err +} + +type natsMessageWriter struct { + io.ReaderFrom +} + +// StructuredWriter implements StructuredWriter.SetStructuredEvent +func (w *natsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + if _, err := w.ReadFrom(event); err != nil { + return err + } + + return nil +} + +var _ binding.StructuredWriter = (*natsMessageWriter)(nil) // Test it conforms to the interface + +type natsBinaryMessageWriter struct { + io.ReaderFrom + header nats.Header +} + +// SetAttribute implements MessageMetadataWriter.SetAttribute +func (w *natsBinaryMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + prefixedName := withPrefix(attribute.Name()) + convertedValue := fmt.Sprint(value) + switch attribute.Kind().String() { + case spec.Time.String(): + timeValue := value.(time.Time) + convertedValue = timeValue.Format(time.RFC3339Nano) + } + w.header.Set(prefixedName, convertedValue) + return nil +} + +// SetExtension implements MessageMetadataWriter.SetExtension +func (w *natsBinaryMessageWriter) SetExtension(name string, value interface{}) error { + prefixedName := withPrefix(name) + convertedValue := fmt.Sprint(value) + w.header.Set(prefixedName, convertedValue) + return nil +} + +// Start implements BinaryWriter.Start +func (w *natsBinaryMessageWriter) Start(ctx context.Context) error { + w.header = nats.Header{} + return nil +} + +// SetData implements BinaryWriter.SetData +func (w *natsBinaryMessageWriter) SetData(data io.Reader) error { + if _, err := w.ReadFrom(data); err != nil { + return err + } + + return nil +} + +// End implements BinaryWriter.End +func (w *natsBinaryMessageWriter) End(ctx context.Context) error { + return nil +} diff --git a/samples/nats_jetstream/v3/go.mod b/samples/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..9c8e89cec --- /dev/null +++ b/samples/nats_jetstream/v3/go.mod @@ -0,0 +1,31 @@ +module github.com/cloudevents/sdk-go/samples/nats_jetstream/v3 + +go 1.18 + +require ( + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0 + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/google/uuid v1.1.1 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect +) + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ./../../../protocol/nats_jetstream/v3 diff --git a/samples/nats_jetstream/v3/go.sum b/samples/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..986929c03 --- /dev/null +++ b/samples/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/samples/nats_jetstream/v3/receiver/main.go b/samples/nats_jetstream/v3/receiver/main.go new file mode 100644 index 000000000..95b336c64 --- /dev/null +++ b/samples/nats_jetstream/v3/receiver/main.go @@ -0,0 +1,62 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/nats-io/nats.go/jetstream" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := context.Background() + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + + consumerOpt := cejsm.WithConsumerConfig(&jetstream.ConsumerConfig{FilterSubjects: []string{natsSubject}}) + urlOpt := cejsm.WithURL(natsURL) + receiver, err := cejsm.NewReceiver(ctx, consumerOpt, urlOpt) + if err != nil { + log.Fatalf("failed to create nats protocol, %s", err.Error()) + } + + defer receiver.Close(ctx) + + c, err := cloudevents.NewClient(receiver) + if err != nil { + log.Fatalf("failed to create client, %s", err.Error()) + } + + for { + if err := c.StartReceiver(ctx, receive); err != nil { + log.Printf("failed to start nats receiver, %s", err.Error()) + } + } +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event) error { + fmt.Printf("Got Event Context: %+v\n", event.Context) + + data := &Example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Got Data: %+v\n", data) + + fmt.Printf("----------------------------\n") + return nil +} diff --git a/samples/nats_jetstream/v3/sender/main.go b/samples/nats_jetstream/v3/sender/main.go new file mode 100644 index 000000000..9b77bc10b --- /dev/null +++ b/samples/nats_jetstream/v3/sender/main.go @@ -0,0 +1,81 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/google/uuid" + "github.com/kelseyhightower/envconfig" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + count = 10 +) + +type envConfig struct { + // NATSServer URL to connect to the nats server. + NATSServer string `envconfig:"NATS_SERVER" default:"http://localhost:4222" required:"true"` + + // Subject is the nats subject to publish cloudevents on. + Subject string `envconfig:"SUBJECT" default:"sample" required:"true"` +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatalf("Failed to process env var: %s", err) + } + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + + ctx := context.Background() + urlOpt := cejsm.WithURL(natsURL) + sender, err := cejsm.NewSender(ctx, natsSubject, urlOpt) + if err != nil { + log.Fatalf("Failed to create nats protocol, %s", err.Error()) + } + + defer sender.Close(context.Background()) + + c, err := cloudevents.NewClient(sender) + if err != nil { + log.Fatalf("Failed to create client, %s", err.Error()) + } + + for _, contentType := range []string{"application/json", "application/xml"} { + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetID(uuid.New().String()) + e.SetType("com.cloudevents.sample.sent") + e.SetTime(time.Now()) + e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/sender") + _ = e.SetData(contentType, &Example{ + Sequence: i, + Message: fmt.Sprintf("Hello, %s!", contentType), + }) + + if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) { + log.Printf("failed to send: %v", err) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/test/integration/go.mod b/test/integration/go.mod index 58b84daa7..27ae15519 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -14,6 +14,8 @@ replace github.com/cloudevents/sdk-go/protocol/nats/v2 => ../../protocol/nats/v2 replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 => ../../protocol/nats_jetstream/v2 +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ../../protocol/nats_jetstream/v3 + replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol/kafka_sarama/v2 replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2 @@ -29,17 +31,18 @@ require ( github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/eclipse/paho.golang v0.21.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.3.0 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.37.0 github.com/nats-io/stan.go v0.10.4 github.com/stretchr/testify v1.8.4 go.uber.org/atomic v1.4.0 - golang.org/x/sync v0.4.0 + golang.org/x/sync v0.8.0 ) require ( @@ -62,14 +65,14 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -81,6 +84,7 @@ require ( golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 3ffcbaab0..0bb5f6f03 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -103,8 +103,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -141,11 +141,11 @@ github.com/nats-io/nats-server/v2 v2.9.23/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq github.com/nats-io/nats-streaming-server v0.24.6 h1:iIZXuPSznnYkiy0P3L0AP9zEN9Etp+tITbbX1KKeq4Q= github.com/nats-io/nats-streaming-server v0.24.6/go.mod h1:tdKXltY3XLeBJ21sHiZiaPl+j8sK3vcCKBWVyxeQs10= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= @@ -234,8 +234,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -260,6 +260,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/test/integration/nats_jetstream/v3/nats_test.go b/test/integration/nats_jetstream/v3/nats_test.go new file mode 100644 index 000000000..07741719c --- /dev/null +++ b/test/integration/nats_jetstream/v3/nats_test.go @@ -0,0 +1,163 @@ +/* + Copyright 2024 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "os" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + ce_nats "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" + bindings "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/test" + . "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + . "github.com/cloudevents/sdk-go/v2/binding/test" +) + +func TestSendReceiveStructuredAndBinary(t *testing.T) { + conn := testConn(t) + defer conn.Close() + + type args struct { + opts []ce_nats.ProtocolOption + bindingEncoding binding.Encoding + consumerConfig any + } + tests := []struct { + name string + args args + }{ + { + name: "regular consumer - structured", + args: args{ + consumerConfig: &jetstream.ConsumerConfig{}, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "ordered consumer - structured", + args: args{ + consumerConfig: &jetstream.OrderedConsumerConfig{}, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "regular consumer - binary", + args: args{ + consumerConfig: &jetstream.ConsumerConfig{}, + bindingEncoding: binding.EncodingBinary, + }, + }, { + name: "ordered consumer - binary", + args: args{ + consumerConfig: &jetstream.OrderedConsumerConfig{}, + bindingEncoding: binding.EncodingBinary, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + cleanup, s, r := testProtocol(ctx, t, conn, tt.args.consumerConfig, tt.args.opts...) + defer cleanup() + EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) { + eventIn = ConvertEventExtensionsToString(t, eventIn) + + var in binding.Message + switch tt.args.bindingEncoding { + case binding.EncodingStructured: + in = MustCreateMockStructuredMessage(t, eventIn) + case binding.EncodingBinary: + in = MustCreateMockBinaryMessage(eventIn) + } + + test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), tt.args.bindingEncoding), in, s, r, func(out binding.Message) { + eventOut := MustToEvent(t, context.Background(), out) + assert.Equal(t, tt.args.bindingEncoding, out.ReadEncoding()) + AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut)) + }) + }) + }) + } +} + +func testConn(t testing.TB) *nats.Conn { + t.Helper() + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4223" + } + + conn, err := nats.Connect(s) + if err != nil { + t.Skipf("Cannot create STAN client to NATS server [%s]: %v", s, err) + } + + return conn +} + +func testProtocol(ctx context.Context, t testing.TB, natsConn *nats.Conn, consumerConfig any, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, + bindings.Receiver) { + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4223" + } + + stream := "test-ce-client-" + uuid.New().String() + subject := stream + ".test" + + var js jetstream.JetStream + var err error + js, err = jetstream.New(natsConn) + require.NoError(t, err) + + streamConfig := jetstream.StreamConfig{Name: stream, Subjects: []string{subject}} + _, err = js.CreateOrUpdateStream(ctx, streamConfig) + require.NoError(t, err) + + if normalConsumerConfig, ok := consumerConfig.(*jetstream.ConsumerConfig); ok { + normalConsumerConfig.FilterSubjects = []string{subject} + opts = append(opts, ce_nats.WithConsumerConfig(normalConsumerConfig)) + } + if orderedConsumerConfig, ok := consumerConfig.(*jetstream.OrderedConsumerConfig); ok { + orderedConsumerConfig.FilterSubjects = []string{subject} + opts = append(opts, ce_nats.WithOrderedConsumerConfig(orderedConsumerConfig)) + } + + opts = append(opts, ce_nats.WithURL(s)) + // use NewProtocol rather than individual Consumer and Sender since this gives us more coverage + p, err := ce_nats.NewProtocol(ctx, subject, opts...) + require.NoError(t, err) + + go func() { + require.NoError(t, p.OpenInbound(context.TODO())) + }() + + return func() { + err = p.Close(context.TODO()) + require.NoError(t, err) + }, p, p +} + +func BenchmarkSendReceive(b *testing.B) { + ctx := context.Background() + conn := testConn(b) + defer conn.Close() + c, s, r := testProtocol(ctx, b, conn, &jetstream.ConsumerConfig{}) + defer c() // Cleanup + test.BenchmarkSendReceive(b, s, r) +}