diff --git a/Cargo.toml b/Cargo.toml index 1ece598be8..71e6636be6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ authors = ["Sean McArthur "] keywords = ["http", "hyper", "hyperium"] categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"] edition = "2018" -rust-version = "1.56" # keep in sync with MSRV.md dev doc +rust-version = "1.63" # keep in sync with MSRV.md dev doc include = [ "Cargo.toml", diff --git a/docs/MSRV.md b/docs/MSRV.md index 65127c99bd..70752c9138 100644 --- a/docs/MSRV.md +++ b/docs/MSRV.md @@ -6,4 +6,4 @@ hyper. It is possible that an older compiler can work, but that is not guaranteed. We try to increase the MSRV responsibly, only when a significant new feature is needed. -The current MSRV is: **1.56**. +The current MSRV is: **1.63**. diff --git a/src/body/incoming.rs b/src/body/incoming.rs index c8f3b06770..cdebd3db58 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -201,7 +201,16 @@ impl Body for Incoming { ping.record_data(bytes.len()); return Poll::Ready(Some(Ok(Frame::data(bytes)))); } - Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))), + Some(Err(e)) => { + return match e.reason() { + // These reasons should cause the body reading to stop, but not fail it. + // The same logic as for `Read for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { + Poll::Ready(None) + } + _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + }; + } None => { *data_done = true; // fall through to trailers diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 83d18a9a41..8dd8a5b9ab 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -25,7 +25,7 @@ //! `cargo`, staring with `1.64.0`, it can be compiled with the following command: //! //! ```notrust -//! RUSTFLAGS="--cfg hyper_unstable_ffi" cargo rustc --features client,http1,http2,ffi --crate-type cdylib +//! RUSTFLAGS="--cfg hyper_unstable_ffi" cargo rustc --crate-type cdylib --features client,http1,http2,ffi //! ``` // We may eventually allow the FFI to be enabled without `client` or `http1`, diff --git a/src/lib.rs b/src/lib.rs index 054beb225a..7de04debc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ pub mod service; pub mod upgrade; #[cfg(feature = "ffi")] +#[cfg_attr(docsrs, doc(cfg(all(feature = "ffi", hyper_unstable_ffi))))] pub mod ffi; cfg_proto! { diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index b8d9951928..7226c98bf5 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -6,7 +6,7 @@ use crate::rt::{Read, Write}; use bytes::Bytes; use futures_channel::mpsc::{Receiver, Sender}; use futures_channel::{mpsc, oneshot}; -use futures_util::future::{self, Either, FutureExt as _, Select}; +use futures_util::future::{Either, FusedFuture, FutureExt as _}; use futures_util::stream::{StreamExt as _, StreamFuture}; use h2::client::{Builder, Connection, SendRequest}; use h2::SendStream; @@ -143,7 +143,10 @@ where } else { (Either::Right(conn), ping::disabled()) }; - let conn: ConnMapErr = ConnMapErr { conn }; + let conn: ConnMapErr = ConnMapErr { + conn, + is_terminated: false, + }; exec.execute_h2_future(H2ClientFuture::Task { task: ConnTask::new(conn, conn_drop_rx, cancel_tx), @@ -218,6 +221,8 @@ pin_project! { { #[pin] conn: Either, Connection, SendBuf<::Data>>>, + #[pin] + is_terminated: bool, } } @@ -229,10 +234,26 @@ where type Output = Result<(), ()>; fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - self.project() - .conn - .poll(cx) - .map_err(|e| debug!("connection error: {}", e)) + let mut this = self.project(); + + if *this.is_terminated { + return Poll::Pending; + } + let polled = this.conn.poll(cx); + if polled.is_ready() { + *this.is_terminated = true; + } + polled.map_err(|e| debug!("connection error: {}", e)) + } +} + +impl FusedFuture for ConnMapErr +where + B: Body, + T: Read + Write + Unpin, +{ + fn is_terminated(&self) -> bool { + self.is_terminated } } @@ -245,10 +266,11 @@ pin_project! { T: Unpin, { #[pin] - select: Select, StreamFuture>>, + drop_rx: StreamFuture>, #[pin] cancel_tx: Option>, - conn: Option>, + #[pin] + conn: ConnMapErr, } } @@ -263,9 +285,9 @@ where cancel_tx: oneshot::Sender, ) -> Self { Self { - select: future::select(conn, drop_rx), + drop_rx, cancel_tx: Some(cancel_tx), - conn: None, + conn, } } } @@ -280,25 +302,24 @@ where fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let mut this = self.project(); - if let Some(conn) = this.conn { - conn.poll_unpin(cx).map(|_| ()) - } else { - match ready!(this.select.poll_unpin(cx)) { - Either::Left((_, _)) => { - // ok or err, the `conn` has finished - return Poll::Ready(()); - } - Either::Right((_, b)) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - drop(this.cancel_tx.take().expect("Future polled twice")); - this.conn = &mut Some(b); - return Poll::Pending; - } - } + if !this.conn.is_terminated() { + if let Poll::Ready(_) = this.conn.poll_unpin(cx) { + // ok or err, the `conn` has finished. + return Poll::Ready(()); + }; } + + if !this.drop_rx.is_terminated() { + if let Poll::Ready(_) = this.drop_rx.poll_unpin(cx) { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close. + trace!("send_request dropped, starting conn shutdown"); + drop(this.cancel_tx.take().expect("ConnTask Future polled twice")); + } + }; + + Poll::Pending } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index f938bf532b..3628576dc1 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -50,7 +50,7 @@ pub(crate) enum BodyLength { Unknown, } -/// Status of when a Disaptcher future completes. +/// Status of when a Dispatcher future completes. pub(crate) enum Dispatched { /// Dispatcher completely shutdown connection. Shutdown, diff --git a/tests/client.rs b/tests/client.rs index ef80596c01..3d46c3fbfa 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1338,7 +1338,7 @@ mod conn { use bytes::{Buf, Bytes}; use futures_channel::{mpsc, oneshot}; use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; - use http_body_util::{BodyExt, Empty, StreamBody}; + use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; @@ -2126,6 +2126,62 @@ mod conn { .expect("client should be open"); } + #[tokio::test] + async fn http2_responds_before_consuming_request_body() { + // Test that a early-response from server works correctly (request body wasn't fully consumed). + // https://github.com/hyperium/hyper/issues/2872 + use hyper::service::service_fn; + + let _ = pretty_env_logger::try_init(); + + let (listener, addr) = setup_tk_test_server().await; + + // Spawn an HTTP2 server that responds before reading the whole request body. + // It's normal case to decline the request due to headers or size of the body. + tokio::spawn(async move { + let sock = TokioIo::new(listener.accept().await.unwrap().0); + hyper::server::conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .serve_connection( + sock, + service_fn(|_req| async move { + Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from( + "No bread for you!", + )))) + }), + ) + .await + .expect("serve_connection"); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .timer(TokioTimer) + .handshake(io) + .await + .expect("http handshake"); + + tokio::spawn(async move { + conn.await.expect("client conn shouldn't error"); + }); + + // Use a channel to keep request stream open + let (_tx, recv) = mpsc::channel::, Box>>(0); + let req = Request::post("/a").body(StreamBody::new(recv)).unwrap(); + let resp = client.send_request(req).await.expect("send_request"); + assert!(resp.status().is_success()); + + let mut body = String::new(); + concat(resp.into_body()) + .await + .unwrap() + .reader() + .read_to_string(&mut body) + .unwrap(); + + assert_eq!(&body, "No bread for you!"); + } + #[tokio::test] async fn h2_connect() { let (listener, addr) = setup_tk_test_server().await;