Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kncloudevents and add SendEvent function #7092

Merged
merged 34 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4e10f95
Copy message dispatcher to kncloudevents package
creydr Jul 14, 2023
d97f8c2
Remove interfaces
creydr Jul 14, 2023
772a697
Rename function and change signature to use options
creydr Jul 14, 2023
b55f5d2
Refactor to include response headers in DispatchInfo
creydr Jul 14, 2023
8cd8f63
Use http client wrapper
creydr Jul 14, 2023
42d073a
Return error if destination is nil instead of sending it to reply
creydr Jul 14, 2023
78ae675
Add option for additional transformers
creydr Jul 14, 2023
025a8e4
Add option to specify retryConfig
creydr Jul 14, 2023
43ea015
Migrate channel to kncloudevents package
creydr Jul 14, 2023
3b2740f
Do not allow empty destination
creydr Jul 14, 2023
56f1aed
Fix license header
creydr Jul 14, 2023
6768a0a
Refactor fanout message handler to take an event
creydr Jul 25, 2023
eadb4be
Rename XYZMessageHandler to XYZEventHandler
creydr Jul 25, 2023
76f7fa9
Run hack/update-codegen.sh
creydr Jul 25, 2023
541745a
Rename message_dispatcher -> event_dispatcher
creydr Jul 25, 2023
909ab95
Fix misspellings
creydr Jul 25, 2023
7a5b628
Rename pkg/imc/message_dispatcher -> event_dispatcher
creydr Jul 26, 2023
437c0c4
Rename kncloudevents/message_dispatcher -> event_dispatcher
creydr Jul 26, 2023
faa3916
Remove unneeded transformers parameter in FanoutEventHandler.autoCrea…
creydr Jul 26, 2023
3d5b691
Add doc to SendEvent|Message function
creydr Jul 26, 2023
f4241b6
Migrate ingress-handler
creydr Jul 28, 2023
d8c37b6
Migrate imc/event_dispatcher test
creydr Jul 28, 2023
a0f5d3b
Add deprecation note to kncloudevents.NewCloudEventsRequest
creydr Jul 28, 2023
c032c68
Migrate filter-handler
creydr Jul 28, 2023
e7c103e
Fix boilerplate and remove unneeded comment
creydr Jul 28, 2023
1e1203f
Filter headers in ingress-handler
creydr Jul 31, 2023
56ed455
Remove unneeded transformers param from receiverFunc
creydr Jul 31, 2023
8dc2540
Simplify event-receiver serveHTTP
creydr Jul 31, 2023
062d28e
Wrap errors in event_dispatcher
creydr Jul 31, 2023
64c97f1
Filter headers in filter-handler
creydr Jul 31, 2023
dba0268
Fix filter for errorExtension response
creydr Aug 1, 2023
0aff6d6
Move knativeErrorTransformer to kncloudevents package
creydr Aug 1, 2023
e11f71a
Simplify filter_handler ServeHTTP
creydr Aug 1, 2023
6edacd9
Remove old message_dispatcher
creydr Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 39 additions & 128 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package filter

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
Expand All @@ -38,7 +38,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
channelAttributes "knative.dev/eventing/pkg/channel/attributes"
"knative.dev/eventing/pkg/utils"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
Expand All @@ -51,7 +51,6 @@ import (
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

const (
Expand All @@ -65,24 +64,6 @@ const (
defaultMaxIdleConnectionsPerHost = 100
)

const (
// NoResponse signals the step that send event to trigger's subscriber hasn't started
NoResponse = -1
)

// ErrHandler handle the different errors of filter dispatch process
type ErrHandler struct {
ResponseCode int
ResponseBody []byte
err error
}

// HeaderProxyAllowList contains the headers that are proxied from the reply; other than the CloudEvents headers.
// Other headers are not proxied because of security concerns.
var HeaderProxyAllowList = map[string]struct{}{
strings.ToLower("Retry-After"): {},
}

// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Handler struct {
// reporter reports stats of status code and dispatch time
Expand All @@ -93,7 +74,7 @@ type Handler struct {
withContext func(ctx context.Context) context.Context
}

// NewHandler creates a new Handler and its associated MessageReceiver.
// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
Expand Down Expand Up @@ -164,10 +145,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

ctx := h.withContext(request.Context())

message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)

event, err := binding.ToEvent(ctx, message)
event, err := cehttp.NewEventFromHTTPRequest(request)
if err != nil {
h.logger.Warn("failed to extract event from request", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -242,123 +220,64 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
URL: t.Status.SubscriberURI,
CACerts: t.Status.SubscriberCACerts,
}
h.send(ctx, writer, request.Header, target, reportArgs, event, t, ttl)
h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, t, ttl)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
// send the event to trigger's subscriber
response, responseErr := h.sendEvent(ctx, headers, target, event, t, reportArgs)

if responseErr.err != nil {
h.logger.Error("failed to send event", zap.Error(responseErr.err))
// If error is not because of the response, it should respond with http.StatusInternalServerError
if responseErr.ResponseCode == NoResponse {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, target, kncloudevents.WithHeader(additionalHeaders))
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))

