Skip to content

Commit

Permalink
http2: Send WindowUpdates when remaining bytes are below a threshold
Browse files Browse the repository at this point in the history
This rolls-forward CL 150197 with an added fix for
TestProtocolErrorAfterGoAway.

Rather than send a WindowUpdate on every chunk of bytes read, allow them
to collect until we go past half the configured window size. Once the
threshold is reached, send a single WindowUpdate to reset the amount
back to the maximum amount configured.

Fixes golang/go#28732

Change-Id: Icee93dedf68d6166aa6fe0c3845d717e66586e73
Reviewed-on: https://go-review.googlesource.com/c/net/+/432038
Run-TryBot: Damien Neil <[email protected]>
Auto-Submit: Damien Neil <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
Reviewed-by: Tatiana Bradley <[email protected]>
  • Loading branch information
neild authored and gopherbot committed Sep 20, 2022
1 parent bf014ff commit 2e0b12c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 38 deletions.
33 changes: 23 additions & 10 deletions http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,7 @@ func (sc *serverConn) serve() {

// Each connection starts with initialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
sc.sendWindowUpdate(nil, int(diff))
}
sc.sendWindowUpdate(nil)

if err := sc.readPreface(); err != nil {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
Expand Down Expand Up @@ -1588,7 +1586,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
sc.sendWindowUpdate(nil, p.Len())
sc.sendWindowUpdate(nil)

p.CloseWithError(err)
}
Expand Down Expand Up @@ -1736,7 +1734,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
// sendWindowUpdate, which also schedules sending the
// frames.
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
sc.sendWindowUpdate(nil) // conn-level

if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
Expand All @@ -1754,7 +1752,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
}
sc.inflow.take(int32(f.Length))
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
sc.sendWindowUpdate(nil) // conn-level

st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
Expand All @@ -1772,7 +1770,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
if len(data) > 0 {
wrote, err := st.body.Write(data)
if err != nil {
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
}
if wrote != len(data) {
Expand Down Expand Up @@ -2324,17 +2322,32 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {

func (sc *serverConn) noteBodyRead(st *stream, n int) {
sc.serveG.check()
sc.sendWindowUpdate(nil, n) // conn-level
sc.sendWindowUpdate(nil) // conn-level
if st.state != stateHalfClosedRemote && st.state != stateClosed {
// Don't send this WINDOW_UPDATE if the stream is closed
// remotely.
sc.sendWindowUpdate(st, n)
sc.sendWindowUpdate(st)
}
}

// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
func (sc *serverConn) sendWindowUpdate(st *stream) {
sc.serveG.check()

var n int32
if st == nil {
if avail, windowSize := sc.inflow.available(), sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
return
} else {
n = windowSize - avail
}
} else {
if avail, windowSize := st.inflow.available(), sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
return
} else {
n = windowSize - avail
}
}
// "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
// A Go Read call on 64-bit machines could in theory read
Expand Down
74 changes: 46 additions & 28 deletions http2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,6 @@ func TestServer_Request_Post_Body_ContentLength_TooSmall(t *testing.T) {
EndHeaders: true,
})
st.writeData(1, true, []byte("12345"))
// Return flow control bytes back, since the data handler closed
// the stream.
st.wantWindowUpdate(0, 5)
})
}

Expand Down Expand Up @@ -1247,6 +1244,41 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {

st.greet()

st.writeHeaders(HeadersFrameParam{
StreamID: 1, // clients send odd numbers
BlockFragment: st.encodeHeader(":method", "POST"),
EndStream: false, // data coming
EndHeaders: true,
})
updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
st.writeData(1, false, bytes.Repeat([]byte("b"), 10))
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
puppet.do(readBodyHandler(t, strings.Repeat("b", 10)))

st.wantWindowUpdate(0, uint32(updateSize))
st.wantWindowUpdate(1, uint32(updateSize))

st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
puppet.do(readBodyHandler(t, strings.Repeat("c", 15)))

st.wantWindowUpdate(0, uint32(updateSize+5))
}

