Skip to content

Commit

Permalink
Fixes for request body passing into text streaming proxy
Browse files Browse the repository at this point in the history
In the previous version, whilst responses were streamed
correctly, the request body was not being received by
the function. This has been tested, along with adding
a forced timeout according to upstream_timeout, which
was a miss in the original commit.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Jan 11, 2024
1 parent 4679f27 commit 5c13f1f
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions gateway/handlers/forwarding_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package handlers

import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"os"
"strings"
"sync"
"time"

fhttputil "github.com/openfaas/faas-provider/httputil"
Expand Down Expand Up @@ -100,9 +102,6 @@ func forwardRequest(w http.ResponseWriter,
}

upstreamReq := buildUpstreamRequest(r, baseURL, requestURL)
if upstreamReq.Body != nil {
defer upstreamReq.Body.Close()
}

if serviceAuthInjector != nil {
serviceAuthInjector.Inject(upstreamReq)
Expand All @@ -113,9 +112,8 @@ func forwardRequest(w http.ResponseWriter,
}

if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") {
ww := fhttputil.NewHttpWriteInterceptor(w)
reverseProxy.ServeHTTP(ww, upstreamReq)
return ww.Status(), nil

return handleEventStream(w, r, reverseProxy, upstreamReq, timeout)
}

ctx, cancel := context.WithTimeout(r.Context(), timeout)
Expand Down Expand Up @@ -143,6 +141,36 @@ func forwardRequest(w http.ResponseWriter,
return res.StatusCode, nil
}

func handleEventStream(w http.ResponseWriter, r *http.Request, reverseProxy *httputil.ReverseProxy, upstreamReq *http.Request, timeout time.Duration) (int, error) {
ww := fhttputil.NewHttpWriteInterceptor(w)

ctx, cancel := context.WithTimeoutCause(r.Context(), timeout, http.ErrHandlerTimeout)
defer cancel()

r = r.WithContext(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
wg.Done()
if r := recover(); r != nil {
if errors.Is(r.(error), http.ErrAbortHandler) {
log.Printf("Aborted [%s] for: %s", upstreamReq.Method, upstreamReq.URL.Path)

} else {
log.Printf("Recovered from panic in reverseproxy: %v", r)
}
}
}()

reverseProxy.ServeHTTP(ww, r)
}()

wg.Wait()

return ww.Status(), nil
}

func copyHeaders(destination http.Header, source *http.Header) {
for k, v := range *source {
vClone := make([]string, len(v))
Expand Down Expand Up @@ -176,12 +204,22 @@ var hopHeaders = []string{
}

func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy {

return &httputil.ReverseProxy{
ErrorLog: log.New(io.Discard, "proxy:", 0),
Transport: http.DefaultClient.Transport,

ErrorLog: log.New(io.Discard, "proxy:", 0),
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
},
Director: func(r *http.Request) {

baseURL := baseURLResolver.Resolve(r)
baseURLu, _ := r.URL.Parse(baseURL)

requestURL := urlPathTransformer.Transform(r)

r.URL.Scheme = "http"
r.URL.Path = requestURL
r.URL.Host = baseURLu.Host
},
}
}

0 comments on commit 5c13f1f

Please sign in to comment.