// If error is not because of the response, it should respond with http.StatusInternalServerError
if dispatchInfo.ResponseCode <= 0 {
writer.WriteHeader(http.StatusInternalServerError)
_ = h.reporter.ReportEventCount(reportArgs, http.StatusInternalServerError)
return
}
// If error has a response propagate subscriber's headers back to channel
if response != nil {
proxyHeaders(response.Header, writer)
}
writer.WriteHeader(responseErr.ResponseCode)

h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration)

writeHeaders(utils.PassThroughHeaders(dispatchInfo.ResponseHeader), writer)
writer.WriteHeader(dispatchInfo.ResponseCode)

// Read Response body to responseErr
errExtensionInfo := broker.ErrExtensionInfo{
ErrDestination: target.URL,
ErrResponseBody: responseErr.ResponseBody,
ErrResponseBody: dispatchInfo.ResponseBody,
}
errExtensionBytes, msErr := json.Marshal(errExtensionInfo)
if msErr != nil {
h.logger.Error("failed to marshal errExtensionInfo", zap.Error(msErr))
return
}
_, _ = writer.Write(errExtensionBytes)
_ = h.reporter.ReportEventCount(reportArgs, responseErr.ResponseCode)
_, err = writer.Write(errExtensionBytes)
if err != nil {
h.logger.Error("failed to write error response", zap.Error(err))
}
_ = h.reporter.ReportEventCount(reportArgs, dispatchInfo.ResponseCode)

return
}

h.logger.Debug("Successfully dispatched message", zap.Any("target", target))

h.reporter.ReportEventDispatchTime(reportArgs, dispatchInfo.ResponseCode, dispatchInfo.Duration)

// If there is an event in the response write it to the response
statusCode, err := h.writeResponse(ctx, writer, response, ttl, target.URL.String())
statusCode, err := h.writeResponse(ctx, writer, dispatchInfo, ttl, target.URL.String())
if err != nil {
h.logger.Error("failed to write response", zap.Error(err))
}
_ = h.reporter.ReportEventCount(reportArgs, statusCode)
}

func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target duckv1.Addressable, event *cloudevents.Event, t *eventingv1.Trigger, reporterArgs *ReportArgs) (*http.Response, ErrHandler) {
responseErr := ErrHandler{
ResponseCode: NoResponse,
}

// Send the event to the subscriber
req, err := kncloudevents.NewCloudEventRequest(ctx, target)
if err != nil {
responseErr.err = fmt.Errorf("failed to create the request: %w", err)
return nil, responseErr
}

message := binding.ToMessage(event)
defer message.Finish(nil)

additionalHeaders := utils.PassThroughHeaders(headers)
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

// Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events
additionalHeaders.Set("prefer", "reply")

err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, req, additionalHeaders)
if err != nil {
responseErr.err = fmt.Errorf("failed to write request: %w", err)
return nil, responseErr
}

start := time.Now()
resp, err := req.Send()
dispatchTime := time.Since(start)
if err != nil {
responseErr.ResponseCode = http.StatusInternalServerError
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", err.Error()))
responseErr.err = fmt.Errorf("failed to dispatch message: %w", err)
return resp, responseErr
}

sc := 0
if resp != nil {
sc = resp.StatusCode
responseErr.ResponseCode = sc
}

_ = h.reporter.ReportEventDispatchTime(reporterArgs, sc, dispatchTime)

