Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(body): add Body::wrap_body #2913

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
Expand Down Expand Up @@ -62,6 +61,7 @@ enum Kind {
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
WrappedBody(SyncWrapper<http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>>),
}

struct Extra {
Expand Down Expand Up @@ -139,6 +139,23 @@ impl Body {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

/// Create a `Body` by wrapping another body.
#[inline]
pub fn wrap_body<B>(body: B) -> Self
where
B: HttpBody<Data = Bytes> + Send + 'static,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
// if `B` is already a `Body` just return that and avoid additional boxing
match super::try_downcast(body) {
Ok(body) => body,
Err(body) => {
let body = body.map_err(crate::Error::new_user_body).boxed_unsync();
Self::new(Kind::WrappedBody(SyncWrapper::new(body)))
}
}
}

pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
let (data_tx, data_rx) = mpsc::channel(0);
let (trailers_tx, trailers_rx) = oneshot::channel();
Expand Down Expand Up @@ -329,12 +346,12 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_data(cx),
}
}

Expand Down Expand Up @@ -393,6 +410,7 @@ impl HttpBody for Body {
},
#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_trailers(cx),
Kind::WrappedBody(ref mut body) => Pin::new(body.get_mut()).poll_trailers(cx),
_ => Poll::Ready(Ok(None)),
}
}
Expand All @@ -407,6 +425,9 @@ impl HttpBody for Body {
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
// we cannot get a `&UnsyncBoxBody` through a `SyncWrapper<UnsyncBoxBody>`
// so we have no way of calling the method on the inner body
Kind::WrappedBody(..) => false,
}
}

Expand All @@ -433,6 +454,9 @@ impl HttpBody for Body {
Kind::H2 { content_length, .. } => opt_len!(content_length),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => SizeHint::default(),
// we cannot get a `&UnsyncBoxBody` through a `SyncWrapper<UnsyncBoxBody>`
// so we have no way of calling the method on the inner body
Kind::WrappedBody(..) => SizeHint::default(),
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,19 @@ pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::D
}
}

pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
where
T: 'static,
K: Send + 'static,
{
let mut k = Some(k);
if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
Ok(k.take().unwrap())
} else {
Err(k.unwrap())
}
}

fn _assert_send_sync() {
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
Expand Down
4 changes: 0 additions & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
all(feature = "client", any(feature = "http1", feature = "http2"))
))]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
Expand Down
5 changes: 0 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub(super) enum Kind {
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
HeaderTimeout,
/// Error while reading a body from connection.
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
Body,
/// Error while writing a body to connection.
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down Expand Up @@ -92,7 +91,6 @@ pub(super) enum Header {
#[derive(Debug)]
pub(super) enum User {
/// Error calling user's HttpBody::poll_data().
#[cfg(any(feature = "http1", feature = "http2"))]
Body,
/// The user aborted writing of the outgoing body.
BodyWriteAborted,
Expand Down Expand Up @@ -367,7 +365,6 @@ impl Error {
Error::new_user(User::Service).with(cause)
}

#[cfg(any(feature = "http1", feature = "http2"))]
pub(super) fn new_user_body<E: Into<Cause>>(cause: E) -> Error {
Error::new_user(User::Body).with(cause)
}
Expand Down Expand Up @@ -440,7 +437,6 @@ impl Error {
Kind::Accept => "error accepting connection",
#[cfg(all(feature = "http1", feature = "server", feature = "runtime"))]
Kind::HeaderTimeout => "read header from client timeout",
#[cfg(any(feature = "http1", feature = "http2", feature = "stream"))]
Kind::Body => "error reading a body from connection",
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::BodyWrite => "error writing a body to connection",
Expand All @@ -451,7 +447,6 @@ impl Error {
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::Io => "connection error",

#[cfg(any(feature = "http1", feature = "http2"))]
Kind::User(User::Body) => "error from user's HttpBody stream",
Kind::User(User::BodyWriteAborted) => "user body write aborted",
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down