diff --git a/go.mod b/go.mod index 69da21885..71896ac8d 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/nats-io/nats.go v1.9.1 github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 + github.com/valyala/bytebufferpool v1.0.0 go.opencensus.io v0.22.0 go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect diff --git a/go.sum b/go.sum index 5715cccc7..787158599 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= diff --git a/pkg/binding/acks_before_finish_message.go b/pkg/binding/acks_before_finish_message.go new file mode 100644 index 000000000..43db14e43 --- /dev/null +++ b/pkg/binding/acks_before_finish_message.go @@ -0,0 +1,23 @@ +package binding + +import "sync/atomic" + +type acksMessage struct { + Message + requiredAcks int32 +} + +func (m *acksMessage) Finish(err error) error { + remainingAcks := atomic.AddInt32(&m.requiredAcks, -1) + if remainingAcks == 0 { + return m.Message.Finish(err) + } + return nil +} + +// WithAcksBeforeFinish returns a wrapper for m that calls m.Finish() +// only after the specified number of acks are received. +// Use it when you need to route a Message to more Sender instances +func WithAcksBeforeFinish(m Message, requiredAcks int) Message { + return &acksMessage{Message: m, requiredAcks: int32(requiredAcks)} +} diff --git a/pkg/binding/acks_before_finish_message_test.go b/pkg/binding/acks_before_finish_message_test.go new file mode 100644 index 000000000..8e9dfb046 --- /dev/null +++ b/pkg/binding/acks_before_finish_message_test.go @@ -0,0 +1,46 @@ +package binding + +import ( + "context" + "net/url" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" +) + +func TestWithAcksBeforeFinish(t *testing.T) { + var testEvent = cloudevents.Event{ + Data: []byte(`"data"`), + DataEncoded: true, + Context: cloudevents.EventContextV1{ + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: types.URIRef{URL: url.URL{Path: "source"}}, + ID: "id", + Type: "type"}.AsV1(), + } + + finishCalled := false + finishMessage := WithFinish(EventMessage(testEvent), func(err error) { + finishCalled = true + }) + + wg := sync.WaitGroup{} + + messageToTest := WithAcksBeforeFinish(finishMessage, 1000) + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(m Message) { + ch := make(chan Message, 1) + assert.NoError(t, ChanSender(ch).Send(context.Background(), m)) + <-ch + wg.Done() + }(messageToTest) + } + + wg.Wait() + assert.True(t, finishCalled) +} diff --git a/pkg/binding/example_implementing_test.go b/pkg/binding/example_implementing_test.go index 5de527683..db1eb5d90 100644 --- a/pkg/binding/example_implementing_test.go +++ b/pkg/binding/example_implementing_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "io" - "io/ioutil" "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/binding/format" @@ -30,7 +29,7 @@ import ( type ExMessage json.RawMessage func (m ExMessage) Structured(b binding.StructuredEncoder) error { - return b.SetStructuredEvent(format.JSON, bytes.NewReader([]byte(m))) + return b.SetStructuredEvent(format.JSON, &m) } func (m ExMessage) Binary(binding.BinaryEncoder) error { @@ -46,9 +45,22 @@ func (m ExMessage) Event(b binding.EventEncoder) error { return b.SetEvent(e) } +func (m *ExMessage) IsEmpty() bool { + return m == nil +} + +func (m *ExMessage) Bytes() []byte { + return *m +} + +func (m *ExMessage) Reader() io.Reader { + return bytes.NewReader(m.Bytes()) +} + func (m ExMessage) Finish(error) error { return nil } var _ binding.Message = (*ExMessage)(nil) +var _ binding.MessagePayloadReader = (*ExMessage)(nil) // ExSender sends by writing JSON encoded events to an io.Writer // ExSender supports transcoding @@ -63,6 +75,9 @@ func NewExSender(w io.Writer, factories ...binding.TransformerFactory) binding.S } func (s *ExSender) Send(ctx context.Context, m binding.Message) error { + // Invoke m.Finish to notify the receiver that message was processed + defer func() { _ = m.Finish(nil) }() + // Translate tries the various encodings, starting with provided root encoder factories. // If a sender doesn't support a specific encoding, a null root encoder factory could be provided. _, _, err := binding.Translate( @@ -79,13 +94,9 @@ func (s *ExSender) Send(ctx context.Context, m binding.Message) error { return err } -func (s *ExSender) SetStructuredEvent(f format.Format, event io.Reader) error { +func (s *ExSender) SetStructuredEvent(f format.Format, event binding.MessagePayloadReader) error { if f == format.JSON { - b, err := ioutil.ReadAll(event) - if err != nil { - return err - } - return s.encoder.Encode(json.RawMessage(b)) + return s.encoder.Encode(event.Bytes()) } else { return binding.ErrNotStructured } diff --git a/pkg/binding/interfaces.go b/pkg/binding/interfaces.go index cd1688ced..84794cdb0 100644 --- a/pkg/binding/interfaces.go +++ b/pkg/binding/interfaces.go @@ -33,6 +33,8 @@ import ( // to a Sender, they may not be implemented by all Message instances. A Sender should // try each method of interest and fall back to Event(EventEncoder) if none are supported. // +// The message should be "consumable" several times, meaning that its methods to visit it can be called several times. +// type Message interface { // Structured transfers a structured-mode event to a StructuredEncoder. // Returns ErrNotStructured if message is not in structured mode. @@ -84,13 +86,26 @@ var ErrNotStructured = errors.New("message is not in structured mode") // ErrNotBinary returned by Message.Binary for non-binary messages. var ErrNotBinary = errors.New("message is not in binary mode") +// MessagePayload allows to read a message payload or as a byte array or as a reader +// Message implementers must re +type MessagePayloadReader interface { + // Returns true if the message payload is empty + IsEmpty() bool + + // Returns the payload as a byte array, nil if IsEmpty() == true + Bytes() []byte + + // Returns the payload as an io.Reader, nil if IsEmpty() == true + Reader() io.Reader +} + // StructuredEncoder should generate a new representation of the event starting from a structured message. // // Protocols that supports structured encoding should implement this interface to implement direct // structured -> structured transfer. type StructuredEncoder interface { // Event receives an io.Reader for the whole event. - SetStructuredEvent(format format.Format, event io.Reader) error + SetStructuredEvent(format format.Format, event MessagePayloadReader) error } // BinaryEncoder should generate a new representation of the event starting from a binary message. @@ -100,7 +115,7 @@ type StructuredEncoder interface { type BinaryEncoder interface { // SetData receives an io.Reader for the data attribute. // io.Reader could be empty, meaning that message payload is empty - SetData(data io.Reader) error + SetData(data MessagePayloadReader) error // Set a standard attribute. // diff --git a/pkg/binding/mock_binary_message.go b/pkg/binding/mock_binary_message.go index 13bf4a1d7..c97759311 100644 --- a/pkg/binding/mock_binary_message.go +++ b/pkg/binding/mock_binary_message.go @@ -2,6 +2,7 @@ package binding import ( "bytes" + "io" cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/binding/spec" @@ -74,7 +75,19 @@ func (bm *MockBinaryMessage) Binary(b BinaryEncoder) error { if len(bm.Body) == 0 { return nil } - return b.SetData(bytes.NewReader(bm.Body)) + return b.SetData(bm) +} + +func (bm *MockBinaryMessage) IsEmpty() bool { + return bm.Body == nil +} + +func (bm *MockBinaryMessage) Bytes() []byte { + return bm.Body +} + +func (bm *MockBinaryMessage) Reader() io.Reader { + return bytes.NewReader(bm.Body) } func (bm *MockBinaryMessage) Finish(error) error { return nil } diff --git a/pkg/binding/mock_structured_message.go b/pkg/binding/mock_structured_message.go index 02d44549e..2f2ecab18 100644 --- a/pkg/binding/mock_structured_message.go +++ b/pkg/binding/mock_structured_message.go @@ -2,6 +2,7 @@ package binding import ( "bytes" + "io" cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/binding/format" @@ -9,8 +10,8 @@ import ( // MockStructuredMessage implements a structured-mode message as a simple struct. type MockStructuredMessage struct { - Format format.Format - Bytes []byte + format format.Format + body []byte } func NewMockStructuredMessage(e cloudevents.Event) *MockStructuredMessage { @@ -19,14 +20,14 @@ func NewMockStructuredMessage(e cloudevents.Event) *MockStructuredMessage { panic(err) } return &MockStructuredMessage{ - Bytes: testEventSerialized, - Format: format.JSON, + body: testEventSerialized, + format: format.JSON, } } func (s *MockStructuredMessage) Event(b EventEncoder) error { e := cloudevents.Event{} - err := s.Format.Unmarshal(s.Bytes, &e) + err := s.format.Unmarshal(s.body, &e) if err != nil { return err } @@ -34,13 +35,25 @@ func (s *MockStructuredMessage) Event(b EventEncoder) error { } func (s *MockStructuredMessage) Structured(b StructuredEncoder) error { - return b.SetStructuredEvent(s.Format, bytes.NewReader(s.Bytes)) + return b.SetStructuredEvent(s.format, s) } func (s *MockStructuredMessage) Binary(BinaryEncoder) error { return ErrNotBinary } +func (s *MockStructuredMessage) IsEmpty() bool { + return s.body == nil +} + +func (s *MockStructuredMessage) Bytes() []byte { + return s.body +} + +func (s *MockStructuredMessage) Reader() io.Reader { + return bytes.NewReader(s.body) +} + func (s *MockStructuredMessage) Finish(error) error { return nil } var _ Message = (*MockStructuredMessage)(nil) // Test it conforms to the interface diff --git a/pkg/binding/test/test.go b/pkg/binding/test/test.go index da6a88d0f..e13671b2c 100644 --- a/pkg/binding/test/test.go +++ b/pkg/binding/test/test.go @@ -73,6 +73,9 @@ func SendReceive(t *testing.T, in binding.Message, s binding.Sender, r binding.R out, recvErr := r.Receive(ctx) require.NoError(t, recvErr) outAssert(out) + // Check if out message can be consumed more than one time + outAssert(out) + outAssert(out) finishErr := out.Finish(nil) require.NoError(t, finishErr) }() diff --git a/pkg/binding/to_event.go b/pkg/binding/to_event.go index c1646bdeb..e6d6bb326 100644 --- a/pkg/binding/to_event.go +++ b/pkg/binding/to_event.go @@ -1,9 +1,6 @@ package binding import ( - "io" - "io/ioutil" - cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/binding/format" "github.com/cloudevents/sdk-go/pkg/binding/spec" @@ -55,21 +52,12 @@ func (b *messageToEventBuilder) SetEvent(e ce.Event) error { return nil } -func (b *messageToEventBuilder) SetStructuredEvent(format format.Format, event io.Reader) error { - //TODO(slinkydeveloper) can we do pooling for this allocation? - val, err := ioutil.ReadAll(event) - if err != nil { - return err - } - return format.Unmarshal(val, b.event) +func (b *messageToEventBuilder) SetStructuredEvent(format format.Format, event MessagePayloadReader) error { + return format.Unmarshal(event.Bytes(), b.event) } -func (b *messageToEventBuilder) SetData(data io.Reader) error { - //TODO(slinkydeveloper) can we do pooling for this allocation? - val, err := ioutil.ReadAll(data) - if err != nil { - return err - } +func (b *messageToEventBuilder) SetData(data MessagePayloadReader) error { + val := data.Bytes() if len(val) != 0 { return b.event.SetData(val) } diff --git a/pkg/binding/transcoder/version.go b/pkg/binding/transcoder/version.go index 0dd0ef75d..c7196f706 100644 --- a/pkg/binding/transcoder/version.go +++ b/pkg/binding/transcoder/version.go @@ -1,8 +1,6 @@ package transcoder import ( - "io" - "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/binding/spec" ce "github.com/cloudevents/sdk-go/pkg/cloudevents" @@ -34,7 +32,7 @@ type binaryVersionTransformer struct { version spec.Version } -func (b binaryVersionTransformer) SetData(data io.Reader) error { +func (b binaryVersionTransformer) SetData(data binding.MessagePayloadReader) error { return b.delegate.SetData(data) } diff --git a/pkg/bindings/amqp/binary_message_encoder.go b/pkg/bindings/amqp/binary_message_encoder.go index 3ffad63ee..838be5dda 100644 --- a/pkg/bindings/amqp/binary_message_encoder.go +++ b/pkg/bindings/amqp/binary_message_encoder.go @@ -1,8 +1,6 @@ package amqp import ( - "io" - "io/ioutil" "net/url" "pack.ag/amqp" @@ -23,12 +21,11 @@ func newBinaryMessageEncoder(amqpMessage *amqp.Message) *binaryMessageEncoder { return &binaryMessageEncoder{amqpMessage: amqpMessage} } -func (b *binaryMessageEncoder) SetData(reader io.Reader) error { - data, err := ioutil.ReadAll(reader) - if err != nil { - return err +func (b *binaryMessageEncoder) SetData(reader binding.MessagePayloadReader) error { + if reader.IsEmpty() { + return nil } - b.amqpMessage.Data = [][]byte{data} + b.amqpMessage.Data = [][]byte{reader.Bytes()} return nil } diff --git a/pkg/bindings/amqp/message.go b/pkg/bindings/amqp/message.go index e3e1285b0..b4d7d4979 100644 --- a/pkg/bindings/amqp/message.go +++ b/pkg/bindings/amqp/message.go @@ -3,6 +3,7 @@ package amqp import ( "bytes" "errors" + "io" "reflect" "strings" @@ -24,12 +25,13 @@ var ( // Message implements binding.Message by wrapping an *amqp.Message. type Message struct{ AMQP *amqp.Message } -// Check if amqp.Message implements binding.Message +// Check if amqp.Message implements binding.Message & binding.MessagePayloadReader var _ binding.Message = (*Message)(nil) +var _ binding.MessagePayloadReader = (*Message)(nil) func (m *Message) Structured(encoder binding.StructuredEncoder) error { if m.AMQP.Properties != nil && format.IsFormat(m.AMQP.Properties.ContentType) { - return encoder.SetStructuredEvent(format.Lookup(m.AMQP.Properties.ContentType), bytes.NewReader(m.AMQP.GetData())) + return encoder.SetStructuredEvent(format.Lookup(m.AMQP.Properties.ContentType), m) } return binding.ErrNotStructured } @@ -66,9 +68,8 @@ func (m *Message) Binary(encoder binding.BinaryEncoder) error { } } - data := m.AMQP.GetData() - if len(data) != 0 { // Some data - return encoder.SetData(bytes.NewReader(data)) + if !m.IsEmpty() { + return encoder.SetData(m) } return nil } @@ -81,6 +82,18 @@ func (m *Message) Event(encoder binding.EventEncoder) error { return encoder.SetEvent(e) } +func (m *Message) IsEmpty() bool { + return m.AMQP.Data == nil || len(m.AMQP.Data) == 0 +} + +func (m *Message) Bytes() []byte { + return m.AMQP.GetData() +} + +func (m *Message) Reader() io.Reader { + return bytes.NewReader(m.AMQP.GetData()) +} + func (m *Message) Finish(err error) error { if err != nil { return m.AMQP.Reject(&amqp.Error{ diff --git a/pkg/bindings/amqp/structured_message_encoder.go b/pkg/bindings/amqp/structured_message_encoder.go index 6e87694d3..f3006bf15 100644 --- a/pkg/bindings/amqp/structured_message_encoder.go +++ b/pkg/bindings/amqp/structured_message_encoder.go @@ -1,9 +1,6 @@ package amqp import ( - "io" - "io/ioutil" - "pack.ag/amqp" "github.com/cloudevents/sdk-go/pkg/binding" @@ -16,12 +13,8 @@ type structuredMessageEncoder struct { var _ binding.StructuredEncoder = (*structuredMessageEncoder)(nil) // Test it conforms to the interface -func (b *structuredMessageEncoder) SetStructuredEvent(format format.Format, event io.Reader) error { - val, err := ioutil.ReadAll(event) - if err != nil { - return err - } - b.amqpMessage.Data = [][]byte{val} +func (b *structuredMessageEncoder) SetStructuredEvent(format format.Format, event binding.MessagePayloadReader) error { + b.amqpMessage.Data = [][]byte{event.Bytes()} b.amqpMessage.Properties = &amqp.MessageProperties{ContentType: format.MediaType()} return nil } diff --git a/pkg/bindings/http/binary_message_encoder.go b/pkg/bindings/http/binary_message_encoder.go index 975e9ff2d..5085cfbc7 100644 --- a/pkg/bindings/http/binary_message_encoder.go +++ b/pkg/bindings/http/binary_message_encoder.go @@ -1,7 +1,6 @@ package http import ( - "io" "io/ioutil" "net/http" @@ -16,8 +15,10 @@ type binaryMessageEncoder struct { var _ binding.BinaryEncoder = (*binaryMessageEncoder)(nil) // Test it conforms to the interface -func (b *binaryMessageEncoder) SetData(reader io.Reader) error { - b.req.Body = ioutil.NopCloser(reader) +func (b *binaryMessageEncoder) SetData(payload binding.MessagePayloadReader) error { + if !payload.IsEmpty() { + b.req.Body = ioutil.NopCloser(payload.Reader()) + } return nil } diff --git a/pkg/bindings/http/message.go b/pkg/bindings/http/message.go index 159bc2c7e..6438909eb 100644 --- a/pkg/bindings/http/message.go +++ b/pkg/bindings/http/message.go @@ -1,10 +1,13 @@ package http import ( + "bytes" "io" nethttp "net/http" "strings" + "github.com/valyala/bytebufferpool" + "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/binding/format" "github.com/cloudevents/sdk-go/pkg/binding/spec" @@ -16,41 +19,51 @@ var specs = spec.WithPrefix(prefix) const ContentType = "Content-Type" -// Message holds the Header and Body of a HTTP Request or Response. +// Message holds the Header and Body of a HTTP Request. type Message struct { - Header nethttp.Header - BodyReader io.ReadCloser - OnFinish func(error) error + header nethttp.Header + body *bytebufferpool.ByteBuffer + pool *bytebufferpool.Pool + onFinish func(error) error } -// Check if http.Message implements binding.Message +// Check if http.Message implements binding.Message & binding.MessagePayloadReader var _ binding.Message = (*Message)(nil) +var _ binding.MessagePayloadReader = (*Message)(nil) // NewMessage returns a Message with header and data from body. // Reads and closes body. -func NewMessage(header nethttp.Header, body io.ReadCloser) (*Message, error) { - m := Message{Header: header} +func NewMessage(pool *bytebufferpool.Pool, header nethttp.Header, body io.ReadCloser) (*Message, error) { + m := Message{header: header, pool: pool} if body != nil { - m.BodyReader = body + m.body = pool.Get() + _, err := m.body.ReadFrom(body) + if err != nil { + return nil, err + } + err = body.Close() + if err != nil { + return nil, err + } } return &m, nil } func (m *Message) Structured(encoder binding.StructuredEncoder) error { - if ft := format.Lookup(m.Header.Get(ContentType)); ft == nil { + if ft := format.Lookup(m.header.Get(ContentType)); ft == nil { return binding.ErrNotStructured } else { - return encoder.SetStructuredEvent(ft, m.BodyReader) + return encoder.SetStructuredEvent(ft, m) } } func (m *Message) Binary(encoder binding.BinaryEncoder) error { - version, err := specs.FindVersion(m.Header.Get) + version, err := specs.FindVersion(m.header.Get) if err != nil { return binding.ErrNotBinary } - for k, v := range m.Header { + for k, v := range m.header { if strings.HasPrefix(k, prefix) { attr := version.Attribute(k) if attr != nil { @@ -66,13 +79,9 @@ func (m *Message) Binary(encoder binding.BinaryEncoder) error { } } - if m.BodyReader != nil { - err = encoder.SetData(m.BodyReader) - if err != nil { - return err - } + if !m.IsEmpty() { + return encoder.SetData(m) } - return nil } @@ -84,12 +93,24 @@ func (m *Message) Event(encoder binding.EventEncoder) error { return encoder.SetEvent(e) } +func (m *Message) IsEmpty() bool { + return m.body == nil +} + +func (m *Message) Bytes() []byte { + return m.body.Bytes() +} + +func (m *Message) Reader() io.Reader { + return bytes.NewReader(m.body.Bytes()) +} + func (m *Message) Finish(err error) error { - if m.BodyReader != nil { - _ = m.BodyReader.Close() + if m.body != nil { + m.pool.Put(m.body) } - if m.OnFinish != nil { - return m.OnFinish(err) + if m.onFinish != nil { + return m.onFinish(err) } return nil } diff --git a/pkg/bindings/http/receiver.go b/pkg/bindings/http/receiver.go index 22cef098f..3df03624b 100644 --- a/pkg/bindings/http/receiver.go +++ b/pkg/bindings/http/receiver.go @@ -7,6 +7,8 @@ import ( "net/http" nethttp "net/http" + "github.com/valyala/bytebufferpool" + "github.com/cloudevents/sdk-go/pkg/binding" ) @@ -18,18 +20,19 @@ type msgErr struct { // Receiver for CloudEvents as HTTP requests. // Implements http.Handler, To receive messages, associate it with a http.Server. type Receiver struct { - incoming chan msgErr + incoming chan msgErr + memoryPool *bytebufferpool.Pool } // ServeHTTP implements http.Handler. // Blocks until Message.Finish is called. func (r *Receiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - m, err := NewMessage(req.Header, req.Body) + m, err := NewMessage(r.memoryPool, req.Header, req.Body) if err != nil { r.incoming <- msgErr{nil, err} } done := make(chan error) - m.OnFinish = func(err error) error { done <- err; return nil } + m.onFinish = func(err error) error { done <- err; return nil } r.incoming <- msgErr{m, err} // Send to Receive() if err = <-done; err != nil { nethttp.Error(rw, fmt.Sprintf("cannot forward CloudEvent: %v", err), http.StatusInternalServerError) @@ -38,7 +41,8 @@ func (r *Receiver) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // NewReceiver creates a receiver func NewReceiver() *Receiver { - return &Receiver{incoming: make(chan msgErr)} + var pool bytebufferpool.Pool + return &Receiver{incoming: make(chan msgErr), memoryPool: &pool} } // Receive the next incoming HTTP request as a CloudEvent. diff --git a/pkg/bindings/http/structured_message_encoder.go b/pkg/bindings/http/structured_message_encoder.go index b5ced9450..594b5a092 100644 --- a/pkg/bindings/http/structured_message_encoder.go +++ b/pkg/bindings/http/structured_message_encoder.go @@ -1,7 +1,6 @@ package http import ( - "io" "io/ioutil" "net/http" @@ -15,8 +14,8 @@ type structuredMessageEncoder struct { var _ binding.StructuredEncoder = (*structuredMessageEncoder)(nil) // Test it conforms to the interface -func (b *structuredMessageEncoder) SetStructuredEvent(format format.Format, event io.Reader) error { +func (b *structuredMessageEncoder) SetStructuredEvent(format format.Format, event binding.MessagePayloadReader) error { b.req.Header.Set(ContentType, format.MediaType()) - b.req.Body = ioutil.NopCloser(event) + b.req.Body = ioutil.NopCloser(event.Reader()) return nil } diff --git a/test/benchmark/benchmark_case.go b/test/benchmark/benchmark_case.go new file mode 100644 index 000000000..fdfade6c9 --- /dev/null +++ b/test/benchmark/benchmark_case.go @@ -0,0 +1,28 @@ +package benchmark + +type BenchmarkCase struct { + PayloadSize int + Parallelism int + OutputSenders int +} + +func GenerateAllBenchmarkCases( + payloadMin int, + payloadMax int, + parallelismMin int, + parallelismMax int, + outputSendersMin int, + outputSendersMax int, +) []BenchmarkCase { + var cases []BenchmarkCase + + for payload := payloadMin; payload <= payloadMax; payload *= 2 { + for parallelism := parallelismMin; parallelism <= parallelismMax; parallelism += 1 { + for outputSenders := outputSendersMin; outputSenders <= outputSendersMax; outputSenders += 1 { + cases = append(cases, BenchmarkCase{payload, parallelism, outputSenders}) + } + } + } + + return cases +} diff --git a/test/benchmark/benchmark_result.go b/test/benchmark/benchmark_result.go new file mode 100644 index 000000000..1ee32424b --- /dev/null +++ b/test/benchmark/benchmark_result.go @@ -0,0 +1,34 @@ +package benchmark + +import ( + "encoding/csv" + "strconv" + "testing" +) + +type BenchmarkResult struct { + BenchmarkCase + testing.BenchmarkResult +} + +func (br *BenchmarkResult) record() []string { + return []string{ + strconv.Itoa(br.Parallelism), + strconv.Itoa(br.PayloadSize), + strconv.Itoa(br.OutputSenders), + strconv.FormatInt(br.NsPerOp(), 10), + strconv.FormatInt(br.AllocedBytesPerOp(), 10), + } +} + +type BenchmarkResults []BenchmarkResult + +func (br BenchmarkResults) WriteToCsv(writer *csv.Writer) error { + for _, i2 := range br { + err := writer.Write(i2.record()) + if err != nil { + return err + } + } + return nil +} diff --git a/test/benchmark/http/3d_plot_allocs.gnuplot b/test/benchmark/http/3d_plot_allocs.gnuplot new file mode 100644 index 000000000..4972abb98 --- /dev/null +++ b/test/benchmark/http/3d_plot_allocs.gnuplot @@ -0,0 +1,19 @@ +set datafile separator comma +set datafile missing NaN +set xlabel "Parallelism" +set ylabel "Number of output senders" +set zlabel "Memory Allocated/Ops" +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 + +print "Plotting with payload size ".payload_size."" + +splot "baseline-binary.csv" using 1:3:($2==payload_size?$5:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 1:3:($2==payload_size?$5:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 1:3:($2==payload_size?$5:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 1:3:($2==payload_size?$5:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 1:3:($2==payload_size?$5:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 1:3:($2==payload_size?$5:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 1:3:($2==payload_size?$5:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 1:3:($2==payload_size?$5:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint +pause -1 diff --git a/test/benchmark/http/3d_plot_ns.gnuplot b/test/benchmark/http/3d_plot_ns.gnuplot new file mode 100644 index 000000000..8b5fd3286 --- /dev/null +++ b/test/benchmark/http/3d_plot_ns.gnuplot @@ -0,0 +1,19 @@ +set datafile separator comma +set datafile missing NaN +set xlabel "Parallelism" +set ylabel "Number of output senders" +set zlabel "Nanoseconds/Ops" +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 + +print "Plotting with payload size ".payload_size."" + +splot "baseline-binary.csv" using 1:3:($2==payload_size?$4:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 1:3:($2==payload_size?$4:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 1:3:($2==payload_size?$4:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 1:3:($2==payload_size?$4:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 1:3:($2==payload_size?$4:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 1:3:($2==payload_size?$4:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 1:3:($2==payload_size?$4:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 1:3:($2==payload_size?$4:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint +pause -1 diff --git a/test/benchmark/http/http_mock.go b/test/benchmark/http/http_mock.go new file mode 100644 index 000000000..1101f745e --- /dev/null +++ b/test/benchmark/http/http_mock.go @@ -0,0 +1,94 @@ +package main + +import ( + "bytes" + "io" + "io/ioutil" + nethttp "net/http" + "net/http/httptest" + "net/url" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/binding" + "github.com/cloudevents/sdk-go/pkg/bindings/http" + cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" +) + +type RoundTripFunc func(req *nethttp.Request) *nethttp.Response + +func (f RoundTripFunc) RoundTrip(req *nethttp.Request) (*nethttp.Response, error) { + return f(req), nil +} + +func NewTestClient(fn RoundTripFunc) *nethttp.Client { + return &nethttp.Client{ + Transport: RoundTripFunc(fn), + } +} + +func MockedSender(options ...http.SenderOptionFunc) binding.Sender { + u, _ := url.Parse("http://localhost") + return http.NewSender(NewTestClient(func(req *nethttp.Request) *nethttp.Response { + return &nethttp.Response{ + StatusCode: 202, + Header: make(nethttp.Header), + } + }), u, options...) +} + +func MockedClient() (cloudevents.Client, *cehttp.Transport) { + t, err := cehttp.New(cehttp.WithTarget("http://localhost")) + + if err != nil { + panic(err) + } + + t.Client = NewTestClient(func(req *nethttp.Request) *nethttp.Response { + return &nethttp.Response{ + StatusCode: 202, + Header: make(nethttp.Header), + Body: ioutil.NopCloser(bytes.NewReader([]byte{})), + } + }) + + client, err := cloudevents.NewClient(t) + + if err != nil { + panic(err) + } + + return client, t +} + +func MockedBinaryRequest(body []byte) *nethttp.Request { + r := httptest.NewRequest("POST", "http://localhost:8080", bytes.NewBuffer(body)) + r.Header.Add("Ce-id", "0") + r.Header.Add("Ce-subject", "sub") + r.Header.Add("Ce-specversion", "1.0") + r.Header.Add("Ce-type", "t") + r.Header.Add("Ce-source", "http://localhost") + r.Header.Add("Content-type", "text/plain") + return r +} + +var ( + eventBegin = []byte("{" + + "\"id\":\"0\"," + + "\"subject\":\"sub\"," + + "\"specversion\":\"1.0\"," + + "\"type\":\"t\"," + + "\"source\":\"http://localhost\"," + + "\"datacontenttype\":\"text/plain\"," + + "\"data\": \"") + eventEnd = []byte("\"}") +) + +func MockedStructuredRequest(body []byte) *nethttp.Request { + r := httptest.NewRequest( + "POST", + "http://localhost:8080", + io.MultiReader(bytes.NewReader(eventBegin), bytes.NewBuffer(body), bytes.NewReader(eventEnd)), + ) + r.Header.Add("Content-type", cloudevents.ApplicationCloudEventsJSON) + return r +} diff --git a/test/benchmark/http/main.go b/test/benchmark/http/main.go index 44c1e8e0f..352a86279 100644 --- a/test/benchmark/http/main.go +++ b/test/benchmark/http/main.go @@ -1,198 +1,215 @@ package main import ( - "bytes" "context" "encoding/csv" "flag" + "fmt" "io" - "io/ioutil" + "math/rand" nethttp "net/http" "net/http/httptest" - "net/url" "os" "runtime" "runtime/pprof" - "strconv" + "sync" "testing" + "time" cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/binding" "github.com/cloudevents/sdk-go/pkg/bindings/http" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" - cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" + "github.com/cloudevents/sdk-go/test/benchmark" ) -type RoundTripFunc func(req *nethttp.Request) *nethttp.Response +var letters = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ") -func (f RoundTripFunc) RoundTrip(req *nethttp.Request) (*nethttp.Response, error) { - return f(req), nil -} - -func NewTestClient(fn RoundTripFunc) *nethttp.Client { - return &nethttp.Client{ - Transport: RoundTripFunc(fn), +func fillRandom(buf []byte, r *rand.Rand) { + for i := 0; i < cap(buf); i++ { + buf[i] = letters[r.Intn(len(letters))] } } -func generateRandomValue(kb int, value byte) []byte { - length := 1024 * kb - b := make([]byte, length) - for i := 0; i < length; i++ { - b[i] = value - } - return b -} +// Avoid DCE +var W *httptest.ResponseRecorder +var R *nethttp.Request + +func benchmarkBaseline(cases []benchmark.BenchmarkCase, requestFactory func([]byte) *nethttp.Request) benchmark.BenchmarkResults { + var results benchmark.BenchmarkResults + r := rand.New(rand.NewSource(time.Now().Unix())) -func MockedSender() binding.Sender { - u, _ := url.Parse("http://localhost") - return http.NewSender(NewTestClient(func(req *nethttp.Request) *nethttp.Response { - return &nethttp.Response{ - StatusCode: 202, - Header: make(nethttp.Header), + for _, c := range cases { + if c.OutputSenders > 1 { + // It doesn't make sense for this test + continue } - }), u) -} + fmt.Printf("%+v\n", c) -func MockedClient() (cloudevents.Client, *cehttp.Transport) { - t, err := cehttp.New(cehttp.WithTarget("http://localhost")) + buffer := make([]byte, c.PayloadSize) + fillRandom(buffer, r) - if err != nil { - panic(err) + result := testing.Benchmark(func(b *testing.B) { + b.SetParallelism(c.Parallelism) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + W = httptest.NewRecorder() + R = requestFactory(buffer) + } + }) + }) + results = append(results, benchmark.BenchmarkResult{BenchmarkCase: c, BenchmarkResult: result}) } - t.Client = NewTestClient(func(req *nethttp.Request) *nethttp.Response { - return &nethttp.Response{ - StatusCode: 202, - Header: make(nethttp.Header), - Body: ioutil.NopCloser(bytes.NewReader([]byte{})), - } - }) + return results +} - client, err := cloudevents.NewClient(t) +func pipeLoopDirect(r *http.Receiver, endCtx context.Context, opts ...http.SenderOptionFunc) { + s := MockedSender(opts...) + var err error + var m binding.Message + for err != io.EOF { + select { + case <-endCtx.Done(): + return + default: + m, err = r.Receive(endCtx) + if err != nil || m == nil { + continue + } + _ = s.Send(context.Background(), m) + } + } +} - if err != nil { - panic(err) +func pipeLoopMulti(r *http.Receiver, endCtx context.Context, outputSenders int, opts ...http.SenderOptionFunc) { + s := MockedSender(opts...) + var err error + var m binding.Message + for err != io.EOF { + select { + case <-endCtx.Done(): + return + default: + m, err = r.Receive(endCtx) + if err != nil { + continue + } + outputMessage := binding.WithAcksBeforeFinish(m, outputSenders) + for i := 0; i < outputSenders; i++ { + go func(m binding.Message) { + _ = s.Send(context.Background(), outputMessage) + }(outputMessage) + } + } } +} - t.SetReceiver(transport.ReceiveFunc(func(ctx context.Context, e cloudevents.Event, er *cloudevents.EventResponse) error { - _, _, _ = client.Send(ctx, e) - er.RespondWith(202, nil) - return nil - })) +func benchmarkReceiverSender(cases []benchmark.BenchmarkCase, requestFactory func([]byte) *nethttp.Request, opts ...http.SenderOptionFunc) benchmark.BenchmarkResults { + var results benchmark.BenchmarkResults + random := rand.New(rand.NewSource(time.Now().Unix())) - return client, t -} + for _, c := range cases { + fmt.Printf("%+v\n", c) -func MockedRequest(body []byte) *nethttp.Request { - r := httptest.NewRequest("POST", "http://localhost:8080", bytes.NewBuffer(body)) - r.Header.Add("Ce-id", "0") - r.Header.Add("Ce-subject", "sub") - r.Header.Add("Ce-specversion", "1.0") - r.Header.Add("Ce-type", "t") - r.Header.Add("Ce-source", "http://localhost") - r.Header.Add("Content-type", "text/plain") - return r -} + ctx, cancel := context.WithCancel(context.TODO()) + receiver := http.NewReceiver() -// Avoid DCE -var W *httptest.ResponseRecorder -var R *nethttp.Request + // Spawn dispatchers + for i := 0; i < c.Parallelism; i++ { + if c.OutputSenders == 1 { + go pipeLoopDirect(receiver, ctx, opts...) + } else { + go pipeLoopMulti(receiver, ctx, c.OutputSenders, opts...) + } + } -type BenchResult struct { - parallelism int - payloadSizeKb int - testing.BenchmarkResult -} + buffer := make([]byte, c.PayloadSize) + fillRandom(buffer, random) + runtime.GC() -func runBench(do func(body []byte)) []BenchResult { - results := make([]BenchResult, 0) - for p := 1; p <= runtime.NumCPU(); p++ { - for k := 1; k <= 32; k *= 2 { - body := generateRandomValue(k, byte('a')) - r := testing.Benchmark(func(b *testing.B) { - b.SetParallelism(p) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - do(body) - } - }) + result := testing.Benchmark(func(b *testing.B) { + b.SetParallelism(c.Parallelism) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w := httptest.NewRecorder() + receiver.ServeHTTP(w, requestFactory(buffer)) + } }) - results = append(results, BenchResult{p, k, r}) - } + }) + results = append(results, benchmark.BenchmarkResult{BenchmarkCase: c, BenchmarkResult: result}) + + cancel() + runtime.GC() } + return results } -func benchmarkBaseline() []BenchResult { - return runBench(func(body []byte) { - W = httptest.NewRecorder() - R = MockedRequest(body) +func dispatchReceiver(clients []cloudevents.Client, outputSenders int) transport.Receiver { + return transport.ReceiveFunc(func(ctx context.Context, e cloudevents.Event, er *cloudevents.EventResponse) error { + var wg sync.WaitGroup + for i := 0; i < outputSenders; i++ { + wg.Add(1) + go func(client cloudevents.Client) { + _, _, _ = client.Send(ctx, e) + wg.Done() + }(clients[i]) + } + wg.Wait() + er.RespondWith(200, nil) + return nil }) } -func benchmarkReceiverSender() []BenchResult { - r := http.NewReceiver() +func benchmarkClient(cases []benchmark.BenchmarkCase, requestFactory func([]byte) *nethttp.Request) benchmark.BenchmarkResults { + var results benchmark.BenchmarkResults + random := rand.New(rand.NewSource(time.Now().Unix())) - results := make([]BenchResult, 0) - for p := 1; p <= runtime.NumCPU(); p++ { - ctx, cancel := context.WithCancel(context.TODO()) + for _, c := range cases { + fmt.Printf("%+v\n", c) - // Spawn dispatchers - for i := 0; i < p; i++ { - go func(r *http.Receiver) { - s := MockedSender() - var err error - var m binding.Message - messageCtx := context.Background() - for err != io.EOF { - select { - case _, ok := <-ctx.Done(): - if !ok { - return - } - default: - m, err = r.Receive(messageCtx) - if err != nil { - continue - } - _ = s.Send(messageCtx, m) - } - } - }(r) + _, mockedReceiverTransport := MockedClient() + + senderClients := make([]cloudevents.Client, c.OutputSenders) + for i := 0; i < c.OutputSenders; i++ { + senderClients[i], _ = MockedClient() } - for k := 1; k <= 32; k *= 2 { - body := generateRandomValue(k, byte('a')) - r := testing.Benchmark(func(b *testing.B) { - b.SetParallelism(p) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - w := httptest.NewRecorder() - r.ServeHTTP(w, MockedRequest(body)) - } - }) + mockedReceiverTransport.SetReceiver(dispatchReceiver(senderClients, c.OutputSenders)) + + buffer := make([]byte, c.PayloadSize) + fillRandom(buffer, random) + runtime.GC() + + result := testing.Benchmark(func(b *testing.B) { + b.SetParallelism(c.Parallelism) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w := httptest.NewRecorder() + mockedReceiverTransport.ServeHTTP(w, requestFactory(buffer)) + } }) - results = append(results, BenchResult{p, k, r}) - } + }) + results = append(results, benchmark.BenchmarkResult{BenchmarkCase: c, BenchmarkResult: result}) - cancel() + runtime.GC() } - return results -} -func benchmarkClient() []BenchResult { - _, mockedTransport := MockedClient() - - return runBench(func(body []byte) { - w := httptest.NewRecorder() - mockedTransport.ServeHTTP(w, MockedRequest(body)) - }) + return results } var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") var memprofile = flag.String("memprofile", "", "write memory profile to `file`") -var bench = flag.String("bench", "baseline", "[baseline, receiver-sender, client]") +var bench = flag.String( + "bench", + "baseline-binary", + "[baseline-structured, baseline-binary, binding-structured-to-structured, binding-structured-to-binary, binding-binary-to-structured, binding-binary-to-binary, client-binary, client-structured]", +) +var out = flag.String("out", "out.csv", "Output file") +var maxPayloadKb = flag.Int("max-payload", 32, "Max payload size in kb") +var maxParallelism = flag.Int("max-parallelism", runtime.NumCPU()*2, "Max parallelism") +var maxOutputSenders = flag.Int("max-output-senders", 1, "Max output senders") func main() { flag.Parse() @@ -204,17 +221,43 @@ func main() { defer pprof.StopCPUProfile() } - var results []BenchResult + benchmarkCases := benchmark.GenerateAllBenchmarkCases( + 1024, + 1024*(*maxPayloadKb), + 1, + *maxParallelism, + 1, + *maxOutputSenders, //TODO to be increased when receiver-sender will support multi senders + ) + + var results benchmark.BenchmarkResults + + fmt.Printf("--- Starting benchmark %s ---\n", *bench) switch *bench { - case "baseline": - results = benchmarkBaseline() + case "baseline-structured": + results = benchmarkBaseline(benchmarkCases, MockedStructuredRequest) + break + case "baseline-binary": + results = benchmarkBaseline(benchmarkCases, MockedBinaryRequest) + break + case "binding-structured-to-structured": + results = benchmarkReceiverSender(benchmarkCases, MockedStructuredRequest, http.ForceStructured()) + break + case "binding-structured-to-binary": + results = benchmarkReceiverSender(benchmarkCases, MockedStructuredRequest, http.ForceBinary()) break - case "receiver-sender": - results = benchmarkReceiverSender() + case "binding-binary-to-structured": + results = benchmarkReceiverSender(benchmarkCases, MockedBinaryRequest, http.ForceStructured()) break - case "client": - results = benchmarkClient() + case "binding-binary-to-binary": + results = benchmarkReceiverSender(benchmarkCases, MockedBinaryRequest, http.ForceBinary()) + break + case "client-binary": + results = benchmarkClient(benchmarkCases, MockedBinaryRequest) + break + case "client-structured": + results = benchmarkClient(benchmarkCases, MockedStructuredRequest) break default: panic("Wrong bench flag") @@ -229,16 +272,17 @@ func main() { _ = pprof.WriteHeapProfile(f) } - writer := csv.NewWriter(os.Stdout) + f, err := os.Create(*out) + if err != nil { + panic(fmt.Sprintf("Cannot open file %s: %v", *out, err)) + } + defer f.Close() + + writer := csv.NewWriter(f) defer writer.Flush() - for _, res := range results { - _ = writer.Write([]string{ - strconv.Itoa(res.parallelism), - strconv.Itoa(res.payloadSizeKb), - strconv.FormatInt(res.NsPerOp(), 10), - strconv.FormatInt(res.AllocedBytesPerOp(), 10), - }) + err = results.WriteToCsv(writer) + if err != nil { + panic(err) } - } diff --git a/test/benchmark/http/plot_output_senders_allocs.gnuplot b/test/benchmark/http/plot_output_senders_allocs.gnuplot new file mode 100644 index 000000000..d141b9e3e --- /dev/null +++ b/test/benchmark/http/plot_output_senders_allocs.gnuplot @@ -0,0 +1,19 @@ +set datafile separator comma +set datafile missing NaN +set xlabel "Number of output senders" +set ylabel "Memory Allocated/Ops" +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 +parallelism=(exist("ARG2") && ARG2 != ""?ARG2:1) + +print "Plotting with payload size ".payload_size." and parallelism ".parallelism."" + +plot "baseline-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$5:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint +pause -1 diff --git a/test/benchmark/http/plot_output_senders_ns.gnuplot b/test/benchmark/http/plot_output_senders_ns.gnuplot new file mode 100644 index 000000000..221dd24d8 --- /dev/null +++ b/test/benchmark/http/plot_output_senders_ns.gnuplot @@ -0,0 +1,19 @@ +set datafile separator comma +set datafile missing NaN +set xlabel "Number of output senders" +set ylabel "Nanoseconds/Ops" +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 +parallelism=(exist("ARG2") && ARG2 != ""?ARG2:1) + +print "Plotting with payload size ".payload_size." and parallelism ".parallelism."" + +plot "baseline-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 3:(($2==payload_size && $1==parallelism)?$4:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint +pause -1 diff --git a/test/benchmark/http/plot_parallelism_allocs.gnuplot b/test/benchmark/http/plot_parallelism_allocs.gnuplot new file mode 100644 index 000000000..1a8b0916f --- /dev/null +++ b/test/benchmark/http/plot_parallelism_allocs.gnuplot @@ -0,0 +1,19 @@ +set datafile separator comma +set datafile missing NaN +set xlabel "Parallelism" +set ylabel "Memory Allocated/Ops" +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 +output_senders=(exist("ARG2") && ARG2 != ""?ARG2:1) + +print "Plotting with payload size ".payload_size." and output_senders ".output_senders."" + +plot "baseline-binary.csv" using 1:(($2==payload_size && $3==output_senders)?$5:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 1:($2==payload_size && $3==output_senders?$5:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint +pause -1 diff --git a/test/benchmark/http/plot_parallelism_ns.gnuplot b/test/benchmark/http/plot_parallelism_ns.gnuplot index 8735dba18..cdf5e4287 100644 --- a/test/benchmark/http/plot_parallelism_ns.gnuplot +++ b/test/benchmark/http/plot_parallelism_ns.gnuplot @@ -2,6 +2,18 @@ set datafile separator comma set datafile missing NaN set xlabel "Parallelism" set ylabel "Nanoseconds/Ops" -payload_size=ARG1 -plot "baseline.csv" using 1:($2==payload_size?$3:1/0) title "Baseline ".payload_size."kb" with linespoint, "receiver-sender.csv" using 1:($2==payload_size?$3:1/0) title "Receiver Sender ".payload_size."kb" with linespoint, "pipe.csv" using 1:($2==payload_size?$3:1/0) title "Pipe ".payload_size."kb" with linespoint, "client.csv" using 1:($2==payload_size?$3:1/0) title "Client ".payload_size."kb" with linespoint +payload_size_kb=ARG1 +payload_size=payload_size_kb*1024 +output_senders=(exist("ARG2") && ARG2 != ""?ARG2:1) + +print "Plotting with payload size ".payload_size." and output_senders ".output_senders."" + +plot "baseline-binary.csv" using 1:(($2==payload_size && $3==output_senders)?$4:1/0) title "Baseline Binary ".payload_size_kb."kb" with linespoint, \ + "baseline-structured.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Baseline Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-structured.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Binding Structured to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-structured-to-binary.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Binding Structured to Binary ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-structured.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Binding Binary to Structured ".payload_size_kb."kb" with linespoint, \ + "binding-binary-to-binary.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Binding Binary to Binary ".payload_size_kb."kb" with linespoint, \ + "client-binary.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Client Binary ".payload_size_kb."kb" with linespoint, \ + "client-structured.csv" using 1:($2==payload_size && $3==output_senders?$4:1/0) title "Client Structured ".payload_size_kb."kb" with linespoint pause -1 diff --git a/test/benchmark/http/run_revision.sh b/test/benchmark/http/run_revision.sh new file mode 100755 index 000000000..b6a282f37 --- /dev/null +++ b/test/benchmark/http/run_revision.sh @@ -0,0 +1,87 @@ +#!/bin/bash + +set -e + +RUNNABLE_NAME="http-bench" + +function usage { + echo "Usage: $0 [--cpu-profile] [--mem-profile] [--max-parallelism n] [--max-payload n] [--max-output-senders n] [git_revision]" + exit 1 +} + +ADDITIONAL_ARGS="" + +PARAMS="" +while (( "$#" )); do + case "$1" in + -h|--help) + usage + ;; + -c|--cpu-profile) + CPU_PROFILE="1" + shift + ;; + -m|--mem-profile) + MEM_PROFILE="1" + shift + ;; + --max-parallelism) + ADDITIONAL_ARGS="$ADDITIONAL_ARGS --max-parallelism $2" + shift 2 + ;; + --max-payload) + ADDITIONAL_ARGS="$ADDITIONAL_ARGS --max-payload $2" + shift 2 + ;; + --max-output-senders) + ADDITIONAL_ARGS="$ADDITIONAL_ARGS --max-output-senders $2" + shift 2 + ;; + --) # end argument parsing + shift + break + ;; + -*|--*=) # unsupported flags + echo "Error: Unsupported flag $1" >&2 + exit 1 + ;; + *) # preserve positional arguments + PARAMS="$PARAMS $1" + shift + ;; + esac +done +eval set -- "$PARAMS" + +REVISION=$1 +if [ ! -z "$REVISION" ] +then + git checkout "$REVISION" +else + REVISION=results +fi + +go build -o $RUNNABLE_NAME -v github.com/cloudevents/sdk-go/test/benchmark/http + +mkdir -p "$REVISION" + +BENCHS=( + "baseline-structured" + "baseline-binary" + "binding-structured-to-structured" + "binding-structured-to-binary" + "binding-binary-to-structured" + "binding-binary-to-binary" + "client-binary" + "client-structured" +) + +for i in "${BENCHS[@]}"; do + ./$RUNNABLE_NAME --bench="$i" \ + ${MEM_PROFILE+-memprofile $REVISION/$i-mem.pprof} \ + ${CPU_PROFILE+-cpuprofile $REVISION/$i-cpu.pprof} \ + $ADDITIONAL_ARGS \ + --out="$REVISION/$i.csv" +done + +rm $RUNNABLE_NAME diff --git a/test/benchmark/http/run_revision_bench.sh b/test/benchmark/http/run_revision_bench.sh new file mode 100644 index 000000000..e345b8145 --- /dev/null +++ b/test/benchmark/http/run_revision_bench.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -e + +REVISION=$1 + +git checkout "$REVISION" +go build -v main.go + +mkdir "$REVISION" + +echo "Running baseline" +./main --bench=baseline > "$REVISION/baseline.csv" + +echo "Running receiver sender" +./main --bench=receiver-sender > "$REVISION/receiver-sender.csv" + +echo "Running client" +./main --bench=client > "$REVISION/client.csv" + +rm main \ No newline at end of file diff --git a/test/benchmark/http/run_revision_profiling.sh b/test/benchmark/http/run_revision_profiling.sh new file mode 100644 index 000000000..78b45e96c --- /dev/null +++ b/test/benchmark/http/run_revision_profiling.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -e + +REVISION=$1 + +git checkout "$REVISION" +go build -v main.go + +mkdir "$REVISION" + +echo "Running baseline" +./main --bench=baseline -memprofile $REVISION/baseline_mem.pprof -cpuprofile $REVISION/baseline_cpu.pprof + +echo "Running receiver sender" +./main --bench=receiver-sender -memprofile $REVISION/receiver_sender_mem.pprof -cpuprofile $REVISION/receiver_sender_cpu.pprof + +echo "Running client" +./main --bench=client -memprofile $REVISION/client_mem.pprof -cpuprofile $REVISION/client_cpu.pprof + +rm main