Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] httpbp middleware doesn't flush chunked responses #573

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions httpbp/middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,10 @@ func recordStatusCode(counters counterGenerator) Middleware {
return func(name string, next HandlerFunc) HandlerFunc {
counter := counters.Counter("baseplate.http." + name + ".response")
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) (err error) {
wrapped := &statusCodeRecorder{ResponseWriter: w}
rec := &statusCodeRecorder{ResponseWriter: w}
wrapped := allowFlushHijack(w, rec)
defer func() {
code := wrapped.getCode(err)
code := rec.getCode(err)
counter.With("status", statusCodeFamily(code)).Add(1)
}()

Expand Down Expand Up @@ -453,9 +454,11 @@ func PrometheusServerMetrics(_ string) Middleware {
}
serverActiveRequests.With(activeRequestLabels).Inc()

wrapped := &responseRecorder{ResponseWriter: w}
rec := &responseRecorder{ResponseWriter: w}
wrapped := allowFlushHijack(w, rec)

defer func() {
code := errorCodeForMetrics(wrapped.responseCode, err)
code := errorCodeForMetrics(rec.responseCode, err)
success := isRequestSuccessful(code, err)

labels := prometheus.Labels{
Expand All @@ -465,7 +468,7 @@ func PrometheusServerMetrics(_ string) Middleware {
}
serverLatency.With(labels).Observe(time.Since(start).Seconds())
serverRequestSize.With(labels).Observe(float64(r.ContentLength))
serverResponseSize.With(labels).Observe(float64(wrapped.bytesWritten))
serverResponseSize.With(labels).Observe(float64(rec.bytesWritten))

totalRequestLabels := prometheus.Labels{
methodLabel: method,
Expand Down Expand Up @@ -510,3 +513,49 @@ func (rr *responseRecorder) WriteHeader(code int) {
rr.ResponseWriter.WriteHeader(code)
rr.responseCode = code
}

type wrappedHijacker struct {
http.ResponseWriter
http.Hijacker
}

type wrappedFlusher struct {
http.ResponseWriter
http.Flusher
}

type wrappedFlushHijacker struct {
http.ResponseWriter
http.Flusher
http.Hijacker
}

func allowFlushHijack(original, rw http.ResponseWriter) http.ResponseWriter {
flusher, isFlusher := original.(http.Flusher)
hijacker, isHijacker := original.(http.Hijacker)
switch {
case isFlusher && isHijacker:
return &wrappedFlushHijacker{rw, flusher, hijacker}
case isFlusher:
return allowFlush(original, rw)
case isHijacker:
return allowHijack(original, rw)
default:
return rw
}

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔕 it's manage-able as we currently have 2 optional interfaces, it will become a nightmare if there's a 3rd one added in the future. but I also can't find a better way to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this approach is definitely verbose but I can't think of a better one. The only other interface on http that a http.ResponseWriter could implement would be http.Pusher. I don't think it's worth supporting it now as I don't think we have any use case driving it, but that would be a 3rd interface that may come up in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a quietly known issue amongst the ecosystem, and the approach in the generic case seems to be code generation.

https://www.doxsey.net/blog/fixing-interface-erasure-in-go/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the article! I updated this PR to use the approach outlined in the article and included http.Pusher. Because we are dealing with 3 interfaces, I adapted their code generator and then massaged the output to make it more human readable. Basically leveraging the jump table idea, without having to adapt a full code generator.

@kylelemons let me know if you think is approach is better or if we should revert back to my previous one.


func allowFlush(original, rw http.ResponseWriter) http.ResponseWriter {
if flusher, isFlusher := original.(http.Flusher); isFlusher {
return &wrappedFlusher{rw, flusher}
}
return rw
}

func allowHijack(original, rw http.ResponseWriter) http.ResponseWriter {
if hijacker, isHijacker := original.(http.Hijacker); isHijacker {
return &wrappedHijacker{rw, hijacker}
}
return rw
}
88 changes: 88 additions & 0 deletions httpbp/middlewares_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package httpbp_test

import (
"bufio"
"context"
"encoding/json"
"errors"
"net"
"net/http"
"net/http/httptest"
"sort"
"strings"
"testing"

"github.com/reddit/baseplate.go"
"github.com/reddit/baseplate.go/ecinterface"
"github.com/reddit/baseplate.go/httpbp"
"github.com/reddit/baseplate.go/log"
Expand Down Expand Up @@ -323,3 +326,88 @@ func TestSupportedMethods(t *testing.T) {
)
}
}

func TestMiddlewareResponseWrapping(t *testing.T) {
store := newSecretsStore(t)
defer store.Close()

bp := baseplate.NewTestBaseplate(baseplate.NewTestBaseplateArgs{
Config: baseplate.Config{Addr: ":8080"},
Store: store,
EdgeContextImpl: ecinterface.Mock(),
})

args := httpbp.ServerArgs{
Baseplate: bp,
Middlewares: []httpbp.Middleware{
func(name string, next httpbp.HandlerFunc) httpbp.HandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if flusher, isFlusher := w.(http.Flusher); isFlusher {
flusher.Flush()
}

next(ctx, w, r)
return nil
}
},
func(name string, next httpbp.HandlerFunc) httpbp.HandlerFunc {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if hijacker, isHijacker := w.(http.Hijacker); isHijacker {
hijacker.Hijack()
}

next(ctx, w, r)
return nil
}
},
},
Endpoints: map[httpbp.Pattern]httpbp.Endpoint{
"/test": {
Name: "test",
Methods: []string{http.MethodGet},
Handle: func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
w.Write([]byte("endpoint"))
return nil
},
},
},
}

// register our middleware to the EndpointRegistry
args, err := args.SetupEndpoints()

if err != nil {
t.Fatal(err)
}

// Test the a flushable response
t.Run("flushable-response", func(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/test", nil)
w := httptest.NewRecorder()
args.EndpointRegistry.ServeHTTP(w, r)

if !w.Flushed {
t.Error("expected http response to be flushed")
}
})

t.Run("hijackable-response", func(tt *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/test", nil)
w := &hijackableResponseRecorder{httptest.NewRecorder(), false}
args.EndpointRegistry.ServeHTTP(w, r)

if !w.Hijacked {
t.Error("expected http response to be hijacked")
}
})
adamthesax marked this conversation as resolved.
Show resolved Hide resolved
}

type hijackableResponseRecorder struct {
*httptest.ResponseRecorder
Hijacked bool
}

func (h *hijackableResponseRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) {
h.Hijacked = true
return nil, nil, nil
}