From 5aa8f5aec22f1655f58bfcf4bbb3743f1364291c Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Thu, 11 Jan 2024 10:40:15 +0000 Subject: [PATCH] Support streaming responses from functions Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- gateway/Dockerfile | 6 +-- gateway/Makefile | 12 ++++- gateway/go.mod | 4 +- gateway/go.sum | 2 + gateway/handlers/forwarding_proxy.go | 46 ++++++++++++---- .../httputil/write_interceptor.go | 16 +++++- .../faas-provider/types/system_events.go | 52 +++++++++++++++++++ gateway/vendor/modules.txt | 2 +- 8 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 gateway/vendor/github.com/openfaas/faas-provider/types/system_events.go diff --git a/gateway/Dockerfile b/gateway/Dockerfile index 721b0f2be..83d851b1f 100644 --- a/gateway/Dockerfile +++ b/gateway/Dockerfile @@ -1,6 +1,6 @@ FROM --platform=${BUILDPLATFORM:-linux/amd64} ghcr.io/openfaas/license-check:0.4.1 as license-check -FROM --platform=${BUILDPLATFORM:-linux/amd64} golang:1.20 as build +FROM --platform=${BUILDPLATFORM:-linux/amd64} golang:1.21 as build ENV GO111MODULE=on ENV CGO_ENABLED=0 @@ -42,9 +42,9 @@ RUN CGO_ENABLED=${CGO_ENABLED} GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -- -X \"github.com/openfaas/faas/gateway/version.GitCommitSHA=${GIT_COMMIT}\" \ -X \"github.com/openfaas/faas/gateway/version.Version=${VERSION}\" \ -X github.com/openfaas/faas/gateway/types.Arch=${TARGETARCH}" \ - -a -installsuffix cgo -o gateway . + -o gateway . -FROM --platform=${TARGETPLATFORM:-linux/amd64} alpine:3.18.3 as ship +FROM --platform=${TARGETPLATFORM:-linux/amd64} alpine:3.19.0 as ship LABEL org.label-schema.license="MIT" \ org.label-schema.vcs-url="https://github.com/openfaas/faas" \ diff --git a/gateway/Makefile b/gateway/Makefile index 9c9c42674..fa58be2e4 100644 --- a/gateway/Makefile +++ b/gateway/Makefile @@ -1,6 +1,6 @@ export DOCKER_CLI_EXPERIMENTAL=enabled -PLATFORM := "linux/amd64,linux/arm/v7,linux/arm64" +PLATFORM?="linux/amd64,linux/arm/v7,linux/arm64" TAG?=dev SERVER?=ttl.sh @@ -19,6 +19,16 @@ buildx-local: .PHONY: buildx-push buildx-push: + @echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \ + && docker buildx create --use --name=multiarch --node multiarch \ + && docker buildx build \ + --progress=plain \ + --platform linux/amd64 \ + --output "type=image,push=true" \ + --tag $(SERVER)/$(OWNER)/$(NAME):$(TAG) . + +.PHONY: buildx-push-all +buildx-push-all: @echo $(SERVER)/$(OWNER)/$(NAME):$(TAG) \ && docker buildx create --use --name=multiarch --node multiarch \ && docker buildx build \ diff --git a/gateway/go.mod b/gateway/go.mod index dec699ebf..be0d4d180 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -1,11 +1,11 @@ module github.com/openfaas/faas/gateway -go 1.20 +go 1.21 require ( github.com/docker/distribution v2.8.3+incompatible github.com/gorilla/mux v1.8.0 - github.com/openfaas/faas-provider v0.24.4 + github.com/openfaas/faas-provider v0.25.2 github.com/openfaas/nats-queue-worker v0.0.0-20231023101743-fa54e89c9db2 github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_model v0.5.0 diff --git a/gateway/go.sum b/gateway/go.sum index dc24daadf..079d1de7c 100644 --- a/gateway/go.sum +++ b/gateway/go.sum @@ -192,6 +192,8 @@ github.com/openfaas/faas-provider v0.19.1 h1:xH8lTWabfDZwzIvC0u1AO48ghD3BNw6Vo23 github.com/openfaas/faas-provider v0.19.1/go.mod h1:Farrp+9Med8LeK3aoYpqplMP8f5ebTILbCSLg2LPLZk= github.com/openfaas/faas-provider v0.24.4 h1:Zzbkabgd0PoQmnRjy53NbMXjhLaIyoIiwP3qaLkm9rE= github.com/openfaas/faas-provider v0.24.4/go.mod h1:NsETIfEndZn4mn/w/XnBTcDTwKqULCziphLp7KgeRcA= +github.com/openfaas/faas-provider v0.25.2 h1:sAyL96CzAk/YnuXQZiRJcHo7UrcYMaf7RDvKxsQb/2o= +github.com/openfaas/faas-provider v0.25.2/go.mod h1:NsETIfEndZn4mn/w/XnBTcDTwKqULCziphLp7KgeRcA= github.com/openfaas/nats-queue-worker v0.0.0-20230303171817-9dfe6fa61387 h1:D4xbdy309Wdyhlm6PgJqUV/aR77VQQG8UTF+q0ay71c= github.com/openfaas/nats-queue-worker v0.0.0-20230303171817-9dfe6fa61387/go.mod h1:s86POyW6C8S4CALFRhO8ax5sR2uaQUJQ0HaQGvbTpTc= github.com/openfaas/nats-queue-worker v0.0.0-20231023101743-fa54e89c9db2 h1:I8U2kq2h7Wl6pkd4hjRK6P0/o3AcCNdfmNJS5gdgxKU= diff --git a/gateway/handlers/forwarding_proxy.go b/gateway/handlers/forwarding_proxy.go index 3aabd74a0..64c8f516b 100644 --- a/gateway/handlers/forwarding_proxy.go +++ b/gateway/handlers/forwarding_proxy.go @@ -9,9 +9,12 @@ import ( "io" "log" "net/http" + "net/http/httputil" "os" + "strings" "time" + fhttputil "github.com/openfaas/faas-provider/httputil" "github.com/openfaas/faas/gateway/pkg/middleware" "github.com/openfaas/faas/gateway/types" ) @@ -28,7 +31,10 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, writeRequestURI = exists } + reverseProxy := makeRewriteProxy(baseURLResolver, urlPathTransformer) + return func(w http.ResponseWriter, r *http.Request) { + baseURL := baseURLResolver.Resolve(r) originalURL := r.URL.String() requestURL := urlPathTransformer.Transform(r) @@ -39,13 +45,13 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, start := time.Now() - statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector) - - seconds := time.Since(start) + statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout, writeRequestURI, serviceAuthInjector, reverseProxy) if err != nil { log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error()) } + seconds := time.Since(start) + for _, notifier := range notifiers { notifier.Notify(r.Method, requestURL, originalURL, statusCode, "completed", seconds) } @@ -86,7 +92,12 @@ func forwardRequest(w http.ResponseWriter, requestURL string, timeout time.Duration, writeRequestURI bool, - serviceAuthInjector middleware.AuthInjector) (int, error) { + serviceAuthInjector middleware.AuthInjector, + reverseProxy *httputil.ReverseProxy) (int, error) { + + if r.Body != nil { + defer r.Body.Close() + } upstreamReq := buildUpstreamRequest(r, baseURL, requestURL) if upstreamReq.Body != nil { @@ -101,14 +112,20 @@ func forwardRequest(w http.ResponseWriter, log.Printf("forwardRequest: %s %s\n", upstreamReq.Host, upstreamReq.URL.String()) } + if strings.HasPrefix(r.Header.Get("Accept"), "text/event-stream") { + ww := fhttputil.NewHttpWriteInterceptor(w) + reverseProxy.ServeHTTP(ww, upstreamReq) + return ww.Status(), nil + } + ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() - res, resErr := proxyClient.Do(upstreamReq.WithContext(ctx)) - if resErr != nil { + res, err := proxyClient.Do(upstreamReq.WithContext(ctx)) + if err != nil { badStatus := http.StatusBadGateway w.WriteHeader(badStatus) - return badStatus, resErr + return badStatus, err } if res.Body != nil { @@ -117,12 +134,10 @@ func forwardRequest(w http.ResponseWriter, copyHeaders(w.Header(), &res.Header) - // Write status code w.WriteHeader(res.StatusCode) if res.Body != nil { - // Copy the body over - io.CopyBuffer(w, res.Body, nil) + io.Copy(w, res.Body) } return res.StatusCode, nil @@ -159,3 +174,14 @@ var hopHeaders = []string{ "Transfer-Encoding", "Upgrade", } + +func makeRewriteProxy(baseURLResolver middleware.BaseURLResolver, urlPathTransformer middleware.URLPathTransformer) *httputil.ReverseProxy { + return &httputil.ReverseProxy{ + ErrorLog: log.New(io.Discard, "proxy:", 0), + Transport: http.DefaultClient.Transport, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + }, + Director: func(r *http.Request) { + }, + } +} diff --git a/gateway/vendor/github.com/openfaas/faas-provider/httputil/write_interceptor.go b/gateway/vendor/github.com/openfaas/faas-provider/httputil/write_interceptor.go index 9806e61f6..e2eb6bf26 100644 --- a/gateway/vendor/github.com/openfaas/faas-provider/httputil/write_interceptor.go +++ b/gateway/vendor/github.com/openfaas/faas-provider/httputil/write_interceptor.go @@ -7,12 +7,17 @@ import ( ) func NewHttpWriteInterceptor(w http.ResponseWriter) *HttpWriteInterceptor { - return &HttpWriteInterceptor{w, 0} + return &HttpWriteInterceptor{ + ResponseWriter: w, + statusCode: 0, + bytesWritten: 0, + } } type HttpWriteInterceptor struct { http.ResponseWriter - statusCode int + statusCode int + bytesWritten int64 } func (c *HttpWriteInterceptor) Status() int { @@ -22,6 +27,10 @@ func (c *HttpWriteInterceptor) Status() int { return c.statusCode } +func (c *HttpWriteInterceptor) BytesWritten() int64 { + return c.bytesWritten +} + func (c *HttpWriteInterceptor) Header() http.Header { return c.ResponseWriter.Header() } @@ -30,6 +39,9 @@ func (c *HttpWriteInterceptor) Write(data []byte) (int, error) { if c.statusCode == 0 { c.WriteHeader(http.StatusOK) } + + c.bytesWritten += int64(len(data)) + return c.ResponseWriter.Write(data) } diff --git a/gateway/vendor/github.com/openfaas/faas-provider/types/system_events.go b/gateway/vendor/github.com/openfaas/faas-provider/types/system_events.go new file mode 100644 index 000000000..b0510df74 --- /dev/null +++ b/gateway/vendor/github.com/openfaas/faas-provider/types/system_events.go @@ -0,0 +1,52 @@ +package types + +import "time" + +const ( + TypeFunctionUsage = "function_usage" + TypeAPIAccess = "api_access" +) + +type Event interface { + EventType() string +} + +type FunctionUsageEvent struct { + Namespace string `json:"namespace"` + FunctionName string `json:"function_name"` + Started time.Time `json:"started"` + Duration time.Duration `json:"duration"` + MemoryBytes int64 `json:"memory_bytes"` +} + +func (e FunctionUsageEvent) EventType() string { + return TypeFunctionUsage +} + +type APIAccessEvent struct { + Actor *Actor `json:"actor,omitempty"` + Path string `json:"path"` + Method string `json:"method"` + Actions []string `json:"actions"` + ResponseCode int `json:"response_code"` + CustomMessage string `json:"custom_message,omitempty"` + Namespace string `json:"namespace,omitempty"` + Time time.Time `json:"time"` +} + +func (e APIAccessEvent) EventType() string { + return TypeAPIAccess +} + +// Actor is the user that triggered an event. +// Get from OIDC claims, we can add any of the default OIDC profile or email claim fields if desired. +type Actor struct { + // OIDC subject, a unique identifier of the user. + Sub string `json:"sub"` + // Full name of the subject, can be the name of a user of OpenFaaS component. + Name string `json:"name,omitempty"` + // OpenFaaS issuer + Issuer string `json:"issuer,omitempty"` + // Federated issuer + FedIssuer string `json:"fed_issuer,omitempty"` +} diff --git a/gateway/vendor/modules.txt b/gateway/vendor/modules.txt index 49cbe51e3..591bb63f0 100644 --- a/gateway/vendor/modules.txt +++ b/gateway/vendor/modules.txt @@ -40,7 +40,7 @@ github.com/nats-io/nuid ## explicit; go 1.14 github.com/nats-io/stan.go github.com/nats-io/stan.go/pb -# github.com/openfaas/faas-provider v0.24.4 +# github.com/openfaas/faas-provider v0.25.2 ## explicit; go 1.20 github.com/openfaas/faas-provider/auth github.com/openfaas/faas-provider/httputil