Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable a binding.Message to be consumed more times #282

Closed
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/nats-io/nats.go v1.9.1
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/valyala/bytebufferpool v1.0.0
go.opencensus.io v0.22.0
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down
23 changes: 23 additions & 0 deletions pkg/binding/acks_before_finish_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package binding

import "sync/atomic"

type acksMessage struct {
Message
requiredAcks int32
}

func (m *acksMessage) Finish(err error) error {
remainingAcks := atomic.AddInt32(&m.requiredAcks, -1)
if remainingAcks == 0 {
return m.Message.Finish(err)
}
return nil
}

// WithAcksBeforeFinish returns a wrapper for m that calls m.Finish()
// only after the specified number of acks are received.
// Use it when you need to route a Message to more Sender instances
func WithAcksBeforeFinish(m Message, requiredAcks int) Message {
return &acksMessage{Message: m, requiredAcks: int32(requiredAcks)}
}
46 changes: 46 additions & 0 deletions pkg/binding/acks_before_finish_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package binding

import (
"context"
"net/url"
"sync"
"testing"

"github.com/stretchr/testify/assert"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
)

func TestWithAcksBeforeFinish(t *testing.T) {
var testEvent = cloudevents.Event{
Data: []byte(`"data"`),
DataEncoded: true,
Context: cloudevents.EventContextV1{
DataContentType: cloudevents.StringOfApplicationJSON(),
Source: types.URIRef{URL: url.URL{Path: "source"}},
ID: "id",
Type: "type"}.AsV1(),
}

finishCalled := false
finishMessage := WithFinish(EventMessage(testEvent), func(err error) {
finishCalled = true
})

wg := sync.WaitGroup{}

messageToTest := WithAcksBeforeFinish(finishMessage, 1000)
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(m Message) {
ch := make(chan Message, 1)
assert.NoError(t, ChanSender(ch).Send(context.Background(), m))
<-ch
wg.Done()
}(messageToTest)
}

wg.Wait()
assert.True(t, finishCalled)
}
27 changes: 19 additions & 8 deletions pkg/binding/example_implementing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"io"
"io/ioutil"

