From 5be673fd756ee0f320dbb611c764b5643fa8d5a0 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 11 Jul 2022 13:17:49 +0200 Subject: [PATCH] feat(body): add `Body::wrap_body` --- src/body/body.rs | 28 ++++++++++++++++++++++++++-- src/body/mod.rs | 13 +++++++++++++ src/common/mod.rs | 4 ---- src/error.rs | 5 ----- 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/src/body/body.rs b/src/body/body.rs index 9dc1a034f9..357f98186d 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -13,7 +13,6 @@ use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; use super::DecodedLength; -#[cfg(feature = "stream")] use crate::common::sync_wrapper::SyncWrapper; use crate::common::Future; #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] @@ -62,6 +61,7 @@ enum Kind { Pin>> + Send>>, >, ), + WrappedBody(SyncWrapper>), } struct Extra { @@ -139,6 +139,23 @@ impl Body { Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) } + /// Create a `Body` by wrapping another body. + #[inline] + pub fn wrap_body(body: B) -> Self + where + B: HttpBody + Send + 'static, + B::Error: Into>, + { + // if `B` is already a `Body` just return that and avoid additional boxing + match super::try_downcast(body) { + Ok(body) => body, + Err(body) => { + let body = body.map_err(crate::Error::new_user_body).boxed_unsync(); + Self::new(Kind::WrappedBody(SyncWrapper::new(body))) + } + } + } + pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { let (data_tx, data_rx) = mpsc::channel(0); let (trailers_tx, trailers_rx) = oneshot::channel(); @@ -329,12 +346,12 @@ impl Body { #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_data(cx), - #[cfg(feature = "stream")] Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), None => Poll::Ready(None), }, + Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_data(cx), } } @@ -393,6 +410,7 @@ impl HttpBody for Body { }, #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_trailers(cx), + Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_trailers(cx), _ => Poll::Ready(Ok(None)), } } @@ -407,6 +425,9 @@ impl HttpBody for Body { Kind::Ffi(..) => false, #[cfg(feature = "stream")] Kind::Wrapped(..) => false, + // we cannot get a `&UnsyncBoxBody` through a `SyncWrapper` + // so we have no way of calling the method on the inner body + Kind::WrappedBody(..) => false, } } @@ -433,6 +454,9 @@ impl HttpBody for Body { Kind::H2 { content_length, .. } => opt_len!(content_length), #[cfg(feature = "ffi")] Kind::Ffi(..) => SizeHint::default(), + // we cannot get a `&UnsyncBoxBody` through a `SyncWrapper` + // so we have no way of calling the method on the inner body + Kind::WrappedBody(..) => SizeHint::default(), } } } diff --git a/src/body/mod.rs b/src/body/mod.rs index 5e2181e941..7cfa2a2ca9 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -56,6 +56,19 @@ pub(crate) fn take_full_data(body: &mut T) -> Option(k: K) -> Result +where + T: 'static, + K: Send + 'static, +{ + let mut k = Some(k); + if let Some(k) = ::downcast_mut::>(&mut k) { + Ok(k.take().unwrap()) + } else { + Err(k.unwrap()) + } +} + fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} diff --git a/src/common/mod.rs b/src/common/mod.rs index e38c6f5c7a..00f85ab8f9 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -18,10 +18,6 @@ pub(crate) mod io; #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] mod lazy; mod never; -#[cfg(any( - feature = "stream", - all(feature = "client", any(feature = "http1", feature = "http2")) -))] pub(crate) mod sync_wrapper; pub(crate) mod task; pub(crate) mod watch; diff --git a/src/error.rs b/src/error.rs index 20acf3a7a5..5efabf97d5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,7 +48,6 @@ pub(super) enum Kind { #[cfg(all(feature = "http1", feature = "server", feature = "runtime"))] HeaderTimeout, /// Error while reading a body from connection. - #[cfg(any(feature = "http1", feature = "http2", feature = "stream"))] Body, /// Error while writing a body to connection. #[cfg(any(feature = "http1", feature = "http2"))] @@ -92,7 +91,6 @@ pub(super) enum Header { #[derive(Debug)] pub(super) enum User { /// Error calling user's HttpBody::poll_data(). - #[cfg(any(feature = "http1", feature = "http2"))] Body, /// The user aborted writing of the outgoing body. BodyWriteAborted, @@ -367,7 +365,6 @@ impl Error { Error::new_user(User::Service).with(cause) } - #[cfg(any(feature = "http1", feature = "http2"))] pub(super) fn new_user_body>(cause: E) -> Error { Error::new_user(User::Body).with(cause) } @@ -440,7 +437,6 @@ impl Error { Kind::Accept => "error accepting connection", #[cfg(all(feature = "http1", feature = "server", feature = "runtime"))] Kind::HeaderTimeout => "read header from client timeout", - #[cfg(any(feature = "http1", feature = "http2", feature = "stream"))] Kind::Body => "error reading a body from connection", #[cfg(any(feature = "http1", feature = "http2"))] Kind::BodyWrite => "error writing a body to connection", @@ -451,7 +447,6 @@ impl Error { #[cfg(any(feature = "http1", feature = "http2"))] Kind::Io => "connection error", - #[cfg(any(feature = "http1", feature = "http2"))] Kind::User(User::Body) => "error from user's HttpBody stream", Kind::User(User::BodyWriteAborted) => "user body write aborted", #[cfg(any(feature = "http1", feature = "http2"))]