From 4d89adce6122af1650165337d9d814314e7ee409 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 11 Jan 2023 17:00:59 -0500 Subject: [PATCH 1/5] fix(body): set an internal max to reserve in `to_bytes` Previously, `to_bytes` would reserve extra space if after two chunks, there was more remaining. It used to reserve however much space the peer advertized. This changes now only reserves up to ~16kb. This way, a slow message with a big body doesn't reserve so much memory, until the data has actually been received. The existing warning to check for a length before calling the function is still the best approach. --- src/body/to_bytes.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/body/to_bytes.rs b/src/body/to_bytes.rs index 62b15a54a9..038c6fd0f3 100644 --- a/src/body/to_bytes.rs +++ b/src/body/to_bytes.rs @@ -63,8 +63,13 @@ where return Ok(first.copy_to_bytes(first.remaining())); }; + // Don't pre-emptively reserve *too* much. + let rest = (body.size_hint().lower() as usize).min(1024 * 16); + let cap = first + .remaining() + .saturating_add(second.remaining()) + .saturating_add(rest); // With more than 1 buf, we gotta flatten into a Vec first. - let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize; let mut vec = Vec::with_capacity(cap); vec.put(first); vec.put(second); From 92443d7ef57ed474f0add7dd1f114c81a3faa8fe Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 2 Feb 2023 09:35:07 -0500 Subject: [PATCH 2/5] fix(server): prevent sending 100-continue if user drops request body (#3138) --- src/proto/h1/conn.rs | 6 ++++++ tests/server.rs | 3 +-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 37ab380f8b..5ebff2803e 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -748,6 +748,12 @@ where /// If the read side can be cheaply drained, do so. Otherwise, close. pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { + if let Reading::Continue(ref decoder) = self.state.reading { + // skip sending the 100-continue + // just move forward to a read, in case a tiny body was included + self.state.reading = Reading::Body(decoder.clone()); + } + let _ = self.poll_read_body(cx); // If still in Reading::Body, just give up diff --git a/tests/server.rs b/tests/server.rs index af5b5e9961..017cca9405 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -973,9 +973,8 @@ async fn expect_continue_waits_for_body_poll() { service_fn(|req| { assert_eq!(req.headers()["expect"], "100-continue"); // But! We're never going to poll the body! + drop(req); tokio::time::sleep(Duration::from_millis(50)).map(move |_| { - // Move and drop the req, so we don't auto-close - drop(req); Response::builder() .status(StatusCode::BAD_REQUEST) .body(hyper::Body::empty()) From 40c01dfb4f87342a6f86f07564ddc482194c6240 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 2 Feb 2023 09:39:38 -0500 Subject: [PATCH 3/5] v0.14.24 --- CHANGELOG.md | 14 ++++++++++++++ Cargo.toml | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e9fe15bde..a5c8fe065e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +### v0.14.24 (2023-02-02) + + +#### Bug Fixes + +* **body:** set an internal max to reserve in `to_bytes` ([4d89adce](https://github.com/hyperium/hyper/commit/4d89adce6122af1650165337d9d814314e7ee409)) +* **server:** prevent sending 100-continue if user drops request body (#3138) ([92443d7e](https://github.com/hyperium/hyper/commit/92443d7ef57ed474f0add7dd1f114c81a3faa8fe)) + + +#### Features + +* **http2:** add `http2_max_header_list_size` to `hyper::server::Builder` (#3006) ([031425f0](https://github.com/hyperium/hyper/commit/031425f087219f02a87eea3d01b14e75e35a5209)) + + ### v0.14.23 (2022-11-07) diff --git a/Cargo.toml b/Cargo.toml index 64f97ad07f..a76b210aff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hyper" -version = "0.14.23" +version = "0.14.24" description = "A fast and correct HTTP library." readme = "README.md" homepage = "https://hyper.rs" From 37ed5a2e3cab76a11092823a80afd8fe2f2a9693 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Mon, 20 Feb 2023 14:08:26 -0500 Subject: [PATCH 4/5] feat(client): add `poison` to `Connected` (#3145) Add `poison` method to `Connected`. This allows callers to mark a connection as poisoned which prevents the pool from reusing it on subsequent requests. `is_open` will consider poisoning prior to returning a connection to the pool. --- src/client/client.rs | 4 ++++ src/client/connect/mod.rs | 43 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/client/client.rs b/src/client/client.rs index 4425e25899..2c567ef673 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -689,6 +689,10 @@ where B: Send + 'static, { fn is_open(&self) -> bool { + if self.conn_info.poisoned.poisoned() { + trace!("marking {:?} as closed because it was poisoned", self.conn_info); + return false; + } match self.tx { PoolTx::Http1(ref tx) => tx.is_ready(), #[cfg(feature = "http2")] diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 862a0e65c1..64ed114a05 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -80,6 +80,9 @@ //! [`AsyncWrite`]: tokio::io::AsyncWrite //! [`Connection`]: Connection use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use ::http::Extensions; @@ -113,6 +116,34 @@ pub struct Connected { pub(super) alpn: Alpn, pub(super) is_proxied: bool, pub(super) extra: Option, + pub(super) poisoned: PoisonPill, +} + +#[derive(Clone)] +pub(crate) struct PoisonPill { + poisoned: Arc, +} + +impl Debug for PoisonPill { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + // print the address of the pill—this makes debugging issues much easier + write!(f, "PoisonPill@{:p} {{ poisoned: {} }}", self.poisoned, self.poisoned.load(Ordering::Relaxed)) + } +} + +impl PoisonPill { + pub(crate) fn healthy() -> Self { + Self { + poisoned: Arc::new(AtomicBool::new(false)), + } + } + pub(crate) fn poison(&self) { + self.poisoned.store(true, Ordering::Relaxed) + } + + pub(crate) fn poisoned(&self) -> bool { + self.poisoned.load(Ordering::Relaxed) + } } pub(super) struct Extra(Box); @@ -130,6 +161,7 @@ impl Connected { alpn: Alpn::None, is_proxied: false, extra: None, + poisoned: PoisonPill::healthy(), } } @@ -189,6 +221,16 @@ impl Connected { self.alpn == Alpn::H2 } + /// Poison this connection + /// + /// A poisoned connection will not be reused for subsequent requests by the pool + pub fn poison(&self) { + self.poisoned.poison(); + tracing::debug!( + poison_pill = ?self.poisoned, "connection was poisoned" + ); + } + // Don't public expose that `Connected` is `Clone`, unsure if we want to // keep that contract... #[cfg(feature = "http2")] @@ -197,6 +239,7 @@ impl Connected { alpn: self.alpn.clone(), is_proxied: self.is_proxied, extra: self.extra.clone(), + poisoned: self.poisoned.clone(), } } } From c8493399b2929a86f3020ae77304a00e43cfd161 Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Wed, 22 Feb 2023 12:24:20 -0500 Subject: [PATCH 5/5] feat(client): add `client::connect::capture_connection()` (#3144) Add `capture_connection` functionality. This allows callers to retrieve the `Connected` struct of the connection that was used internally by Hyper. This is in service of https://github.com/hyperium/hyper/issues/2605. Although this uses `http::Extensions` under the hood, the API exposed explicitly hides that detail. --- src/client/client.rs | 15 +++- src/client/connect/mod.rs | 180 +++++++++++++++++++++++++++++++++++++- tests/client.rs | 34 ++++++- 3 files changed, 223 insertions(+), 6 deletions(-) diff --git a/src/client/client.rs b/src/client/client.rs index 2c567ef673..96de907655 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -10,6 +10,14 @@ use http::uri::{Port, Scheme}; use http::{Method, Request, Response, Uri, Version}; use tracing::{debug, trace, warn}; +use crate::body::{Body, HttpBody}; +use crate::client::connect::CaptureConnectionExtension; +use crate::common::{ + exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin, + Poll, +}; +use crate::rt::Executor; + use super::conn; use super::connect::{self, sealed::Connect, Alpn, Connected, Connection}; use super::pool::{ @@ -17,9 +25,6 @@ use super::pool::{ }; #[cfg(feature = "tcp")] use super::HttpConnector; -use crate::body::{Body, HttpBody}; -use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll}; -use crate::rt::Executor; /// A Client to make outgoing HTTP requests. /// @@ -238,7 +243,9 @@ where }) } }; - + req.extensions_mut() + .get_mut::() + .map(|conn| conn.set(&pooled.conn_info)); if pooled.is_http1() { if req.version() == Version::HTTP_2 { warn!("Connection is HTTP/1, but request requires HTTP/2"); diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 64ed114a05..4815524811 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -82,9 +82,11 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::ops::Deref; use std::sync::Arc; use ::http::Extensions; +use tokio::sync::watch; cfg_feature! { #![feature = "tcp"] @@ -146,6 +148,114 @@ impl PoisonPill { } } +/// [`CaptureConnection`] allows callers to capture [`Connected`] information +/// +/// To capture a connection for a request, use [`capture_connection`]. +#[derive(Debug, Clone)] +pub struct CaptureConnection { + rx: watch::Receiver>, +} + +/// Capture the connection for a given request +/// +/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait. +/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon +/// as the connection is established. +/// +/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none. +/// +/// # Examples +/// +/// **Synchronous access**: +/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been +/// established. This is ideal for situations where you are certain the connection has already +/// been established (e.g. after the response future has already completed). +/// ```rust +/// use hyper::client::connect::{capture_connection, CaptureConnection}; +/// let mut request = http::Request::builder() +/// .uri("http://foo.com") +/// .body(()) +/// .unwrap(); +/// +/// let captured_connection = capture_connection(&mut request); +/// // some time later after the request has been sent... +/// let connection_info = captured_connection.connection_metadata(); +/// println!("we are connected! {:?}", connection_info.as_ref()); +/// ``` +/// +/// **Asynchronous access**: +/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the +/// connection is available. +/// +/// ```rust +/// # #[cfg(feature = "runtime")] +/// # async fn example() { +/// use hyper::client::connect::{capture_connection, CaptureConnection}; +/// let mut request = http::Request::builder() +/// .uri("http://foo.com") +/// .body(hyper::Body::empty()) +/// .unwrap(); +/// +/// let mut captured = capture_connection(&mut request); +/// tokio::task::spawn(async move { +/// let connection_info = captured.wait_for_connection_metadata().await; +/// println!("we are connected! {:?}", connection_info.as_ref()); +/// }); +/// +/// let client = hyper::Client::new(); +/// client.request(request).await.expect("request failed"); +/// # } +/// ``` +pub fn capture_connection(request: &mut crate::http::Request) -> CaptureConnection { + let (tx, rx) = CaptureConnection::new(); + request.extensions_mut().insert(tx); + rx +} + +/// TxSide for [`CaptureConnection`] +/// +/// This is inserted into `Extensions` to allow Hyper to back channel connection info +#[derive(Clone)] +pub(crate) struct CaptureConnectionExtension { + tx: Arc>>, +} + +impl CaptureConnectionExtension { + pub(crate) fn set(&self, connected: &Connected) { + self.tx.send_replace(Some(connected.clone())); + } +} + +impl CaptureConnection { + /// Internal API to create the tx and rx half of [`CaptureConnection`] + pub(crate) fn new() -> (CaptureConnectionExtension, Self) { + let (tx, rx) = watch::channel(None); + ( + CaptureConnectionExtension { tx: Arc::new(tx) }, + CaptureConnection { rx }, + ) + } + + /// Retrieve the connection metadata, if available + pub fn connection_metadata(&self) -> impl Deref> + '_ { + self.rx.borrow() + } + + /// Wait for the connection to be established + /// + /// If a connection was established, this will always return `Some(...)`. If the request never + /// successfully connected (e.g. DNS resolution failure), this method will never return. + pub async fn wait_for_connection_metadata( + &mut self, + ) -> impl Deref> + '_ { + if self.rx.borrow().is_some() { + return self.rx.borrow(); + } + let _ = self.rx.changed().await; + self.rx.borrow() + } +} + pub(super) struct Extra(Box); #[derive(Clone, Copy, Debug, PartialEq)] @@ -233,7 +343,6 @@ impl Connected { // Don't public expose that `Connected` is `Clone`, unsure if we want to // keep that contract... - #[cfg(feature = "http2")] pub(super) fn clone(&self) -> Connected { Connected { alpn: self.alpn.clone(), @@ -394,6 +503,7 @@ pub(super) mod sealed { #[cfg(test)] mod tests { use super::Connected; + use crate::client::connect::CaptureConnection; #[derive(Clone, Debug, PartialEq)] struct Ex1(usize); @@ -452,4 +562,72 @@ mod tests { assert_eq!(ex2.get::(), Some(&Ex1(99))); assert_eq!(ex2.get::(), Some(&Ex2("hiccup"))); } + + #[test] + fn test_sync_capture_connection() { + let (tx, rx) = CaptureConnection::new(); + assert!( + rx.connection_metadata().is_none(), + "connection has not been set" + ); + tx.set(&Connected::new().proxy(true)); + assert_eq!( + rx.connection_metadata() + .as_ref() + .expect("connected should be set") + .is_proxied(), + true + ); + + // ensure it can be called multiple times + assert_eq!( + rx.connection_metadata() + .as_ref() + .expect("connected should be set") + .is_proxied(), + true + ); + } + + #[tokio::test] + async fn async_capture_connection() { + let (tx, mut rx) = CaptureConnection::new(); + assert!( + rx.connection_metadata().is_none(), + "connection has not been set" + ); + let test_task = tokio::spawn(async move { + assert_eq!( + rx.wait_for_connection_metadata() + .await + .as_ref() + .expect("connection should be set") + .is_proxied(), + true + ); + // can be awaited multiple times + assert!( + rx.wait_for_connection_metadata().await.is_some(), + "should be awaitable multiple times" + ); + + assert_eq!(rx.connection_metadata().is_some(), true); + }); + // can't be finished, we haven't set the connection yet + assert_eq!(test_task.is_finished(), false); + tx.set(&Connected::new().proxy(true)); + + assert!(test_task.await.is_ok()); + } + + #[tokio::test] + async fn capture_connection_sender_side_dropped() { + let (tx, mut rx) = CaptureConnection::new(); + assert!( + rx.connection_metadata().is_none(), + "connection has not been set" + ); + drop(tx); + assert!(rx.wait_for_connection_metadata().await.is_none()); + } } diff --git a/tests/client.rs b/tests/client.rs index 2d3737f60e..54fad53eae 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1121,10 +1121,11 @@ mod dispatch_impl { use http::Uri; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; + use tokio_test::block_on; use super::support; use hyper::body::HttpBody; - use hyper::client::connect::{Connected, Connection, HttpConnector}; + use hyper::client::connect::{capture_connection, Connected, Connection, HttpConnector}; use hyper::Client; #[test] @@ -1533,6 +1534,37 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); } + #[test] + fn capture_connection_on_client() { + let _ = pretty_env_logger::try_init(); + + let _rt = support::runtime(); + let connector = DebugConnector::new(); + + let client = Client::builder().build(connector); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + }); + let mut req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Body::empty()) + .unwrap(); + let captured_conn = capture_connection(&mut req); + block_on(client.request(req)).expect("200 OK"); + assert!(captured_conn.connection_metadata().is_some()); + } + #[test] fn client_keep_alive_0() { let _ = pretty_env_logger::try_init();