if resp.StatusCode < http.StatusOK ||
resp.StatusCode >= http.StatusMultipleChoices {
// Read response body into errHandler for failures
body := make([]byte, channelAttributes.KnativeErrorDataExtensionMaxLength)

readLen, readErr := resp.Body.Read(body)
if readErr != nil && readErr != io.EOF {
h.logger.Error("failed to read response body into DispatchExecutionInfo", zap.Error(readErr))
responseErr.ResponseBody = []byte(fmt.Sprintf("dispatch error: %s", readErr.Error()))
} else {
responseErr.ResponseBody = body[:readLen]
}
responseErr.err = fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", resp.StatusCode)

// Reject non-successful responses.
return resp, responseErr
}

return resp, responseErr
}

// The return values are the status
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, resp *http.Response, ttl int32, target string) (int, error) {
response := cehttp.NewMessageFromHttpResponse(resp)
func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, dispatchInfo *kncloudevents.DispatchInfo, ttl int32, target string) (int, error) {
response := cehttp.NewMessage(dispatchInfo.ResponseHeader, io.NopCloser(bytes.NewReader(dispatchInfo.ResponseBody)))
defer response.Finish(nil)

if response.ReadEncoding() == binding.EncodingUnknown {
Expand All @@ -375,10 +294,10 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
writer.WriteHeader(http.StatusBadGateway)
return http.StatusBadGateway, errors.New("received a non-empty response not recognized as CloudEvent. The response MUST be either empty or a valid CloudEvent")
}
proxyHeaders(resp.Header, writer) // Proxy original Response Headers for downstream use
writeHeaders(dispatchInfo.ResponseHeader, writer) // Proxy original Response Headers for downstream use
h.logger.Debug("Response doesn't contain a CloudEvent, replying with an empty response", zap.Any("target", target))
writer.WriteHeader(resp.StatusCode)
return resp.StatusCode, nil
writer.WriteHeader(dispatchInfo.ResponseCode)
return dispatchInfo.ResponseCode, nil
}

event, err := binding.ToEvent(ctx, response)
Expand All @@ -400,15 +319,15 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter,
defer eventResponse.Finish(nil)

// Proxy the original Response Headers for downstream use
proxyHeaders(resp.Header, writer)
writeHeaders(dispatchInfo.ResponseHeader, writer)

if err := cehttp.WriteResponseWriter(ctx, eventResponse, resp.StatusCode, writer); err != nil {
if err := cehttp.WriteResponseWriter(ctx, eventResponse, dispatchInfo.ResponseCode, writer); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to write response event: %w", err)
}

h.logger.Debug("Replied with a CloudEvent response", zap.Any("target", target))

return resp.StatusCode, nil
return dispatchInfo.ResponseCode, nil
}

func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) {
Expand Down Expand Up @@ -523,19 +442,11 @@ func triggerFilterAttribute(filter *eventingv1.TriggerFilter, attributeName stri
return attributeValue
}

// proxyHeaders adds the specified HTTP Headers to the ResponseWriter.
func proxyHeaders(httpHeader http.Header, writer http.ResponseWriter) {
// writeHeaders adds the specified HTTP Headers to the ResponseWriter.
func writeHeaders(httpHeader http.Header, writer http.ResponseWriter) {
for headerKey, headerValues := range httpHeader {
// *Only* proxy some headers because of security reasons
if isInProxyHeaderAllowList(headerKey) {
for _, headerValue := range headerValues {
writer.Header().Add(headerKey, headerValue)
}
for _, headerValue := range headerValues {
writer.Header().Add(headerKey, headerValue)
}
}
}

func isInProxyHeaderAllowList(headerKey string) bool {
_, exists := HeaderProxyAllowList[strings.ToLower(headerKey)]
return exists
}
2 changes: 1 addition & 1 deletion pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func TestReceiver(t *testing.T) {
return
}
if err != nil || event == nil {
t.Fatalf("Expected response event, actually nil")
t.Fatalf("Expected response event, actually nil (err: %+v)", err)
}

// The TTL will be added again.
Expand Down
4 changes: 2 additions & 2 deletions pkg/broker/filter/server_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewServerManager(ctx context.Context, logger *zap.Logger, cmw configmap.Wat
logger.Info("failed to get TLS server config", zap.Error(err))
}

httpReceiver := kncloudevents.NewHTTPMessageReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPMessageReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))
httpReceiver := kncloudevents.NewHTTPEventReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPEventReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))

