From 9ca0adffa9c5847207ce7f07ae8fb11a78081a47 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Thu, 14 May 2020 09:27:59 +0200 Subject: [PATCH] Updated to sdk-go RC4 (#3136) Signed-off-by: Francesco Guardiani --- go.mod | 2 +- go.sum | 4 +- .../cloudevents/sdk-go/v2/binding/encoding.go | 14 ++++ .../sdk-go/v2/binding/format/format.go | 9 ++- .../cloudevents/sdk-go/v2/binding/message.go | 13 ++++ .../cloudevents/sdk-go/v2/client/client.go | 69 +++++++++++------- .../cloudevents/sdk-go/v2/client/invoker.go | 59 ++++++++------- .../cloudevents/sdk-go/v2/client/options.go | 12 ++++ .../cloudevents/sdk-go/v2/client/receiver.go | 4 +- .../cloudevents/sdk-go/v2/event/event.go | 24 ------- .../sdk-go/v2/event/event_marshal.go | 7 +- .../sdk-go/v2/event/event_validation.go | 45 ++++++++++++ .../sdk-go/v2/event/eventcontext.go | 2 +- .../sdk-go/v2/event/eventcontext_v03.go | 22 +++--- .../sdk-go/v2/event/eventcontext_v1.go | 22 +++--- .../distributed_tracing_extension.go | 5 +- .../github.com/cloudevents/sdk-go/v2/go.mod | 2 - .../sdk-go/v2/protocol/http/options.go | 17 ++--- .../sdk-go/v2/protocol/http/protocol.go | 71 +++++++++++++------ .../v2/protocol/http/protocol_lifecycle.go | 47 ++++++------ .../cloudevents/sdk-go/v2/protocol/result.go | 18 ++--- vendor/modules.txt | 2 +- 22 files changed, 285 insertions(+), 185 deletions(-) create mode 100644 vendor/github.com/cloudevents/sdk-go/v2/event/event_validation.go diff --git a/go.mod b/go.mod index 7fde87827bc..59d03738777 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.1 // indirect github.com/cloudevents/sdk-go v1.2.0 - github.com/cloudevents/sdk-go/v2 v2.0.0-RC2 + github.com/cloudevents/sdk-go/v2 v2.0.0-RC4 github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.3.5 github.com/google/go-cmp v0.4.0 diff --git a/go.sum b/go.sum index f064dbccb42..dab9d8f7809 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,8 @@ github.com/cloudevents/sdk-go v0.0.0-20190509003705-56931988abe3/go.mod h1:j1nZW github.com/cloudevents/sdk-go v1.1.2/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ= github.com/cloudevents/sdk-go v1.2.0 h1:2AxI14EJUw1PclJ5gZJtzbxnHIfNMdi76Qq3P3G1BRU= github.com/cloudevents/sdk-go v1.2.0/go.mod h1:ss+jWJ88wypiewnPEzChSBzTYXGpdcILoN9YHk8uhTQ= -github.com/cloudevents/sdk-go/v2 v2.0.0-RC2 h1:XXqj/WXjOWhxUR8/+Ovn5YtSuIE83uOD6Gy3vUnBdUQ= -github.com/cloudevents/sdk-go/v2 v2.0.0-RC2/go.mod h1:f6d2RzSysHwhr4EsysDapUIWyJOFKqIhDisATXEa6Wk= +github.com/cloudevents/sdk-go/v2 v2.0.0-RC4 h1:TpGPDaAfpbOxe86Pe4v3kmOOZYlMSssObvCHVUdiQOc= +github.com/cloudevents/sdk-go/v2 v2.0.0-RC4/go.mod h1:/1Ntmoq0bPbVtRedMtX+58TicVkehGQGu5FIigC38JQ= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko= diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go index 8ef0f13112f..0b6efe636cb 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/encoding.go @@ -16,6 +16,20 @@ const ( EncodingUnknown ) +func (e Encoding) String() string { + switch e { + case EncodingBinary: + return "binary" + case EncodingStructured: + return "structured" + case EncodingEvent: + return "event" + case EncodingUnknown: + return "unknown" + } + return "" +} + // ErrUnknownEncoding specifies that the Message is not an event or it is encoded with an unknown encoding var ErrUnknownEncoding = errors.New("unknown Message encoding") diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go index 8d13746ec8c..9e2b1ec6763 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/format/format.go @@ -3,7 +3,6 @@ package format import ( "encoding/json" "fmt" - "mime" "strings" "github.com/cloudevents/sdk-go/v2/event" @@ -47,8 +46,12 @@ func init() { // Lookup returns the format for contentType, or nil if not found. func Lookup(contentType string) Format { - mediaType, _, _ := mime.ParseMediaType(contentType) - return formats[mediaType] + i := strings.IndexRune(contentType, ';') + if i == -1 { + i = len(contentType) + } + contentType = strings.TrimSpace(strings.ToLower(contentType[0:i])) + return formats[contentType] } func unknown(mediaType string) error { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go index d933adfd10e..7222f715476 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/binding/message.go @@ -126,3 +126,16 @@ type MessageWrapper interface { // Method to get the wrapped message GetWrappedMessage() Message } + +func UnwrapMessage(message Message) Message { + m := message + for m != nil { + switch mt := m.(type) { + case MessageWrapper: + m = mt.GetWrappedMessage() + default: + return m + } + } + return m +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go index 6991b8e6357..d2ace137dc5 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/client.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "runtime" "sync" "go.uber.org/zap" @@ -47,7 +48,10 @@ type Client interface { // New produces a new client with the provided transport object and applied // client options. func New(obj interface{}, opts ...Option) (Client, error) { - c := &ceClient{} + c := &ceClient{ + // Running runtime.GOMAXPROCS(0) doesn't update the value, just returns the current one + pollGoroutines: runtime.GOMAXPROCS(0), + } if p, ok := obj.(protocol.Sender); ok { c.sender = p @@ -83,6 +87,7 @@ type ceClient struct { invoker Invoker receiverMu sync.Mutex eventDefaulterFns []EventDefaulter + pollGoroutines int } func (c *ceClient) applyOptions(opts ...Option) error { @@ -185,6 +190,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { } c.invoker = invoker + if c.responder == nil && c.receiver == nil { + return errors.New("responder nor receiver set") + } + defer func() { c.invoker = nil }() @@ -192,36 +201,44 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { // Start the opener, if set. if c.opener != nil { go func() { - // TODO: handle error correctly here. if err := c.opener.OpenInbound(ctx); err != nil { - panic(err) + cecontext.LoggerFrom(ctx).Errorf("Error while opening the inbound connection: %s", err) } }() } - var msg binding.Message - var respFn protocol.ResponseFn // Start Polling. - for { - if c.responder != nil { - msg, respFn, err = c.responder.Respond(ctx) - } else if c.receiver != nil { - msg, err = c.receiver.Receive(ctx) - } else { - return errors.New("responder nor receiver set") - } - - if err == io.EOF { // Normal close - return nil - } - - if err != nil { - cecontext.LoggerFrom(ctx).Warn("Error while receiving a message: %s", err.Error()) - continue - } - - if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { - return err - } + wg := sync.WaitGroup{} + for i := 0; i < c.pollGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + var msg binding.Message + var respFn protocol.ResponseFn + var err error + + if c.responder != nil { + msg, respFn, err = c.responder.Respond(ctx) + } else if c.receiver != nil { + msg, err = c.receiver.Receive(ctx) + } + + if err == io.EOF { // Normal close + return + } + + if err != nil { + cecontext.LoggerFrom(ctx).Warnf("Error while receiving a message: %s", err) + continue + } + + if err := c.invoker.Invoke(ctx, msg, respFn); err != nil { + cecontext.LoggerFrom(ctx).Warnf("Error while handling a message: %s", err) + } + } + }() } + wg.Wait() + return nil } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go index b9c6609790d..162ae27e2d5 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/invoker.go @@ -2,10 +2,10 @@ package client import ( "context" - "fmt" "github.com/cloudevents/sdk-go/v2/binding" cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol" ) @@ -37,22 +37,32 @@ type receiveInvoker struct { } func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) { - var isFinished bool defer func() { - if !isFinished { - if err2 := m.Finish(err); err2 == nil { - err = err2 + err = m.Finish(err) + }() + + var respMsg binding.Message + var result protocol.Result + + e, eventErr := binding.ToEvent(ctx, m) + switch { + case eventErr != nil && r.fn.hasEventIn: + return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr)) + case r.fn != nil: + // Check if event is valid before invoking the receiver function + if e != nil { + if validationErr := e.Validate(); validationErr != nil { + return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr)) } } - }() - e, err := binding.ToEvent(ctx, m) - if err != nil { - return err - } + // Let's invoke the receiver fn + var resp *event.Event + resp, result = r.fn.invoke(ctx, e) - if e != nil && r.fn != nil { - resp, result := r.fn.invoke(ctx, *e) + if respFn == nil { + break + } // Apply the defaulter chain to the outgoing event. if resp != nil && len(r.eventDefaulterFns) > 0 { @@ -60,29 +70,24 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p *resp = fn(ctx, *resp) } // Validate the event conforms to the CloudEvents Spec. - if verr := resp.Validate(); verr != nil { - cecontext.LoggerFrom(ctx).Error(fmt.Errorf("cloudevent validation failed on response event: %v, %w", verr, err)) + if vErr := resp.Validate(); vErr != nil { + cecontext.LoggerFrom(ctx).Errorf("cloudevent validation failed on response event: %w", vErr) } } - // protocol can manual ack by the result - if respFn == nil { - if !protocol.IsACK(result) { - err = m.Finish(result) - isFinished = true - } - return - } - - var rm binding.Message + // because binding.Message is an interface, casting a nil resp + // here would make future comparisons to nil false if resp != nil { - rm = (*binding.EventMessage)(resp) + respMsg = (*binding.EventMessage)(resp) } + } - return respFn(ctx, rm, result) + if respFn == nil { + // let the protocol ACK based on the result + return result } - return nil + return respFn(ctx, respMsg, result) } func (r *receiveInvoker) IsReceiver() bool { diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go index 3a1b40fe9ea..aeec1eb2843 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/options.go @@ -71,3 +71,15 @@ func WithTracePropagation() Option { return nil } } + +// WithPollGoroutines configures how much goroutines should be used to +// poll the Receiver/Responder/Protocol implementations. +// Default value is GOMAXPROCS +func WithPollGoroutines(pollGoroutines int) Option { + return func(i interface{}) error { + if c, ok := i.(*ceClient); ok { + c.pollGoroutines = pollGoroutines + } + return nil + } +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go index be7a4332cfc..e1d1544c6da 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/client/receiver.go @@ -72,7 +72,7 @@ func receiver(fn interface{}) (*receiverFn, error) { return r, nil } -func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, protocol.Result) { +func (r *receiverFn) invoke(ctx context.Context, e *event.Event) (*event.Event, protocol.Result) { args := make([]reflect.Value, 0, r.numIn) if r.numIn > 0 { @@ -80,7 +80,7 @@ func (r *receiverFn) invoke(ctx context.Context, e event.Event) (*event.Event, p args = append(args, reflect.ValueOf(ctx)) } if r.hasEventIn { - args = append(args, reflect.ValueOf(e)) + args = append(args, reflect.ValueOf(*e)) } } v := r.fnValue.Call(args) diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go index 1c1c4b696fe..3f8215a07d3 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/event.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event.go @@ -62,30 +62,6 @@ func (e Event) ExtensionAs(name string, obj interface{}) error { return e.Context.ExtensionAs(name, obj) } -// Validate performs a spec based validation on this event. -// Validation is dependent on the spec version specified in the event context. -func (e Event) Validate() error { - if e.Context == nil { - return fmt.Errorf("every event conforming to the CloudEvents specification MUST include a context") - } - - if e.FieldErrors != nil { - errs := make([]string, 0) - for f, e := range e.FieldErrors { - errs = append(errs, fmt.Sprintf("%q: %s,", f, e)) - } - if len(errs) > 0 { - return fmt.Errorf("previous field errors: [%s]", strings.Join(errs, "\n")) - } - } - - if err := e.Context.Validate(); err != nil { - return err - } - - return nil -} - // String returns a pretty-printed representation of the Event. func (e Event) String() string { b := strings.Builder{} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event_marshal.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event_marshal.go index b3fe804db6f..190eb3fc181 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/event_marshal.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event_marshal.go @@ -3,7 +3,6 @@ package event import ( "context" "encoding/json" - "errors" "fmt" "strings" @@ -31,7 +30,7 @@ func (e Event) MarshalJSON() ([]byte, error) { case CloudEventsVersionV1: b, err = JsonEncode(e) default: - return nil, fmt.Errorf("unknown spec version: %q", e.SpecVersion()) + return nil, ValidationError{"specversion": fmt.Errorf("unknown : %q", e.SpecVersion())} } // Report the observable @@ -64,7 +63,7 @@ func (e *Event) UnmarshalJSON(b []byte) error { case CloudEventsVersionV1: err = e.JsonDecodeV1(b, raw) default: - return fmt.Errorf("unknown spec version: %q", version) + return ValidationError{"specversion": fmt.Errorf("unknown : %q", version)} } // Report the observable @@ -263,7 +262,7 @@ func (e *Event) JsonDecodeV1(body []byte, raw map[string]json.RawMessage) error delete(raw, "data_base64") if data != nil && dataBase64 != nil { - return errors.New("parsing error: JSON decoder found both 'data', and 'data_base64' in JSON payload") + return ValidationError{"data": fmt.Errorf("found both 'data', and 'data_base64' in JSON payload")} } if data != nil { e.DataEncoded = data diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/event_validation.go b/vendor/github.com/cloudevents/sdk-go/v2/event/event_validation.go new file mode 100644 index 00000000000..feae2ce03ab --- /dev/null +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/event_validation.go @@ -0,0 +1,45 @@ +package event + +import ( + "fmt" + "strings" +) + +type ValidationError map[string]error + +func (e ValidationError) Error() string { + b := strings.Builder{} + for k, v := range e { + b.WriteString(k) + b.WriteString(": ") + b.WriteString(v.Error()) + b.WriteRune('\n') + } + return b.String() +} + +// Validate performs a spec based validation on this event. +// Validation is dependent on the spec version specified in the event context. +func (e Event) Validate() ValidationError { + if e.Context == nil { + return ValidationError{"specversion": fmt.Errorf("missing Event.Context")} + } + + errs := map[string]error{} + if e.FieldErrors != nil { + for k, v := range errs { + errs[k] = v + } + } + + if fieldErrors := e.Context.Validate(); fieldErrors != nil { + for k, v := range fieldErrors { + errs[k] = v + } + } + + if len(errs) > 0 { + return errs + } + return nil +} diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext.go index 5ad2374349f..2d0611215a2 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext.go @@ -110,7 +110,7 @@ type EventContext interface { // Validate the event based on the specifics of the CloudEvents spec version // represented by this event context. - Validate() error + Validate() ValidationError // Clone clones the event context. Clone() EventContext diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go index ded2edc864a..c626311df5b 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v03.go @@ -161,8 +161,8 @@ func (ec EventContextV03) AsV1() *EventContextV1 { // As of Feb 26, 2019, commit 17c32ea26baf7714ad027d9917d03d2fff79fc7e // + https://github.com/cloudevents/spec/pull/387 -> datacontentencoding // + https://github.com/cloudevents/spec/pull/406 -> subject -func (ec EventContextV03) Validate() error { - errors := []string(nil) +func (ec EventContextV03) Validate() ValidationError { + errors := map[string]error{} // type // Type: String @@ -172,7 +172,7 @@ func (ec EventContextV03) Validate() error { // SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines the semantics of this event type. eventType := strings.TrimSpace(ec.Type) if eventType == "" { - errors = append(errors, "type: MUST be a non-empty string") + errors["type"] = fmt.Errorf("MUST be a non-empty string") } // source @@ -181,7 +181,7 @@ func (ec EventContextV03) Validate() error { // REQUIRED source := strings.TrimSpace(ec.Source.String()) if source == "" { - errors = append(errors, "source: REQUIRED") + errors["source"] = fmt.Errorf("REQUIRED") } // subject @@ -192,7 +192,7 @@ func (ec EventContextV03) Validate() error { if ec.Subject != nil { subject := strings.TrimSpace(*ec.Subject) if subject == "" { - errors = append(errors, "subject: if present, MUST be a non-empty string") + errors["subject"] = fmt.Errorf("if present, MUST be a non-empty string") } } @@ -204,7 +204,7 @@ func (ec EventContextV03) Validate() error { // MUST be unique within the scope of the producer id := strings.TrimSpace(ec.ID) if id == "" { - errors = append(errors, "id: MUST be a non-empty string") + errors["id"] = fmt.Errorf("MUST be a non-empty string") // no way to test "MUST be unique within the scope of the producer" } @@ -225,7 +225,7 @@ func (ec EventContextV03) Validate() error { schemaURL := strings.TrimSpace(ec.SchemaURL.String()) // empty string is not RFC 3986 compatible. if schemaURL == "" { - errors = append(errors, "schemaurl: if present, MUST adhere to the format specified in RFC 3986") + errors["schemaurl"] = fmt.Errorf("if present, MUST adhere to the format specified in RFC 3986") } } @@ -237,11 +237,11 @@ func (ec EventContextV03) Validate() error { if ec.DataContentType != nil { dataContentType := strings.TrimSpace(*ec.DataContentType) if dataContentType == "" { - errors = append(errors, "datacontenttype: if present, MUST adhere to the format specified in RFC 2046") + errors["datacontenttype"] = fmt.Errorf("if present, MUST adhere to the format specified in RFC 2046") } else { _, _, err := mime.ParseMediaType(dataContentType) if err != nil { - errors = append(errors, fmt.Sprintf("datacontenttype: failed to parse RFC 2046 media type, %s", err.Error())) + errors["datacontenttype"] = fmt.Errorf("if present, MUST adhere to the format specified in RFC 2046") } } } @@ -255,12 +255,12 @@ func (ec EventContextV03) Validate() error { if ec.DataContentEncoding != nil { dataContentEncoding := strings.ToLower(strings.TrimSpace(*ec.DataContentEncoding)) if dataContentEncoding != Base64 { - errors = append(errors, "datacontentencoding: if present, MUST adhere to RFC 2045 Section 6.1") + errors["datacontentencoding"] = fmt.Errorf("if present, MUST adhere to RFC 2045 Section 6.1") } } if len(errors) > 0 { - return fmt.Errorf(strings.Join(errors, "\n")) + return errors } return nil } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go index 8ddd01f8b6d..f7e09ed63db 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/event/eventcontext_v1.go @@ -163,9 +163,9 @@ func (ec EventContextV1) AsV1() *EventContextV1 { } // Validate returns errors based on requirements from the CloudEvents spec. -// For more details, see https://github.com/cloudevents/spec/blob/v1.0-rc1/spec.md. -func (ec EventContextV1) Validate() error { - errors := []string(nil) +// For more details, see https://github.com/cloudevents/spec/blob/v1.0/spec.md. +func (ec EventContextV1) Validate() ValidationError { + errors := map[string]error{} // id // Type: String @@ -175,7 +175,7 @@ func (ec EventContextV1) Validate() error { // MUST be unique within the scope of the producer id := strings.TrimSpace(ec.ID) if id == "" { - errors = append(errors, "id: MUST be a non-empty string") + errors["id"] = fmt.Errorf("MUST be a non-empty string") // no way to test "MUST be unique within the scope of the producer" } @@ -187,7 +187,7 @@ func (ec EventContextV1) Validate() error { // An absolute URI is RECOMMENDED source := strings.TrimSpace(ec.Source.String()) if source == "" { - errors = append(errors, "source: REQUIRED") + errors["source"] = fmt.Errorf("REQUIRED") } // type @@ -198,7 +198,7 @@ func (ec EventContextV1) Validate() error { // SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines the semantics of this event type. eventType := strings.TrimSpace(ec.Type) if eventType == "" { - errors = append(errors, "type: MUST be a non-empty string") + errors["type"] = fmt.Errorf("MUST be a non-empty string") } // The following attributes are optional but still have validation. @@ -211,11 +211,11 @@ func (ec EventContextV1) Validate() error { if ec.DataContentType != nil { dataContentType := strings.TrimSpace(*ec.DataContentType) if dataContentType == "" { - errors = append(errors, "datacontenttype: if present, MUST adhere to the format specified in RFC 2046") + errors["datacontenttype"] = fmt.Errorf("if present, MUST adhere to the format specified in RFC 2046") } else { _, _, err := mime.ParseMediaType(dataContentType) if err != nil { - errors = append(errors, fmt.Sprintf("datacontenttype: failed to parse RFC 2046 media type, %s", err.Error())) + errors["datacontenttype"] = fmt.Errorf("failed to parse RFC 2046 media type %w", err) } } } @@ -229,7 +229,7 @@ func (ec EventContextV1) Validate() error { dataSchema := strings.TrimSpace(ec.DataSchema.String()) // empty string is not RFC 3986 compatible. if dataSchema == "" { - errors = append(errors, "dataschema: if present, MUST adhere to the format specified in RFC 3986") + errors["dataschema"] = fmt.Errorf("if present, MUST adhere to the format specified in RFC 3986") } } @@ -241,7 +241,7 @@ func (ec EventContextV1) Validate() error { if ec.Subject != nil { subject := strings.TrimSpace(*ec.Subject) if subject == "" { - errors = append(errors, "subject: if present, MUST be a non-empty string") + errors["subject"] = fmt.Errorf("if present, MUST be a non-empty string") } } @@ -253,7 +253,7 @@ func (ec EventContextV1) Validate() error { // --> no need to test this, no way to set the time without it being valid. if len(errors) > 0 { - return fmt.Errorf(strings.Join(errors, "\n")) + return errors } return nil } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go b/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go index a46039afef8..7988b65f292 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/extensions/distributed_tracing_extension.go @@ -84,7 +84,10 @@ func (d *DistributedTracingExtension) WriteTransformer() binding.TransformerFunc if err != nil { return nil } - return writer.SetExtension(TraceStateExtension, d.TraceState) + if d.TraceState != "" { + return writer.SetExtension(TraceStateExtension, d.TraceState) + } + return nil } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/go.mod b/vendor/github.com/cloudevents/sdk-go/v2/go.mod index 8682bc70e48..be311f192d4 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/go.mod +++ b/vendor/github.com/cloudevents/sdk-go/v2/go.mod @@ -22,8 +22,6 @@ require ( go.opencensus.io v0.22.0 go.uber.org/zap v1.10.0 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e - google.golang.org/api v0.15.0 - google.golang.org/grpc v1.26.0 ) go 1.13 diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go index a6ca567a090..bf632808cd5 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/options.go @@ -80,16 +80,14 @@ func WithShutdownTimeout(timeout time.Duration) Option { func checkListen(t *Protocol, prefix string) error { switch { - case t.Port != nil: - return fmt.Errorf("%v port already set", prefix) - case t.listener != nil: - return fmt.Errorf("%v listener already set", prefix) + case t.listener.Load() != nil: + return fmt.Errorf("error setting %v: listener already set", prefix) } return nil } // WithPort sets the listening port for StartReceiver. -// Only one of WithListener or WithPort is allowed. +// Only one of WithListener or WithPort is allowed. func WithPort(port int) Option { return func(t *Protocol) error { if t == nil { @@ -101,7 +99,7 @@ func WithPort(port int) Option { if err := checkListen(t, "http port option"); err != nil { return err } - t.setPort(port) + t.Port = port return nil } } @@ -113,12 +111,11 @@ func WithListener(l net.Listener) Option { if t == nil { return fmt.Errorf("http listener option can not set nil protocol") } - if err := checkListen(t, "http port option"); err != nil { + if err := checkListen(t, "http listener"); err != nil { return err } - t.listener = l - _, err := t.listen() - return err + t.listener.Store(l) + return nil } } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go index 72d6c042a65..46a0c4cca84 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol.go @@ -2,16 +2,18 @@ package http import ( "context" + "errors" "fmt" "io" - "net" "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/cloudevents/sdk-go/v2/binding" cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol" ) @@ -20,6 +22,12 @@ const ( DefaultShutdownTimeout = time.Minute * 1 ) +type msgErr struct { + msg *Message + respFn protocol.ResponseFn + err error +} + // Protocol acts as both a http client and a http handler. type Protocol struct { Target *url.URL @@ -33,8 +41,9 @@ type Protocol struct { // If nil, DefaultShutdownTimeout is used. ShutdownTimeout time.Duration - // Port is the port to bind the receiver to. Defaults to 8080. - Port *int + // Port is the port configured to bind the receiver to. Defaults to 8080. + // If you want to know the effective port you're listening to, use GetListeningPort() + Port int // Path is the path to bind the receiver to. Defaults to "/". Path string @@ -42,8 +51,9 @@ type Protocol struct { reMu sync.Mutex // Handler is the handler the http Server will use. Use this to reuse the // http server. If nil, the Protocol will create a one. - Handler *http.ServeMux - listener net.Listener + Handler *http.ServeMux + + listener atomic.Value roundTripper http.RoundTripper server *http.Server handlerRegistered bool @@ -53,6 +63,7 @@ type Protocol struct { func New(opts ...Option) (*Protocol, error) { p := &Protocol{ incoming: make(chan msgErr), + Port: -1, } if err := p.applyOptions(opts...); err != nil { return nil, err @@ -201,56 +212,74 @@ func (p *Protocol) Respond(ctx context.Context) (binding.Message, protocol.Respo if !ok { return nil, nil, io.EOF } + + if in.msg == nil { + return nil, in.respFn, in.err + } return in.msg, in.respFn, in.err + case <-ctx.Done(): return nil, nil, io.EOF } } -type msgErr struct { - msg *Message - respFn protocol.ResponseFn - err error -} - // ServeHTTP implements http.Handler. // Blocks until ResponseFn is invoked. func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) { m := NewMessageFromHttpRequest(req) - if m == nil || m.ReadEncoding() == binding.EncodingUnknown { + if m == nil { p.incoming <- msgErr{msg: nil, err: binding.ErrUnknownEncoding} return // if there was no message, return. } done := make(chan struct{}) - var finishErr error + var finishErr error m.OnFinish = func(err error) error { finishErr = err return nil } - var fn protocol.ResponseFn = func(ctx context.Context, resp binding.Message, er protocol.Result, transformers ...binding.Transformer) error { + var fn protocol.ResponseFn = func(ctx context.Context, respMsg binding.Message, res protocol.Result, transformers ...binding.Transformer) error { // Unblock the ServeHTTP after the reply is written defer func() { done <- struct{}{} }() - status := http.StatusOK + if finishErr != nil { - http.Error(rw, fmt.Sprintf("cannot forward CloudEvent: %v", finishErr), http.StatusInternalServerError) + http.Error(rw, fmt.Sprintf("Cannot forward CloudEvent: %s", finishErr), http.StatusInternalServerError) } - if er != nil { + + status := http.StatusOK + if res != nil { var result *Result - if protocol.ResultAs(er, &result) { + switch { + case protocol.ResultAs(res, &result): if result.StatusCode > 100 && result.StatusCode < 600 { status = result.StatusCode } + + case !protocol.IsACK(res): + // Map client errors to http status code + validationError := event.ValidationError{} + if errors.As(res, &validationError) { + status = http.StatusBadRequest + rw.Header().Set("content-type", "text/plain") + rw.WriteHeader(status) + _, _ = rw.Write([]byte(validationError.Error())) + } else if errors.Is(res, binding.ErrUnknownEncoding) { + status = http.StatusUnsupportedMediaType + } else { + status = http.StatusInternalServerError + } } } - if resp != nil { - err := WriteResponseWriter(ctx, resp, status, rw, transformers...) - return resp.Finish(err) + + if respMsg != nil { + err := WriteResponseWriter(ctx, respMsg, status, rw, transformers...) + return respMsg.Finish(err) } + rw.WriteHeader(status) return nil } diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_lifecycle.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_lifecycle.go index 89098d3cd1b..f3aafbd4d14 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_lifecycle.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/http/protocol_lifecycle.go @@ -28,13 +28,14 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { p.handlerRegistered = true } - addr, err := p.listen() + // After listener is invok + listener, err := p.listen() if err != nil { return err } p.server = &http.Server{ - Addr: addr.String(), + Addr: listener.Addr().String(), Handler: &ochttp.Handler{ Propagation: &tracecontext.HTTPFormat{}, Handler: attachMiddleware(p.Handler, p.middleware), @@ -50,7 +51,7 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { errChan := make(chan error, 1) go func() { - errChan <- p.server.Serve(p.listener) + errChan <- p.server.Serve(listener) }() // wait for the server to return or ctx.Done(). @@ -67,13 +68,13 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { } } -// GetPort returns the listening port. -// Returns -1 if there is a listening error. -// Note this will call net.Listen() if the listener is not already started. -func (p *Protocol) GetPort() int { - // Ensure we have a listener and therefore a port. - if _, err := p.listen(); err == nil || p.Port != nil { - return *p.Port +// GetListeningPort returns the listening port. +// Returns -1 if it's not listening. +func (p *Protocol) GetListeningPort() int { + if listener := p.listener.Load(); listener != nil { + if tcpAddr, ok := listener.(net.Listener).Addr().(*net.TCPAddr); ok { + return tcpAddr.Port + } } return -1 } @@ -82,33 +83,25 @@ func formatSpanName(r *http.Request) string { return "cloudevents.http." + r.URL.Path } -func (p *Protocol) setPort(port int) { - if p.Port == nil { - p.Port = new(int) - } - *p.Port = port -} - // listen if not already listening, update t.Port -func (p *Protocol) listen() (net.Addr, error) { - if p.listener == nil { +func (p *Protocol) listen() (net.Listener, error) { + if p.listener.Load() == nil { port := 8080 - if p.Port != nil { - port = *p.Port + if p.Port != -1 { + port = p.Port if port < 0 || port > 65535 { return nil, fmt.Errorf("invalid port %d", port) } } var err error - if p.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil { + var listener net.Listener + if listener, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil { return nil, err } + p.listener.Store(listener) + return listener, nil } - addr := p.listener.Addr() - if tcpAddr, ok := addr.(*net.TCPAddr); ok { - p.setPort(tcpAddr.Port) - } - return addr, nil + return p.listener.Load().(net.Listener), nil } // GetPath returns the path the transport is hosted on. If the path is '/', diff --git a/vendor/github.com/cloudevents/sdk-go/v2/protocol/result.go b/vendor/github.com/cloudevents/sdk-go/v2/protocol/result.go index f5583ff7cc6..42cff75ad3c 100644 --- a/vendor/github.com/cloudevents/sdk-go/v2/protocol/result.go +++ b/vendor/github.com/cloudevents/sdk-go/v2/protocol/result.go @@ -60,17 +60,15 @@ var ( // a transport.Result. This type holds the base ACK/NACK results. func NewReceipt(ack bool, messageFmt string, args ...interface{}) Result { return &Receipt{ - ACK: ack, - Format: messageFmt, - Args: args, + Err: fmt.Errorf(messageFmt, args...), + ACK: ack, } } // Receipt wraps the fields required to understand if a protocol event is acknowledged. type Receipt struct { - ACK bool - Format string - Args []interface{} + Err error + ACK bool } // make sure Result implements error. @@ -82,18 +80,16 @@ func (e *Receipt) Is(target error) bool { return e.ACK == o.ACK } // Allow for wrapped errors. - err := fmt.Errorf(e.Format, e.Args...) - return errors.Is(err, target) + return errors.Is(e.Err, target) } // Error returns the string that is formed by using the format string with the // provided args. func (e *Receipt) Error() string { - return fmt.Sprintf(e.Format, e.Args...) + return e.Err.Error() } // Unwrap returns the wrapped error if exist or nil func (e *Receipt) Unwrap() error { - err := fmt.Errorf(e.Format, e.Args...) - return errors.Unwrap(err) + return errors.Unwrap(e.Err) } diff --git a/vendor/modules.txt b/vendor/modules.txt index c83ca5feb4b..c9a80b459ce 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -89,7 +89,7 @@ github.com/cloudevents/sdk-go/pkg/cloudevents/observability github.com/cloudevents/sdk-go/pkg/cloudevents/transport github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http github.com/cloudevents/sdk-go/pkg/cloudevents/types -# github.com/cloudevents/sdk-go/v2 v2.0.0-RC2 +# github.com/cloudevents/sdk-go/v2 v2.0.0-RC4 github.com/cloudevents/sdk-go/v2 github.com/cloudevents/sdk-go/v2/binding github.com/cloudevents/sdk-go/v2/binding/buffering