diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 4da9199365b..9261250a03d 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -17,13 +17,13 @@ limitations under the License. package filter import ( + "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" - "strings" "time" opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" @@ -38,7 +38,7 @@ import ( "knative.dev/pkg/logging" "knative.dev/eventing/pkg/apis" - channelAttributes "knative.dev/eventing/pkg/channel/attributes" + "knative.dev/eventing/pkg/utils" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" @@ -51,7 +51,6 @@ import ( "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/reconciler/sugar/trigger/path" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" ) const ( @@ -65,24 +64,6 @@ const ( defaultMaxIdleConnectionsPerHost = 100 ) -const ( - // NoResponse signals the step that send event to trigger's subscriber hasn't started - NoResponse = -1 -) - -// ErrHandler handle the different errors of filter dispatch process -type ErrHandler struct { - ResponseCode int - ResponseBody []byte - err error -} - -// HeaderProxyAllowList contains the headers that are proxied from the reply; other than the CloudEvents headers. -// Other headers are not proxied because of security concerns. -var HeaderProxyAllowList = map[string]struct{}{ - strings.ToLower("Retry-After"): {}, -} - // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time @@ -93,7 +74,7 @@ type Handler struct { withContext func(ctx context.Context) context.Context } -// NewHandler creates a new Handler and its associated MessageReceiver. +// NewHandler creates a new Handler and its associated EventReceiver. func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, @@ -164,10 +145,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { ctx := h.withContext(request.Context()) - message := cehttp.NewMessageFromHttpRequest(request) - defer message.Finish(nil) - - event, err := binding.ToEvent(ctx, message) + event, err := cehttp.NewEventFromHTTPRequest(request) if err != nil { h.logger.Warn("failed to extract event from request", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -242,123 +220,64 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { URL: t.Status.SubscriberURI, CACerts: t.Status.SubscriberCACerts, } - h.send(ctx, writer, request.Header, target, reportArgs, event, t, ttl) + h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, t, ttl) } func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { - // send the event to trigger's subscriber - response, responseErr := h.sendEvent(ctx, headers, target, event, t, reportArgs) - if responseErr.err != nil { - h.logger.Error("failed to send event", zap.Error(responseErr.err)) - // If error is not because of the response, it should respond with http.StatusInternalServerError - if responseErr.ResponseCode == NoResponse { + additionalHeaders := headers.Clone() + additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) + dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders)) + if err != nil { + h.logger.Error("failed to send event", zap.Error(err)) + + // If error is not because of the response, it should respond with http.StatusInternalServerError + if dispatchInfo.ResponseCode <= 0 { writer.WriteHeader(http.StatusInternalServerError) _ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError) return } - // If error has a response propagate subscriber's headers back to channel - if response != nil { - proxyHeaders(response.Header, writer) - } - writer.WriteHeader(responseErr.ResponseCode) + + h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration) + + writeHeaders(utils.PassThroughHeaders(dispatchInfo.ResponseHeader), writer) + writer.WriteHeader(dispatchInfo.ResponseCode) // Read Response body to responseErr errExtensionInfo := broker.ErrExtensionInfo{ ErrDestination: target.URL, - ErrResponseBody: responseErr.ResponseBody, + ErrResponseBody: dispatchInfo.ResponseBody, } errExtensionBytes, msErr := json.Marshal(errExtensionInfo) if msErr != nil { h.logger.Error("failed to marshal errExtensionInfo", zap.Error(msErr)) return } - _, _ = writer.Write(errExtensionBytes) - _ = h.reporter.ReportEventCount(reportArgs, responseErr.ResponseCode) + _, err = writer.Write(errExtensionBytes) + if err != nil { + h.logger.Error("failed to write error response", zap.Error(err)) + } + _ = h.reporter.ReportEventCount(reportArgs, dispatchInfo.ResponseCode) return } h.logger.Debug("Successfully dispatched message", zap.Any("target", target)) + h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration) + // If there is an event in the response write it to the response - statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.URL.String()) + statusCode, err := h.writeResponse(ctx, writer, dispatchInfo, ttl, target.URL.String()) if err != nil { h.logger.Error("failed to write response", zap.Error(err)) } _ = h.reporter.ReportEventCount(reportArgs, statusCode) } -func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, t *eventingv1.Trigger, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { - responseErr := ErrHandler{ - ResponseCode: NoResponse, - } - - // Send the event to the subscriber - req, err := kncloudevents.NewCloudEventRequest(ctx, target) - if err != nil { - responseErr.err = fmt.Errorf("failed to create the request: %w", err) - return nil, responseErr - } - - message := binding.ToMessage(event) - defer message.Finish(nil) - - additionalHeaders := utils.PassThroughHeaders(headers) - additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) - - // Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events - additionalHeaders.Set("prefer", "reply") - - err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders) - if err != nil { - responseErr.err = fmt.Errorf("failed to write request: %w", err) - return nil, responseErr - } - - start := time.Now() - resp, err := req.Send() - dispatchTime := time.Since(start) - if err != nil { - responseErr.ResponseCode = http.StatusInternalServerError - responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) - responseErr.err = fmt.Errorf("failed to dispatch message: %w", err) - return resp, responseErr - } - - sc := 0 - if resp != nil { - sc = resp.StatusCode - responseErr.ResponseCode = sc - } - - _ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime) - - if resp.StatusCode < http.StatusOK || - resp.StatusCode >= http.StatusMultipleChoices { - // Read response body into errHandler for failures - body := make([]byte, channelAttributes.KnativeErrorDataExtensionMaxLength) - - readLen, readErr := resp.Body.Read(body) - if readErr != nil && readErr != io.EOF { - h.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(readErr)) - responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", readErr.Error())) - } else { - responseErr.ResponseBody = body[:readLen] - } - responseErr.err = fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", resp.StatusCode) - - // Reject non-successful responses. - return resp, responseErr - } - - return resp, responseErr -} - // The return values are the status -func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) { - response := cehttp.NewMessageFromHttpResponse(resp) +func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, dispatchInfo *kncloudevents.DispatchInfo, ttl int32, target string) (int, error) { + response := cehttp.NewMessage(dispatchInfo.ResponseHeader, io.NopCloser(bytes.NewReader(dispatchInfo.ResponseBody))) defer response.Finish(nil) if response.ReadEncoding() == binding.EncodingUnknown { @@ -375,10 +294,10 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, writer.WriteHeader(http.StatusBadGateway) return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent") } - proxyHeaders(resp.Header, writer) // Proxy original Response Headers for downstream use + writeHeaders(dispatchInfo.ResponseHeader, writer) // Proxy original Response Headers for downstream use h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target)) - writer.WriteHeader(resp.StatusCode) - return resp.StatusCode, nil + writer.WriteHeader(dispatchInfo.ResponseCode) + return dispatchInfo.ResponseCode, nil } event, err := binding.ToEvent(ctx, response) @@ -400,15 +319,15 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, defer eventResponse.Finish(nil) // Proxy the original Response Headers for downstream use - proxyHeaders(resp.Header, writer) + writeHeaders(dispatchInfo.ResponseHeader, writer) - if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil { + if err := cehttp.WriteResponseWriter(ctx, eventResponse, dispatchInfo.ResponseCode, writer); err != nil { return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err) } h.logger.Debug("Replied with a CloudEvent response", zap.Any("target", target)) - return resp.StatusCode, nil + return dispatchInfo.ResponseCode, nil } func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) { @@ -523,19 +442,11 @@ func triggerFilterAttribute(filter *eventingv1.TriggerFilter, attributeName stri return attributeValue } -// proxyHeaders adds the specified HTTP Headers to the ResponseWriter. -func proxyHeaders(httpHeader http.Header, writer http.ResponseWriter) { +// writeHeaders adds the specified HTTP Headers to the ResponseWriter. +func writeHeaders(httpHeader http.Header, writer http.ResponseWriter) { for headerKey, headerValues := range httpHeader { - // *Only* proxy some headers because of security reasons - if isInProxyHeaderAllowList(headerKey) { - for _, headerValue := range headerValues { - writer.Header().Add(headerKey, headerValue) - } + for _, headerValue := range headerValues { + writer.Header().Add(headerKey, headerValue) } } } - -func isInProxyHeaderAllowList(headerKey string) bool { - _, exists := HeaderProxyAllowList[strings.ToLower(headerKey)] - return exists -} diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 3af69b63c19..9a719e30e00 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -508,7 +508,7 @@ func TestReceiver(t *testing.T) { return } if err != nil || event == nil { - t.Fatalf("Expected response event, actually nil") + t.Fatalf("Expected response event, actually nil (err: %+v)", err) } // The TTL will be added again. diff --git a/pkg/broker/filter/server_manager.go b/pkg/broker/filter/server_manager.go index ec39463a532..edd8d3abbfc 100644 --- a/pkg/broker/filter/server_manager.go +++ b/pkg/broker/filter/server_manager.go @@ -36,8 +36,8 @@ func NewServerManager(ctx context.Context, logger *zap.Logger, cmw configmap.Wat logger.Info("failed to get TLS server config", zap.Error(err)) } - httpReceiver := kncloudevents.NewHTTPMessageReceiver(httpPort) - httpsReceiver := kncloudevents.NewHTTPMessageReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig)) + httpReceiver := kncloudevents.NewHTTPEventReceiver(httpPort) + httpsReceiver := kncloudevents.NewHTTPEventReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig)) return eventingtls.NewServerManager(ctx, httpReceiver, httpsReceiver, handler, cmw) } diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index e0c6417890e..ba9ee64cc84 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -49,8 +49,6 @@ import ( ) const ( - // noDuration signals that the dispatch step hasn't started - noDuration = -1 defaultMaxIdleConnections = 1000 defaultMaxIdleConnectionsPerHost = 1000 ) @@ -228,8 +226,8 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { eventType: event.Type(), } - statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName) - if dispatchTime > noDuration { + statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, brokerNamespace, brokerName) + if dispatchTime > kncloudevents.NoDuration { _ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime) } _ = h.Reporter.ReportEventCount(reporterArgs, statusCode) @@ -259,7 +257,6 @@ func toKReference(broker *eventingv1.Broker) *duckv1.KReference { } func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) { - // Setting the extension as a string as the CloudEvents sdk does not support non-string extensions. event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()}) if h.Defaulter != nil { @@ -269,51 +266,20 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud if ttl, err := broker.GetTTL(event.Context); err != nil || ttl <= 0 { h.Logger.Debug("dropping event based on TTL status.", zap.Int32("TTL", ttl), zap.String("event.id", event.ID()), zap.Error(err)) - return http.StatusBadRequest, noDuration + return http.StatusBadRequest, kncloudevents.NoDuration } channelAddress, err := h.getChannelAddress(brokerName, brokerNamespace) if err != nil { h.Logger.Warn("Broker not found in the namespace", zap.Error(err)) - return http.StatusBadRequest, noDuration + return http.StatusBadRequest, kncloudevents.NoDuration } - return h.send(ctx, headers, event, *channelAddress) -} - -func (h *Handler) send(ctx context.Context, headers http.Header, event *cloudevents.Event, target duckv1.Addressable) (int, time.Duration) { - - request, err := kncloudevents.NewCloudEventRequest(ctx, target) - if err != nil { - h.Logger.Error("failed to create event request.", zap.Error(err)) - return http.StatusInternalServerError, noDuration - } - - message := binding.ToMessage(event) - defer message.Finish(nil) - - additionalHeaders := utils.PassThroughHeaders(headers) - err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, request, additionalHeaders) - if err != nil { - h.Logger.Error("failed to write request additionalHeaders.", zap.Error(err)) - return http.StatusInternalServerError, noDuration - } - - resp, dispatchTime, err := h.sendAndRecordDispatchTime(request) - if resp != nil { - defer resp.Body.Close() - } + dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers)) if err != nil { h.Logger.Error("failed to dispatch event", zap.Error(err)) - return http.StatusInternalServerError, dispatchTime + return http.StatusInternalServerError, kncloudevents.NoDuration } - return resp.StatusCode, dispatchTime -} - -func (h *Handler) sendAndRecordDispatchTime(request *kncloudevents.CloudEventRequest) (*http.Response, time.Duration, error) { - start := time.Now() - resp, err := request.Send() - dispatchTime := time.Since(start) - return resp, dispatchTime, err + return dispatchInfo.ResponseCode, dispatchInfo.Duration } diff --git a/pkg/broker/ingress/server_manager.go b/pkg/broker/ingress/server_manager.go index 6e2d2fad360..0b1d202e654 100644 --- a/pkg/broker/ingress/server_manager.go +++ b/pkg/broker/ingress/server_manager.go @@ -36,8 +36,8 @@ func NewServerManager(ctx context.Context, logger *zap.Logger, cmw configmap.Wat logger.Info("failed to get TLS server config", zap.Error(err)) } - httpReceiver := kncloudevents.NewHTTPMessageReceiver(httpPort) - httpsReceiver := kncloudevents.NewHTTPMessageReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig)) + httpReceiver := kncloudevents.NewHTTPEventReceiver(httpPort) + httpsReceiver := kncloudevents.NewHTTPEventReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig)) return eventingtls.NewServerManager(ctx, httpReceiver, httpsReceiver, handler, cmw) } diff --git a/pkg/channel/message_receiver.go b/pkg/channel/event_receiver.go similarity index 68% rename from pkg/channel/message_receiver.go rename to pkg/channel/event_receiver.go index 4ad9ca7ed84..86ff96cf146 100644 --- a/pkg/channel/message_receiver.go +++ b/pkg/channel/event_receiver.go @@ -23,8 +23,7 @@ import ( nethttp "net/http" "time" - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/buffering" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol/http" "go.uber.org/zap" @@ -44,7 +43,7 @@ func (e *UnknownChannelError) Error() string { return fmt.Sprint("unknown channel: ", e.Channel) } -// UnknownHostError represents the error when a ResolveMessageChannelFromHostHeader func cannot resolve an host +// UnknownHostError represents the error when a ResolveChannelFromHostHeader func cannot resolve an host type UnknownHostError string func (e UnknownHostError) Error() string { @@ -57,35 +56,32 @@ func (e BadRequestError) Error() string { return "malformed request: " + string(e) } -// MessageReceiver starts a server to receive new events for the channel dispatcher. The new +// EventReceiver starts a server to receive new events for the channel dispatcher. The new // event is emitted via the receiver function. -type MessageReceiver struct { - httpBindingsReceiver *kncloudevents.HTTPMessageReceiver - receiverFunc UnbufferedMessageReceiverFunc +type EventReceiver struct { + httpBindingsReceiver *kncloudevents.HTTPEventReceiver + receiverFunc EventReceiverFunc logger *zap.Logger hostToChannelFunc ResolveChannelFromHostFunc pathToChannelFunc ResolveChannelFromPathFunc reporter StatsReporter } -// UnbufferedMessageReceiverFunc is the function to be called for handling the message. -// The provided message is not buffered, so it can't be safely read more times. -// When you perform the write (or the buffering) of the Message, you must use the transformers provided as parameters. -// This function is responsible for invoking Message.Finish(). -type UnbufferedMessageReceiverFunc func(context.Context, ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error +// EventReceiverFunc is the function to be called for handling the event. +type EventReceiverFunc func(context.Context, ChannelReference, event.Event, nethttp.Header) error -// ReceiverOptions provides functional options to MessageReceiver function. -type MessageReceiverOptions func(*MessageReceiver) error +// ReceiverOptions provides functional options to EventReceiver function. +type EventReceiverOptions func(*EventReceiver) error // ResolveChannelFromHostFunc function enables EventReceiver to get the Channel Reference from incoming request HostHeader // before calling receiverFunc. // Returns UnknownHostError if the channel is not found, otherwise returns a generic error. type ResolveChannelFromHostFunc func(string) (ChannelReference, error) -// ResolveMessageChannelFromHostHeader is a ReceiverOption for NewMessageReceiver which enables the caller to overwrite the +// ResolveChannelFromHostHeader is a ReceiverOption for NewEventReceiver which enables the caller to overwrite the // default behaviour defined by ParseChannelFromHost function. -func ResolveMessageChannelFromHostHeader(hostToChannelFunc ResolveChannelFromHostFunc) MessageReceiverOptions { - return func(r *MessageReceiver) error { +func ResolveChannelFromHostHeader(hostToChannelFunc ResolveChannelFromHostFunc) EventReceiverOptions { + return func(r *EventReceiver) error { r.hostToChannelFunc = hostToChannelFunc return nil } @@ -95,20 +91,20 @@ func ResolveMessageChannelFromHostHeader(hostToChannelFunc ResolveChannelFromHos // before calling receiverFunc. type ResolveChannelFromPathFunc func(string) (ChannelReference, error) -// ResolveMessageChannelFromPath is a ReceiverOption for NewMessageReceiver which enables the caller to overwrite the +// ResolveChannelFromPath is a ReceiverOption for NewEventReceiver which enables the caller to overwrite the // default behaviour defined by ParseChannelFromPath function. -func ResolveMessageChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) MessageReceiverOptions { - return func(r *MessageReceiver) error { +func ResolveChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) EventReceiverOptions { + return func(r *EventReceiver) error { r.pathToChannelFunc = PathToChannelFunc return nil } } -// NewMessageReceiver creates an event receiver passing new events to the +// NewEventReceiver creates an event receiver passing new events to the // receiverFunc. -func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zap.Logger, reporter StatsReporter, opts ...MessageReceiverOptions) (*MessageReceiver, error) { - bindingsReceiver := kncloudevents.NewHTTPMessageReceiver(8080) - receiver := &MessageReceiver{ +func NewEventReceiver(receiverFunc EventReceiverFunc, logger *zap.Logger, reporter StatsReporter, opts ...EventReceiverOptions) (*EventReceiver, error) { + bindingsReceiver := kncloudevents.NewHTTPEventReceiver(8080) + receiver := &EventReceiver{ httpBindingsReceiver: bindingsReceiver, receiverFunc: receiverFunc, hostToChannelFunc: ResolveChannelFromHostFunc(ParseChannelFromHost), @@ -128,7 +124,7 @@ func NewMessageReceiver(receiverFunc UnbufferedMessageReceiverFunc, logger *zap. // Only HTTP POST requests to the root path (/) are accepted. If other paths or // methods are needed, use the HandleRequest method directly with another HTTP // server. -func (r *MessageReceiver) Start(ctx context.Context) error { +func (r *EventReceiver) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -156,7 +152,7 @@ func (r *MessageReceiver) Start(ctx context.Context) error { } } -func (r *MessageReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { +func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { response.Header().Set("Allow", "POST, OPTIONS") if request.Method == nethttp.MethodOptions { response.Header().Set("WebHook-Allowed-Origin", "*") // Accept from any Origin: @@ -207,23 +203,7 @@ func (r *MessageReceiver) ServeHTTP(response nethttp.ResponseWriter, request *ne args.Ns = channel.Namespace - message := http.NewMessageFromHttpRequest(request) - if message.ReadEncoding() == binding.EncodingUnknown { - r.logger.Info("Cannot determine the cloudevent message encoding") - response.WriteHeader(nethttp.StatusBadRequest) - r.reporter.ReportEventCount(&args, nethttp.StatusBadRequest) - return - } - - bufferedMessage, err := buffering.CopyMessage(request.Context(), message) - if err != nil { - r.logger.Warn("Cannot buffer cloudevent message", zap.Error(err)) - response.WriteHeader(nethttp.StatusBadRequest) - _ = r.reporter.ReportEventCount(&args, nethttp.StatusBadRequest) - return - } - - event, err := binding.ToEvent(request.Context(), bufferedMessage) + event, err := http.NewEventFromHTTPRequest(request) if err != nil { r.logger.Warn("failed to extract event from request", zap.Error(err)) response.WriteHeader(nethttp.StatusBadRequest) @@ -238,7 +218,7 @@ func (r *MessageReceiver) ServeHTTP(response nethttp.ResponseWriter, request *ne return } - err = r.receiverFunc(request.Context(), channel, bufferedMessage, []binding.Transformer{}, utils.PassThroughHeaders(request.Header)) + err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) if err != nil { if _, ok := err.(*UnknownChannelError); ok { response.WriteHeader(nethttp.StatusNotFound) @@ -262,4 +242,4 @@ func ReportEventCountMetricsForDispatchError(err error, reporter StatsReporter, } } -var _ nethttp.Handler = (*MessageReceiver)(nil) +var _ nethttp.Handler = (*EventReceiver)(nil) diff --git a/pkg/channel/message_receiver_test.go b/pkg/channel/event_receiver_test.go similarity index 82% rename from pkg/channel/message_receiver_test.go rename to pkg/channel/event_receiver_test.go index d6dac154c9e..0221e5e22a9 100644 --- a/pkg/channel/message_receiver_test.go +++ b/pkg/channel/event_receiver_test.go @@ -29,6 +29,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/client" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/go-cmp/cmp" @@ -45,16 +46,16 @@ import ( "knative.dev/pkg/tracing/propagation/tracecontextb3" ) -func TestMessageReceiver_ServeHTTP(t *testing.T) { +func TestEventReceiver_ServeHTTP(t *testing.T) { testCases := map[string]struct { method string host string path string additionalHeaders nethttp.Header expected int - receiverFunc UnbufferedMessageReceiverFunc + receiverFunc EventReceiverFunc responseValidator func(r httptest.ResponseRecorder) error - opts []MessageReceiverOptions + opts []EventReceiverOptions }{ "host based channel reference with non '/' path": { path: "/something", @@ -63,7 +64,7 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { "path based channel reference with malformed path": { path: "/something", expected: nethttp.StatusBadRequest, - opts: []MessageReceiverOptions{ResolveMessageChannelFromPath(ParseChannelFromPath)}, + opts: []EventReceiverOptions{ResolveChannelFromPath(ParseChannelFromPath)}, }, "not a POST": { method: nethttp.MethodGet, @@ -74,13 +75,13 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { expected: nethttp.StatusBadRequest, }, "unknown channel error": { - receiverFunc: func(_ context.Context, c ChannelReference, _ binding.Message, _ []binding.Transformer, _ nethttp.Header) error { + receiverFunc: func(_ context.Context, c ChannelReference, _ event.Event, _ nethttp.Header) error { return &UnknownChannelError{Channel: c} }, expected: nethttp.StatusNotFound, }, "other receiver function error": { - receiverFunc: func(_ context.Context, _ ChannelReference, _ binding.Message, _ []binding.Transformer, _ nethttp.Header) error { + receiverFunc: func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error { return errors.New("test induced receiver function error") }, expected: nethttp.StatusInternalServerError, @@ -88,14 +89,14 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { "path based channel reference": { path: "/new-namespace/new-channel", host: "test-name.test-namespace.svc." + network.GetClusterDomainName(), - receiverFunc: func(ctx context.Context, r ChannelReference, m binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + receiverFunc: func(ctx context.Context, r ChannelReference, m event.Event, additionalHeaders nethttp.Header) error { if r.Namespace != "new-namespace" || r.Name != "new-channel" { return fmt.Errorf("bad channel reference %v", r) } return nil }, expected: nethttp.StatusAccepted, - opts: []MessageReceiverOptions{ResolveMessageChannelFromPath(ParseChannelFromPath)}, + opts: []EventReceiverOptions{ResolveChannelFromPath(ParseChannelFromPath)}, }, "headers and body pass through": { // The header, body, and host values set here are verified in the receiverFunc. Altering @@ -107,19 +108,14 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { "knatIve-will-pass-through": {"true", "always"}, }, host: "test-name.test-namespace.svc." + network.GetClusterDomainName(), - receiverFunc: func(ctx context.Context, r ChannelReference, m binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + receiverFunc: func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error { if r.Namespace != "test-namespace" || r.Name != "test-name" { return fmt.Errorf("test receiver func -- bad reference: %v", r) } - e, err := binding.ToEvent(ctx, m, transformers...) - if err != nil { - return err - } - // Check payload var payload string - err = e.DataAs(&payload) + err := e.DataAs(&payload) if err != nil { return err } @@ -172,7 +168,7 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { } f := tc.receiverFunc - r, err := NewMessageReceiver(f, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), reporter, tc.opts...) + r, err := NewEventReceiver(f, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), reporter, tc.opts...) if err != nil { t.Fatalf("Error creating new event receiver. Error:%s", err) } @@ -215,12 +211,12 @@ func TestMessageReceiver_ServeHTTP(t *testing.T) { } } -func TestMessageReceiver_ServerStart_trace_propagation(t *testing.T) { +func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) { want := test.ConvertEventExtensionsToString(t, test.FullEvent()) done := make(chan struct{}, 1) - receiverFunc := func(ctx context.Context, r ChannelReference, m binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + receiverFunc := func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error { if r.Namespace != "test-namespace" || r.Name != "test-name" { return fmt.Errorf("test receiver func -- bad reference: %v", r) } @@ -241,7 +237,7 @@ func TestMessageReceiver_ServerStart_trace_propagation(t *testing.T) { reporter := NewStatsReporter("testcontainer", "testpod") logger, _ := zap.NewDevelopment() - r, err := NewMessageReceiver(receiverFunc, logger, reporter) + r, err := NewEventReceiver(receiverFunc, logger, reporter) if err != nil { t.Fatalf("Error creating new event receiver. Error:%s", err) } @@ -279,14 +275,14 @@ func TestMessageReceiver_ServerStart_trace_propagation(t *testing.T) { <-done } -func TestMessageReceiver_WrongRequest(t *testing.T) { +func TestEventReceiver_WrongRequest(t *testing.T) { reporter := NewStatsReporter("testcontainer", "testpod") host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/" - f := func(_ context.Context, _ ChannelReference, _ binding.Message, _ []binding.Transformer, _ nethttp.Header) error { + f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error { return errors.New("test induced receiver function error") } - r, err := NewMessageReceiver(f, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), reporter) + r, err := NewEventReceiver(f, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), reporter) if err != nil { t.Fatalf("Error creating new event receiver. Error:%s", err) } @@ -302,18 +298,18 @@ func TestMessageReceiver_WrongRequest(t *testing.T) { } } -func TestMessageReceiver_UnknownHost(t *testing.T) { +func TestEventReceiver_UnknownHost(t *testing.T) { host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/" reporter := NewStatsReporter("testcontainer", "testpod") - f := func(_ context.Context, _ ChannelReference, _ binding.Message, _ []binding.Transformer, _ nethttp.Header) error { + f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error { return errors.New("test induced receiver function error") } - r, err := NewMessageReceiver( + r, err := NewEventReceiver( f, zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), reporter, - ResolveMessageChannelFromHostHeader(func(s string) (reference ChannelReference, err error) { + ResolveChannelFromHostHeader(func(s string) (reference ChannelReference, err error) { return ChannelReference{}, UnknownHostError(s) })) if err != nil { diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_event_handler.go similarity index 59% rename from pkg/channel/fanout/fanout_message_handler.go rename to pkg/channel/fanout/fanout_event_handler.go index aba7d0f1fda..2bc142ef9bc 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -17,8 +17,8 @@ limitations under the License. // Package fanout provides an http.Handler that takes in one request and fans it out to N other // requests, based on a list of Subscriptions. Logically, it represents all the Subscriptions to a // single Knative Channel. -// It will normally be used in conjunction with multichannelfanout.MessageHandler, which contains multiple -// fanout.MessageHandler, each corresponding to a single Knative Channel. +// It will normally be used in conjunction with multichannelfanout.EventHandler, which contains multiple +// fanout.EventHandler, each corresponding to a single Knative Channel. package fanout import ( @@ -28,8 +28,7 @@ import ( "sync" "time" - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/buffering" + "github.com/cloudevents/sdk-go/v2/event" "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" @@ -53,7 +52,7 @@ type Subscription struct { RetryConfig *kncloudevents.RetryConfig } -// Config for a fanout.MessageHandler. +// Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. @@ -61,18 +60,18 @@ type Config struct { AsyncHandler bool `json:"asyncHandler,omitempty"` } -// MessageHandler is an http.Handler but has methods for managing +// EventHandler is an http.Handler but has methods for managing // the fanout Subscriptions. Get/Set methods are synchronized, and // GetSubscriptions returns a copy of the Subscriptions, so you can // use it to fetch a snapshot and use it after that safely. -type MessageHandler interface { +type EventHandler interface { nethttp.Handler SetSubscriptions(ctx context.Context, subs []Subscription) GetSubscriptions(ctx context.Context) []Subscription } -// MessageHandler is a http.Handler that takes a single request in and fans it out to N other servers. -type FanoutMessageHandler struct { +// FanoutEventHandler is a http.Handler that takes a single request in and fans it out to N other servers. +type FanoutEventHandler struct { // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. asyncHandler bool @@ -80,8 +79,7 @@ type FanoutMessageHandler struct { subscriptionsMutex sync.RWMutex subscriptions []Subscription - receiver *channel.MessageReceiver - dispatcher channel.MessageDispatcher + receiver *channel.EventReceiver // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, // rather than a member variable. @@ -94,21 +92,18 @@ type FanoutMessageHandler struct { channelUID *types.UID } -// NewMessageHandler creates a new fanout.MessageHandler. - -func NewFanoutMessageHandler( +// NewFanoutEventHandler creates a new fanout.EventHandler. +func NewFanoutEventHandler( logger *zap.Logger, - messageDispatcher channel.MessageDispatcher, config Config, reporter channel.StatsReporter, eventTypeHandler *eventtype.EventTypeAutoHandler, channelAddressable *duckv1.KReference, channelUID *types.UID, - receiverOpts ...channel.MessageReceiverOptions, -) (*FanoutMessageHandler, error) { - handler := &FanoutMessageHandler{ + receiverOpts ...channel.EventReceiverOptions, +) (*FanoutEventHandler, error) { + handler := &FanoutEventHandler{ logger: logger, - dispatcher: messageDispatcher, timeout: defaultTimeout, reporter: reporter, asyncHandler: config.AsyncHandler, @@ -120,7 +115,7 @@ func NewFanoutMessageHandler( copy(handler.subscriptions, config.Subscriptions) // The receiver function needs to point back at the handler itself, so set it up after // initialization. - receiver, err := channel.NewMessageReceiver(createMessageReceiverFunction(handler), logger, reporter, receiverOpts...) + receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, receiverOpts...) if err != nil { return nil, err } @@ -164,7 +159,7 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil } -func (f *FanoutMessageHandler) SetSubscriptions(ctx context.Context, subs []Subscription) { +func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscription) { f.subscriptionsMutex.Lock() defer f.subscriptionsMutex.Unlock() s := make([]Subscription, len(subs)) @@ -172,7 +167,7 @@ func (f *FanoutMessageHandler) SetSubscriptions(ctx context.Context, subs []Subs f.subscriptions = s } -func (f *FanoutMessageHandler) GetSubscriptions(ctx context.Context) []Subscription { +func (f *FanoutEventHandler) GetSubscriptions(ctx context.Context) []Subscription { f.subscriptionsMutex.RLock() defer f.subscriptionsMutex.RUnlock() ret := make([]Subscription, len(f.subscriptions)) @@ -180,21 +175,16 @@ func (f *FanoutMessageHandler) GetSubscriptions(ctx context.Context) []Subscript return ret } -func (f *FanoutMessageHandler) autoCreateEventType(ctx context.Context, bufferedMessage binding.Message, transformers []binding.Transformer) { +func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event.Event) { if f.channelAddressable == nil { f.logger.Warn("No addressable for channel") return } else { - event, err := binding.ToEvent(ctx, bufferedMessage, transformers...) - if err != nil { - f.logger.Warn("Failed to extract event from message") - return - } if f.channelUID == nil { f.logger.Warn("No channelUID provided, unable to autocreate event type") return } - err = f.eventTypeHandler.AutoCreateEventType(ctx, event, f.channelAddressable, *f.channelUID) + err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelAddressable, *f.channelUID) if err != nil { f.logger.Warn("EventTypeCreate failed") return @@ -202,99 +192,66 @@ func (f *FanoutMessageHandler) autoCreateEventType(ctx context.Context, buffered } } -func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error { +func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, channel.ChannelReference, event.Event, nethttp.Header) error { if f.asyncHandler { - return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + return func(ctx context.Context, ref channel.ChannelReference, evnt event.Event, additionalHeaders nethttp.Header) error { if f.eventTypeHandler != nil { - bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) - if err != nil { - f.logger.Warn("Failed to copy message") - } else { - f.autoCreateEventType(ctx, bufferedMessage, transformers) - } + f.autoCreateEventType(ctx, evnt) } subs := f.GetSubscriptions(ctx) if len(subs) == 0 { - // Nothing to do here, finish the message and return - _ = message.Finish(nil) + // Nothing to do here return nil } parentSpan := trace.FromContext(ctx) - te := kncloudevents.TypeExtractorTransformer("") - transformers = append(transformers, &te) - // Message buffering here is done before starting the dispatch goroutine - // Because the message could be closed before the buffering happens - bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) - if err != nil { - return err - } - reportArgs := channel.ReportArgs{} - reportArgs.EventType = string(te) + reportArgs.EventType = evnt.Type() reportArgs.Ns = ref.Namespace - // We don't need the original message anymore - _ = message.Finish(nil) - go func(m binding.Message, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) { + go func(e event.Event, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) { // Run async dispatch with background context. ctx = trace.NewContext(context.Background(), s) h.Set(apis.KnNamespaceHeader, ref.Namespace) // Any returned error is already logged in f.dispatch(). - dispatchResultForFanout := f.dispatch(ctx, subs, m, h) + dispatchResultForFanout := f.dispatch(ctx, subs, e, h) _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) - }(bufferedMessage, additionalHeaders, parentSpan, &f.reporter, &reportArgs) + }(evnt, additionalHeaders, parentSpan, &f.reporter, &reportArgs) return nil } } - return func(ctx context.Context, ref channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error { + return func(ctx context.Context, ref channel.ChannelReference, event event.Event, additionalHeaders nethttp.Header) error { if f.eventTypeHandler != nil { - bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) - if err != nil { - f.logger.Warn("Failed to copy message") - } else { - f.autoCreateEventType(ctx, bufferedMessage, transformers) - } + f.autoCreateEventType(ctx, event) } subs := f.GetSubscriptions(ctx) if len(subs) == 0 { - // Nothing to do here, finish the message and return - _ = message.Finish(nil) + // Nothing to do here return nil } - te := kncloudevents.TypeExtractorTransformer("") - transformers = append(transformers, &te) - // We buffer the message to send it several times - bufferedMessage, err := buffering.CopyMessage(ctx, message, transformers...) - if err != nil { - return err - } - // We don't need the original message anymore - _ = message.Finish(nil) - reportArgs := channel.ReportArgs{} - reportArgs.EventType = string(te) + reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace - dispatchResultForFanout := f.dispatch(ctx, subs, bufferedMessage, additionalHeaders) + dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders) return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) } } -func (f *FanoutMessageHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { +func (f *FanoutEventHandler) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { f.receiver.ServeHTTP(response, request) } // ParseDispatchResultAndReportMetric processes the dispatch result and records the related channel metrics with the appropriate context func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel.StatsReporter, reportArgs channel.ReportArgs) error { - if result.info != nil && result.info.Time > channel.NoDuration { - if result.info.ResponseCode > channel.NoResponse { - _ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Time) + if result.info != nil && result.info.Duration > kncloudevents.NoDuration { + if result.info.ResponseCode > kncloudevents.NoResponse { + _ = reporter.ReportEventDispatchTime(&reportArgs, result.info.ResponseCode, result.info.Duration) } else { - _ = reporter.ReportEventDispatchTime(&reportArgs, nethttp.StatusInternalServerError, result.info.Time) + _ = reporter.ReportEventDispatchTime(&reportArgs, nethttp.StatusInternalServerError, result.info.Duration) } } err := result.err @@ -308,37 +265,34 @@ func ParseDispatchResultAndReportMetrics(result DispatchResult, reporter channel // dispatch takes the event, fans it out to each subscription in subs. If all the fanned out // events return successfully, then return nil. Else, return an error. -func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription, bufferedMessage binding.Message, additionalHeaders nethttp.Header) DispatchResult { - // Bind the lifecycle of the buffered message to the number of subs - bufferedMessage = buffering.WithAcksBeforeFinish(bufferedMessage, len(subs)) - +func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription, event event.Event, additionalHeaders nethttp.Header) DispatchResult { errorCh := make(chan DispatchResult, len(subs)) for _, sub := range subs { go func(s Subscription) { - dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s) + dispatchedResultPerSub, err := f.makeFanoutRequest(ctx, event, additionalHeaders, s) errorCh <- DispatchResult{err: err, info: dispatchedResultPerSub} }(sub) } - var totalDispatchTimeForFanout time.Duration = channel.NoDuration + var totalDispatchTimeForFanout time.Duration = kncloudevents.NoDuration dispatchResultForFanout := DispatchResult{ - info: &channel.DispatchExecutionInfo{ - Time: channel.NoDuration, - ResponseCode: channel.NoResponse, + info: &kncloudevents.DispatchInfo{ + Duration: kncloudevents.NoDuration, + ResponseCode: kncloudevents.NoResponse, }, } for range subs { select { case dispatchResult := <-errorCh: if dispatchResult.info != nil { - if dispatchResult.info.Time > channel.NoDuration { - if totalDispatchTimeForFanout > channel.NoDuration { - totalDispatchTimeForFanout += dispatchResult.info.Time + if dispatchResult.info.Duration > kncloudevents.NoDuration { + if totalDispatchTimeForFanout > kncloudevents.NoDuration { + totalDispatchTimeForFanout += dispatchResult.info.Duration } else { - totalDispatchTimeForFanout = dispatchResult.info.Time + totalDispatchTimeForFanout = dispatchResult.info.Duration } } - dispatchResultForFanout.info.Time = totalDispatchTimeForFanout + dispatchResultForFanout.info.Duration = totalDispatchTimeForFanout dispatchResultForFanout.info.ResponseCode = dispatchResult.info.ResponseCode } if dispatchResult.err != nil { @@ -358,32 +312,28 @@ func (f *FanoutMessageHandler) dispatch(ctx context.Context, subs []Subscription // makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and // the `sink` portions of the subscription. -func (f *FanoutMessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) (*channel.DispatchExecutionInfo, error) { - return f.dispatcher.DispatchMessageWithRetries( - ctx, - message, - additionalHeaders, - sub.Subscriber, - sub.Reply, - sub.DeadLetter, - sub.RetryConfig, - ) +func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) { + return kncloudevents.SendEvent(ctx, event, sub.Subscriber, + kncloudevents.WithHeader(additionalHeaders), + kncloudevents.WithReply(sub.Reply), + kncloudevents.WithDeadLetterSink(sub.DeadLetter), + kncloudevents.WithRetryConfig(sub.RetryConfig)) } type DispatchResult struct { err error - info *channel.DispatchExecutionInfo + info *kncloudevents.DispatchInfo } func (d DispatchResult) Error() error { return d.err } -func (d DispatchResult) Info() *channel.DispatchExecutionInfo { +func (d DispatchResult) Info() *kncloudevents.DispatchInfo { return d.info } -func NewDispatchResult(err error, info *channel.DispatchExecutionInfo) DispatchResult { +func NewDispatchResult(err error, info *kncloudevents.DispatchInfo) DispatchResult { return DispatchResult{ err: err, info: info, diff --git a/pkg/channel/fanout/fanout_message_handler_test.go b/pkg/channel/fanout/fanout_event_handler_test.go similarity index 87% rename from pkg/channel/fanout/fanout_message_handler_test.go rename to pkg/channel/fanout/fanout_event_handler_test.go index 04483409e18..902e9d4f609 100644 --- a/pkg/channel/fanout/fanout_message_handler_test.go +++ b/pkg/channel/fanout/fanout_event_handler_test.go @@ -31,11 +31,12 @@ import ( eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/kncloudevents" duckv1 "knative.dev/pkg/apis/duck/v1" - pkgduckv1 "knative.dev/pkg/apis/duck/v1" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" bindingshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/cloudevents/sdk-go/v2/test" "go.opencensus.io/trace" "go.uber.org/atomic" "go.uber.org/zap" @@ -67,8 +68,8 @@ func TestSubscriberSpecToFanoutConfig(t *testing.T) { ReplyURI: apis.HTTP("reply.example.com"), ReplyCACerts: &replyCACerts, Delivery: &eventingduckv1.DeliverySpec{ - DeadLetterSink: &pkgduckv1.Destination{ - Ref: &pkgduckv1.KReference{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ Kind: "mykind", Namespace: "mynamespace", Name: "myname", @@ -111,7 +112,7 @@ func TestSubscriberSpecToFanoutConfig(t *testing.T) { } func TestGetSetSubscriptions(t *testing.T) { - h := &FanoutMessageHandler{subscriptions: make([]Subscription, 0)} + h := &FanoutEventHandler{subscriptions: make([]Subscription, 0)} subs := h.GetSubscriptions(context.TODO()) if len(subs) != 0 { t.Error("Wanted 0 subs, got: ", len(subs)) @@ -147,9 +148,9 @@ func TestGetSetSubscriptions(t *testing.T) { } -func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { +func TestFanoutEventHandler_ServeHTTP(t *testing.T) { testCases := map[string]struct { - receiverFunc channel.UnbufferedMessageReceiverFunc + receiverFunc channel.EventReceiverFunc timeout time.Duration subs []Subscription subscriber func(http.ResponseWriter, *http.Request) @@ -160,14 +161,14 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { asyncExpectedStatus int }{ "rejected by receiver": { - receiverFunc: func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, http.Header) error { + receiverFunc: func(context.Context, channel.ChannelReference, event.Event, http.Header) error { return errors.New("rejected by test-receiver") }, expectedStatus: http.StatusInternalServerError, asyncExpectedStatus: http.StatusInternalServerError, }, "receiver has span": { - receiverFunc: func(ctx context.Context, _ channel.ChannelReference, _ binding.Message, _ []binding.Transformer, _ http.Header) error { + receiverFunc: func(ctx context.Context, _ channel.ChannelReference, _ event.Event, _ http.Header) error { if span := trace.FromContext(ctx); span == nil { return errors.New("missing span") } @@ -196,19 +197,21 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { expectedStatus: http.StatusAccepted, asyncExpectedStatus: http.StatusAccepted, }, - "empty sub succeeds": { - subs: []Subscription{ - {}, - }, - expectedStatus: http.StatusAccepted, - asyncExpectedStatus: http.StatusAccepted, - }, "reply fails": { subs: []Subscription{ { - Reply: &replaceReplier, + Subscriber: replaceSubscriber, + Reply: &replaceReplier, }, }, + subscriber: func(writer http.ResponseWriter, req *http.Request) { + // response with some event for reply + event := test.FullEvent() + message := binding.ToMessage(&event) + bindingshttp.WriteResponseWriter(context.TODO(), message, http.StatusAccepted, writer) + message.Finish(nil) + }, + subscriberReqs: 1, replier: func(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusNotFound) }, @@ -306,15 +309,15 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) { } for n, tc := range testCases { t.Run("sync - "+n, func(t *testing.T) { - testFanoutMessageHandler(t, false, tc.receiverFunc, tc.timeout, tc.subs, tc.subscriber, tc.subscriberReqs, tc.replier, tc.replierReqs, tc.expectedStatus) + testFanoutEventHandler(t, false, tc.receiverFunc, tc.timeout, tc.subs, tc.subscriber, tc.subscriberReqs, tc.replier, tc.replierReqs, tc.expectedStatus) }) t.Run("async - "+n, func(t *testing.T) { - testFanoutMessageHandler(t, true, tc.receiverFunc, tc.timeout, tc.subs, tc.subscriber, tc.subscriberReqs, tc.replier, tc.replierReqs, tc.asyncExpectedStatus) + testFanoutEventHandler(t, true, tc.receiverFunc, tc.timeout, tc.subs, tc.subscriber, tc.subscriberReqs, tc.replier, tc.replierReqs, tc.asyncExpectedStatus) }) } } -func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) { +func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.EventReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) { var subscriberServerWg *sync.WaitGroup reporter := channel.NewStatsReporter("testcontainer", "testpod") if subscriberReqs != 0 { @@ -360,14 +363,13 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb } calledChan := make(chan bool, 1) - recvOptionFunc := func(*channel.MessageReceiver) error { + recvOptionFunc := func(*channel.EventReceiver) error { calledChan <- true return nil } - h, err := NewFanoutMessageHandler( + h, err := NewFanoutEventHandler( logger, - channel.NewMessageDispatcher(logger), Config{ Subscriptions: subs, AsyncHandler: async, @@ -384,7 +386,7 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb } if receiverFunc != nil { - receiver, err := channel.NewMessageReceiver(receiverFunc, logger, reporter) + receiver, err := channel.NewEventReceiver(receiverFunc, logger, reporter) if err != nil { t.Fatal("NewEventReceiver failed =", err) } @@ -411,7 +413,7 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb h.ServeHTTP(&resp, req) if resp.Code != expectedStatus { - t.Errorf("Unexpected status code. Expected %v, Actual %v", expectedStatus, resp.Code) + t.Fatalf("Unexpected status code. Expected %v, Actual %v", expectedStatus, resp.Code) } if subscriberServerWg != nil { diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go deleted file mode 100644 index 2806aa75909..00000000000 --- a/pkg/channel/message_dispatcher.go +++ /dev/null @@ -1,341 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package channel - -import ( - "bytes" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - nethttp "net/http" - "time" - - cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/protocol/http" - "go.opencensus.io/trace" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/sets" - - "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/network" - "knative.dev/pkg/system" - - eventingapis "knative.dev/eventing/pkg/apis" - - "knative.dev/eventing/pkg/broker" - "knative.dev/eventing/pkg/channel/attributes" - "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" -) - -const ( - // noDuration signals that the dispatch step hasn't started - NoDuration = -1 - NoResponse = -1 -) - -type MessageDispatcher interface { - // DispatchMessage dispatches an event to a destination over HTTP. - // - // The destination and reply are URLs. - DispatchMessage(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination duckv1.Addressable, reply *duckv1.Addressable, deadLetter *duckv1.Addressable) (*DispatchExecutionInfo, error) - - // DispatchMessageWithRetries dispatches an event to a destination over HTTP. - // - // The destination and reply are URLs. - DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination duckv1.Addressable, reply *duckv1.Addressable, deadLetter *duckv1.Addressable, config *kncloudevents.RetryConfig, transformers ...binding.Transformer) (*DispatchExecutionInfo, error) -} - -// MessageDispatcherImpl is the 'real' MessageDispatcher used everywhere except unit tests. -var _ MessageDispatcher = &MessageDispatcherImpl{} - -// MessageDispatcherImpl dispatches events to a destination over HTTP. -type MessageDispatcherImpl struct { - supportedSchemes sets.String - - logger *zap.Logger -} - -type DispatchExecutionInfo struct { - Time time.Duration - ResponseCode int - ResponseBody []byte -} - -// NewMessageDispatcher creates a new Message dispatcher. -func NewMessageDispatcher(logger *zap.Logger) *MessageDispatcherImpl { - return &MessageDispatcherImpl{ - supportedSchemes: sets.NewString("http", "https"), - logger: logger, - } -} - -func (d *MessageDispatcherImpl) DispatchMessage(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination duckv1.Addressable, reply *duckv1.Addressable, deadLetter *duckv1.Addressable) (*DispatchExecutionInfo, error) { - return d.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, nil) -} - -func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, message cloudevents.Message, additionalHeaders nethttp.Header, destination duckv1.Addressable, reply *duckv1.Addressable, deadLetter *duckv1.Addressable, retriesConfig *kncloudevents.RetryConfig, transformers ...binding.Transformer) (*DispatchExecutionInfo, error) { - // All messages that should be finished at the end of this function - // are placed in this slice - var messagesToFinish []binding.Message - defer func() { - for _, msg := range messagesToFinish { - _ = msg.Finish(nil) - } - }() - - // sanitize eventual host-only URLs - destination = *d.sanitizeAddressable(&destination) - reply = d.sanitizeAddressable(reply) - deadLetter = d.sanitizeAddressable(deadLetter) - - // If there is a destination, variables response* are filled with the response of the destination - // Otherwise, they are filled with the original message - var responseMessage cloudevents.Message - var responseAdditionalHeaders nethttp.Header - var dispatchExecutionInfo *DispatchExecutionInfo - - if destination.URL != nil { - var err error - // Try to send to destination - messagesToFinish = append(messagesToFinish, message) - - // Add `Prefer: reply` header no matter if a reply destination is provided. Discussion: https://github.com/knative/eventing/pull/5764 - additionalHeadersForDestination := nethttp.Header{} - if additionalHeaders != nil { - additionalHeadersForDestination = additionalHeaders.Clone() - } - additionalHeadersForDestination.Set("Prefer", "reply") - - ctx, responseMessage, responseAdditionalHeaders, dispatchExecutionInfo, err = d.executeRequest(ctx, &destination, message, additionalHeadersForDestination, retriesConfig, transformers...) - if err != nil { - // If DeadLetter is configured, then send original message with knative error extensions - if deadLetter != nil { - dispatchTransformers := d.dispatchExecutionInfoTransformers(destination.URL, dispatchExecutionInfo) - _, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, additionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...) - if deadLetterErr != nil { - return dispatchExecutionInfo, fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination.URL, err, deadLetter.URL, deadLetterErr) - } - if deadLetterResponse != nil { - messagesToFinish = append(messagesToFinish, deadLetterResponse) - } - - return dispatchExecutionInfo, nil - } - // No DeadLetter, just fail - return dispatchExecutionInfo, fmt.Errorf("unable to complete request to %s: %v", destination.URL, err) - } - } else { - // No destination url, try to send to reply if available - responseMessage = message - responseAdditionalHeaders = additionalHeaders - } - - if additionalHeaders.Get(eventingapis.KnNamespaceHeader) != "" { - if responseAdditionalHeaders == nil { - responseAdditionalHeaders = make(nethttp.Header) - } - responseAdditionalHeaders.Set(eventingapis.KnNamespaceHeader, additionalHeaders.Get(eventingapis.KnNamespaceHeader)) - } - - // No response, dispatch completed - if responseMessage == nil { - return dispatchExecutionInfo, nil - } - - messagesToFinish = append(messagesToFinish, responseMessage) - - if reply == nil { - d.logger.Debug("cannot forward response as reply is empty") - return dispatchExecutionInfo, nil - } - - ctx, responseResponseMessage, _, dispatchExecutionInfo, err := d.executeRequest(ctx, reply, responseMessage, responseAdditionalHeaders, retriesConfig, transformers...) - if err != nil { - // If DeadLetter is configured, then send original message with knative error extensions - if deadLetter != nil { - dispatchTransformers := d.dispatchExecutionInfoTransformers(reply.URL, dispatchExecutionInfo) - _, deadLetterResponse, _, dispatchExecutionInfo, deadLetterErr := d.executeRequest(ctx, deadLetter, message, responseAdditionalHeaders, retriesConfig, append(transformers, dispatchTransformers)...) - if deadLetterErr != nil { - return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", reply.URL, err, deadLetter.URL, deadLetterErr) - } - if deadLetterResponse != nil { - messagesToFinish = append(messagesToFinish, deadLetterResponse) - } - - return dispatchExecutionInfo, nil - } - // No DeadLetter, just fail - return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s: %v", reply.URL, err) - } - if responseResponseMessage != nil { - messagesToFinish = append(messagesToFinish, responseResponseMessage) - } - - return dispatchExecutionInfo, nil -} - -func (d *MessageDispatcherImpl) executeRequest(ctx context.Context, - target *duckv1.Addressable, - message cloudevents.Message, - additionalHeaders nethttp.Header, - configs *kncloudevents.RetryConfig, - transformers ...binding.Transformer) (context.Context, cloudevents.Message, nethttp.Header, *DispatchExecutionInfo, error) { - - d.logger.Debug("Dispatching event", zap.Any("target", target)) - - execInfo := DispatchExecutionInfo{ - Time: NoDuration, - ResponseCode: NoResponse, - } - ctx, span := trace.StartSpan(ctx, "knative.dev", trace.WithSpanKind(trace.SpanKindClient)) - defer span.End() - - req, err := kncloudevents.NewCloudEventRequest(ctx, *target) - if err != nil { - return ctx, nil, nil, &execInfo, err - } - - if span.IsRecordingEvents() { - transformers = append(transformers, tracing.PopulateSpan(span, target.URL.String())) - } - - err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders, transformers...) - if err != nil { - return ctx, nil, nil, &execInfo, err - } - - start := time.Now() - response, err := req.SendWithRetries(configs) - dispatchTime := time.Since(start) - if err != nil { - execInfo.Time = dispatchTime - execInfo.ResponseCode = nethttp.StatusInternalServerError - execInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) - return ctx, nil, nil, &execInfo, err - } - - if response != nil { - execInfo.ResponseCode = response.StatusCode - } - execInfo.Time = dispatchTime - - body := new(bytes.Buffer) - _, readErr := body.ReadFrom(response.Body) - - if isFailure(response.StatusCode) { - // Read response body into execInfo for failures - if readErr != nil && readErr != io.EOF { - d.logger.Error("failed to read response body", zap.Error(err)) - execInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) - } else { - execInfo.ResponseBody = body.Bytes() - } - _ = response.Body.Close() - // Reject non-successful responses. - return ctx, nil, nil, &execInfo, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", response.StatusCode) - } - - var responseMessageBody []byte - if readErr != nil && readErr != io.EOF { - d.logger.Error("failed to read response body", zap.Error(err)) - responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error())) - } else { - responseMessageBody = body.Bytes() - } - responseMessage := http.NewMessage(response.Header, io.NopCloser(bytes.NewReader(responseMessageBody))) - - if responseMessage.ReadEncoding() == binding.EncodingUnknown { - _ = response.Body.Close() - _ = responseMessage.BodyReader.Close() - d.logger.Debug("Response is a non event, discarding it", zap.Int("status_code", response.StatusCode)) - return ctx, nil, nil, &execInfo, nil - } - return ctx, responseMessage, utils.PassThroughHeaders(response.Header), &execInfo, nil -} - -func (d *MessageDispatcherImpl) sanitizeAddressable(addressable *duckv1.Addressable) *duckv1.Addressable { - if addressable == nil { - return nil - } - - addressable.URL = d.sanitizeURL(addressable.URL) - - return addressable -} - -func (d *MessageDispatcherImpl) sanitizeURL(url *apis.URL) *apis.URL { - if url == nil { - return nil - } - - if d.supportedSchemes.Has(url.Scheme) { - // Already a URL with a known scheme. - return url - } - - return &apis.URL{ - Scheme: "http", - Host: url.Host, - Path: "/", - } -} - -// dispatchExecutionTransformer returns Transformers based on the specified destination and DispatchExecutionInfo -func (d *MessageDispatcherImpl) dispatchExecutionInfoTransformers(destination *apis.URL, dispatchExecutionInfo *DispatchExecutionInfo) binding.Transformers { - if destination == nil { - destination = &apis.URL{} - } - - httpResponseBody := dispatchExecutionInfo.ResponseBody - if destination.Host == network.GetServiceHostname("broker-filter", system.Namespace()) { - - var errExtensionInfo broker.ErrExtensionInfo - - err := json.Unmarshal(dispatchExecutionInfo.ResponseBody, &errExtensionInfo) - if err != nil { - d.logger.Debug("Unmarshal dispatchExecutionInfo ResponseBody failed", zap.Error(err)) - return nil - } - destination = errExtensionInfo.ErrDestination - httpResponseBody = errExtensionInfo.ErrResponseBody - } - - destination = d.sanitizeURL(destination) - - // Encodes response body as base64 for the resulting length. - bodyLen := len(httpResponseBody) - encodedLen := base64.StdEncoding.EncodedLen(bodyLen) - if encodedLen > attributes.KnativeErrorDataExtensionMaxLength { - encodedLen = attributes.KnativeErrorDataExtensionMaxLength - } - encodedBuf := make([]byte, encodedLen) - base64.StdEncoding.Encode(encodedBuf, httpResponseBody) - - return attributes.KnativeErrorTransformers(*destination.URL(), dispatchExecutionInfo.ResponseCode, string(encodedBuf[:encodedLen])) -} - -// isFailure returns true if the status code is not a successful HTTP status. -func isFailure(statusCode int) bool { - return statusCode < nethttp.StatusOK /* 200 */ || - statusCode >= nethttp.StatusMultipleChoices /* 300 */ -} diff --git a/pkg/channel/message_dispatcher_various_response_benchmark_test.go b/pkg/channel/message_dispatcher_various_response_benchmark_test.go deleted file mode 100644 index 977e868d62a..00000000000 --- a/pkg/channel/message_dispatcher_various_response_benchmark_test.go +++ /dev/null @@ -1,264 +0,0 @@ -/* -Copyright 2022 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package channel_test - -import ( - "bytes" - "context" - "io" - "net/http" - "net/http/httptest" - "net/url" - "testing" - - cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/transformer" - "github.com/cloudevents/sdk-go/v2/test" - "github.com/google/uuid" - "go.uber.org/zap" - - "knative.dev/eventing/pkg/channel" - "knative.dev/eventing/pkg/utils" - "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" -) - -type fakeMessageHandler struct { - sendToDestination bool - hasDeadLetterSink bool - eventExtensions map[string]string - header http.Header - body string - fakeResponse *http.Response - fakeDeadLetterResponse *http.Response -} - -func NewFakeMessageHandler(sendToDestination bool, hasDeadLetterSink bool, eventExtensionsmap map[string]string, header http.Header, body string, - fakeResponse *http.Response, fakeDeadLetterResponse *http.Response) *fakeMessageHandler { - - fakeMessageHandler := &fakeMessageHandler{ - sendToDestination: sendToDestination, - hasDeadLetterSink: hasDeadLetterSink, - eventExtensions: eventExtensionsmap, - header: header, - body: body, - fakeResponse: fakeResponse, - fakeDeadLetterResponse: fakeDeadLetterResponse, - } - return fakeMessageHandler -} - -func BenchmarkDispatcher_dispatch_ok_response_not_null(b *testing.B) { - fmh := newSampleResponseAcceptedAndNotNull() - benchmarkMessageDispatcher(*fmh, b) -} - -func BenchmarkDispatcher_dispatch_ok_response_null(b *testing.B) { - fmh := newSampleResponseAcceptedAndNull() - benchmarkMessageDispatcher(*fmh, b) -} - -func BenchmarkDispatcher_dispatch_fail_response_not_null(b *testing.B) { - fmh := newSampleResponseStatusInternalServerErrorAndNotNull() - benchmarkMessageDispatcher(*fmh, b) -} - -func newSampleResponseAcceptedAndNotNull() *fakeMessageHandler { - header := map[string][]string{ - // do-not-forward should not get forwarded. - "do-not-forward": {"header"}, - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - } - body := "destintation" - eventExtensions := map[string]string{ - "abc": `"ce-abc-value"`, - } - fakeResponse := &http.Response{ - StatusCode: http.StatusAccepted, - Body: io.NopCloser(bytes.NewBufferString(uuid.NewString())), - } - fmh := NewFakeMessageHandler(true, false, eventExtensions, header, body, fakeResponse, nil) - return fmh -} - -func newSampleResponseStatusInternalServerErrorAndNotNull() *fakeMessageHandler { - header := map[string][]string{ - // do-not-forward should not get forwarded. - "do-not-forward": {"header"}, - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - } - body := "destintation" - eventExtensions := map[string]string{ - "abc": `"ce-abc-value"`, - } - fakeResponse := &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(bytes.NewBufferString(uuid.NewString())), - } - fmh := NewFakeMessageHandler(true, false, eventExtensions, header, body, fakeResponse, nil) - return fmh -} - -func newSampleResponseAcceptedAndNull() *fakeMessageHandler { - header := map[string][]string{ - // do-not-forward should not get forwarded. - "do-not-forward": {"header"}, - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - } - body := "destintation" - eventExtensions := map[string]string{ - "abc": `"ce-abc-value"`, - } - fakeResponse := &http.Response{ - StatusCode: http.StatusAccepted, - Body: io.NopCloser(bytes.NewBufferString("")), - } - fmh := NewFakeMessageHandler(true, false, eventExtensions, header, body, fakeResponse, nil) - return fmh -} - -func benchmarkMessageDispatcher(fmh fakeMessageHandler, b *testing.B) { - logger := zap.NewNop() - md := channel.NewMessageDispatcher(logger) - buf := new(bytes.Buffer) - buf.ReadFrom(fmh.fakeResponse.Body) - s := buf.String() - - destHandler := &fakeHttpHandler{ - b: b, - response: fmh.fakeResponse, - requests: make([]requestValidation, 0), - responseBody: s, - } - destServer := httptest.NewServer(destHandler) - destination := duckv1.Addressable{ - URL: getOnlyDomainURL(b, fmh.sendToDestination, destServer.URL), - } - - var deadLetterSinkHandler *fakeHttpHandler - var deadLetterSinkServer *httptest.Server - var deadLetterSink *duckv1.Addressable - if fmh.hasDeadLetterSink { - deadLetterSinkHandler = &fakeHttpHandler{ - b: b, - response: fmh.fakeDeadLetterResponse, - requests: make([]requestValidation, 0), - } - deadLetterSinkServer = httptest.NewServer(deadLetterSinkHandler) - defer deadLetterSinkServer.Close() - - deadLetterSink = &duckv1.Addressable{ - URL: getOnlyDomainURL(b, true, deadLetterSinkServer.URL), - } - } - - event := test.FullEvent() - event.SetID(uuid.New().String()) - event.SetType("testtype") - event.SetSource("testsource") - for n, v := range fmh.eventExtensions { - event.SetExtension(n, v) - } - event.SetData(cloudevents.ApplicationJSON, fmh.body) - - ctx := context.Background() - - message := binding.ToMessage(&event) - var err error - ev, err := binding.ToEvent(ctx, message, binding.Transformers{transformer.AddTimeNow}) - if err != nil { - b.Fatal(err) - } - message = binding.ToMessage(ev) - finishInvoked := 0 - message = binding.WithFinish(message, func(err error) { - finishInvoked++ - }) - - var headers http.Header = nil - if fmh.header != nil { - headers = utils.PassThroughHeaders(fmh.header) - } - // Start the bench - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _ = md.DispatchMessage(ctx, message, headers, destination, nil, deadLetterSink) - } - -} - -type requestValidation struct { - Host string - Headers http.Header - Body string -} - -type fakeHttpHandler struct { - b *testing.B - response *http.Response - requests []requestValidation - responseBody string -} - -func (f *fakeHttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - // Make a copy of the request. - body, err := io.ReadAll(r.Body) - if err != nil { - - f.b.Error("Failed to read the request body") - } - f.requests = append(f.requests, requestValidation{ - Host: r.Host, - Headers: r.Header, - Body: string(body), - }) - - if f.response != nil { - for h, vs := range f.response.Header { - for _, v := range vs { - w.Header().Add(h, v) - } - } - w.WriteHeader(f.response.StatusCode) - w.Write([]byte(f.responseBody)) - } else { - w.WriteHeader(http.StatusOK) - w.Write([]byte("")) - } -} - -func getOnlyDomainURL(b *testing.B, shouldSend bool, serverURL string) *apis.URL { - if shouldSend { - server, err := url.Parse(serverURL) - if err != nil { - b.Errorf("Bad serverURL: %q", serverURL) - } - return &apis.URL{ - Host: server.Host, - } - } - return nil -} diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go similarity index 63% rename from pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go rename to pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go index 5ce4e43a76e..a849bda4fdc 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go @@ -16,13 +16,13 @@ limitations under the License. // Package multichannelfanout provides an http.Handler that takes in one request to a Knative // Channel and fans it out to N other requests. Logically, it represents multiple Knative Channels. -// It is made up of a map, map[channel]fanout.MessageHandler and each incoming request is inspected to -// determine which Channel it is on. This Handler delegates the HTTP handling to the fanout.MessageHandler +// It is made up of a map, map[channel]fanout.EventHandler and each incoming request is inspected to +// determine which Channel it is on. This Handler delegates the HTTP handling to the fanout.EventHandler // corresponding to the incoming request's Channel. // It is often used in conjunction with a swappable.Handler. The swappable.Handler delegates all its -// requests to the multichannelfanout.MessageHandler. When a new configuration is available, a new -// multichannelfanout.MessageHandler is created and swapped in for all subsequent requests. The old -// multichannelfanout.MessageHandler is discarded. +// requests to the multichannelfanout.EventHandler. When a new configuration is available, a new +// multichannelfanout.EventHandler is created and swapped in for all subsequent requests. The old +// multichannelfanout.EventHandler is discarded. package multichannelfanout import ( @@ -37,35 +37,35 @@ import ( "knative.dev/eventing/pkg/channel/fanout" ) -type MultiChannelMessageHandler interface { +type MultiChannelEventHandler interface { http.Handler - SetChannelHandler(host string, handler fanout.MessageHandler) + SetChannelHandler(host string, handler fanout.EventHandler) DeleteChannelHandler(host string) - GetChannelHandler(host string) fanout.MessageHandler + GetChannelHandler(host string) fanout.EventHandler CountChannelHandlers() int } // Handler is an http.Handler that introspects the incoming request to determine what Channel it is -// on, and then delegates handling of that request to the single fanout.FanoutMessageHandler corresponding to +// on, and then delegates handling of that request to the single fanout.FanoutEventHandler corresponding to // that Channel. -type MessageHandler struct { +type EventHandler struct { logger *zap.Logger handlersLock sync.RWMutex - handlers map[string]fanout.MessageHandler + handlers map[string]fanout.EventHandler } // NewHandler creates a new Handler. -func NewMessageHandler(_ context.Context, logger *zap.Logger) *MessageHandler { - return &MessageHandler{ +func NewEventHandler(_ context.Context, logger *zap.Logger) *EventHandler { + return &EventHandler{ logger: logger, - handlers: make(map[string]fanout.MessageHandler), + handlers: make(map[string]fanout.EventHandler), } } -// NewMessageHandlerWithConfig creates a new Handler with the specified configuration. This is really meant for tests +// NewEventHandlerWithConfig creates a new Handler with the specified configuration. This is really meant for tests // where you want to apply a fully specified configuration for tests. Reconciler operates on single channel at a time. -func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageDispatcher channel.MessageDispatcher, conf Config, reporter channel.StatsReporter, recvOptions ...channel.MessageReceiverOptions) (*MessageHandler, error) { - handlers := make(map[string]fanout.MessageHandler, len(conf.ChannelConfigs)) +func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Config, reporter channel.StatsReporter, recvOptions ...channel.EventReceiverOptions) (*EventHandler, error) { + handlers := make(map[string]fanout.EventHandler, len(conf.ChannelConfigs)) for _, cc := range conf.ChannelConfigs { keys := []string{cc.HostName, cc.Path} @@ -73,7 +73,7 @@ func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageD if key == "" { continue } - handler, err := fanout.NewFanoutMessageHandler(logger, messageDispatcher, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, recvOptions...) + handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, recvOptions...) if err != nil { logger.Error("Failed creating new fanout handler.", zap.Error(err)) return nil, err @@ -85,39 +85,39 @@ func NewMessageHandlerWithConfig(_ context.Context, logger *zap.Logger, messageD handlers[key] = handler } } - return &MessageHandler{ + return &EventHandler{ logger: logger, handlers: handlers, }, nil } -func (h *MessageHandler) SetChannelHandler(host string, handler fanout.MessageHandler) { +func (h *EventHandler) SetChannelHandler(host string, handler fanout.EventHandler) { h.handlersLock.Lock() defer h.handlersLock.Unlock() h.handlers[host] = handler } -func (h *MessageHandler) DeleteChannelHandler(host string) { +func (h *EventHandler) DeleteChannelHandler(host string) { h.handlersLock.Lock() defer h.handlersLock.Unlock() delete(h.handlers, host) } -func (h *MessageHandler) GetChannelHandler(host string) fanout.MessageHandler { +func (h *EventHandler) GetChannelHandler(host string) fanout.EventHandler { h.handlersLock.RLock() defer h.handlersLock.RUnlock() return h.handlers[host] } -func (h *MessageHandler) CountChannelHandlers() int { +func (h *EventHandler) CountChannelHandlers() int { h.handlersLock.RLock() defer h.handlersLock.RUnlock() return len(h.handlers) } -// ServeHTTP delegates the actual handling of the request to a fanout.MessageHandler, based on the +// ServeHTTP delegates the actual handling of the request to a fanout.EventHandler, based on the // request's channel key. -func (h *MessageHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { +func (h *EventHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { channelKey := request.Host if request.URL.Path != "/" { diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go similarity index 92% rename from pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go rename to pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go index 3d94a009081..49db902cac0 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_message_handler_test.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go @@ -43,7 +43,7 @@ var ( } ) -func TestNewMessageHandlerWithConfig(t *testing.T) { +func TestNewEventHandlerWithConfig(t *testing.T) { testCases := []struct { name string config Config @@ -68,10 +68,9 @@ func TestNewMessageHandlerWithConfig(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - _, err := NewMessageHandlerWithConfig( + _, err := NewEventHandlerWithConfig( context.TODO(), logger, - channel.NewMessageDispatcher(logger), tc.config, reporter, ) @@ -89,12 +88,12 @@ func TestNewMessageHandlerWithConfig(t *testing.T) { } } -func TestNewMessageHandler(t *testing.T) { +func TestNewEventHandler(t *testing.T) { handlerName := "handler.example.com" reporter := channel.NewStatsReporter("testcontainer", "testpod") logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - handler := NewMessageHandler(context.TODO(), logger) + handler := NewEventHandler(context.TODO(), logger) h := handler.GetChannelHandler(handlerName) if len(handler.handlers) != 0 { t.Errorf("non-empty handler map on creation") @@ -102,7 +101,7 @@ func TestNewMessageHandler(t *testing.T) { if h != nil { t.Errorf("Found handler for %q but not expected", handlerName) } - f, err := fanout.NewFanoutMessageHandler(logger, channel.NewMessageDispatcher(logger), fanout.Config{}, reporter, nil, nil, nil) + f, err := fanout.NewFanoutEventHandler(logger, fanout.Config{}, reporter, nil, nil, nil) if err != nil { t.Error("Failed to create FanoutMessagHandler: ", err) } @@ -119,7 +118,7 @@ func TestNewMessageHandler(t *testing.T) { } -func TestServeHTTPMessageHandler(t *testing.T) { +func TestServeHTTPEventHandler(t *testing.T) { testCases := map[string]struct { name string config Config @@ -127,7 +126,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { respStatusCode int hostKey string pathKey string - recvOptions []channel.MessageReceiverOptions + recvOptions []channel.EventReceiverOptions expectedStatusCode int }{ "non-existent channel host based": { @@ -299,7 +298,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { hostKey: "host.should.be.ignored", pathKey: "default/first-channel", expectedStatusCode: http.StatusAccepted, - recvOptions: []channel.MessageReceiverOptions{channel.ResolveMessageChannelFromPath(channel.ParseChannelFromPath)}, + recvOptions: []channel.EventReceiverOptions{channel.ResolveChannelFromPath(channel.ParseChannelFromPath)}, }, } for n, tc := range testCases { @@ -312,7 +311,7 @@ func TestServeHTTPMessageHandler(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) reporter := channel.NewStatsReporter("testcontainer", "testpod") - h, err := NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), tc.config, reporter, tc.recvOptions...) + h, err := NewEventHandlerWithConfig(context.TODO(), logger, tc.config, reporter, tc.recvOptions...) if err != nil { t.Fatalf("Unexpected NewHandler error: '%v'", err) } diff --git a/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go b/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go index 36ad1081111..17b60e9b91e 100644 --- a/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go +++ b/pkg/eventingtls/eventingtlstesting/eventingtlstesting.go @@ -42,7 +42,7 @@ func init() { CA, Key, Crt = loadCerts() } -func StartServer(ctx context.Context, t *testing.T, port int, handler http.Handler, receiverOptions ...kncloudevents.HTTPMessageReceiverOption) string { +func StartServer(ctx context.Context, t *testing.T, port int, handler http.Handler, receiverOptions ...kncloudevents.HTTPEventReceiverOption) string { secret := types.NamespacedName{ Namespace: "knative-tests", Name: "tls-secret", @@ -65,7 +65,7 @@ func StartServer(ctx context.Context, t *testing.T, port int, handler http.Handl tlsConfig, err := eventingtls.GetTLSServerConfig(serverTLSConfig) assert.Nil(t, err) - receiver := kncloudevents.NewHTTPMessageReceiver(port, + receiver := kncloudevents.NewHTTPEventReceiver(port, append(receiverOptions, kncloudevents.WithTLSConfig(tlsConfig), )..., diff --git a/pkg/eventingtls/servermanager_test.go b/pkg/eventingtls/servermanager_test.go index 795d5e3e834..3eae372f6e5 100644 --- a/pkg/eventingtls/servermanager_test.go +++ b/pkg/eventingtls/servermanager_test.go @@ -58,8 +58,8 @@ func TestStartServers(t *testing.T) { for n, tc := range testCases { t.Run(n, func(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.TODO()) - httpReceiver := kncloudevents.NewHTTPMessageReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) - httpsReceiver := kncloudevents.NewHTTPMessageReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) + httpReceiver := kncloudevents.NewHTTPEventReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) + httpsReceiver := kncloudevents.NewHTTPEventReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) errChan := make(chan error) cmw := newFeatureCMW(tc.transportEncryption) @@ -106,7 +106,7 @@ func TestStartServers(t *testing.T) { func TestStartServersHttpError(t *testing.T) { ctx := context.TODO() - receiver := kncloudevents.NewHTTPMessageReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) + receiver := kncloudevents.NewHTTPEventReceiver(0, kncloudevents.WithDrainQuietPeriod(time.Millisecond)) cmw := newFeatureCMW(feature.Disabled) // httpReceiver set to nil diff --git a/pkg/inmemorychannel/message_dispatcher.go b/pkg/inmemorychannel/event_dispatcher.go similarity index 55% rename from pkg/inmemorychannel/message_dispatcher.go rename to pkg/inmemorychannel/event_dispatcher.go index 55a19cc721c..008a722d049 100644 --- a/pkg/inmemorychannel/message_dispatcher.go +++ b/pkg/inmemorychannel/event_dispatcher.go @@ -26,55 +26,55 @@ import ( "knative.dev/eventing/pkg/kncloudevents" ) -type MessageDispatcher interface { - GetHandler(ctx context.Context) multichannelfanout.MultiChannelMessageHandler +type EventDispatcher interface { + GetHandler(ctx context.Context) multichannelfanout.MultiChannelEventHandler } -type InMemoryMessageDispatcher struct { - handler multichannelfanout.MultiChannelMessageHandler - httpBindingsReceiver *kncloudevents.HTTPMessageReceiver +type InMemoryEventDispatcher struct { + handler multichannelfanout.MultiChannelEventHandler + httpBindingsReceiver *kncloudevents.HTTPEventReceiver writeTimeout time.Duration logger *zap.Logger } -type InMemoryMessageDispatcherArgs struct { - Port int - ReadTimeout time.Duration - WriteTimeout time.Duration - Handler multichannelfanout.MultiChannelMessageHandler - Logger *zap.Logger - HTTPMessageReceiverOptions []kncloudevents.HTTPMessageReceiverOption +type InMemoryEventDispatcherArgs struct { + Port int + ReadTimeout time.Duration + WriteTimeout time.Duration + Handler multichannelfanout.MultiChannelEventHandler + Logger *zap.Logger + HTTPEventReceiverOptions []kncloudevents.HTTPEventReceiverOption } -// GetHandler gets the current multichannelfanout.MessageHandler to delegate all HTTP +// GetHandler gets the current multichannelfanout.EventHandler to delegate all HTTP // requests to. -func (d *InMemoryMessageDispatcher) GetHandler(ctx context.Context) multichannelfanout.MultiChannelMessageHandler { +func (d *InMemoryEventDispatcher) GetHandler(ctx context.Context) multichannelfanout.MultiChannelEventHandler { return d.handler } -func (d *InMemoryMessageDispatcher) GetReceiver() kncloudevents.HTTPMessageReceiver { +func (d *InMemoryEventDispatcher) GetReceiver() kncloudevents.HTTPEventReceiver { return *d.httpBindingsReceiver } // Start starts the inmemory dispatcher's message processing. // This is a blocking call. -func (d *InMemoryMessageDispatcher) Start(ctx context.Context) error { +func (d *InMemoryEventDispatcher) Start(ctx context.Context) error { return d.httpBindingsReceiver.StartListen(kncloudevents.WithShutdownTimeout(ctx, d.writeTimeout), d.handler) } // WaitReady blocks until the dispatcher's server is ready to receive requests. -func (d *InMemoryMessageDispatcher) WaitReady() { +func (d *InMemoryEventDispatcher) WaitReady() { <-d.httpBindingsReceiver.Ready } -func NewMessageDispatcher(args *InMemoryMessageDispatcherArgs) *InMemoryMessageDispatcher { +func NewEventDispatcher(args *InMemoryEventDispatcherArgs) *InMemoryEventDispatcher { // TODO set read timeouts? - bindingsReceiver := kncloudevents.NewHTTPMessageReceiver( + bindingsReceiver := kncloudevents.NewHTTPEventReceiver( args.Port, - args.HTTPMessageReceiverOptions..., + args.HTTPEventReceiverOptions..., ) - dispatcher := &InMemoryMessageDispatcher{ + dispatcher := &InMemoryEventDispatcher{ handler: args.Handler, httpBindingsReceiver: bindingsReceiver, logger: args.Logger, diff --git a/pkg/inmemorychannel/message_dispatcher_test.go b/pkg/inmemorychannel/event_dispatcher_test.go similarity index 89% rename from pkg/inmemorychannel/message_dispatcher_test.go rename to pkg/inmemorychannel/event_dispatcher_test.go index 54cbba4f028..61369135fc1 100644 --- a/pkg/inmemorychannel/message_dispatcher_test.go +++ b/pkg/inmemorychannel/event_dispatcher_test.go @@ -48,11 +48,11 @@ import ( _ "knative.dev/pkg/system/testing" ) -func TestNewMessageDispatcher(t *testing.T) { +func TestNewEventDispatcher(t *testing.T) { logger := logtesting.TestLogger(t).Desugar() - sh := multichannelfanout.NewMessageHandler(context.TODO(), logger) + sh := multichannelfanout.NewEventHandler(context.TODO(), logger) - args := &InMemoryMessageDispatcherArgs{ + args := &InMemoryEventDispatcherArgs{ Port: 8080, ReadTimeout: 1 * time.Minute, WriteTimeout: 1 * time.Minute, @@ -60,7 +60,7 @@ func TestNewMessageDispatcher(t *testing.T) { Logger: logger, } - d := NewMessageDispatcher(args) + d := NewEventDispatcher(args) if d == nil { t.Fatalf("Failed to create with NewDispatcher") @@ -70,14 +70,14 @@ func TestNewMessageDispatcher(t *testing.T) { // This test emulates a real dispatcher usage func TestDispatcher_close(t *testing.T) { logger := logtesting.TestLogger(t).Desugar() - sh := multichannelfanout.NewMessageHandler(context.TODO(), logger) + sh := multichannelfanout.NewEventHandler(context.TODO(), logger) port, err := freePort() if err != nil { t.Fatal(err) } - dispatcherArgs := &InMemoryMessageDispatcherArgs{ + dispatcherArgs := &InMemoryEventDispatcherArgs{ Port: port, ReadTimeout: 1 * time.Minute, WriteTimeout: 1 * time.Minute, @@ -85,7 +85,7 @@ func TestDispatcher_close(t *testing.T) { Logger: logger, } - dispatcher := NewMessageDispatcher(dispatcherArgs) + dispatcher := NewEventDispatcher(dispatcherArgs) kncloudevents.WithDrainQuietPeriod(time.Nanosecond)(dispatcher.httpBindingsReceiver) serverCtx, cancel := context.WithCancel(context.Background()) @@ -230,14 +230,14 @@ func TestDispatcher_dispatch(t *testing.T) { }, } - sh, err := multichannelfanout.NewMessageHandlerWithConfig(context.TODO(), logger, channel.NewMessageDispatcher(logger), config, reporter) + sh, err := multichannelfanout.NewEventHandlerWithConfig(context.TODO(), logger, config, reporter) if err != nil { t.Fatal(err) } logger.Info("Starting dispatcher", zap.Int("port", port)) - dispatcherArgs := &InMemoryMessageDispatcherArgs{ + dispatcherArgs := &InMemoryEventDispatcherArgs{ Port: port, ReadTimeout: 1 * time.Minute, WriteTimeout: 1 * time.Minute, @@ -245,7 +245,7 @@ func TestDispatcher_dispatch(t *testing.T) { Logger: logger, } - dispatcher := NewMessageDispatcher(dispatcherArgs) + dispatcher := NewEventDispatcher(dispatcherArgs) serverCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -259,21 +259,13 @@ func TestDispatcher_dispatch(t *testing.T) { dispatcher.WaitReady() // Ok now everything should be ready to send the event - request, err := kncloudevents.NewCloudEventRequest(context.TODO(), *mustParseUrlToAddressable(t, channelAProxy.URL)) + dispatchInfo, err := kncloudevents.SendEvent(context.TODO(), test.FullEvent(), *mustParseUrlToAddressable(t, channelAProxy.URL)) if err != nil { t.Fatal(err) } - event := test.FullEvent() - _ = protocolhttp.WriteRequest(context.Background(), binding.ToMessage(&event), request.Request) - - res, err := request.Send() - if err != nil { - t.Fatal(err) - } - - if res.StatusCode != http.StatusAccepted { - t.Fatal("Expected 202, Have", res.StatusCode) + if dispatchInfo.ResponseCode != http.StatusAccepted { + t.Fatal("Expected 202, Got: ", dispatchInfo.ResponseCode) } transformationsFailureWg.Wait() diff --git a/pkg/channel/attributes/knative_error.go b/pkg/kncloudevents/attributes/knative_error.go similarity index 100% rename from pkg/channel/attributes/knative_error.go rename to pkg/kncloudevents/attributes/knative_error.go diff --git a/pkg/channel/attributes/knative_error_test.go b/pkg/kncloudevents/attributes/knative_error_test.go similarity index 100% rename from pkg/channel/attributes/knative_error_test.go rename to pkg/kncloudevents/attributes/knative_error_test.go diff --git a/pkg/kncloudevents/event_dispatcher.go b/pkg/kncloudevents/event_dispatcher.go new file mode 100644 index 00000000000..6dc703a2d62 --- /dev/null +++ b/pkg/kncloudevents/event_dispatcher.go @@ -0,0 +1,440 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kncloudevents + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/hashicorp/go-retryablehttp" + "go.opencensus.io/trace" + + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/network" + "knative.dev/pkg/system" + + eventingapis "knative.dev/eventing/pkg/apis" + "knative.dev/eventing/pkg/utils" + + "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/kncloudevents/attributes" + "knative.dev/eventing/pkg/tracing" +) + +const ( + // noDuration signals that the dispatch step hasn't started + NoDuration = -1 + NoResponse = -1 +) + +type DispatchInfo struct { + Duration time.Duration + ResponseCode int + ResponseHeader http.Header + ResponseBody []byte +} + +type SendOption func(*senderConfig) error + +func WithReply(reply *duckv1.Addressable) SendOption { + return func(sc *senderConfig) error { + sc.reply = reply + + return nil + } +} + +func WithDeadLetterSink(dls *duckv1.Addressable) SendOption { + return func(sc *senderConfig) error { + sc.deadLetterSink = dls + + return nil + } +} + +func WithRetryConfig(retryConfig *RetryConfig) SendOption { + return func(sc *senderConfig) error { + sc.retryConfig = retryConfig + + return nil + } +} + +func WithHeader(header http.Header) SendOption { + return func(sc *senderConfig) error { + sc.additionalHeaders = header + + return nil + } +} + +func WithTransformers(transformers ...binding.Transformer) SendOption { + return func(sc *senderConfig) error { + sc.transformers = transformers + + return nil + } +} + +type senderConfig struct { + reply *duckv1.Addressable + deadLetterSink *duckv1.Addressable + additionalHeaders http.Header + retryConfig *RetryConfig + transformers binding.Transformers +} + +// SendEvent sends the given event to the given destination. +func SendEvent(ctx context.Context, event event.Event, destination duckv1.Addressable, options ...SendOption) (*DispatchInfo, error) { + message := binding.ToMessage(&event) + + return SendMessage(ctx, message, destination, options...) +} + +// SendMessage sends the given message to the given destination. +// SendMessage is kept for compatibility and SendEvent should be used whenever possible. +func SendMessage(ctx context.Context, message binding.Message, destination duckv1.Addressable, options ...SendOption) (*DispatchInfo, error) { + config := &senderConfig{ + additionalHeaders: make(http.Header), + } + + // apply options + for _, opt := range options { + if err := opt(config); err != nil { + return nil, fmt.Errorf("could not apply option: %w", err) + } + } + + return send(ctx, message, destination, config) +} + +func send(ctx context.Context, message binding.Message, destination duckv1.Addressable, config *senderConfig) (*DispatchInfo, error) { + dispatchExecutionInfo := &DispatchInfo{} + + // All messages that should be finished at the end of this function + // are placed in this slice + messagesToFinish := []binding.Message{message} + defer func() { + for _, msg := range messagesToFinish { + _ = msg.Finish(nil) + } + }() + + if destination.URL == nil { + return dispatchExecutionInfo, fmt.Errorf("can not dispatch message to nil destination.URL") + } + + // sanitize eventual host-only URLs + destination = *sanitizeAddressable(&destination) + config.reply = sanitizeAddressable(config.reply) + config.deadLetterSink = sanitizeAddressable(config.deadLetterSink) + + // send to destination + + // Add `Prefer: reply` header no matter if a reply destination is provided. Discussion: https://github.com/knative/eventing/pull/5764 + additionalHeadersForDestination := http.Header{} + if config.additionalHeaders != nil { + additionalHeadersForDestination = config.additionalHeaders.Clone() + } + additionalHeadersForDestination.Set("Prefer", "reply") + + ctx, responseMessage, dispatchExecutionInfo, err := executeRequest(ctx, destination, message, additionalHeadersForDestination, config.retryConfig, config.transformers) + if err != nil { + // If DeadLetter is configured, then send original message with knative error extensions + if config.deadLetterSink != nil { + dispatchTransformers := dispatchExecutionInfoTransformers(destination.URL, dispatchExecutionInfo) + _, deadLetterResponse, dispatchExecutionInfo, deadLetterErr := executeRequest(ctx, *config.deadLetterSink, message, config.additionalHeaders, config.retryConfig, append(config.transformers, dispatchTransformers)) + if deadLetterErr != nil { + return dispatchExecutionInfo, fmt.Errorf("unable to complete request to either %s (%v) or %s (%v)", destination.URL, err, config.deadLetterSink.URL, deadLetterErr) + } + if deadLetterResponse != nil { + messagesToFinish = append(messagesToFinish, deadLetterResponse) + } + + return dispatchExecutionInfo, nil + } + // No DeadLetter, just fail + return dispatchExecutionInfo, fmt.Errorf("unable to complete request to %s: %w", destination.URL, err) + } + + responseAdditionalHeaders := utils.PassThroughHeaders(dispatchExecutionInfo.ResponseHeader) + + if config.additionalHeaders.Get(eventingapis.KnNamespaceHeader) != "" { + if responseAdditionalHeaders == nil { + responseAdditionalHeaders = make(http.Header) + } + responseAdditionalHeaders.Set(eventingapis.KnNamespaceHeader, config.additionalHeaders.Get(eventingapis.KnNamespaceHeader)) + } + + if responseMessage == nil { + // No response, dispatch completed + return dispatchExecutionInfo, nil + } + + messagesToFinish = append(messagesToFinish, responseMessage) + + if config.reply == nil { + return dispatchExecutionInfo, nil + } + + // send reply + + ctx, responseResponseMessage, dispatchExecutionInfo, err := executeRequest(ctx, *config.reply, responseMessage, responseAdditionalHeaders, config.retryConfig, config.transformers) + if err != nil { + // If DeadLetter is configured, then send original message with knative error extensions + if config.deadLetterSink != nil { + dispatchTransformers := dispatchExecutionInfoTransformers(config.reply.URL, dispatchExecutionInfo) + _, deadLetterResponse, dispatchExecutionInfo, deadLetterErr := executeRequest(ctx, *config.deadLetterSink, message, responseAdditionalHeaders, config.retryConfig, append(config.transformers, dispatchTransformers)) + if deadLetterErr != nil { + return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s (%v) and failed to send it to the dead letter sink %s (%v)", config.reply.URL, err, config.deadLetterSink.URL, deadLetterErr) + } + if deadLetterResponse != nil { + messagesToFinish = append(messagesToFinish, deadLetterResponse) + } + + return dispatchExecutionInfo, nil + } + // No DeadLetter, just fail + return dispatchExecutionInfo, fmt.Errorf("failed to forward reply to %s: %w", config.reply.URL, err) + } + if responseResponseMessage != nil { + messagesToFinish = append(messagesToFinish, responseResponseMessage) + } + + return dispatchExecutionInfo, nil +} + +func executeRequest(ctx context.Context, target duckv1.Addressable, message cloudevents.Message, additionalHeaders http.Header, retryConfig *RetryConfig, transformers ...binding.Transformer) (context.Context, cloudevents.Message, *DispatchInfo, error) { + dispatchInfo := DispatchInfo{ + Duration: NoDuration, + ResponseCode: NoResponse, + ResponseHeader: make(http.Header), + } + + ctx, span := trace.StartSpan(ctx, "knative.dev", trace.WithSpanKind(trace.SpanKindClient)) + defer span.End() + + if span.IsRecordingEvents() { + transformers = append(transformers, tracing.PopulateSpan(span, target.URL.String())) + } + + req, err := createRequest(ctx, message, target, additionalHeaders, transformers...) + if err != nil { + return ctx, nil, &dispatchInfo, fmt.Errorf("failed to create request: %w", err) + } + + client, err := newClient(target) + if err != nil { + return ctx, nil, &dispatchInfo, fmt.Errorf("failed to create http client: %w", err) + } + + start := time.Now() + response, err := client.DoWithRetries(req, retryConfig) + dispatchInfo.Duration = time.Since(start) + if err != nil { + dispatchInfo.ResponseCode = http.StatusInternalServerError + dispatchInfo.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error())) + + return ctx, nil, &dispatchInfo, err + } + + dispatchInfo.ResponseCode = response.StatusCode + dispatchInfo.ResponseHeader = response.Header + + body := new(bytes.Buffer) + _, readErr := body.ReadFrom(response.Body) + + if isFailure(response.StatusCode) { + // Read response body into dispatchInfo for failures + if readErr != nil && readErr != io.EOF { + dispatchInfo.ResponseBody = []byte(fmt.Sprintf("dispatch resulted in status \"%s\". Could not read response body: error: %s", response.Status, err.Error())) + } else { + dispatchInfo.ResponseBody = body.Bytes() + } + response.Body.Close() + + // Reject non-successful responses. + return ctx, nil, &dispatchInfo, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", response.StatusCode) + } + + var responseMessageBody []byte + if readErr != nil && readErr != io.EOF { + responseMessageBody = []byte(fmt.Sprintf("Failed to read response body: %s", err.Error())) + } else { + responseMessageBody = body.Bytes() + dispatchInfo.ResponseBody = responseMessageBody + } + responseMessage := cehttp.NewMessage(response.Header, io.NopCloser(bytes.NewReader(responseMessageBody))) + + if responseMessage.ReadEncoding() == binding.EncodingUnknown { + // Response is a non event, discard it + response.Body.Close() + responseMessage.BodyReader.Close() + return ctx, nil, &dispatchInfo, nil + } + + return ctx, responseMessage, &dispatchInfo, nil +} + +func createRequest(ctx context.Context, message binding.Message, target duckv1.Addressable, additionalHeaders http.Header, transformers ...binding.Transformer) (*http.Request, error) { + request, err := http.NewRequestWithContext(ctx, "POST", target.URL.String(), nil) + if err != nil { + return nil, fmt.Errorf("could not create http request: %w", err) + } + + if err := cehttp.WriteRequest(ctx, message, request, transformers...); err != nil { + return nil, fmt.Errorf("could not write message to request: %w", err) + } + + for key, val := range additionalHeaders { + request.Header[key] = val + } + + return request, nil +} + +// client is a wrapper around the http.Client, which provides methods for retries +type client struct { + http.Client +} + +func newClient(target duckv1.Addressable) (*client, error) { + c, err := getClientForAddressable(target) + if err != nil { + return nil, fmt.Errorf("failed to get http client for addressable: %w", err) + } + + return &client{ + Client: *c, + }, nil +} + +func (c *client) Do(req *http.Request) (*http.Response, error) { + return c.Client.Do(req) +} + +func (c *client) DoWithRetries(req *http.Request, retryConfig *RetryConfig) (*http.Response, error) { + if retryConfig == nil { + return c.Do(req) + } + + client := c.Client + if retryConfig.RequestTimeout != 0 { + client = http.Client{ + Transport: client.Transport, + CheckRedirect: client.CheckRedirect, + Jar: client.Jar, + Timeout: retryConfig.RequestTimeout, + } + } + + retryableClient := retryablehttp.Client{ + HTTPClient: &client, + RetryWaitMin: defaultRetryWaitMin, + RetryWaitMax: defaultRetryWaitMax, + RetryMax: retryConfig.RetryMax, + CheckRetry: retryablehttp.CheckRetry(retryConfig.CheckRetry), + Backoff: generateBackoffFn(retryConfig), + ErrorHandler: func(resp *http.Response, err error, numTries int) (*http.Response, error) { + return resp, err + }, + } + + retryableReq, err := retryablehttp.FromRequest(req) + if err != nil { + return nil, err + } + + return retryableClient.Do(retryableReq) +} + +// dispatchExecutionTransformer returns Transformers based on the specified destination and DispatchExecutionInfo +func dispatchExecutionInfoTransformers(destination *apis.URL, dispatchExecutionInfo *DispatchInfo) binding.Transformers { + if destination == nil { + destination = &apis.URL{} + } + + httpResponseBody := dispatchExecutionInfo.ResponseBody + if destination.Host == network.GetServiceHostname("broker-filter", system.Namespace()) { + + var errExtensionInfo broker.ErrExtensionInfo + + err := json.Unmarshal(dispatchExecutionInfo.ResponseBody, &errExtensionInfo) + if err != nil { + return nil + } + destination = errExtensionInfo.ErrDestination + httpResponseBody = errExtensionInfo.ErrResponseBody + } + + destination = sanitizeURL(destination) + + // Encodes response body as base64 for the resulting length. + bodyLen := len(httpResponseBody) + encodedLen := base64.StdEncoding.EncodedLen(bodyLen) + if encodedLen > attributes.KnativeErrorDataExtensionMaxLength { + encodedLen = attributes.KnativeErrorDataExtensionMaxLength + } + encodedBuf := make([]byte, encodedLen) + base64.StdEncoding.Encode(encodedBuf, httpResponseBody) + + return attributes.KnativeErrorTransformers(*destination.URL(), dispatchExecutionInfo.ResponseCode, string(encodedBuf[:encodedLen])) +} + +// isFailure returns true if the status code is not a successful HTTP status. +func isFailure(statusCode int) bool { + return statusCode < http.StatusOK /* 200 */ || + statusCode >= http.StatusMultipleChoices /* 300 */ +} + +func sanitizeAddressable(addressable *duckv1.Addressable) *duckv1.Addressable { + if addressable == nil { + return nil + } + + addressable.URL = sanitizeURL(addressable.URL) + + return addressable +} + +func sanitizeURL(url *apis.URL) *apis.URL { + if url == nil { + return nil + } + + if url.Scheme == "http" || url.Scheme == "https" { + // Already a URL with a known scheme. + return url + } + + return &apis.URL{ + Scheme: "http", + Host: url.Host, + Path: "/", + } +} diff --git a/pkg/channel/message_dispatcher_test.go b/pkg/kncloudevents/event_dispatcher_test.go similarity index 93% rename from pkg/channel/message_dispatcher_test.go rename to pkg/kncloudevents/event_dispatcher_test.go index 1103e020d7e..8f4b5e13035 100644 --- a/pkg/channel/message_dispatcher_test.go +++ b/pkg/kncloudevents/event_dispatcher_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Knative Authors +Copyright 2023 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package channel +package kncloudevents_test import ( "bytes" @@ -37,8 +37,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" "k8s.io/apimachinery/pkg/util/sets" rectesting "knative.dev/pkg/reconciler/testing" @@ -78,7 +76,7 @@ const ( testCeType = "testtype" ) -func TestDispatchMessage(t *testing.T) { +func TestSendEvent(t *testing.T) { testCases := map[string]struct { sendToDestination bool sendToReply bool @@ -196,56 +194,8 @@ func TestDispatchMessage(t *testing.T) { eventExtensions: map[string]string{ "abc": `"ce-abc-value"`, }, - expectedReplyRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "traceparent": {"ignored-value-header"}, - "ce-abc": {`"ce-abc-value"`}, - "ce-id": {"ignored-value-header"}, - "ce-time": {"ignored-value-header"}, - "ce-source": {testCeSource}, - "ce-type": {testCeType}, - "ce-specversion": {cloudevents.VersionV1}, - }, - Body: `"reply"`, - }, lastReceiver: "reply", - }, - "reply - only -- error": { - sendToReply: true, - header: map[string][]string{ - // do-not-forward should not get forwarded. - "do-not-forward": {"header"}, - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - }, - body: "reply", - eventExtensions: map[string]string{ - "abc": `"ce-abc-value"`, - }, - expectedReplyRequest: &requestValidation{ - Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, - "traceparent": {"ignored-value-header"}, - "ce-abc": {`"ce-abc-value"`}, - "ce-id": {"ignored-value-header"}, - "ce-time": {"ignored-value-header"}, - "ce-source": {testCeSource}, - "ce-type": {testCeType}, - "ce-specversion": {cloudevents.VersionV1}, - }, - Body: `"reply"`, - }, - fakeResponse: &http.Response{ - StatusCode: http.StatusNotFound, - Body: io.NopCloser(bytes.NewBufferString("destination-response")), - }, - expectedErr: true, + expectedErr: true, }, "destination and reply - dest returns bad status code": { sendToDestination: true, @@ -513,6 +463,7 @@ func TestDispatchMessage(t *testing.T) { lastReceiver: "deadLetter", }, "invalid reply and delivery option - deadletter reply without event": { + sendToDestination: true, sendToReply: true, hasDeadLetterSink: true, header: map[string][]string{ @@ -526,11 +477,12 @@ func TestDispatchMessage(t *testing.T) { eventExtensions: map[string]string{ "abc": `"ce-abc-value"`, }, - expectedReplyRequest: &requestValidation{ + expectedDestRequest: &requestValidation{ Headers: map[string][]string{ "x-request-id": {"id123"}, "knative-1": {"knative-1-value"}, "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, "traceparent": {"ignored-value-header"}, "ce-abc": {`"ce-abc-value"`}, "ce-id": {"ignored-value-header"}, @@ -541,16 +493,48 @@ func TestDispatchMessage(t *testing.T) { }, Body: `"destination"`, }, + fakeResponse: &http.Response{ + StatusCode: http.StatusAccepted, + Header: map[string][]string{ + "do-not-passthrough": {"no"}, + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, + "ce-abc": {`"new-ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedReplyRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"new-ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"2002-10-02T15:00:00Z"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: "destination-response", + }, + fakeReplyResponse: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBufferString("reply-response-body")), + }, expectedDeadLetterRequest: &requestValidation{ Headers: map[string][]string{ - "x-request-id": {"id123"}, - "knative-1": {"knative-1-value"}, - "knative-2": {"knative-2-value"}, + "x-request-id": {"altered-id"}, + "knative-1": {"new-knative-1-value"}, "traceparent": {"ignored-value-header"}, "ce-abc": {`"ce-abc-value"`}, "ce-id": {"ignored-value-header"}, "ce-knativeerrorcode": {strconv.Itoa(http.StatusBadRequest)}, - "ce-knativeerrordata": {base64.StdEncoding.EncodeToString([]byte("destination-response"))}, + "ce-knativeerrordata": {base64.StdEncoding.EncodeToString([]byte("reply-response-body"))}, "ce-time": {"2002-10-02T15:00:00Z"}, "ce-source": {testCeSource}, "ce-type": {testCeType}, @@ -558,13 +542,9 @@ func TestDispatchMessage(t *testing.T) { }, Body: `"destination"`, }, - fakeReplyResponse: &http.Response{ - StatusCode: http.StatusBadRequest, - Body: io.NopCloser(bytes.NewBufferString("destination-response")), - }, fakeDeadLetterResponse: &http.Response{ StatusCode: http.StatusAccepted, - Body: io.NopCloser(bytes.NewBufferString("deadlettersink-response")), + Body: io.NopCloser(bytes.NewBufferString("deadlettersink-response-body")), }, lastReceiver: "deadLetter", }, @@ -846,8 +826,6 @@ func TestDispatchMessage(t *testing.T) { ctx := context.Background() - md := NewMessageDispatcher(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) - destination := duckv1.Addressable{ URL: getOnlyDomainURL(t, tc.sendToDestination, destServer.URL), } @@ -872,7 +850,10 @@ func TestDispatchMessage(t *testing.T) { if tc.header != nil { headers = utils.PassThroughHeaders(tc.header) } - info, err := md.DispatchMessage(ctx, message, headers, destination, reply, deadLetterSink) + info, err := kncloudevents.SendMessage(ctx, message, destination, + kncloudevents.WithReply(reply), + kncloudevents.WithDeadLetterSink(deadLetterSink), + kncloudevents.WithHeader(headers)) if tc.lastReceiver != "" { switch tc.lastReceiver { @@ -962,11 +943,9 @@ func TestDispatchMessageToTLSEndpoint(t *testing.T) { } }() - md := NewMessageDispatcher(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) - // send event message := binding.ToMessage(&eventToSend) - info, err := md.DispatchMessage(ctx, message, nil, destination, nil, nil) + info, err := kncloudevents.SendMessage(ctx, message, destination) require.Nil(t, err) require.Equal(t, 200, info.ResponseCode) @@ -1027,11 +1006,9 @@ func TestDispatchMessageToTLSEndpointWithReply(t *testing.T) { } }() - md := NewMessageDispatcher(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) - // send event message := binding.ToMessage(&eventToSend) - info, err := md.DispatchMessage(ctx, message, nil, destination, &reply, nil) + info, err := kncloudevents.SendMessage(ctx, message, destination, kncloudevents.WithReply(&reply)) require.Nil(t, err) require.Equal(t, 200, info.ResponseCode) @@ -1087,11 +1064,9 @@ func TestDispatchMessageToTLSEndpointWithDeadLetterSink(t *testing.T) { } }() - md := NewMessageDispatcher(zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))) - // send event message := binding.ToMessage(&eventToSend) - info, err := md.DispatchMessage(ctx, message, nil, destination, nil, &dls) + info, err := kncloudevents.SendMessage(ctx, message, destination, kncloudevents.WithDeadLetterSink(&dls)) require.Nil(t, err) require.Equal(t, 200, info.ResponseCode) diff --git a/pkg/kncloudevents/message_receiver.go b/pkg/kncloudevents/event_receiver.go similarity index 78% rename from pkg/kncloudevents/message_receiver.go rename to pkg/kncloudevents/event_receiver.go index 32a9390ab16..152cf06b1c0 100644 --- a/pkg/kncloudevents/message_receiver.go +++ b/pkg/kncloudevents/event_receiver.go @@ -33,7 +33,7 @@ const ( DefaultShutdownTimeout = time.Minute * 1 ) -type HTTPMessageReceiver struct { +type HTTPEventReceiver struct { port int server *http.Server @@ -46,11 +46,11 @@ type HTTPMessageReceiver struct { Ready chan interface{} } -// HTTPMessageReceiverOption enables further configuration of a HTTPMessageReceiver. -type HTTPMessageReceiverOption func(*HTTPMessageReceiver) +// HTTPEventReceiverOption enables further configuration of a HTTPEventReceiver. +type HTTPEventReceiverOption func(*HTTPEventReceiver) -func NewHTTPMessageReceiver(port int, o ...HTTPMessageReceiverOption) *HTTPMessageReceiver { - h := &HTTPMessageReceiver{ +func NewHTTPEventReceiver(port int, o ...HTTPEventReceiverOption) *HTTPEventReceiver { + h := &HTTPEventReceiver{ port: port, } @@ -63,25 +63,25 @@ func NewHTTPMessageReceiver(port int, o ...HTTPMessageReceiverOption) *HTTPMessa } // WithChecker takes a handler func which will run as an additional health check in Drainer. -// kncloudevents HTTPMessageReceiver uses Drainer to perform health check. +// kncloudevents HTTPEventReceiver uses Drainer to perform health check. // By default, Drainer directly writes StatusOK to kubelet probe if the Pod is not draining. // Users can configure customized liveness and readiness check logic by defining checker here. -func WithChecker(checker http.HandlerFunc) HTTPMessageReceiverOption { - return func(h *HTTPMessageReceiver) { +func WithChecker(checker http.HandlerFunc) HTTPEventReceiverOption { + return func(h *HTTPEventReceiver) { h.checker = checker } } // WithDrainQuietPeriod configures the QuietPeriod for the Drainer. -func WithDrainQuietPeriod(duration time.Duration) HTTPMessageReceiverOption { - return func(h *HTTPMessageReceiver) { +func WithDrainQuietPeriod(duration time.Duration) HTTPEventReceiverOption { + return func(h *HTTPEventReceiver) { h.drainQuietPeriod = duration } } // WithTLSConfig configures the TLS config for the receiver. -func WithTLSConfig(cfg *tls.Config) HTTPMessageReceiverOption { - return func(h *HTTPMessageReceiver) { +func WithTLSConfig(cfg *tls.Config) HTTPEventReceiverOption { + return func(h *HTTPEventReceiver) { if h.server == nil { h.server = newServer() } @@ -92,8 +92,8 @@ func WithTLSConfig(cfg *tls.Config) HTTPMessageReceiverOption { // WithWriteTimeout sets the HTTP server's WriteTimeout. It covers the time between end of reading // Request Header to end of writing response. -func WithWriteTimeout(duration time.Duration) HTTPMessageReceiverOption { - return func(h *HTTPMessageReceiver) { +func WithWriteTimeout(duration time.Duration) HTTPEventReceiverOption { + return func(h *HTTPEventReceiver) { if h.server == nil { h.server = newServer() } @@ -104,8 +104,8 @@ func WithWriteTimeout(duration time.Duration) HTTPMessageReceiverOption { // WithReadTimeout sets the HTTP server's ReadTimeout. It covers the duration from reading the entire request // (Headers + Body) -func WithReadTimeout(duration time.Duration) HTTPMessageReceiverOption { - return func(h *HTTPMessageReceiver) { +func WithReadTimeout(duration time.Duration) HTTPEventReceiverOption { + return func(h *HTTPEventReceiver) { if h.server == nil { h.server = newServer() } @@ -114,7 +114,7 @@ func WithReadTimeout(duration time.Duration) HTTPMessageReceiverOption { } } -func (recv *HTTPMessageReceiver) GetAddr() string { +func (recv *HTTPEventReceiver) GetAddr() string { if recv.server != nil { return recv.server.Addr } @@ -123,7 +123,7 @@ func (recv *HTTPMessageReceiver) GetAddr() string { } // Blocking -func (recv *HTTPMessageReceiver) StartListen(ctx context.Context, handler http.Handler) error { +func (recv *HTTPEventReceiver) StartListen(ctx context.Context, handler http.Handler) error { var err error if recv.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", recv.port)); err != nil { return err diff --git a/pkg/kncloudevents/message_receiver_test.go b/pkg/kncloudevents/event_receiver_test.go similarity index 76% rename from pkg/kncloudevents/message_receiver_test.go rename to pkg/kncloudevents/event_receiver_test.go index 91c112667f4..894486d1d5b 100644 --- a/pkg/kncloudevents/message_receiver_test.go +++ b/pkg/kncloudevents/event_receiver_test.go @@ -37,13 +37,13 @@ func TestStartListenOnPort(t *testing.T) { port := 12999 drainQuietPeriod := time.Millisecond * 10 errChan := make(chan error) - messageReceiver := NewHTTPMessageReceiver(port, WithDrainQuietPeriod(drainQuietPeriod)) + eventReceiver := NewHTTPEventReceiver(port, WithDrainQuietPeriod(drainQuietPeriod)) ctx, cancelFunc := context.WithCancel(context.TODO()) go func() { - errChan <- messageReceiver.StartListen(ctx, &testEventParsingHandler{}) + errChan <- eventReceiver.StartListen(ctx, &testEventParsingHandler{}) }() - <-messageReceiver.Ready + <-eventReceiver.Ready conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{Port: port}) assert.NoError(t, err) conn.Close() @@ -59,19 +59,19 @@ func TestWithShutdownTimeout(t *testing.T) { serverShutdownTimeout := time.Millisecond * 15 errChan := make(chan error) receivedRequest := make(chan bool, 1) - messageReceiver := NewHTTPMessageReceiver(0, WithDrainQuietPeriod(drainQuietPeriod)) + eventReceiver := NewHTTPEventReceiver(0, WithDrainQuietPeriod(drainQuietPeriod)) ctx, cancelFunc := context.WithCancel(context.TODO()) go func() { - errChan <- messageReceiver.StartListen(WithShutdownTimeout(ctx, serverShutdownTimeout), &blockingHandler{ + errChan <- eventReceiver.StartListen(WithShutdownTimeout(ctx, serverShutdownTimeout), &blockingHandler{ blockFor: serverShutdownTimeout * 100, receivedRequest: receivedRequest, }) }() - <-messageReceiver.Ready + <-eventReceiver.Ready - addr := "http://" + messageReceiver.server.Addr + addr := "http://" + eventReceiver.server.Addr client := http.DefaultClient req, err := http.NewRequest("GET", addr, nil) assert.NoError(t, err) @@ -94,16 +94,16 @@ func TestWithChecker(t *testing.T) { checkerInvoked <- true writer.WriteHeader(http.StatusTeapot) } - messageReceiver := NewHTTPMessageReceiver(0, WithDrainQuietPeriod(drainQuietPeriod), WithChecker(someChecker)) + eventReceiver := NewHTTPEventReceiver(0, WithDrainQuietPeriod(drainQuietPeriod), WithChecker(someChecker)) ctx, cancelFunc := context.WithCancel(context.TODO()) go func() { - errChan <- messageReceiver.StartListen(ctx, &testEventParsingHandler{}) + errChan <- eventReceiver.StartListen(ctx, &testEventParsingHandler{}) }() - <-messageReceiver.Ready + <-eventReceiver.Ready - addr := "http://" + messageReceiver.server.Addr + addr := "http://" + eventReceiver.server.Addr client := http.DefaultClient req, err := http.NewRequest("GET", addr, nil) assert.NoError(t, err) @@ -123,14 +123,14 @@ func TestWithChecker(t *testing.T) { func TestStartListenReceiveEvent(t *testing.T) { drainQuietPeriod := time.Millisecond * 10 errChan := make(chan error) - messageReceiver := NewHTTPMessageReceiver(0, WithDrainQuietPeriod(drainQuietPeriod)) + eventReceiver := NewHTTPEventReceiver(0, WithDrainQuietPeriod(drainQuietPeriod)) ctx, cancelFunc := context.WithCancel(context.TODO()) handler := &testEventParsingHandler{ t: t, ReceivedEvents: make(chan *event.Event, 1), } go func() { - errChan <- messageReceiver.StartListen(ctx, handler) + errChan <- eventReceiver.StartListen(ctx, handler) }() p, err := cloudevents.NewHTTP() @@ -146,9 +146,9 @@ func TestStartListenReceiveEvent(t *testing.T) { "message": "Hi World!", }) - <-messageReceiver.Ready + <-eventReceiver.Ready - addr := "http://" + messageReceiver.server.Addr + addr := "http://" + eventReceiver.server.Addr ctx = cloudevents.ContextWithTarget(ctx, addr) res := ceClient.Send(ctx, ceEvent) if cloudevents.IsUndelivered(res) { @@ -173,38 +173,38 @@ func TestStartListenReceiveEvent(t *testing.T) { func TestGetAddr(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.TODO()) errChan := make(chan error) - messageReceiver := NewHTTPMessageReceiver(0, WithDrainQuietPeriod(time.Millisecond)) + eventReceiver := NewHTTPEventReceiver(0, WithDrainQuietPeriod(time.Millisecond)) go func() { - errChan <- messageReceiver.StartListen(WithShutdownTimeout(ctx, time.Millisecond), &blockingHandler{blockFor: time.Microsecond}) + errChan <- eventReceiver.StartListen(WithShutdownTimeout(ctx, time.Millisecond), &blockingHandler{blockFor: time.Microsecond}) }() - <-messageReceiver.Ready - assert.NotEmpty(t, messageReceiver.GetAddr()) + <-eventReceiver.Ready + assert.NotEmpty(t, eventReceiver.GetAddr()) cancelFunc() assert.Equal(t, nil, <-errChan) } func TestGetAddrEmpty(t *testing.T) { - messageReceiver := NewHTTPMessageReceiver(0, WithDrainQuietPeriod(time.Millisecond)) + eventReceiver := NewHTTPEventReceiver(0, WithDrainQuietPeriod(time.Millisecond)) - assert.Empty(t, messageReceiver.GetAddr()) + assert.Empty(t, eventReceiver.GetAddr()) } func TestWithWriteTimeout(t *testing.T) { writeTimeout := time.Millisecond * 10 - messageReceiver := NewHTTPMessageReceiver(0, WithWriteTimeout(writeTimeout)) + eventReceiver := NewHTTPEventReceiver(0, WithWriteTimeout(writeTimeout)) - assert.Equal(t, writeTimeout, messageReceiver.server.WriteTimeout) + assert.Equal(t, writeTimeout, eventReceiver.server.WriteTimeout) } func TestWithReadTimeout(t *testing.T) { readTimeout := 30 * time.Second - messageReceiver := NewHTTPMessageReceiver(0, WithReadTimeout(readTimeout)) + eventReceiver := NewHTTPEventReceiver(0, WithReadTimeout(readTimeout)) - assert.Equal(t, readTimeout, messageReceiver.server.ReadTimeout) + assert.Equal(t, readTimeout, eventReceiver.server.ReadTimeout) } type testEventParsingHandler struct { diff --git a/pkg/kncloudevents/message_sender_new.go b/pkg/kncloudevents/message_sender_new.go index 16f3accd7ee..593e5a77c05 100644 --- a/pkg/kncloudevents/message_sender_new.go +++ b/pkg/kncloudevents/message_sender_new.go @@ -81,6 +81,7 @@ func (req *CloudEventRequest) SendWithRetries(config *RetryConfig) (*nethttp.Res return retryableClient.Do(retryableReq) } +// Deprecated: use pkg/kncloudevents.SendEvent() instead func NewCloudEventRequest(ctx context.Context, target duckv1.Addressable) (*CloudEventRequest, error) { nethttpReqest, err := nethttp.NewRequestWithContext(ctx, "POST", target.URL.String(), nil) if err != nil { diff --git a/pkg/kncloudevents/utils.go b/pkg/kncloudevents/utils.go index 02e58497b76..8d129c9027f 100644 --- a/pkg/kncloudevents/utils.go +++ b/pkg/kncloudevents/utils.go @@ -21,9 +21,7 @@ import ( nethttp "net/http" "github.com/cloudevents/sdk-go/v2/binding" - "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/protocol/http" - "github.com/cloudevents/sdk-go/v2/types" ) func WriteHTTPRequestWithAdditionalHeaders(ctx context.Context, message binding.Message, req *nethttp.Request, @@ -44,17 +42,3 @@ func WriteRequestWithAdditionalHeaders(ctx context.Context, message binding.Mess additionalHeaders nethttp.Header, transformers ...binding.Transformer) error { return WriteHTTPRequestWithAdditionalHeaders(ctx, message, req.Request, additionalHeaders, transformers...) } - -type TypeExtractorTransformer string - -func (a *TypeExtractorTransformer) Transform(reader binding.MessageMetadataReader, _ binding.MessageMetadataWriter) error { - _, ty := reader.GetAttribute(spec.Type) - if ty != nil { - tyParsed, err := types.ToString(ty) - if err != nil { - return err - } - *a = TypeExtractorTransformer(tyParsed) - } - return nil -} diff --git a/pkg/kncloudevents/utils_test.go b/pkg/kncloudevents/utils_test.go index a7afc362d8f..faa44fc0896 100644 --- a/pkg/kncloudevents/utils_test.go +++ b/pkg/kncloudevents/utils_test.go @@ -22,7 +22,6 @@ import ( "net/http" "testing" - "github.com/cloudevents/sdk-go/v2/binding/test" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -87,27 +86,3 @@ func TestWriteRequestWithAdditionalHeadersAddsHeadersToRequest(t *testing.T) { assert.Equal(t, additionalHeaders["Another-Key"], request.Header["Another-Key"]) } - -// If reader's message does have Type attribute -func TestTypeExtractorTransformerWithType(t *testing.T) { - ceType := "some.custom.type" - ceEvent := cloudevents.NewEvent() - ceEvent.SetType(ceType) - - var te = TypeExtractorTransformer("") - mockBinaryMsg := test.MustCreateMockBinaryMessage(ceEvent) - te.Transform(mockBinaryMsg.(binding.MessageMetadataReader), mockBinaryMsg.(binding.MessageMetadataWriter)) - - assert.Equal(t, ceType, string(te)) -} - -// If reader's message does NOT have Type attribute -func TestTypeExtractorTransformerWithoutType(t *testing.T) { - ceEvent := cloudevents.NewEvent() - - var te = TypeExtractorTransformer("") - mockBinaryMsg := test.MustCreateMockBinaryMessage(ceEvent) - te.Transform(mockBinaryMsg.(binding.MessageMetadataReader), mockBinaryMsg.(binding.MessageMetadataWriter)) - - assert.Equal(t, "", string(te)) -} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index e7eda8b867c..7d4b4394429 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -109,7 +109,7 @@ func NewController( reporter := channel.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) - sh := multichannelfanout.NewMessageHandler(ctx, logger.Desugar()) + sh := multichannelfanout.NewEventHandler(ctx, logger.Desugar()) inmemorychannelInformer := inmemorychannelinformer.Get(ctx) @@ -119,11 +119,11 @@ func NewController( } r := &Reconciler{ - multiChannelMessageHandler: sh, - reporter: reporter, - messagingClientSet: eventingclient.Get(ctx).MessagingV1(), - eventingClient: eventingclient.Get(ctx).EventingV1beta2(), - eventTypeLister: eventtypeinformer.Get(ctx).Lister(), + multiChannelEventHandler: sh, + reporter: reporter, + messagingClientSet: eventingclient.Get(ctx).MessagingV1(), + eventingClient: eventingclient.Get(ctx).EventingV1beta2(), + eventTypeLister: eventtypeinformer.Get(ctx).Lister(), } impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { @@ -147,18 +147,18 @@ func NewController( r.featureStore = featureStore - httpArgs := &inmemorychannel.InMemoryMessageDispatcherArgs{ + httpArgs := &inmemorychannel.InMemoryEventDispatcherArgs{ Port: httpPort, ReadTimeout: readTimeout, WriteTimeout: writeTimeout, Handler: sh, Logger: logger.Desugar(), - HTTPMessageReceiverOptions: []kncloudevents.HTTPMessageReceiverOption{ + HTTPEventReceiverOptions: []kncloudevents.HTTPEventReceiverOption{ kncloudevents.WithChecker(readinessCheckerHTTPHandler(readinessChecker)), }, } - httpDispatcher := inmemorychannel.NewMessageDispatcher(httpArgs) + httpDispatcher := inmemorychannel.NewEventDispatcher(httpArgs) httpReceiver := httpDispatcher.GetReceiver() secret := types.NamespacedName{ @@ -171,16 +171,16 @@ func NewController( if err != nil { logger.Panicf("unable to get tls config: %s", err) } - httpsArgs := &inmemorychannel.InMemoryMessageDispatcherArgs{ + httpsArgs := &inmemorychannel.InMemoryEventDispatcherArgs{ Port: httpsPort, ReadTimeout: readTimeout, WriteTimeout: writeTimeout, Handler: sh, Logger: logger.Desugar(), - HTTPMessageReceiverOptions: []kncloudevents.HTTPMessageReceiverOption{kncloudevents.WithTLSConfig(tlsConfig)}, + HTTPEventReceiverOptions: []kncloudevents.HTTPEventReceiverOption{kncloudevents.WithTLSConfig(tlsConfig)}, } - httpsDispatcher := inmemorychannel.NewMessageDispatcher(httpsArgs) + httpsDispatcher := inmemorychannel.NewEventDispatcher(httpsArgs) httpsReceiver := httpsDispatcher.GetReceiver() s, err := eventingtls.NewServerManager(ctx, &httpReceiver, &httpsReceiver, httpDispatcher.GetHandler(ctx), cmw) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 1f723676dca..05e22d5a998 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -50,12 +50,12 @@ import ( // Reconciler reconciles InMemory Channels. type Reconciler struct { - multiChannelMessageHandler multichannelfanout.MultiChannelMessageHandler - reporter channel.StatsReporter - messagingClientSet messagingv1.MessagingV1Interface - eventTypeLister v1beta2.EventTypeLister - eventingClient eventingv1beta2.EventingV1beta2Interface - featureStore *feature.Store + multiChannelEventHandler multichannelfanout.MultiChannelEventHandler + reporter channel.StatsReporter + messagingClientSet messagingv1.MessagingV1Interface + eventTypeLister v1beta2.EventTypeLister + eventingClient eventingv1beta2.EventingV1beta2Interface + featureStore *feature.Store } // Check the interfaces Reconciler should implement @@ -107,12 +107,11 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec } // First grab the host based MultiChannelFanoutMessage httpHandler - httpHandler := r.multiChannelMessageHandler.GetChannelHandler(config.HostName) + httpHandler := r.multiChannelEventHandler.GetChannelHandler(config.HostName) if httpHandler == nil { // No handler yet, create one. - fanoutHandler, err := fanout.NewFanoutMessageHandler( + fanoutHandler, err := fanout.NewFanoutEventHandler( logging.FromContext(ctx).Desugar(), - channel.NewMessageDispatcher(logging.FromContext(ctx).Desugar()), config.FanoutConfig, r.reporter, eventTypeAutoHandler, @@ -120,10 +119,10 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec UID, ) if err != nil { - logging.FromContext(ctx).Error("Failed to create a new fanout.MessageHandler", err) + logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) return err } - r.multiChannelMessageHandler.SetChannelHandler(config.HostName, fanoutHandler) + r.multiChannelEventHandler.SetChannelHandler(config.HostName, fanoutHandler) } else { // Just update the config if necessary. haveSubs := httpHandler.GetSubscriptions(ctx) @@ -136,24 +135,23 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec } // Look for an https handler that's configured to use paths - httpsHandler := r.multiChannelMessageHandler.GetChannelHandler(config.Path) + httpsHandler := r.multiChannelEventHandler.GetChannelHandler(config.Path) if httpsHandler == nil { // No handler yet, create one. - fanoutHandler, err := fanout.NewFanoutMessageHandler( + fanoutHandler, err := fanout.NewFanoutEventHandler( logging.FromContext(ctx).Desugar(), - channel.NewMessageDispatcher(logging.FromContext(ctx).Desugar()), config.FanoutConfig, r.reporter, eventTypeAutoHandler, channelReference, UID, - channel.ResolveMessageChannelFromPath(channel.ParseChannelFromPath), + channel.ResolveChannelFromPath(channel.ParseChannelFromPath), ) if err != nil { - logging.FromContext(ctx).Error("Failed to create a new fanout.MessageHandler", err) + logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) return err } - r.multiChannelMessageHandler.SetChannelHandler(config.Path, fanoutHandler) + r.multiChannelEventHandler.SetChannelHandler(config.Path, fanoutHandler) } else { // Just update the config if necessary. haveSubs := httpsHandler.GetSubscriptions(ctx) @@ -241,7 +239,7 @@ func (r *Reconciler) deleteFunc(obj interface{}) { } if imc.Status.Address != nil && imc.Status.Address.URL != nil { if hostName := imc.Status.Address.URL.Host; hostName != "" { - r.multiChannelMessageHandler.DeleteChannelHandler(hostName) + r.multiChannelEventHandler.DeleteChannelHandler(hostName) } } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index b93850b2f5b..8eb94926489 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -40,7 +40,6 @@ import ( eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel" @@ -311,8 +310,8 @@ func TestAllCases(t *testing.T) { table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = v1addr.WithDuck(ctx) r := &Reconciler{ - multiChannelMessageHandler: newFakeMultiChannelHandler(), - messagingClientSet: fakeeventingclient.Get(ctx).MessagingV1(), + multiChannelEventHandler: newFakeMultiChannelHandler(), + messagingClientSet: fakeeventingclient.Get(ctx).MessagingV1(), } return inmemorychannel.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetInMemoryChannelLister(), @@ -498,11 +497,11 @@ func TestReconciler_ReconcileKind(t *testing.T) { }) // Just run the tests once with no existing handler (creates the handler) and once // with an existing, so we exercise both paths at once. - fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil, nil, nil, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil) if err != nil { t.Error(err) } - for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} { + for _, fanoutHandler := range []fanout.EventHandler{nil, fh} { t.Run("handler-"+n, func(t *testing.T) { handler := newFakeMultiChannelHandler() if fanoutHandler != nil { @@ -510,9 +509,9 @@ func TestReconciler_ReconcileKind(t *testing.T) { handler.SetChannelHandler(channelServiceAddress.URL.String(), fanoutHandler) } r := &Reconciler{ - multiChannelMessageHandler: handler, - messagingClientSet: fakeEventingClient.MessagingV1(), - featureStore: feature.NewStore(logtesting.TestLogger(t)), + multiChannelEventHandler: handler, + messagingClientSet: fakeEventingClient.MessagingV1(), + featureStore: feature.NewStore(logtesting.TestLogger(t)), } e := r.ReconcileKind(ctx, tc.imc) if e != tc.wantResult { @@ -543,18 +542,18 @@ func TestReconciler_InvalidInputs(t *testing.T) { }, } for n, tc := range testCases { - fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil, nil, nil, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil) if err != nil { t.Error(err) } - for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} { + for _, fanoutHandler := range []fanout.EventHandler{nil, fh} { t.Run("handler-"+n, func(t *testing.T) { handler := newFakeMultiChannelHandler() if fanoutHandler != nil { handler.SetChannelHandler(channelServiceAddress.URL.String(), fanoutHandler) } r := &Reconciler{ - multiChannelMessageHandler: handler, + multiChannelEventHandler: handler, } r.deleteFunc(tc.imc) }) @@ -573,18 +572,18 @@ func TestReconciler_Deletion(t *testing.T) { }, } for n, tc := range testCases { - fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil, nil, nil, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil) if err != nil { t.Error(err) } - for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} { + for _, fanoutHandler := range []fanout.EventHandler{nil, fh} { t.Run("handler-"+n, func(t *testing.T) { handler := newFakeMultiChannelHandler() if fanoutHandler != nil { handler.SetChannelHandler(channelServiceAddress.URL.Host, fanoutHandler) } r := &Reconciler{ - multiChannelMessageHandler: handler, + multiChannelEventHandler: handler, } r.deleteFunc(tc.imc) if handler.GetChannelHandler(channelServiceAddress.URL.Host) != nil { @@ -606,16 +605,16 @@ func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl { } type fakeMultiChannelHandler struct { - handlers map[string]fanout.MessageHandler + handlers map[string]fanout.EventHandler } func newFakeMultiChannelHandler() *fakeMultiChannelHandler { - return &fakeMultiChannelHandler{handlers: make(map[string]fanout.MessageHandler, 2)} + return &fakeMultiChannelHandler{handlers: make(map[string]fanout.EventHandler, 2)} } func (f *fakeMultiChannelHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {} -func (f *fakeMultiChannelHandler) SetChannelHandler(host string, handler fanout.MessageHandler) { +func (f *fakeMultiChannelHandler) SetChannelHandler(host string, handler fanout.EventHandler) { f.handlers[host] = handler } @@ -623,7 +622,7 @@ func (f *fakeMultiChannelHandler) DeleteChannelHandler(host string) { delete(f.handlers, host) } -func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.MessageHandler { +func (f *fakeMultiChannelHandler) GetChannelHandler(host string) fanout.EventHandler { return f.handlers[host] } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/readiness.go b/pkg/reconciler/inmemorychannel/dispatcher/readiness.go index e6fdf0187f3..c45ba5b6648 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/readiness.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/readiness.go @@ -44,7 +44,7 @@ type DispatcherReadyChecker struct { chLister messaginglistersv1.InMemoryChannelLister // Allows listing/counting the handlers which have already been registered. - chMsgHandler multichannelfanout.MultiChannelMessageHandler + chMsgHandler multichannelfanout.MultiChannelEventHandler // Allows safe concurrent read/write of 'isReady'. sync.Mutex diff --git a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go index 8079f49065c..e0ec28d8ce1 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/readiness_test.go @@ -64,8 +64,8 @@ func TestReadinessChecker(t *testing.T) { } // Add one handler - handler.SetChannelHandler("foo", &fanout.FanoutMessageHandler{}) - handler.SetChannelHandler("bar", &fanout.FanoutMessageHandler{}) + handler.SetChannelHandler("foo", &fanout.FanoutEventHandler{}) + handler.SetChannelHandler("bar", &fanout.FanoutEventHandler{}) res, err = http.Get(ts.URL) if err != nil { diff --git a/pkg/utils/headers.go b/pkg/utils/headers.go index e3c37378683..a565926bdcb 100644 --- a/pkg/utils/headers.go +++ b/pkg/utils/headers.go @@ -28,8 +28,8 @@ import ( var ( // These MUST be lowercase strings, as they will be compared against lowercase strings. forwardHeaders = sets.NewString( - // tracing - "x-request-id", + "x-request-id", // tracing + "retry-after", ) // These MUST be lowercase strings, as they will be compared against lowercase strings. // Removing CloudEvents ce- prefixes on purpose as they should be set in the CloudEvent itself as extensions. diff --git a/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE b/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE deleted file mode 100644 index f7c935c201b..00000000000 --- a/third_party/VENDOR-LICENSE/github.com/valyala/bytebufferpool/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2016 Aliaksandr Valialkin, VertaMedia - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -