Skip to content

Commit

Permalink
feat(body): introduce an Entity trait to represent bodies
Browse files Browse the repository at this point in the history
This dedicated `Entity` trait replaces the previous `Stream<Item=impl
AsRef<[u8]>, Error=hyper::Error>`. This allows for several improvements
immediately, and prepares for HTTP2 support.

- The `Entity::is_end_stream` makes up for change away from
  `Option<Body>`, which was previously used to know if the body should be
  empty. Since `Request` and `Response` now require a body to be set,
  this method can be used to tell hyper that the body is actually empty.

  It also provides the possibility of slight optimizations when polling
  for data, by allowing to check `is_end_stream` before polling again.
  This can allow a consumer to know that a body stream has ended without
  polling for `None` afterwards.

- The `Entity::content_length` method allows a body to automatically
  declare a size, in case a user doesn't set a `Content-Length` or
  `Transfer-Encoding` header.

- It's now possible to send and receive trailers, though this will be
  for HTTP2 connections only.

By being a trait owned by hyper, new methods can be added later as new
features are wanted (with default implementations).

The `hyper::Body` type now implements `Entity` instead of `Stream`,
provides a better channel option, and is easier to use with custom
streams via `Body::wrap_stream`.

BREAKING CHANGE: All code that was assuming the body was a `Stream` must
  be adjusted to use an `Entity` instead.

  Using `hyper::Body` as a `Stream` can call `Body::into_stream`
  to get a stream wrapper.

  Passing a custom `impl Stream` will need to either implement
  `Entity`, or as an easier option, switch to `Body::wrap_stream`.

  `Body::pair` has been replaced with `Body::channel`, which returns a
  `hyper::body::Sender` instead of a `futures::sync::mpsc::Sender`.

Closes #1438
  • Loading branch information
