Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sse support #527

Merged
merged 6 commits into from
Jul 19, 2016
Merged

Add sse support #527

merged 6 commits into from
Jul 19, 2016

Conversation

emilevauge
Copy link
Member

@emilevauge emilevauge commented Jul 13, 2016

This PR adds SSE support.
Fixes #503
Fixes #524

@JayH5
Copy link
Contributor

JayH5 commented Jul 15, 2016

Hey @emilevauge thanks for all your work on Traefik. It's been really useful for us.

Looking at containous/oxy@98f8b7d it looks like your approach here is to treat the text/event-stream content-type as a special case and then reading each event, writing each event to the http.ResponseWriter and then flushing it.

First of all, it looks like you are splitting events on \n\n which is a problem because the Server-Sent Events spec also allows \r or \r\n to delimit lines, not just \n.

But what I really want to ask is if maybe there is a more general solution to this problem?

The underlying problem seems to be that Go's http.ResponseWriter buffers responses by default which breaks streaming protocols (as far as I understand -- I am very far from being a golang expert). Wouldn't it make more sense to have an option to disable that buffering behaviour and immediately flush all request body content to the response? Let the client worry about parsing the SSE content itself, not Traefik.

As far as I can see, there are 2 ways that buffering could be disabled:

  1. An option in the frontend, say, something like a flush boolean flag. It seems like this could be passed through to oxy/forward/fwd the same way passHostHeaders is.
  2. A special header to indicate that a response should not be buffered. I'm not sure about other software, but Nginx has the X-Accel-Buffering header that can be used to switch off buffering. Having a special header for this means that a) it can be done on a per-response basis which is useful, and b) it can be done for any streaming content type, not just a specific one.

Anyway, just my thoughts. Thanks again for the great project.

@emilevauge
Copy link
Member Author

emilevauge commented Jul 15, 2016

Hi @JayH5
Oops, I read the SSE specs to quickly...
I absolutely agree with you on using a more generic way of dealing with this issue. That's what I wanted to do at the beginning, but this is not that simple.

  1. In the std lib, in the reverse proxy implementation, https://golang.org/src/net/http/httputil/reverseproxy.go, they give the possibility to flush the buffer at each FlushInterval. This helps but you have to configure the interval accordingly to your needs...
    Here, the issue is not in http.ResponseWriter, but in how we copy the bytes in fwd.serveHTTP in oxy. We could also flush the response at each byte received (without waiting for a \n\n or \r\n) but I have to test the performance of that solution.
  2. Once the technical solution will be found on how to NOT buffer responses, we will have to decide how we activate this. What I like in using text/event-stream is that it's automatic, no need for an option to be set by the user. But we could add an option to force enable this in any case, in frontend section maybe?

Any help on both 1/ and 2/ is welcomed :)

/cc @containous/traefik

@JayH5
Copy link
Contributor

JayH5 commented Jul 15, 2016

@emilevauge: Thanks for the response.

I wasn't aware of the golang stdlib reverse proxy (again, really not a golang expert). Seems strange to me to have a timer-based flushing system 😕. But there's some code in there that nearly gets at what I think could work.

I wasn't suggesting writing and flushing data byte-by-byte. But it should be possible to read as much as is available into a buffer and then write and flush.

