diff --git a/tonic/src/transport/channel/endpoint.rs b/tonic/src/transport/channel/endpoint.rs index 3934c2ea7..3988b9153 100644 --- a/tonic/src/transport/channel/endpoint.rs +++ b/tonic/src/transport/channel/endpoint.rs @@ -214,6 +214,25 @@ impl Endpoint { Channel::connect(connector, self.clone()).await } + /// Create a channel from this config. + /// + /// The channel returned by this method does not attempt to connect to the endpoint until first + /// use. + pub fn connect_lazy(&self) -> Result { + let mut http = hyper::client::connect::HttpConnector::new(); + http.enforce_http(false); + http.set_nodelay(self.tcp_nodelay); + http.set_keepalive(self.tcp_keepalive); + + #[cfg(feature = "tls")] + let connector = service::connector(http, self.tls.clone()); + + #[cfg(not(feature = "tls"))] + let connector = service::connector(http); + + Channel::new(connector, self.clone()) + } + /// Connect with a custom connector. pub async fn connect_with_connector(&self, connector: C) -> Result where diff --git a/tonic/src/transport/channel/mod.rs b/tonic/src/transport/channel/mod.rs index 773f8f20f..f5b14860d 100644 --- a/tonic/src/transport/channel/mod.rs +++ b/tonic/src/transport/channel/mod.rs @@ -130,6 +130,21 @@ impl Channel { (Self::balance(list, DEFAULT_BUFFER_SIZE), tx) } + pub(crate) fn new(connector: C, endpoint: Endpoint) -> Result + where + C: Service + Send + 'static, + C::Error: Into + Send, + C::Future: Unpin + Send, + C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + { + let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); + + let svc = Connection::new(connector, endpoint).map_err(super::Error::from_source)?; + let svc = Buffer::new(Either::A(svc), buffer_size); + + Ok(Channel { svc }) + } + pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, @@ -139,10 +154,9 @@ impl Channel { { let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE); - let svc = Connection::new(connector, endpoint) + let svc = Connection::connect(connector, endpoint) .await - .map_err(|e| super::Error::from_source(e))?; - + .map_err(super::Error::from_source)?; let svc = Buffer::new(Either::A(svc), buffer_size); Ok(Channel { svc }) diff --git a/tonic/src/transport/service/connection.rs b/tonic/src/transport/service/connection.rs index dcb086f09..a3a934973 100644 --- a/tonic/src/transport/service/connection.rs +++ b/tonic/src/transport/service/connection.rs @@ -16,7 +16,7 @@ use tower::{ limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer}, timeout::TimeoutLayer, util::BoxService, - ServiceBuilder, + ServiceBuilder, ServiceExt, }; use tower_load::Load; use tower_service::Service; @@ -29,7 +29,7 @@ pub(crate) struct Connection { } impl Connection { - pub(crate) async fn new(connector: C, endpoint: Endpoint) -> Result + pub(crate) fn new(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, C::Error: Into + Send, @@ -60,9 +60,8 @@ impl Connection { .optional_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) .into_inner(); - let mut connector = HyperConnect::new(connector, settings); - let initial_conn = connector.call(endpoint.uri.clone()).await?; - let conn = Reconnect::new(initial_conn, connector, endpoint.uri.clone()); + let connector = HyperConnect::new(connector, settings); + let conn = Reconnect::new(connector, endpoint.uri.clone()); let inner = stack.layer(conn); @@ -70,6 +69,16 @@ impl Connection { inner: BoxService::new(inner), }) } + + pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result + where + C: Service + Send + 'static, + C::Error: Into + Send, + C::Future: Unpin + Send, + C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, + { + Self::new(connector, endpoint)?.ready_oneshot().await + } } impl Service for Connection { diff --git a/tonic/src/transport/service/discover.rs b/tonic/src/transport/service/discover.rs index bed7f1310..55a98cdc1 100644 --- a/tonic/src/transport/service/discover.rs +++ b/tonic/src/transport/service/discover.rs @@ -64,7 +64,7 @@ impl Discover for DynamicServiceStream { #[cfg(not(feature = "tls"))] let connector = service::connector(http); - let fut = Connection::new(connector, endpoint); + let fut = Connection::connect(connector, endpoint); self.connecting = Some((k, Box::pin(fut))); continue; } diff --git a/tonic/src/transport/service/reconnect.rs b/tonic/src/transport/service/reconnect.rs index 68a19d79a..3e6e7015f 100644 --- a/tonic/src/transport/service/reconnect.rs +++ b/tonic/src/transport/service/reconnect.rs @@ -31,16 +31,10 @@ impl Reconnect where M: Service, { - pub(crate) fn new(initial_connection: S, mk_service: M, target: Target) -> Self - where - M: Service, - S: Service, - Error: From + From, - Target: Clone, - { + pub(crate) fn new(mk_service: M, target: Target) -> Self { Reconnect { mk_service, - state: State::Connected(initial_connection), + state: State::Idle, target, error: None, }