"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/binding/format"
Expand All @@ -30,7 +29,7 @@ import (
type ExMessage json.RawMessage

func (m ExMessage) Structured(b binding.StructuredEncoder) error {
return b.SetStructuredEvent(format.JSON, bytes.NewReader([]byte(m)))
return b.SetStructuredEvent(format.JSON, &m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to use *m is not necessary. json.RawMessage is just a type cast for []byte, and in Go slices are semantically pointers (actually pointer pairs) - passing or assigning a slice does not copy the underlying data. You need to use the copy() or append() built-in functions to copy the data. So passing/assigning ExMessage by value doesn't copy any data.

}

func (m ExMessage) Binary(binding.BinaryEncoder) error {
Expand All @@ -46,9 +45,22 @@ func (m ExMessage) Event(b binding.EventEncoder) error {
return b.SetEvent(e)
}

func (m *ExMessage) IsEmpty() bool {
return m == nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you get rid of the pointer (m ExMessage) then this is (len(m) == 0) if you keep the pointer then it should be (m == nil || len(*m) == 0)

}

func (m *ExMessage) Bytes() []byte {
return *m
}

func (m *ExMessage) Reader() io.Reader {
return bytes.NewReader(m.Bytes())
}

func (m ExMessage) Finish(error) error { return nil }

var _ binding.Message = (*ExMessage)(nil)
var _ binding.MessagePayloadReader = (*ExMessage)(nil)

// ExSender sends by writing JSON encoded events to an io.Writer
// ExSender supports transcoding
Expand All @@ -63,6 +75,9 @@ func NewExSender(w io.Writer, factories ...binding.TransformerFactory) binding.S
}

func (s *ExSender) Send(ctx context.Context, m binding.Message) error {
// Invoke m.Finish to notify the receiver that message was processed
defer func() { _ = m.Finish(nil) }()

// Translate tries the various encodings, starting with provided root encoder factories.
// If a sender doesn't support a specific encoding, a null root encoder factory could be provided.
_, _, err := binding.Translate(
Expand All @@ -79,13 +94,9 @@ func (s *ExSender) Send(ctx context.Context, m binding.Message) error {
return err
}

func (s *ExSender) SetStructuredEvent(f format.Format, event io.Reader) error {
func (s *ExSender) SetStructuredEvent(f format.Format, event binding.MessagePayloadReader) error {
if f == format.JSON {
b, err := ioutil.ReadAll(event)
if err != nil {
return err
}
return s.encoder.Encode(json.RawMessage(b))
return s.encoder.Encode(event.Bytes())
} else {
return binding.ErrNotStructured
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/binding/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
// to a Sender, they may not be implemented by all Message instances. A Sender should
// try each method of interest and fall back to Event(EventEncoder) if none are supported.
//
// The message should be "consumable" several times, meaning that its methods to visit it can be called several times.
//
type Message interface {
// Structured transfers a structured-mode event to a StructuredEncoder.
// Returns ErrNotStructured if message is not in structured mode.
Expand Down Expand Up @@ -84,13 +86,26 @@ var ErrNotStructured = errors.New("message is not in structured mode")
// ErrNotBinary returned by Message.Binary for non-binary messages.
var ErrNotBinary = errors.New("message is not in binary mode")

// MessagePayload allows to read a message payload or as a byte array or as a reader
// Message implementers must re
type MessagePayloadReader interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an undue burden on the Message implementer. If they provide an io.Reader then io.Copy to a bytes.Buffer is maximally efficient, and the streaming case is supported. io.Reader is a standard Go idiom (net.http and many other standard libs) so I don't think we should hide it in a non-standard interface.

// Returns true if the message payload is empty
IsEmpty() bool

// Returns the payload as a byte array, nil if IsEmpty() == true
Bytes() []byte

// Returns the payload as an io.Reader, nil if IsEmpty() == true
Reader() io.Reader
}

// StructuredEncoder should generate a new representation of the event starting from a structured message.
//
// Protocols that supports structured encoding should implement this interface to implement direct
// structured -> structured transfer.
type StructuredEncoder interface {
// Event receives an io.Reader for the whole event.
SetStructuredEvent(format format.Format, event io.Reader) error
SetStructuredEvent(format format.Format, event MessagePayloadReader) error
}

// BinaryEncoder should generate a new representation of the event starting from a binary message.
Expand All @@ -100,7 +115,7 @@ type StructuredEncoder interface {
type BinaryEncoder interface {
// SetData receives an io.Reader for the data attribute.
// io.Reader could be empty, meaning that message payload is empty
SetData(data io.Reader) error
SetData(data MessagePayloadReader) error

// Set a standard attribute.
//
Expand Down
15 changes: 14 additions & 1 deletion pkg/binding/mock_binary_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package binding

import (
"bytes"
"io"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/binding/spec"
Expand Down Expand Up @@ -74,7 +75,19 @@ func (bm *MockBinaryMessage) Binary(b BinaryEncoder) error {
if len(bm.Body) == 0 {
return nil
}
return b.SetData(bytes.NewReader(bm.Body))
return b.SetData(bm)
}

func (bm *MockBinaryMessage) IsEmpty() bool {
return bm.Body == nil
}

func (bm *MockBinaryMessage) Bytes() []byte {
return bm.Body
}

func (bm *MockBinaryMessage) Reader() io.Reader {
return bytes.NewReader(bm.Body)
}

func (bm *MockBinaryMessage) Finish(error) error { return nil }
Expand Down
25 changes: 19 additions & 6 deletions pkg/binding/mock_structured_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package binding

import (
"bytes"
"io"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/binding/format"
)

// MockStructuredMessage implements a structured-mode message as a simple struct.
type MockStructuredMessage struct {
Format format.Format
Bytes []byte
format format.Format
body []byte
}

func NewMockStructuredMessage(e cloudevents.Event) *MockStructuredMessage {
Expand All @@ -19,28 +20,40 @@ func NewMockStructuredMessage(e cloudevents.Event) *MockStructuredMessage {
panic(err)
}
return &MockStructuredMessage{
Bytes: testEventSerialized,
Format: format.JSON,
body: testEventSerialized,
format: format.JSON,
}
}

func (s *MockStructuredMessage) Event(b EventEncoder) error {
e := cloudevents.Event{}
err := s.Format.Unmarshal(s.Bytes, &e)
err := s.format.Unmarshal(s.body, &e)
if err != nil {
return err
}
return b.SetEvent(e)
}

func (s *MockStructuredMessage) Structured(b StructuredEncoder) error {
return b.SetStructuredEvent(s.Format, bytes.NewReader(s.Bytes))
return b.SetStructuredEvent(s.format, s)
}

func (s *MockStructuredMessage) Binary(BinaryEncoder) error {
return ErrNotBinary
}

func (s *MockStructuredMessage) IsEmpty() bool {
return s.body == nil
}

func (s *MockStructuredMessage) Bytes() []byte {
return s.body
}

func (s *MockStructuredMessage) Reader() io.Reader {
return bytes.NewReader(s.body)
}

func (s *MockStructuredMessage) Finish(error) error { return nil }

var _ Message = (*MockStructuredMessage)(nil) // Test it conforms to the interface
3 changes: 3 additions & 0 deletions pkg/binding/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func SendReceive(t *testing.T, in binding.Message, s binding.Sender, r binding.R
out, recvErr := r.Receive(ctx)
require.NoError(t, recvErr)
outAssert(out)
// Check if out message can be consumed more than one time
outAssert(out)
outAssert(out)
finishErr := out.Finish(nil)
require.NoError(t, finishErr)
}()
Expand Down
20 changes: 4 additions & 16 deletions pkg/binding/to_event.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package binding

import (
"io"
"io/ioutil"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/binding/format"
"github.com/cloudevents/sdk-go/pkg/binding/spec"
Expand Down Expand Up @@ -55,21 +52,12 @@ func (b *messageToEventBuilder) SetEvent(e ce.Event) error {
return nil
}

func (b *messageToEventBuilder) SetStructuredEvent(format format.Format, event io.Reader) error {
//TODO(slinkydeveloper) can we do pooling for this allocation?
val, err := ioutil.ReadAll(event)
if err != nil {
return err
}
return format.Unmarshal(val, b.event)
func (b *messageToEventBuilder) SetStructuredEvent(format format.Format, event MessagePayloadReader) error {
return format.Unmarshal(event.Bytes(), b.event)
}

func (b *messageToEventBuilder) SetData(data io.Reader) error {
//TODO(slinkydeveloper) can we do pooling for this allocation?
val, err := ioutil.ReadAll(data)
if err != nil {
return err
}
func (b *messageToEventBuilder) SetData(data MessagePayloadReader) error {
val := data.Bytes()
if len(val) != 0 {
return b.event.SetData(val)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/binding/transcoder/version.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package transcoder

import (
"io"

"github.com/cloudevents/sdk-go/pkg/binding"
"github.com/cloudevents/sdk-go/pkg/binding/spec"
ce "github.com/cloudevents/sdk-go/pkg/cloudevents"
Expand Down Expand Up @@ -34,7 +32,7 @@ type binaryVersionTransformer struct {
version spec.Version
}

func (b binaryVersionTransformer) SetData(data io.Reader) error {
func (b binaryVersionTransformer) SetData(data binding.MessagePayloadReader) error {
return b.delegate.SetData(data)
}

Expand Down
11 changes: 4 additions & 7 deletions pkg/bindings/amqp/binary_message_encoder.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package amqp

import (
"io"
"io/ioutil"
"net/url"

"pack.ag/amqp"
Expand All @@ -23,12 +21,11 @@ func newBinaryMessageEncoder(amqpMessage *amqp.Message) *binaryMessageEncoder {
return &binaryMessageEncoder{amqpMessage: amqpMessage}
}

func (b *binaryMessageEncoder) SetData(reader io.Reader) error {
data, err := ioutil.ReadAll(reader)
if err != nil {
return err
func (b *binaryMessageEncoder) SetData(reader binding.MessagePayloadReader) error {
if reader.IsEmpty() {
return nil
}
b.amqpMessage.Data = [][]byte{data}
b.amqpMessage.Data = [][]byte{reader.Bytes()}
return nil
}

Expand Down
Loading