From 7dec3c269aea9d91e44b6e3d610d4ba38ab6c386 Mon Sep 17 00:00:00 2001 From: dswij Date: Mon, 1 Jan 2024 20:58:09 +0800 Subject: [PATCH 1/4] fix: streams awaiting capacity lockout This PR changes the the assign-capacity queue to prioritize streams that are send-ready. This is necessary to prevent a lockout when streams aren't able to proceed while waiting for connection capacity, but there is none. --- src/proto/streams/prioritize.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 3196049a..d3b728c7 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -474,7 +474,13 @@ impl Prioritize { // // In this case, the stream needs to be queued up for when the // connection has more capacity. - self.pending_capacity.push(stream); + if stream.is_send_ready() { + // Prioritize assigning capacity to a send-ready stream + // See https://github.com/hyperium/hyper/issues/3338 + self.pending_capacity.push_front(stream); + } else { + self.pending_capacity.push(stream); + } } // If data is buffered and the stream is send ready, then From f1f99e0cad5a0937c383e250f19b914a131560bc Mon Sep 17 00:00:00 2001 From: dswij Date: Sat, 6 Jan 2024 18:50:01 +0800 Subject: [PATCH 2/4] fix: pending_open streams taking capacity --- src/proto/streams/prioritize.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index d3b728c7..999bb075 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -184,7 +184,15 @@ impl Prioritize { stream.requested_send_capacity = cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; - self.try_assign_capacity(stream); + // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity + // cannot be assigned at the time it is called. + // + // Streams over the max concurrent count will still call `send_data` so we should be + // careful not to put it into `pending_capacity` as it will starve the connection + // capacity for other streams + if !stream.is_pending_open { + self.try_assign_capacity(stream); + } } if frame.is_end_stream() { @@ -474,13 +482,7 @@ impl Prioritize { // // In this case, the stream needs to be queued up for when the // connection has more capacity. - if stream.is_send_ready() { - // Prioritize assigning capacity to a send-ready stream - // See https://github.com/hyperium/hyper/issues/3338 - self.pending_capacity.push_front(stream); - } else { - self.pending_capacity.push(stream); - } + self.pending_capacity.push(stream); } // If data is buffered and the stream is send ready, then @@ -528,6 +530,7 @@ impl Prioritize { loop { if let Some(mut stream) = self.pop_pending_open(store, counts) { self.pending_send.push_front(&mut stream); + self.try_assign_capacity(&mut stream); } match self.pop_frame(buffer, store, max_frame_len, counts) { From e13adae674958d294b617d754baafbebb169f19c Mon Sep 17 00:00:00 2001 From: dswij Date: Sat, 6 Jan 2024 18:50:26 +0800 Subject: [PATCH 3/4] test: requests over max stream starving capacity --- tests/h2-tests/tests/prioritization.rs | 97 +++++++++++++++++++++++++- 1 file changed, 95 insertions(+), 2 deletions(-) diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index 7c268106..11d2c2cc 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -1,5 +1,6 @@ -use futures::future::join; -use futures::{FutureExt, StreamExt}; +use futures::future::{join, select}; +use futures::{pin_mut, FutureExt, StreamExt}; + use h2_support::prelude::*; use h2_support::DEFAULT_WINDOW_SIZE; use std::task::Context; @@ -408,3 +409,95 @@ async fn send_data_receive_window_update() { join(mock, h2).await; } + +#[tokio::test] +async fn stream_count_over_max_stream_limit_does_not_starve_capacity() { + use tokio::sync::oneshot; + + h2_support::trace_init!(); + + let (io, mut srv) = mock::new(); + + let (tx, rx) = oneshot::channel(); + + let srv = async move { + let _ = srv + .assert_client_handshake_with_settings( + frames::settings() + // super tiny server + .max_concurrent_streams(1), + ) + .await; + srv.recv_frame(frames::headers(1).request("POST", "http://example.com/")) + .await; + + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16384])).await; + srv.recv_frame(frames::data(1, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + + // All of these connection capacities should be assigned to stream 3 + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16384)).await; + srv.send_frame(frames::window_update(0, 16383)).await; + + // StreamId(3) should be able to send all of its request with the conn capacity + srv.recv_frame(frames::headers(3).request("POST", "http://example.com/")) + .await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16384])).await; + srv.recv_frame(frames::data(3, vec![0; 16383]).eos()).await; + srv.send_frame(frames::headers(3).response(200).eos()).await; + + // Then all the future stream is guaranteed to be send-able by induction + tx.send(()).unwrap(); + }; + + fn request() -> Request<()> { + Request::builder() + .method(Method::POST) + .uri("http://example.com/") + .body(()) + .unwrap() + } + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .handshake::<_, Bytes>(io) + .await + .expect("handshake"); + + let (req1, mut send1) = client.send_request(request(), false).unwrap(); + let (req2, mut send2) = client.send_request(request(), false).unwrap(); + + // Use up the connection window. + send1.send_data(vec![0; 65535].into(), true).unwrap(); + // Queue up for more connection window. + send2.send_data(vec![0; 65535].into(), true).unwrap(); + + // Queue up more pending open streams + for _ in 0..5 { + let (_, mut send) = client.send_request(request(), false).unwrap(); + send.send_data(vec![0; 65535].into(), true).unwrap(); + } + + let response = conn.drive(req1).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = conn.drive(req2).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let _ = rx.await; + }; + + let task = join(srv, client); + pin_mut!(task); + + let t = tokio::time::sleep(Duration::from_secs(5)).map(|_| panic!("time out")); + pin_mut!(t); + + select(task, t).await; +} From 8d116746d9ef85d014389bab83f46eda9d0aaad1 Mon Sep 17 00:00:00 2001 From: dswij Date: Sun, 7 Jan 2024 02:32:05 +0800 Subject: [PATCH 4/4] allow dead code --- src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/error.rs b/src/error.rs index eb2b2acb..96a471bc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,6 +25,7 @@ pub struct Error { #[derive(Debug)] enum Kind { /// A RST_STREAM frame was received or sent. + #[allow(dead_code)] Reset(StreamId, Reason, Initiator), /// A GO_AWAY frame was received or sent.