seanmonstar committed Mar 19, 2018
1 parent 3cd48b4 commit fbc449e
Show file tree
Hide file tree
Showing 18 changed files with 807 additions and 481 deletions.
2 changes: 1 addition & 1 deletion examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn main() {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

res.into_parts().1.for_each(|chunk| {
res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
}).map(|_| {
Expand Down
2 changes: 1 addition & 1 deletion examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
Box::new(req.into_parts().1.concat2().map(|b| {
Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
14 changes: 8 additions & 6 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;

use futures::{Future, Sink};
use futures::{Future/*, Sink*/};
use futures::sync::oneshot;

use hyper::{Body, Chunk, Method, Request, Response, StatusCode};
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
use hyper::server::{Http, Service};

use std::fs::File;
use std::io::{self, copy, Read};
use std::io::{self, copy/*, Read*/};
use std::thread;

static NOTFOUND: &[u8] = b"Not Found";
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Service for ResponseExamples {
// a small test file.
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let mut file = match File::open(INDEX) {
let _file = match File::open(INDEX) {
Ok(f) => f,
Err(_) => {
tx.send(Response::builder()
Expand All @@ -91,9 +91,10 @@ impl Service for ResponseExamples {
return;
},
};
let (mut tx_body, rx_body) = Body::pair();
let (_tx_body, rx_body) = Body::channel();
let res = Response::new(rx_body.into());
tx.send(res).expect("Send error on successful file read");
/* TODO: fix once we have futures 0.2 Sink working
let mut buf = [0u8; 16];
loop {
match file.read(&mut buf) {
Expand All @@ -104,7 +105,7 @@ impl Service for ResponseExamples {
break;
} else {
let chunk: Chunk = buf[0..n].to_vec().into();
match tx_body.send(Ok(chunk)).wait() {
match tx_body.send_data(chunk).wait() {
Ok(t) => { tx_body = t; },
Err(_) => { break; }
};
Expand All @@ -113,6 +114,7 @@ impl Service for ResponseExamples {
Err(_) => { break; }
}
}
*/
});

Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
Expand Down
2 changes: 1 addition & 1 deletion examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Service for Echo {
Response::new(INDEX.into())
},
(&Method::POST, "/echo") => {
Response::new(req.into_parts().1)
Response::new(req.into_body())
},
_ => {
let mut res = Response::new(Body::empty());
Expand Down
13 changes: 5 additions & 8 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ extern crate tokio_core;
use futures::{Future, Stream};

use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::error::Error;
use hyper::server::{Http, Service};

#[allow(unused)]
Expand All @@ -18,20 +17,18 @@ static URL: &str = "http://127.0.0.1:1337/web_api";
static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>";
static LOWERCASE: &[u8] = b"i am a lower case string";

pub type ResponseStream = Box<Stream<Item=Chunk, Error=Error>>;

struct ResponseExamples(tokio_core::reactor::Handle);

impl Service for ResponseExamples {
type Request = Request<Body>;
type Response = Response<ResponseStream>;
type Response = Response<Body>;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => {
let body: ResponseStream = Box::new(Body::from(INDEX));
let body = Body::from(INDEX);
Box::new(futures::future::ok(Response::new(body)))
},
(&Method::GET, "/test.html") => {
Expand All @@ -45,7 +42,7 @@ impl Service for ResponseExamples {
let web_res_future = client.request(req);

Box::new(web_res_future.map(|web_res| {
let body: ResponseStream = Box::new(web_res.into_parts().1.map(|b| {
let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| {
Chunk::from(format!("before: '{:?}'<br>after: '{:?}'",
std::str::from_utf8(LOWERCASE).unwrap(),
std::str::from_utf8(&b).unwrap()))
Expand All @@ -55,15 +52,15 @@ impl Service for ResponseExamples {
},
(&Method::POST, "/web_api") => {
// A web api to run against. Simple upcasing of the body.
let body: ResponseStream = Box::new(req.into_parts().1.map(|chunk| {
let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| {
let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>();
Chunk::from(upper)
}));
Box::new(futures::future::ok(Response::new(body)))
},
_ => {
let body: ResponseStream = Box::new(Body::from(NOTFOUND));
let body = Body::from(NOTFOUND);
Box::new(futures::future::ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(body)
Expand Down
62 changes: 20 additions & 42 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use std::fmt;
use std::marker::PhantomData;

use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use futures::{Async, Future, Poll};
use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};

use proto;
use proto::body::Entity;
use super::dispatch;
use {Body, Request, Response, StatusCode};

Expand Down Expand Up @@ -44,14 +45,13 @@ pub struct SendRequest<B> {
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Item,
B::Data,
proto::ClientUpgradeTransaction,
>,
}
Expand Down Expand Up @@ -134,8 +134,7 @@ impl<B> SendRequest<B>

impl<B> SendRequest<B>
where
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
/// Sends a `Request` on the associated connection.
///
Expand All @@ -152,7 +151,7 @@ where
/// the `Host` header based on it. You must add a `Host` header yourself
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is, irregardless of calling `Request::set_proxy`.
/// be serialized as-is.
///
/// # Example
///
Expand Down Expand Up @@ -185,19 +184,6 @@ where
/// # fn main() {}
/// ```
pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
/* TODO?
// The Connection API does less things automatically than the Client
// API does. For instance, right here, we always assume set_proxy, so
// that if an absolute-form URI is provided, it is serialized as-is.
//
// Part of the reason for this is to prepare for the change to `http`
// types, where there is no more set_proxy.
//
// It's important that this method isn't called directly from the
// `Client`, so that `set_proxy` there is still respected.
req.set_proxy(true);
*/

let inner = match self.dispatch.send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {
Expand Down Expand Up @@ -269,8 +255,7 @@ impl<B> fmt::Debug for SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts<T> {
Expand All @@ -297,8 +282,7 @@ where
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
type Item = ();
type Error = ::Error;
Expand All @@ -311,8 +295,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
Expand Down Expand Up @@ -341,8 +324,7 @@ impl Builder {
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
Handshake {
inner: HandshakeInner {
Expand All @@ -356,8 +338,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
Expand All @@ -374,8 +355,7 @@ impl Builder {
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
Expand All @@ -400,14 +380,13 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Item,
B::Data,
proto::ClientTransaction,
>);
type Error = ::Error;
Expand All @@ -420,8 +399,7 @@ where
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
B: Entity<Error=::Error> + 'static,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
Expand All @@ -431,7 +409,7 @@ where
proto::dispatch::Client<B>,
B,
T,
B::Item,
B::Data,
R,
>);
type Error = ::Error;
Expand Down Expand Up @@ -485,16 +463,16 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]> + Send,
B: Entity<Error=::Error> + 'static,
B::Data: Send + 'static,
{}

#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]> + Send + Sync,
B: Entity<Error=::Error> + 'static,
B::Data: Send + Sync + 'static,
{}

#[doc(hidden)]
Expand Down
Loading

0 comments on commit fbc449e

Please sign in to comment.