From what I can see, oxy uses io.Copy to copy the response body, which in turn calls io.CopyBuffer. That method copies over (up to) 32KB chunks of data from the Reader to the Writer (if you don't give it your own buffer).

It looks like io.Reader has the behaviour we would want. From the docs: "If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.". So hopefully we can read as much as is available into a buffer then write and flush.

The simplest way I can see to do this would be to just flush the Writer after each buffer is written to it. Copying code from the stdlib reverse proxy it may be possible to do something like this, where the call oxy makes to io.Copy is replaced with copyBody?

func (f *httpForwarder) copyBody(dst io.Writer, src io.Reader) {
    if f.flush { // Or check for header
        if wf, ok := dst.(writeFlusher); ok {
            fw := &flushWriter{
                dst: wf,
            }
            dst = fw
        }
    }

    return io.Copy(dst, src)
}

type writeFlusher interface {
    io.Writer
    http.Flusher
}

type flushWriter struct {
    dst writeFlusher
}

func (m *flushWriter) Write(p []byte) (int, error) {
    written, err := m.dst.Write(p)
    m.dst.Flush()
    return written, err
}

(This is the most golang I've ever written so there are probably issues with the above).

@emilevauge
Copy link
Member Author

emilevauge commented Jul 17, 2016

@JayH5: I made some changes in oxy to stream the body without analyzing it: containous/oxy@d12fc74
I just modified the code from copyBuffer: https://golang.org/src/io/io.go?s=12575:12653#L366 to flush at each iteration.
I will add later (in 1.1) an option to force streaming in frontend but in 1.0.1, I will rely on Content-Type header.
WDYT?

@emilevauge emilevauge force-pushed the add-sse-support branch 2 times, most recently from dd005fa to e9c12a7 Compare July 17, 2016 13:59
serverEntryPoint.httpServer.BlockingClose()
for serverEntryPointName, serverEntryPoint := range server.serverEntryPoints {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(server.globalConfiguration.GraceTimeOut)*time.Second)
go func(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, why passing ctx if you don't use it ? 👼

@vdemeester
Copy link
Contributor

I will add later (in 1.1) an option to force streaming in frontend but in 1.0.1, I will rely on Content-Type header.

The less changes for 1.0.1, the better 😉

@JayH5
Copy link
Contributor

JayH5 commented Jul 18, 2016

@emilevauge: I've done a bit more digging in the go source code.. and .. it's kind of tricky. I don't think containous/oxy@d12fc74 is going to work. http.ResponseWriter will usually implement io.ReaderFrom, as it does in the golang source here, so the flush call you added will never be reached. It's doubly complicated because when ReadFrom is called on the http.ResponseWriter, it calls io.CopyBuffer anyway while hiding it's io.ReaderFrom interface so that a regular copy is done 😧

I would still suggest my route of wrapping all calls to Write() with a following Flush(). This whole thing really needs tests I guess. I could maybe make a PR to containous/oxy?

@emilevauge
Copy link
Member Author

emilevauge commented Jul 18, 2016

@JayH5 :

I don't think containous/oxy@d12fc74 is going to work

The previous solution was working well in my tests ;) But I agree we could make it better using a wrapper.

Here is what I propose. I created a wrapper responseFlusher. This wrapper must implement http.Hijacker, http.Flusher and http.CloseNotifier to stay compliant with all possible response writers:

package forward

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
)

var (
    _ http.Hijacker      = &responseFlusher{}
    _ http.Flusher       = &responseFlusher{}
    _ http.CloseNotifier = &responseFlusher{}
)

type responseFlusher struct {
    http.ResponseWriter
    flush bool
}

func newResponseFlusher(rw http.ResponseWriter, flush bool) *responseFlusher {
    return &responseFlusher{
        ResponseWriter: rw,
        flush:          flush,
    }
}

func (wf *responseFlusher) Write(p []byte) (int, error) {
    written, err := wf.ResponseWriter.Write(p)
    if wf.flush {
        wf.Flush()
    }
    return written, err
}

func (wf *responseFlusher) Hijack() (net.Conn, *bufio.ReadWriter, error) {
    hijacker, ok := wf.ResponseWriter.(http.Hijacker)
    if !ok {
        return nil, nil, fmt.Errorf("the ResponseWriter doesn't support the Hijacker interface")
    }
    return hijacker.Hijack()
}

func (wf *responseFlusher) CloseNotify() <-chan bool {
    return wf.ResponseWriter.(http.CloseNotifier).CloseNotify()
}

func (wf *responseFlusher) Flush() {
    flusher, ok := wf.ResponseWriter.(http.Flusher)
    if ok {
        flusher.Flush()
    }
}

The httpForwarder.serverHTTP then calls it:

    stream := "text/event-stream" == response.Header.Get(ContentType) || f.streamResponse
    written, err := io.Copy(newResponseFlusher(w, stream), response.Body)

I will add a unit test in oxy for that use case.

BUT, some work will have to be done on this project because the existing tests are not all passing :'( I forked it since the original project is not really maintained anymore but I didn't had time to correct each existing test yet. If you can give some help on this, it would be really appreciated ;)

WDYT?

@emilevauge
Copy link
Member Author

I updated containous/oxy containous/oxy@ab7796d.

@JayH5
Copy link
Contributor

JayH5 commented Jul 18, 2016

@emilevauge sorry I meant to reply but then got distracted trying to get the oxy tests to work. I tried but I think I'm going to give up.. all those chunked-encoding tests are broken and they're not straightforward to fix. It might almost be easier just to pull out the bits of oxy that Traefik uses and start from scratch.

The flushing code is looking better. There are a couple of places where I think it could be cleaned up a bit but otherwise it LGTM. Thanks.

@emilevauge
Copy link
Member Author

@JayH5 thanks for your help in this :)

@vdemeester
Copy link
Contributor

LGTM 👼

Signed-off-by: Emile Vauge <[email protected]>
@emilevauge emilevauge merged commit be8ebdb into v1.0 Jul 19, 2016
@vdemeester vdemeester deleted the add-sse-support branch July 19, 2016 09:59
@Russell-IO
Copy link
Contributor

LGTM

@ashmckenzie
Copy link

❤️

@JayH5
Copy link
Contributor

JayH5 commented Jul 20, 2016

@emilevauge: in practice, I found the check for the Content-Type value to be a little too simplistic. I made a PR to fix that: containous/oxy#9

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants