Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set the connection close header if we have a body to read #116

Merged
merged 6 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions http/body.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package http

import "io"

// bodyWrapper wraps an io.Reader and calls onEOF whenever the Read function returns io.EOF.
// This was designed for wrapping the request body, so we can know whether it was closed.
type bodyWrapper struct {
io.ReadCloser
onEOF func()
}

func (bw bodyWrapper) Read(data []byte) (int, error) {
n, err := bw.ReadCloser.Read(data)
if err == io.EOF {
bw.onEOF()
}

return n, err
}
21 changes: 20 additions & 1 deletion http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"runtime/debug"
"strings"
"sync"
"time"

cmds "github.com/ipfs/go-ipfs-cmds"
Expand Down Expand Up @@ -109,6 +110,24 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// If we have a request body, make sure the preamble
// knows that it should close the body if it wants to
// write before completing reading.
// FIXME: https://github.com/ipfs/go-ipfs/issues/5168
// FIXME: https://github.com/golang/go/issues/15527
var bodyEOFChan chan struct{}
if r.Body != http.NoBody {
bodyEOFChan = make(chan struct{})
var once sync.Once
bw := bodyWrapper{
ReadCloser: r.Body,
onEOF: func() {
once.Do(func() { close(bodyEOFChan) })
},
}
r.Body = bw
}
keks marked this conversation as resolved.
Show resolved Hide resolved

req, err := parseRequest(ctx, r, h.root)
if err != nil {
if err == ErrNotFound {
Expand Down Expand Up @@ -145,7 +164,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}()
}

re, err := NewResponseEmitter(w, r.Method, req)
re, err := NewResponseEmitter(w, r.Method, req, withRequestBodyEOFChan(bodyEOFChan))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
Expand Down
26 changes: 26 additions & 0 deletions http/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http/httptest"
"runtime"

Expand Down Expand Up @@ -200,6 +201,31 @@ var (
},
},

"echo": &cmds.Command{
Arguments: []cmdkit.Argument{
cmdkit.FileArg("file", true, false, "a file"),
},
Type: "",
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
err := re.Emit("i received:")
if err != nil {
return err
}

f, err := req.Files.NextFile()
if err != nil {
return err
}

data, err := ioutil.ReadAll(f)
if err != nil {
return err
}

return re.Emit(string(data))
},
},

"version": &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Show ipfs version information.",
Expand Down
117 changes: 100 additions & 17 deletions http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
"testing"

"github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs-files"
)

