Skip to content

Commit

Permalink
fix(transport): Return connection error on Channel::connect (#413)
Browse files Browse the repository at this point in the history
Before this fix, if the connect phase of the transport failed before
ever establishing a connection, we would never return the error until
the first call to send a request. This PR changes that behavior to only
forward the error to the call method if we have ever made a connection
before. If we have never established a connection before then
`Reconnect` will return an error on the call to `poll_ready`.

Fixes #403
  • Loading branch information
LucioFranco authored Jul 27, 2020
1 parent 5bd8a04 commit 2ea17b2
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
55 changes: 55 additions & 0 deletions tests/integration_tests/tests/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use futures_util::FutureExt;
use integration_tests::pb::{test_client::TestClient, test_server, Input, Output};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::oneshot;
use tonic::{transport::Server, Request, Response, Status};

#[tokio::test]
async fn connect_returns_err() {
let res = TestClient::connect("http://thisdoesntexist").await;

assert!(res.is_err());
}

#[tokio::test]
async fn connect_returns_err_via_call_after_connected() {
struct Svc(Arc<Mutex<Option<oneshot::Sender<()>>>>);

#[tonic::async_trait]
impl test_server::Test for Svc {
async fn unary_call(&self, _: Request<Input>) -> Result<Response<Output>, Status> {
let mut l = self.0.lock().unwrap();
l.take().unwrap().send(()).unwrap();

Ok(Response::new(Output {}))
}
}

let (tx, rx) = oneshot::channel();
let sender = Arc::new(Mutex::new(Some(tx)));
let svc = test_server::TestServer::new(Svc(sender));

let jh = tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_shutdown("127.0.0.1:1338".parse().unwrap(), rx.map(drop))
.await
.unwrap();
});

tokio::time::delay_for(Duration::from_millis(100)).await;

let mut client = TestClient::connect("http://127.0.0.1:1338").await.unwrap();

// First call should pass, then shutdown the server
client.unary_call(Request::new(Input {})).await.unwrap();

tokio::time::delay_for(Duration::from_millis(100)).await;

let res = client.unary_call(Request::new(Input {})).await;

assert!(res.is_err());

jh.await.unwrap();
}
15 changes: 13 additions & 2 deletions tonic/src/transport/service/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ where
state: State<M::Future, M::Response>,
target: Target,
error: Option<M::Error>,
has_been_connected: bool,
}

#[derive(Debug)]
Expand All @@ -37,6 +38,7 @@ where
state: State::Idle,
target,
error: None,
has_been_connected: false,
}
}
}
Expand Down Expand Up @@ -84,14 +86,23 @@ where
}
Poll::Ready(Err(e)) => {
trace!("poll_ready; error");

state = State::Idle;
self.error = Some(e.into());
break;

if self.has_been_connected {
self.error = Some(e.into());
break;
} else {
return Poll::Ready(Err(e.into()));
}
}
}
}
State::Connected(ref mut inner) => {
trace!("poll_ready; connected");

self.has_been_connected = true;

match inner.poll_ready(cx) {
Poll::Ready(Ok(())) => {
trace!("poll_ready; ready");
Expand Down

0 comments on commit 2ea17b2

Please sign in to comment.