func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
puppet := newHandlerPuppet()
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
puppet.act(w, r)
}, func(s *Server) {
s.MaxUploadBufferPerStream = 6
})
defer st.Close()
defer puppet.done()

st.greet()

st.writeHeaders(HeadersFrameParam{
StreamID: 1, // clients send odd numbers
BlockFragment: st.encodeHeader(":method", "POST"),
Expand All @@ -1255,18 +1287,14 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {
})
st.writeData(1, false, []byte("abcdef"))
puppet.do(readBodyHandler(t, "abc"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)
puppet.do(readBodyHandler(t, "d"))
puppet.do(readBodyHandler(t, "ef"))

puppet.do(readBodyHandler(t, "def"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)
st.wantWindowUpdate(1, 6)

st.writeData(1, true, []byte("ghijkl")) // END_STREAM here
puppet.do(readBodyHandler(t, "ghi"))
puppet.do(readBodyHandler(t, "jkl"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM
}

// the version of the TestServer_Handler_Sends_WindowUpdate with padding.
Expand Down Expand Up @@ -1295,12 +1323,7 @@ func TestServer_Handler_Sends_WindowUpdate_Padding(t *testing.T) {
st.wantWindowUpdate(1, 5)

puppet.do(readBodyHandler(t, "abc"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)

puppet.do(readBodyHandler(t, "def"))
st.wantWindowUpdate(0, 3)
st.wantWindowUpdate(1, 3)
}

func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) {
Expand Down Expand Up @@ -2296,8 +2319,6 @@ func TestServer_Response_Automatic100Continue(t *testing.T) {
// gigantic and/or sensitive "foo" payload now.
st.writeData(1, true, []byte(msg))

st.wantWindowUpdate(0, uint32(len(msg)))

hf = st.wantHeaders()
if hf.StreamEnded() {
t.Fatal("expected data to follow")
Expand Down Expand Up @@ -2485,9 +2506,6 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
// it did before.
st.writeData(1, true, []byte("foo"))

// Get our flow control bytes back, since the handler didn't get them.
st.wantWindowUpdate(0, uint32(len("foo")))

// Sent after a peer sends data anyway (admittedly the
// previous RST_STREAM might've still been in-flight),
// but they'll get the more friendly 'cancel' code
Expand Down Expand Up @@ -3930,7 +3948,6 @@ func TestServer_Rejects_TooSmall(t *testing.T) {
EndHeaders: true,
})
st.writeData(1, true, []byte("12345"))
st.wantWindowUpdate(0, 5)
st.wantRSTStream(1, ErrCodeProtocol)
})
}
Expand Down Expand Up @@ -4223,7 +4240,6 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
st.writeData(1, false, []byte(content[5:]))
blockCh <- true

increments := len(content)
for {
f, err := st.readFrame()
if err == io.EOF {
Expand All @@ -4232,10 +4248,12 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 {
break
}
if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 {
increments -= int(wu.Increment)
if increments == 0 {
break
if e, a := uint32(3), wu.Increment; e != a {
t.Errorf("Increment=%d, want %d", a, e)
}
}
}
Expand Down Expand Up @@ -4378,22 +4396,22 @@ func TestServerSendsEarlyHints(t *testing.T) {

func TestProtocolErrorAfterGoAway(t *testing.T) {
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.(http.Flusher).Flush()
io.Copy(io.Discard, r.Body)
})
defer st.Close()

st.greet()
content := "some content"
st.writeHeaders(HeadersFrameParam{
StreamID: 1,
BlockFragment: st.encodeHeader(
":method", "POST",
"content-length", strconv.Itoa(len(content)),
"content-length", "1",
),
EndStream: false,
EndHeaders: true,
})
st.writeData(1, false, []byte(content[:5]))

_, err := st.readFrame()
if err != nil {
Expand Down

0 comments on commit 2e0b12c

Please sign in to comment.