func TestHTTP(t *testing.T) {
type testcase struct {
path []string
v interface{}
vs []interface{}
file files.File
r string
err error
sendErr error
wait bool
close bool
}

tcs := []testcase{
Expand Down Expand Up @@ -56,6 +60,23 @@ func TestHTTP(t *testing.T) {
path: []string{"reader"},
r: "the reader call returns a reader.",
},

{
path: []string{"echo"},
file: files.NewSliceFile(
"",
"",
[]files.File{
files.NewReaderFile(
"stdin",
"/dev/stdin",
readCloser{
Reader: bytes.NewBufferString("This is the body of the request!"),
Closer: nopCloser{},
}, nil)}),
vs: []interface{}{"i received:", "This is the body of the request!"},
close: true,
},
}

mkTest := func(tc testcase) func(*testing.T) {
Expand All @@ -67,6 +88,10 @@ func TestHTTP(t *testing.T) {
t.Fatal(err)
}

if tc.file != nil {
req.Files = tc.file
}

res, err := c.Send(req)
if tc.sendErr != nil {
if err == nil {
Expand All @@ -79,27 +104,55 @@ func TestHTTP(t *testing.T) {

return
} else if err != nil {
t.Fatal("unexpected error:", tc.sendErr)
t.Fatal("unexpected error:", err)
}

v, err := res.Next()
t.Log("v:", v, "err:", err)
if tc.err != nil {
if err == nil {
t.Error("got nil error, expected:", tc.err)
} else if err.Error() != tc.err.Error() {
t.Errorf("got error %q, expected %q", err, tc.err)
if len(tc.vs) > 0 {
for _, tc.v = range tc.vs {
v, err := res.Next()
if err != nil {
t.Error("unexpected error:", err)
}
// TODO find a better way to solve this!
if s, ok := v.(*string); ok {
v = *s
}
t.Log("v:", v, "err:", err)

// if we don't expect a reader
if !reflect.DeepEqual(v, tc.v) {
t.Errorf("expected value to be %v but got: %+v", tc.v, v)
}
}
} else if err != nil {
t.Fatal("unexpected error:", err)
}

// TODO find a better way to solve this!
if s, ok := v.(*string); ok {
v = *s
}
_, err = res.Next()
if tc.err != nil {
if err == nil {
t.Fatal("got nil error, expected:", tc.err)
} else if err.Error() != tc.err.Error() {
t.Fatalf("got error %q, expected %q", err, tc.err)
}
} else if err != io.EOF {
t.Fatal("expected io.EOF error, got:", err)
}
} else if len(tc.r) == 0 {
v, err := res.Next()
// TODO find a better way to solve this!
if s, ok := v.(*string); ok {
v = *s
}

t.Log("v:", v, "err:", err)
if tc.err != nil {
if err == nil {
t.Error("got nil error, expected:", tc.err)
} else if err.Error() != tc.err.Error() {
t.Errorf("got error %q, expected %q", err, tc.err)
}
} else if err != nil {
t.Fatal("unexpected error:", err)
}

if len(tc.r) == 0 {
// if we don't expect a reader
if !reflect.DeepEqual(v, tc.v) {
t.Errorf("expected value to be %v but got: %+v", tc.v, v)
Expand All @@ -116,14 +169,31 @@ func TestHTTP(t *testing.T) {
t.Fatal("expected io.EOF error, got:", err)
}
} else {
v, err := res.Next()
// TODO find a better way to solve this!
if s, ok := v.(*string); ok {
v = *s
}

t.Log("v:", v, "err:", err)
if tc.err != nil {
if err == nil {
t.Error("got nil error, expected:", tc.err)
} else if err.Error() != tc.err.Error() {
t.Errorf("got error %q, expected %q", err, tc.err)
}
} else if err != nil {
t.Fatal("unexpected error:", err)
}

r, ok := v.(io.Reader)
if !ok {
t.Fatalf("expected a %T but got a %T", r, v)
}

var buf bytes.Buffer

_, err := io.Copy(&buf, r)
_, err = io.Copy(&buf, r)
if err != nil {
t.Fatal("unexpected copy error:", err)
}
Expand All @@ -133,6 +203,10 @@ func TestHTTP(t *testing.T) {
}
}

if tc.close && !res.(*Response).res.Close {
t.Error("expected the connection to be closed by the server but it wasn't")
}

wait, ok := getWaitChan(env)
if !ok {
t.Fatal("could not get wait chan")
Expand All @@ -148,3 +222,12 @@ func TestHTTP(t *testing.T) {
t.Run(fmt.Sprintf("%d-%s", i, strings.Join(tc.path, "/")), mkTest(tc))
}
}

type nopCloser struct{}

func (nopCloser) Close() error { return nil }

type readCloser struct {
io.Reader
io.Closer
}
40 changes: 39 additions & 1 deletion http/responseemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
)

// NewResponeEmitter returns a new ResponseEmitter.
func NewResponseEmitter(w http.ResponseWriter, method string, req *cmds.Request) (ResponseEmitter, error) {
func NewResponseEmitter(w http.ResponseWriter, method string, req *cmds.Request, opts ...ResponseEmitterOption) (ResponseEmitter, error) {
encType, enc, err := cmds.GetEncoder(req, w, cmds.JSON)
if err != nil {
return nil, err
Expand All @@ -40,9 +40,29 @@ func NewResponseEmitter(w http.ResponseWriter, method string, req *cmds.Request)
method: method,
req: req,
}

// apply functional options
for _, opt := range opts {
opt(re)
}

return re, nil
}

// ResponseEmitterOption is the type describing options to the NewResponseEmitter function.
type ResponseEmitterOption func(*responseEmitter)

// withRequestBodyEOFChan return a ResponseEmitterOption needed to gracefully handle
// the case where the handler wants to send data and the request data has not been read
// completely yet.
func withRequestBodyEOFChan(ch <-chan struct{}) ResponseEmitterOption {
return func(re *responseEmitter) {
if ch != nil {
re.bodyEOFChan = ch
}
}
}

type ResponseEmitter interface {
cmds.ResponseEmitter
http.Flusher
Expand All @@ -59,6 +79,8 @@ type responseEmitter struct {
length uint64
err *cmdkit.Error

bodyEOFChan <-chan struct{}

streaming bool
closed bool
once sync.Once
Expand Down Expand Up @@ -252,6 +274,22 @@ func (re *responseEmitter) doPreamble(value interface{}) {
// Set up our potential trailer
h.Set("Trailer", StreamErrHeader)

// If we have a request body, make sure we close the body
// if we want to write before completing reading.
// FIXME: https://github.com/ipfs/go-ipfs/issues/5168
// FIXME: https://github.com/golang/go/issues/15527
if re.bodyEOFChan != nil {
select {
case <-re.bodyEOFChan:
// all good, we received an EOF, so the body is read completely.
// we handle the error where it occurs, here we just want to know that we're done
default:
// set connection close header, because we want to write
// even though the body is not yet read completely.
h.Set("Connection", "close")
}
}

switch v := value.(type) {
case *cmdkit.Error:
re.sendErr(v)
Expand Down