Skip to content

Commit

Permalink
Merge branch '0.14.x' into feat/backport-split-server-conn-modules
Browse files Browse the repository at this point in the history
  • Loading branch information
oddgrd authored Mar 3, 2023
2 parents dcbfeaa + c849339 commit 39bf0f9
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 10 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
### v0.14.24 (2023-02-02)


#### Bug Fixes

* **body:** set an internal max to reserve in `to_bytes` ([4d89adce](https://github.com/hyperium/hyper/commit/4d89adce6122af1650165337d9d814314e7ee409))
* **server:** prevent sending 100-continue if user drops request body (#3138) ([92443d7e](https://github.com/hyperium/hyper/commit/92443d7ef57ed474f0add7dd1f114c81a3faa8fe))


#### Features

* **http2:** add `http2_max_header_list_size` to `hyper::server::Builder` (#3006) ([031425f0](https://github.com/hyperium/hyper/commit/031425f087219f02a87eea3d01b14e75e35a5209))


### v0.14.23 (2022-11-07)


Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hyper"
version = "0.14.23"
version = "0.14.24"
description = "A fast and correct HTTP library."
readme = "README.md"
homepage = "https://hyper.rs"
Expand Down
7 changes: 6 additions & 1 deletion src/body/to_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ where
return Ok(first.copy_to_bytes(first.remaining()));
};

// Don't pre-emptively reserve *too* much.
let rest = (body.size_hint().lower() as usize).min(1024 * 16);
let cap = first
.remaining()
.saturating_add(second.remaining())
.saturating_add(rest);
// With more than 1 buf, we gotta flatten into a Vec first.
let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize;
let mut vec = Vec::with_capacity(cap);
vec.put(first);
vec.put(second);
Expand Down
19 changes: 15 additions & 4 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ use http::uri::{Port, Scheme};
use http::{Method, Request, Response, Uri, Version};
use tracing::{debug, trace, warn};

use crate::body::{Body, HttpBody};
use crate::client::connect::CaptureConnectionExtension;
use crate::common::{
exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin,
Poll,
};
use crate::rt::Executor;

use super::conn;
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
use super::pool::{
self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
};
#[cfg(feature = "tcp")]
use super::HttpConnector;
use crate::body::{Body, HttpBody};
use crate::common::{exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
use crate::rt::Executor;

/// A Client to make outgoing HTTP requests.
///
Expand Down Expand Up @@ -238,7 +243,9 @@ where
})
}
};

req.extensions_mut()
.get_mut::<CaptureConnectionExtension>()
.map(|conn| conn.set(&pooled.conn_info));
if pooled.is_http1() {
if req.version() == Version::HTTP_2 {
warn!("Connection is HTTP/1, but request requires HTTP/2");
Expand Down Expand Up @@ -689,6 +696,10 @@ where
B: Send + 'static,
{
fn is_open(&self) -> bool {
if self.conn_info.poisoned.poisoned() {
trace!("marking {:?} as closed because it was poisoned", self.conn_info);
return false;
}
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
#[cfg(feature = "http2")]
Expand Down
223 changes: 222 additions & 1 deletion src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@
//! [`AsyncWrite`]: tokio::io::AsyncWrite
//! [`Connection`]: Connection
use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, Ordering};
use std::ops::Deref;
use std::sync::Arc;

use ::http::Extensions;
use tokio::sync::watch;

cfg_feature! {
#![feature = "tcp"]
Expand Down Expand Up @@ -113,6 +118,142 @@ pub struct Connected {
pub(super) alpn: Alpn,
pub(super) is_proxied: bool,
pub(super) extra: Option<Extra>,
pub(super) poisoned: PoisonPill,
}

#[derive(Clone)]
pub(crate) struct PoisonPill {
poisoned: Arc<AtomicBool>,
}

impl Debug for PoisonPill {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// print the address of the pill—this makes debugging issues much easier
write!(f, "PoisonPill@{:p} {{ poisoned: {} }}", self.poisoned, self.poisoned.load(Ordering::Relaxed))
}
}

impl PoisonPill {
pub(crate) fn healthy() -> Self {
Self {
poisoned: Arc::new(AtomicBool::new(false)),
}
}
pub(crate) fn poison(&self) {
self.poisoned.store(true, Ordering::Relaxed)
}

pub(crate) fn poisoned(&self) -> bool {
self.poisoned.load(Ordering::Relaxed)
}
}

/// [`CaptureConnection`] allows callers to capture [`Connected`] information
///
/// To capture a connection for a request, use [`capture_connection`].
#[derive(Debug, Clone)]
pub struct CaptureConnection {
rx: watch::Receiver<Option<Connected>>,
}

/// Capture the connection for a given request
///
/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
/// as the connection is established.
///
/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
///
/// # Examples
///
/// **Synchronous access**:
/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
/// established. This is ideal for situations where you are certain the connection has already
/// been established (e.g. after the response future has already completed).
/// ```rust
/// use hyper::client::connect::{capture_connection, CaptureConnection};
/// let mut request = http::Request::builder()
/// .uri("http://foo.com")
/// .body(())
/// .unwrap();
///
/// let captured_connection = capture_connection(&mut request);
/// // some time later after the request has been sent...
/// let connection_info = captured_connection.connection_metadata();
/// println!("we are connected! {:?}", connection_info.as_ref());
/// ```
///
/// **Asynchronous access**:
/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
/// connection is available.
///
/// ```rust
/// # #[cfg(feature = "runtime")]
/// # async fn example() {
/// use hyper::client::connect::{capture_connection, CaptureConnection};
/// let mut request = http::Request::builder()
/// .uri("http://foo.com")
/// .body(hyper::Body::empty())
/// .unwrap();
///
/// let mut captured = capture_connection(&mut request);
/// tokio::task::spawn(async move {
/// let connection_info = captured.wait_for_connection_metadata().await;
/// println!("we are connected! {:?}", connection_info.as_ref());
/// });
///
/// let client = hyper::Client::new();
/// client.request(request).await.expect("request failed");
/// # }
/// ```
pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
let (tx, rx) = CaptureConnection::new();
request.extensions_mut().insert(tx);
rx
}

/// TxSide for [`CaptureConnection`]
///
/// This is inserted into `Extensions` to allow Hyper to back channel connection info
#[derive(Clone)]
pub(crate) struct CaptureConnectionExtension {
tx: Arc<watch::Sender<Option<Connected>>>,
}

impl CaptureConnectionExtension {
pub(crate) fn set(&self, connected: &Connected) {
self.tx.send_replace(Some(connected.clone()));
}
}

impl CaptureConnection {
/// Internal API to create the tx and rx half of [`CaptureConnection`]
pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
let (tx, rx) = watch::channel(None);
(
CaptureConnectionExtension { tx: Arc::new(tx) },
CaptureConnection { rx },
)
}

/// Retrieve the connection metadata, if available
pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
self.rx.borrow()
}

/// Wait for the connection to be established
///
/// If a connection was established, this will always return `Some(...)`. If the request never
/// successfully connected (e.g. DNS resolution failure), this method will never return.
pub async fn wait_for_connection_metadata(
&mut self,
) -> impl Deref<Target = Option<Connected>> + '_ {
if self.rx.borrow().is_some() {
return self.rx.borrow();
}
let _ = self.rx.changed().await;
self.rx.borrow()
}
}

