From 5c13f1f01c6a9f596857d81c889d77bd04fbc5c1 Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Thu, 11 Jan 2024 17:40:42 +0000 Subject: [PATCH] Fixes for request body passing into text streaming proxy In the previous version, whilst responses were streamed correctly, the request body was not being received by the function. This has been tested, along with adding a forced timeout according to upstream_timeout, which was a miss in the original commit. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/handlers/forwarding_proxy.go | 54 +++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 64c8f516b..d97f99bdd 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -5,6 +5,7 @@ package handlers import ( "context" + "errors" "fmt" "io" "log" @@ -12,6 +13,7 @@ import ( "net/http/httputil" "os" "strings" + "sync" "time" fhttputil "github.com/openfaas/faas-provider/httputil" @@ -100,9 +102,6 @@ func forwardRequest(w http.ResponseWriter, } upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) - if upstreamReq.Body != nil { - defer upstreamReq.Body.Close() - } if serviceAuthInjector != nil { serviceAuthInjector.Inject(upstreamReq) @@ -113,9 +112,8 @@ func forwardRequest(w http.ResponseWriter, } if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") { - ww := fhttputil.NewHttpWriteInterceptor(w) - reverseProxy.ServeHTTP(ww, upstreamReq) - return ww.Status(), nil + + return handleEventStream(w, r, reverseProxy, upstreamReq, timeout) } ctx, cancel := context.WithTimeout(r.Context(), timeout) @@ -143,6 +141,36 @@ func forwardRequest(w http.ResponseWriter, return res.StatusCode, nil } +func handleEventStream(w http.ResponseWriter, r *http.Request, reverseProxy *httputil.ReverseProxy, upstreamReq *http.Request, timeout time.Duration) (int, error) { + ww := fhttputil.NewHttpWriteInterceptor(w) + + ctx, cancel := context.WithTimeoutCause(r.Context(), timeout, http.ErrHandlerTimeout) + defer cancel() + + r = r.WithContext(ctx) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer func() { + wg.Done() + if r := recover(); r != nil { + if errors.Is(r.(error), http.ErrAbortHandler) { + log.Printf("Aborted [%s] for: %s", upstreamReq.Method, upstreamReq.URL.Path) + + } else { + log.Printf("Recovered from panic in reverseproxy: %v", r) + } + } + }() + + reverseProxy.ServeHTTP(ww, r) + }() + + wg.Wait() + + return ww.Status(), nil +} + func copyHeaders(destination http.Header, source *http.Header) { for k, v := range *source { vClone := make([]string, len(v)) @@ -176,12 +204,22 @@ var hopHeaders = []string{ } func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy { + return &httputil.ReverseProxy{ - ErrorLog: log.New(io.Discard, "proxy:", 0), - Transport: http.DefaultClient.Transport, + + ErrorLog: log.New(io.Discard, "proxy:", 0), ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { }, Director: func(r *http.Request) { + + baseURL := baseURLResolver.Resolve(r) + baseURLu, _ := r.URL.Parse(baseURL) + + requestURL := urlPathTransformer.Transform(r) + + r.URL.Scheme = "http" + r.URL.Path = requestURL + r.URL.Host = baseURLu.Host }, } }