diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 581c8233f2..e9aa404c63 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -500,6 +500,8 @@ where I: AsyncRead + AsyncWrite, Ok(encoder) => { if !encoder.is_eof() { Writing::Body(encoder) + } else if encoder.is_last() { + Writing::Closed } else { Writing::KeepAlive } @@ -566,7 +568,11 @@ where I: AsyncRead + AsyncWrite, self.io.buffer(encoded); if encoder.is_eof() { - Writing::KeepAlive + if encoder.is_last() { + Writing::Closed + } else { + Writing::KeepAlive + } } else { return Ok(AsyncSink::Ready); } @@ -577,7 +583,11 @@ where I: AsyncRead + AsyncWrite, if let Some(end) = end { self.io.buffer(end); } - Writing::KeepAlive + if encoder.is_last() { + Writing::Closed + } else { + Writing::KeepAlive + } }, Err(_not_eof) => Writing::Closed, } diff --git a/src/proto/h1/encode.rs b/src/proto/h1/encode.rs index cc33e97b2f..c78bd25592 100644 --- a/src/proto/h1/encode.rs +++ b/src/proto/h1/encode.rs @@ -9,6 +9,7 @@ use iovec::IoVec; #[derive(Debug, Clone)] pub struct Encoder { kind: Kind, + is_last: bool, } #[derive(Debug)] @@ -43,22 +44,22 @@ enum BufKind { } impl Encoder { - pub fn chunked() -> Encoder { + fn new(kind: Kind) -> Encoder { Encoder { - kind: Kind::Chunked, + kind: kind, + is_last: false, } } + pub fn chunked() -> Encoder { + Encoder::new(Kind::Chunked) + } pub fn length(len: u64) -> Encoder { - Encoder { - kind: Kind::Length(len), - } + Encoder::new(Kind::Length(len)) } pub fn eof() -> Encoder { - Encoder { - kind: Kind::Eof, - } + Encoder::new(Kind::Eof) } pub fn is_eof(&self) -> bool { @@ -68,6 +69,14 @@ impl Encoder { } } + pub fn set_last(&mut self) { + self.is_last = true; + } + + pub fn is_last(&self) -> bool { + self.is_last + } + pub fn end(&self) -> Result>, NotEof> { match self.kind { Kind::Length(0) => Ok(None), diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index de569e2270..ed1762bd45 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -132,7 +132,11 @@ where // replying with the latter status code response. let ret = if ::StatusCode::SwitchingProtocols == head.subject { T::on_encode_upgrade(&mut head) - .map(|_| Server::set_length(&mut head, has_body, method.as_ref())) + .map(|_| { + let mut enc = Server::set_length(&mut head, has_body, method.as_ref()); + enc.set_last(); + enc + }) } else if head.subject.is_informational() { error!("response with 1xx status code not supported"); head = MessageHead::default(); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 49ed32100f..c0d6f773f6 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -134,7 +134,9 @@ pub fn expecting_continue(version: HttpVersion, headers: &Headers) -> bool { ret } -pub type ServerTransaction = h1::role::Server; +pub type ServerTransaction = h1::role::Server; +//pub type ServerTransaction = h1::role::Server; +//pub type ServerUpgradeTransaction = h1::role::Server; pub type ClientTransaction = h1::role::Client; pub type ClientUpgradeTransaction = h1::role::Client; diff --git a/src/server/conn.rs b/src/server/conn.rs new file mode 100644 index 0000000000..040037b788 --- /dev/null +++ b/src/server/conn.rs @@ -0,0 +1,124 @@ +//! Lower-level Server connection API. +//! +//! The types in thie module are to provide a lower-level API based around a +//! single connection. Accepting a connection and binding it with a service +//! are not handled at this level. This module provides the building blocks to +//! customize those things externally. +//! +//! If don't have need to manage connections yourself, consider using the +//! higher-level [Server](super) API. + +use std::fmt; + +use bytes::Bytes; +use futures::{Future, Poll, Stream}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use proto; +use super::{HyperService, Request, Response, Service}; + +/// A future binding a connection with a Service. +/// +/// Polling this future will drive HTTP forward. +#[must_use = "futures do nothing unless polled"] +pub struct Connection +where + S: HyperService, + S::ResponseBody: Stream, + ::Item: AsRef<[u8]>, +{ + pub(super) conn: proto::dispatch::Dispatcher< + proto::dispatch::Server, + S::ResponseBody, + I, + ::Item, + proto::ServerTransaction, + >, +} + +/// Deconstructed parts of a `Connection`. +/// +/// This allows taking apart a `Connection` at a later time, in order to +/// reclaim the IO object, and additional related pieces. +#[derive(Debug)] +pub struct Parts { + /// The original IO object used in the handshake. + pub io: T, + /// A buffer of bytes that have been read but not processed as HTTP. + /// + /// If the client sent additional bytes after its last request, and + /// this connection "ended" with an upgrade, the read buffer will contain + /// those bytes. + /// + /// You will want to check for any existing bytes if you plan to continue + /// communicating on the IO object. + pub read_buf: Bytes, + _inner: (), +} + +// ===== impl Connection ===== + +impl Connection +where S: Service, Error = ::Error> + 'static, + I: AsyncRead + AsyncWrite + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + /// Disables keep-alive for this connection. + pub fn disable_keep_alive(&mut self) { + self.conn.disable_keep_alive() + } + + /// Return the inner IO object, and additional information. + /// + /// This should only be called after `poll_without_shutdown` signals + /// that the connection is "done". Otherwise, it may not have finished + /// flushing all necessary HTTP bytes. + pub fn into_parts(self) -> Parts { + let (io, read_buf) = self.conn.into_inner(); + Parts { + io: io, + read_buf: read_buf, + _inner: (), + } + } + + /// Poll the connection for completion, but without calling `shutdown` + /// on the underlying IO. + /// + /// This is useful to allow running a connection while doing an HTTP + /// upgrade. Once the upgrade is completed, the connection would be "done", + /// but it is not desired to actally shutdown the IO object. Instead you + /// would take it back using `into_parts`. + pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> { + try_ready!(self.conn.poll_without_shutdown()); + Ok(().into()) + } +} + +impl Future for Connection +where S: Service, Error = ::Error> + 'static, + I: AsyncRead + AsyncWrite + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + self.conn.poll() + } +} + +impl fmt::Debug for Connection +where + S: HyperService, + S::ResponseBody: Stream, + ::Item: AsRef<[u8]>, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Connection") + .finish() + } +} + diff --git a/src/server/mod.rs b/src/server/mod.rs index b1416d16f0..b7d6f8a54f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,6 +5,7 @@ #[cfg(feature = "compat")] pub mod compat; +pub mod conn; mod service; use std::cell::RefCell; @@ -46,6 +47,7 @@ feat_server_proto! { }; } +pub use self::conn::Connection; pub use self::service::{const_service, service_fn}; /// A configuration of the HTTP protocol. @@ -108,34 +110,6 @@ pub struct AddrIncoming { timeout: Option, } -/// A future binding a connection with a Service. -/// -/// Polling this future will drive HTTP forward. -/// -/// # Note -/// -/// This will currently yield an unnameable (`Opaque`) value -/// on success. The purpose of this is that nothing can be assumed about -/// the type, not even it's name. It's probable that in a later release, -/// this future yields the underlying IO object, which could be done without -/// a breaking change. -/// -/// It is likely best to just map the value to `()`, for now. -#[must_use = "futures do nothing unless polled"] -pub struct Connection -where - S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, -{ - conn: proto::dispatch::Dispatcher< - proto::dispatch::Server, - S::ResponseBody, - I, - ::Item, - proto::ServerTransaction, - >, -} // ===== impl Http ===== @@ -567,70 +541,6 @@ where } */ -// ===== impl Connection ===== - -impl Future for Connection -where S: Service, Error = ::Error> + 'static, - I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, -{ - type Item = self::unnameable::Opaque; - type Error = ::Error; - - fn poll(&mut self) -> Poll { - try_ready!(self.conn.poll()); - Ok(self::unnameable::opaque().into()) - } -} - -impl fmt::Debug for Connection -where - S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Connection") - .finish() - } -} - -impl Connection -where S: Service, Error = ::Error> + 'static, - I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, -{ - /// Disables keep-alive for this connection. - pub fn disable_keep_alive(&mut self) { - self.conn.disable_keep_alive() - } -} - -mod unnameable { - // This type is specifically not exported outside the crate, - // so no one can actually name the type. With no methods, we make no - // promises about this type. - // - // All of that to say we can eventually replace the type returned - // to something else, and it would not be a breaking change. - // - // We may want to eventually yield the `T: AsyncRead + AsyncWrite`, which - // doesn't have a `Debug` bound. So, this type can't implement `Debug` - // either, so the type change doesn't break people. - #[allow(missing_debug_implementations)] - pub struct Opaque { - _inner: (), - } - - pub fn opaque() -> Opaque { - Opaque { - _inner: (), - } - } -} - // ===== impl AddrIncoming ===== impl AddrIncoming { diff --git a/tests/server.rs b/tests/server.rs index 50e1c17293..b3989db2bf 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1,5 +1,6 @@ #![deny(warnings)] extern crate hyper; +#[macro_use] extern crate futures; extern crate spmc; extern crate pretty_env_logger; @@ -930,6 +931,71 @@ fn returning_1xx_response_is_error() { core.run(fut).unwrap_err(); } +#[test] +fn upgrades() { + use tokio_io::io::{read_to_end, write_all}; + let _ = pretty_env_logger::try_init(); + let mut core = Core::new().unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let addr = listener.local_addr().unwrap(); + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + tcp.write_all(b"\ + GET / HTTP/1.1\r\n\ + Upgrade: foobar\r\n\ + Connection: upgrade\r\n\ + \r\n\ + eagerly optimistic\ + ").expect("write 1"); + let mut buf = [0; 256]; + tcp.read(&mut buf).expect("read 1"); + + let expected = "HTTP/1.1 101 Switching Protocols\r\n"; + assert_eq!(s(&buf[..expected.len()]), expected); + let _ = tx.send(()); + + let n = tcp.read(&mut buf).expect("read 2"); + assert_eq!(s(&buf[..n]), "foo=bar"); + tcp.write_all(b"bar=foo").expect("write 2"); + }); + + let fut = listener.incoming() + .into_future() + .map_err(|_| -> hyper::Error { unreachable!() }) + .and_then(|(item, _incoming)| { + let (socket, _) = item.unwrap(); + let conn = Http::::new() + .serve_connection(socket, service_fn(|_| { + let mut res = Response::::new() + .with_status(StatusCode::SwitchingProtocols); + res.headers_mut().set_raw("Upgrade", "foobar"); + Ok(res) + })); + + let mut conn_opt = Some(conn); + future::poll_fn(move || { + try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown()); + // conn is done with HTTP now + Ok(conn_opt.take().unwrap().into()) + }) + }); + + let conn = core.run(fut).unwrap(); + + // wait so that we don't write until other side saw 101 response + core.run(rx).unwrap(); + + let parts = conn.into_parts(); + let io = parts.io; + assert_eq!(parts.read_buf, "eagerly optimistic"); + + let io = core.run(write_all(io, b"foo=bar")).unwrap().0; + let vec = core.run(read_to_end(io, vec![])).unwrap().1; + assert_eq!(vec, b"bar=foo"); +} + #[test] fn parse_errors_send_4xx_response() { let mut core = Core::new().unwrap();