Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try h2 on h1 failure #1503

Merged
merged 1 commit into from
May 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ num_cpus = "1.0"
pretty_env_logger = "0.2.0"
spmc = "0.2"
url = "1.0"
tokio-mockstream = "1.1.0"

[features]
default = [
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub(crate) enum Kind {
pub(crate) enum Parse {
Method,
Version,
VersionH2,
Uri,
Header,
TooLarge,
Expand Down Expand Up @@ -164,6 +165,10 @@ impl Error {
Error::new(Kind::Parse(Parse::Version), None)
}

pub(crate) fn new_version_h2() -> Error {
Error::new(Kind::Parse(Parse::VersionH2), None)
}

pub(crate) fn new_mismatched_response() -> Error {
Error::new(Kind::MismatchedResponse, None)
}
Expand Down Expand Up @@ -250,6 +255,7 @@ impl StdError for Error {
match self.inner.kind {
Kind::Parse(Parse::Method) => "invalid Method specified",
Kind::Parse(Parse::Version) => "invalid HTTP version specified",
Kind::Parse(Parse::VersionH2) => "invalid HTTP version specified (Http2)",
Kind::Parse(Parse::Uri) => "invalid URI",
Kind::Parse(Parse::Header) => "invalid Header provided",
Kind::Parse(Parse::TooLarge) => "message head is too large",
Expand Down
11 changes: 11 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use proto::{BodyLength, Decode, Http1Transaction, MessageHead};
use super::io::{Buffered};
use super::{EncodedBuf, Encoder, Decoder};

const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

/// This handles a connection, which will have been established over an
/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
Expand Down Expand Up @@ -107,6 +108,11 @@ where I: AsyncRead + AsyncWrite,
T::should_error_on_parse_eof() && !self.state.is_idle()
}

fn has_h2_prefix(&self) -> bool {
let read_buf = self.io.read_buf();
read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
}

pub fn read_head(&mut self) -> Poll<Option<(MessageHead<T::Incoming>, bool)>, ::Error> {
debug_assert!(self.can_read_head());
trace!("Conn::read_head");
Expand All @@ -124,6 +130,7 @@ where I: AsyncRead + AsyncWrite,
self.io.consume_leading_lines();
let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
return if was_mid_parse || must_error {
// We check if the buf contains the h2 Preface
debug!("parse error ({}) with {} bytes", e, self.io.read_buf().len());
self.on_parse_error(e)
.map(|()| Async::NotReady)
Expand Down Expand Up @@ -529,8 +536,12 @@ where I: AsyncRead + AsyncWrite,
// - Client: there is nothing we can do
// - Server: if Response hasn't been written yet, we can send a 4xx response
fn on_parse_error(&mut self, err: ::Error) -> ::Result<()> {

match self.state.writing {
Writing::Init => {
if self.has_h2_prefix() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh great idea, so that Http1Transaction doesn't need to get more complicated, I like it!

return Err(::Error::new_version_h2())
}
if let Some(msg) = T::on_error(&err) {
self.write_head(msg, None);
self.state.error = Some(err);
Expand Down
3 changes: 3 additions & 0 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ impl<S> Server<S> where S: Service {
service: service,
}
}
pub fn into_service(self) -> S {
self.service
}
}

impl<S, Bs> Dispatch for Server<S>
Expand Down
6 changes: 3 additions & 3 deletions src/proto/h1/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ where
use ::error::{Kind, Parse};
let status = match *err.kind() {
Kind::Parse(Parse::Method) |
Kind::Parse(Parse::Version) |
Kind::Parse(Parse::Header) |
Kind::Parse(Parse::Uri) => {
Kind::Parse(Parse::Uri) |
Kind::Parse(Parse::Version) => {
StatusCode::BAD_REQUEST
},
Kind::Parse(Parse::TooLarge) => {
StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE
}
},
_ => return None,
};

Expand Down
63 changes: 50 additions & 13 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::fmt;
use std::sync::Arc;
#[cfg(feature = "runtime")] use std::time::Duration;

use super::rewind::Rewind;
use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use futures::future::{Either, Executor};
Expand All @@ -23,6 +24,7 @@ use common::Exec;
use proto;
use body::{Body, Payload};
use service::{NewService, Service};
use error::{Kind, Parse};

#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming;

Expand Down Expand Up @@ -74,31 +76,32 @@ pub(super) struct SpawnAll<I, S> {
///
/// Polling this future will drive HTTP forward.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<I, S>
pub struct Connection<T, S>
where
S: Service,
{
pub(super) conn: Either<
pub(super) conn: Option<
Either<
proto::h1::Dispatcher<
proto::h1::dispatch::Server<S>,
S::ResBody,
I,
T,
proto::ServerTransaction,
>,
proto::h2::Server<
I,
Rewind<T>,
S,
S::ResBody,
>,
>,
>>,
}

/// Deconstructed parts of a `Connection`.
///
/// This allows taking apart a `Connection` at a later time, in order to
/// reclaim the IO object, and additional related pieces.
#[derive(Debug)]
pub struct Parts<T, S> {
pub struct Parts<T, S> {
/// The original IO object used in the handshake.
pub io: T,
/// A buffer of bytes that have been read but not processed as HTTP.
Expand Down Expand Up @@ -239,12 +242,13 @@ impl Http {
let sd = proto::h1::dispatch::Server::new(service);
Either::A(proto::h1::Dispatcher::new(sd, conn))
} else {
let h2 = proto::h2::Server::new(io, service, self.exec.clone());
let rewind_io = Rewind::new(io);
let h2 = proto::h2::Server::new(rewind_io, service, self.exec.clone());
Either::B(h2)
};

Connection {
conn: either,
conn: Some(either),
}
}

Expand Down Expand Up @@ -322,7 +326,7 @@ where
/// This `Connection` should continue to be polled until shutdown
/// can finish.
pub fn graceful_shutdown(&mut self) {
match self.conn {
match *self.conn.as_mut().unwrap() {
Either::A(ref mut h1) => {
h1.disable_keep_alive();
},
Expand All @@ -334,11 +338,12 @@ where

/// Return the inner IO object, and additional information.
///
/// If the IO object has been "rewound" the io will not contain those bytes rewound.
/// This should only be called after `poll_without_shutdown` signals
/// that the connection is "done". Otherwise, it may not have finished
/// flushing all necessary HTTP bytes.
pub fn into_parts(self) -> Parts<I, S> {
let (io, read_buf, dispatch) = match self.conn {
let (io, read_buf, dispatch) = match self.conn.unwrap() {
Either::A(h1) => {
h1.into_inner()
},
Expand All @@ -349,7 +354,7 @@ where
Parts {
io: io,
read_buf: read_buf,
service: dispatch.service,
service: dispatch.into_service(),
_inner: (),
}
}
Expand All @@ -362,14 +367,37 @@ where
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
match self.conn {
match *self.conn.as_mut().unwrap() {
Either::A(ref mut h1) => {
try_ready!(h1.poll_without_shutdown());
Ok(().into())
},
Either::B(ref mut h2) => h2.poll(),
}
}

fn try_h2(&mut self) -> Poll<(), ::Error> {
trace!("Trying to upgrade connection to h2");
let conn = self.conn.take();

let (io, read_buf, dispatch) = match conn.unwrap() {
Either::A(h1) => {
h1.into_inner()
},
Either::B(_h2) => {
panic!("h2 cannot into_inner");
}
};
let mut rewind_io = Rewind::new(io);
rewind_io.rewind(read_buf);
let mut h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), Exec::Default);
let pr = h2.poll();

debug_assert!(self.conn.is_none());
self.conn = Some(Either::B(h2));

pr
}
}

impl<I, B, S> Future for Connection<I, S>
Expand All @@ -384,7 +412,16 @@ where
type Error = ::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.conn.poll()
match self.conn.poll() {
Ok(x) => Ok(x.map(|o| o.unwrap_or_else(|| ()))),
Err(e) => {
debug!("error polling connection protocol: {}", e);
match *e.kind() {
Kind::Parse(Parse::VersionH2) => self.try_h2(),
_ => Err(e),
}
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

pub mod conn;
#[cfg(feature = "runtime")] mod tcp;
mod rewind;

use std::fmt;
#[cfg(feature = "runtime")] use std::net::SocketAddr;
Expand Down
Loading