Skip to content

Commit

Permalink
fix(client): detect HTTP2 connection closures sooner
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Apr 29, 2019
1 parent 271bba1 commit e0ec5ca
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 7 deletions.
42 changes: 35 additions & 7 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::IntoBuf;
use futures::{Async, Future, Poll, Stream};
use futures::future::{self, Either};
use futures::sync::mpsc;
use futures::sync::{mpsc, oneshot};
use h2::client::{Builder, Handshake, SendRequest};
use tokio_io::{AsyncRead, AsyncWrite};

Expand All @@ -18,6 +18,10 @@ type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
/// other handles to it have been dropped, so that it can shutdown.
type ConnDropRef = mpsc::Sender<Never>;

/// A oneshot channel watches the `Connection` task, and when it completes,
/// the "dispatch" task will be notified and can shutdown sooner.
type ConnEof = oneshot::Receiver<Never>;

pub(crate) struct Client<T, B>
where
B: Payload,
Expand All @@ -29,7 +33,7 @@ where

enum State<T, B> where B: IntoBuf {
Handshaking(Handshake<T, B>),
Ready(SendRequest<B>, ConnDropRef),
Ready(SendRequest<B>, ConnDropRef, ConnEof),
}

impl<T, B> Client<T, B>
Expand Down Expand Up @@ -66,14 +70,18 @@ where
// in h2 where dropping all SendRequests won't notify a
// parked Connection.
let (tx, rx) = mpsc::channel(0);
let (cancel_tx, cancel_rx) = oneshot::channel();
let rx = rx.into_future()
.map(|(msg, _)| match msg {
Some(never) => match never {},
None => (),
})
.map_err(|_| -> Never { unreachable!("mpsc cannot error") });
let fut = conn
.inspect(|_| trace!("connection complete"))
.inspect(move |_| {
drop(cancel_tx);
trace!("connection complete")
})
.map_err(|e| debug!("connection error: {}", e))
.select2(rx)
.then(|res| match res {
Expand All @@ -92,10 +100,21 @@ where
Err(Either::B((never, _))) => match never {},
});
self.executor.execute(fut)?;
State::Ready(request_tx, tx)
State::Ready(request_tx, tx, cancel_rx)
},
State::Ready(ref mut tx, ref conn_dropper) => {
try_ready!(tx.poll_ready().map_err(::Error::new_h2));
State::Ready(ref mut tx, ref conn_dropper, ref mut cancel_rx) => {
match tx.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
return if err.reason() == Some(::h2::Reason::NO_ERROR) {
trace!("connection gracefully shutdown");
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Err(::Error::new_h2(err))
};
}
}
match self.rx.poll() {
Ok(Async::Ready(Some((req, cb)))) => {
// check that future hasn't been canceled already
Expand Down Expand Up @@ -157,7 +176,16 @@ where
continue;
},

Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => {
match cancel_rx.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_conn_is_eof) => {
trace!("connection task is closed, closing dispatch task");
return Ok(Async::Ready(Dispatched::Shutdown));
}
}
},

Ok(Async::Ready(None)) => {
trace!("client::dispatch::Sender dropped");
Expand Down
51 changes: 51 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2095,6 +2095,57 @@ mod conn {
assert_eq!(vec, b"bar=foo");
}


#[test]
fn http2_detect_conn_eof() {
use futures::future;
use hyper::{Response, Server};
use hyper::service::service_fn_ok;
use tokio::timer::Delay;

let _ = pretty_env_logger::try_init();

let mut rt = Runtime::new().unwrap();

let server = Server::bind(&([127, 0, 0, 1], 0).into())
.http2_only(true)
.serve(|| service_fn_ok(|_req| {
Response::new(Body::empty())
}));
let addr = server.local_addr();
let (shdn_tx, shdn_rx) = oneshot::channel();
rt.spawn(server.with_graceful_shutdown(shdn_rx).map_err(|e| panic!("server error: {:?}", e)));

let io = rt.block_on(tcp_connect(&addr)).expect("tcp connect");
let (mut client, conn) = rt.block_on(
conn::Builder::new().http2_only(true).handshake::<_, Body>(io)
).expect("http handshake");
rt.spawn(conn.map_err(|e| panic!("client conn error: {:?}", e)));


// Sanity check that client is ready
rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready sanity");

let req = Request::builder()
.uri(format!("http://{}/", addr))
.body(Body::empty())
.expect("request builder");

rt.block_on(client.send_request(req)).expect("req1 send");

// Sanity check that client is STILL ready
rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready after");

// Trigger the server shutdown...
let _ = shdn_tx.send(());

// Allow time for graceful shutdown roundtrips...
rt.block_on(Delay::new(::std::time::Instant::now() + Duration::from_millis(100)).map_err(|e| panic!("delay error: {:?}", e))).expect("delay");

// After graceful shutdown roundtrips, the client should be closed...
rt.block_on(future::poll_fn(|| client.poll_ready())).expect_err("client should be closed");
}

struct DebugStream {
tcp: TcpStream,
shutdown_called: bool,
Expand Down

0 comments on commit e0ec5ca

Please sign in to comment.