return eventingtls.NewServerManager(ctx, httpReceiver, httpsReceiver, handler, cmw)
}
Expand Down
48 changes: 7 additions & 41 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ import (
)

const (
// noDuration signals that the dispatch step hasn't started
noDuration = -1
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
)
Expand Down Expand Up @@ -228,8 +226,8 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
eventType: event.Type(),
}

statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName)
if dispatchTime > noDuration {
statusCode, dispatchTime := h.receive(ctx, utils.PassThroughHeaders(request.Header), event, brokerNamespace, brokerName)
if dispatchTime > kncloudevents.NoDuration {
_ = h.Reporter.ReportEventDispatchTime(reporterArgs, statusCode, dispatchTime)
}
_ = h.Reporter.ReportEventCount(reporterArgs, statusCode)
Expand Down Expand Up @@ -259,7 +257,6 @@ func toKReference(broker *eventingv1.Broker) *duckv1.KReference {
}

func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) {

// Setting the extension as a string as the CloudEvents sdk does not support non-string extensions.
event.SetExtension(broker.EventArrivalTime, cloudevents.Timestamp{Time: time.Now()})
if h.Defaulter != nil {
Expand All @@ -269,51 +266,20 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud

if ttl, err := broker.GetTTL(event.Context); err != nil || ttl <= 0 {
h.Logger.Debug("dropping event based on TTL status.", zap.Int32("TTL", ttl), zap.String("event.id", event.ID()), zap.Error(err))
return http.StatusBadRequest, noDuration
return http.StatusBadRequest, kncloudevents.NoDuration
}

channelAddress, err := h.getChannelAddress(brokerName, brokerNamespace)
if err != nil {
h.Logger.Warn("Broker not found in the namespace", zap.Error(err))
return http.StatusBadRequest, noDuration
return http.StatusBadRequest, kncloudevents.NoDuration
}

return h.send(ctx, headers, event, *channelAddress)
}

func (h *Handler) send(ctx context.Context, headers http.Header, event *cloudevents.Event, target duckv1.Addressable) (int, time.Duration) {

request, err := kncloudevents.NewCloudEventRequest(ctx, target)
if err != nil {
h.Logger.Error("failed to create event request.", zap.Error(err))
return http.StatusInternalServerError, noDuration
}

message := binding.ToMessage(event)
defer message.Finish(nil)

additionalHeaders := utils.PassThroughHeaders(headers)
err = kncloudevents.WriteRequestWithAdditionalHeaders(ctx, message, request, additionalHeaders)
if err != nil {
h.Logger.Error("failed to write request additionalHeaders.", zap.Error(err))
return http.StatusInternalServerError, noDuration
}

resp, dispatchTime, err := h.sendAndRecordDispatchTime(request)
if resp != nil {
defer resp.Body.Close()
}
dispatchInfo, err := kncloudevents.SendEvent(ctx, *event, *channelAddress, kncloudevents.WithHeader(headers))
if err != nil {
h.Logger.Error("failed to dispatch event", zap.Error(err))
return http.StatusInternalServerError, dispatchTime
return http.StatusInternalServerError, kncloudevents.NoDuration
}

return resp.StatusCode, dispatchTime
}

func (h *Handler) sendAndRecordDispatchTime(request *kncloudevents.CloudEventRequest) (*http.Response, time.Duration, error) {
start := time.Now()
resp, err := request.Send()
dispatchTime := time.Since(start)
return resp, dispatchTime, err
return dispatchInfo.ResponseCode, dispatchInfo.Duration
}
4 changes: 2 additions & 2 deletions pkg/broker/ingress/server_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewServerManager(ctx context.Context, logger *zap.Logger, cmw configmap.Wat
logger.Info("failed to get TLS server config", zap.Error(err))
}

httpReceiver := kncloudevents.NewHTTPMessageReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPMessageReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))
httpReceiver := kncloudevents.NewHTTPEventReceiver(httpPort)
httpsReceiver := kncloudevents.NewHTTPEventReceiver(httpsPort, kncloudevents.WithTLSConfig(tlsConfig))

return eventingtls.NewServerManager(ctx, httpReceiver, httpsReceiver, handler, cmw)
}
Expand Down
Loading
Loading