pub(super) struct Extra(Box<dyn ExtraInner>);
Expand All @@ -130,6 +271,7 @@ impl Connected {
alpn: Alpn::None,
is_proxied: false,
extra: None,
poisoned: PoisonPill::healthy(),
}
}

Expand Down Expand Up @@ -189,14 +331,24 @@ impl Connected {
self.alpn == Alpn::H2
}

/// Poison this connection
///
/// A poisoned connection will not be reused for subsequent requests by the pool
pub fn poison(&self) {
self.poisoned.poison();
tracing::debug!(
poison_pill = ?self.poisoned, "connection was poisoned"
);
}

// Don't public expose that `Connected` is `Clone`, unsure if we want to
// keep that contract...
#[cfg(feature = "http2")]
pub(super) fn clone(&self) -> Connected {
Connected {
alpn: self.alpn.clone(),
is_proxied: self.is_proxied,
extra: self.extra.clone(),
poisoned: self.poisoned.clone(),
}
}
}
Expand Down Expand Up @@ -351,6 +503,7 @@ pub(super) mod sealed {
#[cfg(test)]
mod tests {
use super::Connected;
use crate::client::connect::CaptureConnection;

#[derive(Clone, Debug, PartialEq)]
struct Ex1(usize);
Expand Down Expand Up @@ -409,4 +562,72 @@ mod tests {
assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
}

#[test]
fn test_sync_capture_connection() {
let (tx, rx) = CaptureConnection::new();
assert!(
rx.connection_metadata().is_none(),
"connection has not been set"
);
tx.set(&Connected::new().proxy(true));
assert_eq!(
rx.connection_metadata()
.as_ref()
.expect("connected should be set")
.is_proxied(),
true
);

// ensure it can be called multiple times
assert_eq!(
rx.connection_metadata()
.as_ref()
.expect("connected should be set")
.is_proxied(),
true
);
}

#[tokio::test]
async fn async_capture_connection() {
let (tx, mut rx) = CaptureConnection::new();
assert!(
rx.connection_metadata().is_none(),
"connection has not been set"
);
let test_task = tokio::spawn(async move {
assert_eq!(
rx.wait_for_connection_metadata()
.await
.as_ref()
.expect("connection should be set")
.is_proxied(),
true
);
// can be awaited multiple times
assert!(
rx.wait_for_connection_metadata().await.is_some(),
"should be awaitable multiple times"
);

assert_eq!(rx.connection_metadata().is_some(), true);
});
// can't be finished, we haven't set the connection yet
assert_eq!(test_task.is_finished(), false);
tx.set(&Connected::new().proxy(true));

assert!(test_task.await.is_ok());
}

#[tokio::test]
async fn capture_connection_sender_side_dropped() {
let (tx, mut rx) = CaptureConnection::new();
assert!(
rx.connection_metadata().is_none(),
"connection has not been set"
);
drop(tx);
assert!(rx.wait_for_connection_metadata().await.is_none());
}
}
6 changes: 6 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,12 @@ where

/// If the read side can be cheaply drained, do so. Otherwise, close.
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) {
if let Reading::Continue(ref decoder) = self.state.reading {
// skip sending the 100-continue
// just move forward to a read, in case a tiny body was included
self.state.reading = Reading::Body(decoder.clone());
}

let _ = self.poll_read_body(cx);

// If still in Reading::Body, just give up
Expand Down
Loading

0 comments on commit 39bf0f9

Please sign in to comment.