diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index dae6e442b36..7489669f145 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -80,7 +80,43 @@ Introduce `upgrade::read_length_prefixed` and `upgrade::write_length_prefixed`. See [PR 2111](https://github.com/libp2p/rust-libp2p/pull/2111). +- Add support for multistream-select [simultaneous open extension] to assign _initiator_ and + _responder_ role during authentication protocol negotiation on simultaneously opened connection. + + This is one important component of the greater effort to support hole punching in rust-libp2p. + + - `Transport::upgrade` no longer takes a multistream-select `Version`. Instead the + multistream-select `Version`s `V1`, `V1Lazy` and `V1SimultaneousOpen` can be selected when + setting the authentication upgrade via `Builder::authenticate_with_version` and the + multistream-select `Version`s `V1` and `V1Lazy` can be selected when setting the multiplexing + upgrade via `Builder::multiplex_with_version`. + + Users merely wanting to maintain the status quo should use the following call chain depending + on which `Version` they previously used: + + - `Version::V1` + + ```rust + my_transport.upgrade() + .authenticate(my_authentication) + .multiplex(my_multiplexer) + ``` + - `Version::V1Lazy` + + ```rust + my_transport.upgrade() + .authenticate_with_version(my_authentication, Version::V1Lazy) + .multiplex_with_version(my_multiplexer, Version::V1Lazy) + ``` + + - `Builder::multiplex_ext` is removed in favor of the new simultaneous open workflow. Please reach + out in case you depend on `Builder::multiplex_ext`. + + See [PR 2066]. + [PR 2090]: https://github.com/libp2p/rust-libp2p/pull/2090 +[simultaneous open extension]: https://github.com/libp2p/specs/blob/master/connections/simopen.md +[PR 2066]: https://github.com/libp2p/rust-libp2p/pull/2066 # 0.28.3 [2021-04-26] diff --git a/core/Cargo.toml b/core/Cargo.toml index 8573f45cf7c..b9edbddf9ff 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,7 +23,7 @@ libsecp256k1 = { version = "0.7.0", optional = true } log = "0.4" multiaddr = { version = "0.13.0" } multihash = { version = "0.14", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] } -multistream-select = { version = "0.10", path = "../misc/multistream-select" } +multistream-select = { version = "0.11", path = "../misc/multistream-select" } parking_lot = "0.11.0" pin-project = "1.0.0" prost = "0.9" diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index 4c394aeb75d..b15bd2573e0 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -412,8 +412,6 @@ where #[cfg(test)] mod tests { - use futures::{future::BoxFuture, stream::BoxStream}; - use super::*; use crate::transport; @@ -463,12 +461,18 @@ mod tests { impl transport::Transport for DummyTrans { type Output = (); type Error = std::io::Error; - type Listener = BoxStream< - 'static, - Result, std::io::Error>, + type Listener = Pin< + Box< + dyn Stream< + Item = Result< + ListenerEvent, + std::io::Error, + >, + >, + >, >; - type ListenerUpgrade = BoxFuture<'static, Result>; - type Dial = BoxFuture<'static, Result>; + type ListenerUpgrade = Pin>>>; + type Dial = Pin>>>; fn listen_on( self, @@ -519,12 +523,18 @@ mod tests { impl transport::Transport for DummyTrans { type Output = (); type Error = std::io::Error; - type Listener = BoxStream< - 'static, - Result, std::io::Error>, + type Listener = Pin< + Box< + dyn Stream< + Item = Result< + ListenerEvent, + std::io::Error, + >, + >, + >, >; - type ListenerUpgrade = BoxFuture<'static, Result>; - type Dial = BoxFuture<'static, Result>; + type ListenerUpgrade = Pin>>>; + type Dial = Pin>>>; fn listen_on( self, diff --git a/core/src/transport.rs b/core/src/transport.rs index 7006c15a810..ade8daf6a6c 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -46,7 +46,6 @@ pub use self::boxed::Boxed; pub use self::choice::OrTransport; pub use self::memory::MemoryTransport; pub use self::optional::OptionalTransport; -pub use self::upgrade::Upgrade; /// A transport provides connection-oriented communication between two peers /// through ordered streams of data (i.e. connections). @@ -198,12 +197,12 @@ pub trait Transport { /// Begins a series of protocol upgrades via an /// [`upgrade::Builder`](upgrade::Builder). - fn upgrade(self, version: upgrade::Version) -> upgrade::Builder + fn upgrade(self) -> upgrade::Builder where Self: Sized, Self::Error: 'static, { - upgrade::Builder::new(self, version) + upgrade::Builder::new(self) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 7777be9256e..4a0d50392ff 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -20,25 +20,21 @@ //! Configuration of transport protocol upgrades. -pub use crate::upgrade::Version; - use crate::{ muxing::{StreamMuxer, StreamMuxerBox}, transport::{ - and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, - TransportError, + and_then::AndThen, boxed::boxed, timeout::TransportTimeout, Transport, TransportError, }, upgrade::{ - self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, - OutboundUpgradeApply, UpgradeError, + self, AuthenticationUpgradeApply, AuthenticationVersion, InboundUpgrade, + InboundUpgradeApply, OutboundUpgrade, OutboundUpgradeApply, Role, UpgradeError, Version, }, ConnectedPoint, Negotiated, PeerId, }; -use futures::{prelude::*, ready}; +use futures::{future::Either, prelude::*, ready}; use multiaddr::Multiaddr; use std::{ error::Error, - fmt, pin::Pin, task::{Context, Poll}, time::Duration, @@ -68,7 +64,6 @@ use std::{ #[derive(Clone)] pub struct Builder { inner: T, - version: upgrade::Version, } impl Builder @@ -77,8 +72,8 @@ where T::Error: 'static, { /// Creates a `Builder` over the given (base) `Transport`. - pub fn new(inner: T, version: upgrade::Version) -> Builder { - Builder { inner, version } + pub fn new(inner: T) -> Builder { + Builder { inner } } /// Upgrades the transport to perform authentication of the remote. @@ -96,7 +91,9 @@ where pub fn authenticate( self, upgrade: U, - ) -> Authenticated Authenticate + Clone>> + ) -> Authenticated< + AndThen AuthenticationUpgradeApply + Clone>, + > where T: Transport, C: AsyncRead + AsyncWrite + Unpin, @@ -105,82 +102,29 @@ where U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone, E: Error + 'static, { - let version = self.version; - Authenticated(Builder::new( - self.inner.and_then(move |conn, endpoint| Authenticate { - inner: upgrade::apply(conn, upgrade, endpoint, version), - }), - version, - )) - } -} - -/// An upgrade that authenticates the remote peer, typically -/// in the context of negotiating a secure channel. -/// -/// Configured through [`Builder::authenticate`]. -#[pin_project::pin_project] -pub struct Authenticate -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, -{ - #[pin] - inner: EitherUpgrade, -} - -impl Future for Authenticate -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> - + OutboundUpgrade< - Negotiated, - Output = >>::Output, - Error = >>::Error, - >, -{ - type Output = as Future>::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - Future::poll(this.inner, cx) + self.authenticate_with_version(upgrade, AuthenticationVersion::default()) } -} -/// An upgrade that negotiates a (sub)stream multiplexer on -/// top of an authenticated transport. -/// -/// Configured through [`Authenticated::multiplex`]. -#[pin_project::pin_project] -pub struct Multiplex -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, -{ - peer_id: Option, - #[pin] - upgrade: EitherUpgrade, -} - -impl Future for Multiplex -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E>, -{ - type Output = Result<(PeerId, M), UpgradeError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let m = match ready!(Future::poll(this.upgrade, cx)) { - Ok(m) => m, - Err(err) => return Poll::Ready(Err(err)), - }; - let i = this - .peer_id - .take() - .expect("Multiplex future polled after completion."); - Poll::Ready(Ok((i, m))) + /// Same as [`Builder::authenticate`] with the option to choose the + /// [`AuthenticationVersion`] used to upgrade the connection. + pub fn authenticate_with_version( + self, + upgrade: U, + version: AuthenticationVersion, + ) -> Authenticated< + AndThen AuthenticationUpgradeApply + Clone>, + > + where + T: Transport, + C: AsyncRead + AsyncWrite + Unpin, + D: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade, Output = (PeerId, D), Error = E>, + U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone, + E: Error + 'static, + { + Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| { + upgrade::apply_authentication(conn, upgrade, endpoint, version) + }))) } } @@ -203,19 +147,66 @@ where /// /// * I/O upgrade: `C -> D`. /// * Transport output: `(PeerId, C) -> (PeerId, D)`. - pub fn apply(self, upgrade: U) -> Authenticated> + pub fn apply( + self, + upgrade: U, + ) -> Authenticated< + AndThen< + T, + impl FnOnce( + ((PeerId, Role), C), + ConnectedPoint, + ) -> UpgradeAuthenticated + + Clone, + >, + > + where + T: Transport, + C: AsyncRead + AsyncWrite + Unpin, + D: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade, Output = D, Error = E>, + U: OutboundUpgrade, Output = D, Error = E> + Clone, + E: Error + 'static, + { + self.apply_with_version(upgrade, Version::default()) + } + + /// Same as [`Authenticated::apply`] with the option to choose the + /// [`Version`] used to upgrade the connection. + pub fn apply_with_version( + self, + upgrade: U, + version: Version, + ) -> Authenticated< + AndThen< + T, + impl FnOnce( + ((PeerId, Role), C), + ConnectedPoint, + ) -> UpgradeAuthenticated + + Clone, + >, + > where - T: Transport, + T: Transport, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade, Output = D, Error = E>, U: OutboundUpgrade, Output = D, Error = E> + Clone, E: Error + 'static, { - Authenticated(Builder::new( - Upgrade::new(self.0.inner, upgrade), - self.0.version, - )) + Authenticated(Builder::new(self.0.inner.and_then( + move |((i, r), c), _endpoint| { + let upgrade = match r { + Role::Initiator => Either::Left(upgrade::apply_outbound(c, upgrade, version)), + Role::Responder => Either::Right(upgrade::apply_inbound(c, upgrade)), + }; + UpgradeAuthenticated { + user_data: Some((i, r)), + upgrade, + } + }, + ))) } /// Upgrades the transport with a (sub)stream multiplexer. @@ -231,60 +222,95 @@ where pub fn multiplex( self, upgrade: U, - ) -> Multiplexed Multiplex + Clone>> + ) -> Multiplexed< + AndThen< + T, + impl FnOnce(((PeerId, Role), C), ConnectedPoint) -> UpgradeAuthenticated + + Clone, + >, + > where - T: Transport, + T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, U: InboundUpgrade, Output = M, Error = E>, U: OutboundUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, { - let version = self.0.version; - Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| { - let upgrade = upgrade::apply(c, upgrade, endpoint, version); - Multiplex { - peer_id: Some(i), - upgrade, - } - })) + self.multiplex_with_version(upgrade, Version::default()) } - /// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade. - /// - /// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`] - /// and returns an upgrade which receives the I/O resource `C` and must - /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated. - /// This ends the (regular) transport upgrade process. - /// - /// ## Transitions - /// - /// * I/O upgrade: `C -> M`. - /// * Transport output: `(PeerId, C) -> (PeerId, M)`. - pub fn multiplex_ext( + /// Same as [`Authenticated::multiplex`] with the option to choose the + /// [`Version`] used to upgrade the connection. + pub fn multiplex_with_version( self, - up: F, - ) -> Multiplexed Multiplex + Clone>> + upgrade: U, + version: Version, + ) -> Multiplexed< + AndThen< + T, + impl FnOnce(((PeerId, Role), C), ConnectedPoint) -> UpgradeAuthenticated + + Clone, + >, + > where - T: Transport, + T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, U: InboundUpgrade, Output = M, Error = E>, U: OutboundUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, - F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone, { - let version = self.0.version; - Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| { - let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version); - Multiplex { - peer_id: Some(peer_id), + Multiplexed(self.0.inner.and_then(move |((i, r), c), _endpoint| { + let upgrade = match r { + Role::Initiator => Either::Left(upgrade::apply_outbound(c, upgrade, version)), + Role::Responder => Either::Right(upgrade::apply_inbound(c, upgrade)), + }; + UpgradeAuthenticated { + user_data: Some(i), upgrade, } })) } } +/// An upgrade that negotiates a (sub)stream multiplexer on +/// top of an authenticated transport. +/// +/// Configured through [`Authenticated::multiplex`]. +#[pin_project::pin_project] +pub struct UpgradeAuthenticated +where + C: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade> + OutboundUpgrade>, +{ + user_data: Option, + #[pin] + upgrade: EitherUpgrade, +} + +impl Future for UpgradeAuthenticated +where + C: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade, Output = M, Error = E>, + U: OutboundUpgrade, Output = M, Error = E>, +{ + type Output = Result<(D, M), UpgradeError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let m = match ready!(Future::poll(this.upgrade, cx)) { + Ok(m) => m, + Err(err) => return Poll::Ready(Err(err)), + }; + let user_data = this + .user_data + .take() + .expect("UpgradeAuthenticated future polled after completion."); + Poll::Ready(Ok((user_data, m))) + } +} + /// A authenticated and multiplexed transport, obtained from /// [`Authenticated::multiplex`]. #[derive(Clone)] @@ -350,257 +376,4 @@ where } /// An inbound or outbound upgrade. -type EitherUpgrade = future::Either, OutboundUpgradeApply>; - -/// A custom upgrade on an [`Authenticated`] transport. -/// -/// See [`Transport::upgrade`] -#[derive(Debug, Copy, Clone)] -pub struct Upgrade { - inner: T, - upgrade: U, -} - -impl Upgrade { - pub fn new(inner: T, upgrade: U) -> Self { - Upgrade { inner, upgrade } - } -} - -impl Transport for Upgrade -where - T: Transport, - T::Error: 'static, - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, - E: Error + 'static, -{ - type Output = (PeerId, D); - type Error = TransportUpgradeError; - type Listener = ListenerStream; - type ListenerUpgrade = ListenerUpgradeFuture; - type Dial = DialUpgradeFuture; - - fn dial(self, addr: Multiaddr) -> Result> { - let future = self - .inner - .dial(addr) - .map_err(|err| err.map(TransportUpgradeError::Transport))?; - Ok(DialUpgradeFuture { - future: Box::pin(future), - upgrade: future::Either::Left(Some(self.upgrade)), - }) - } - - fn listen_on(self, addr: Multiaddr) -> Result> { - let stream = self - .inner - .listen_on(addr) - .map_err(|err| err.map(TransportUpgradeError::Transport))?; - Ok(ListenerStream { - stream: Box::pin(stream), - upgrade: self.upgrade, - }) - } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) - } -} - -/// Errors produced by a transport upgrade. -#[derive(Debug)] -pub enum TransportUpgradeError { - /// Error in the transport. - Transport(T), - /// Error while upgrading to a protocol. - Upgrade(UpgradeError), -} - -impl fmt::Display for TransportUpgradeError -where - T: fmt::Display, - U: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e), - TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e), - } - } -} - -impl Error for TransportUpgradeError -where - T: Error + 'static, - U: Error + 'static, -{ - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - TransportUpgradeError::Transport(e) => Some(e), - TransportUpgradeError::Upgrade(e) => Some(e), - } - } -} - -/// The [`Transport::Dial`] future of an [`Upgrade`]d transport. -pub struct DialUpgradeFuture -where - U: OutboundUpgrade>, - C: AsyncRead + AsyncWrite + Unpin, -{ - future: Pin>, - upgrade: future::Either, (Option, OutboundUpgradeApply)>, -} - -impl Future for DialUpgradeFuture -where - F: TryFuture, - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, Output = D>, - U::Error: Error, -{ - type Output = Result<(PeerId, D), TransportUpgradeError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // We use a `this` variable because the compiler can't mutably borrow multiple times - // accross a `Deref`. - let this = &mut *self; - - loop { - this.upgrade = match this.upgrade { - future::Either::Left(ref mut up) => { - let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx) - .map_err(TransportUpgradeError::Transport)) - { - Ok(v) => v, - Err(err) => return Poll::Ready(Err(err)), - }; - let u = up - .take() - .expect("DialUpgradeFuture is constructed with Either::Left(Some)."); - future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1))) - } - future::Either::Right((ref mut i, ref mut up)) => { - let d = match ready!( - Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade) - ) { - Ok(d) => d, - Err(err) => return Poll::Ready(Err(err)), - }; - let i = i - .take() - .expect("DialUpgradeFuture polled after completion."); - return Poll::Ready(Ok((i, d))); - } - } - } - } -} - -impl Unpin for DialUpgradeFuture -where - U: OutboundUpgrade>, - C: AsyncRead + AsyncWrite + Unpin, -{ -} - -/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport. -pub struct ListenerStream { - stream: Pin>, - upgrade: U, -} - -impl Stream for ListenerStream -where - S: TryStream, Error = E>, - F: TryFuture, - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D> + Clone, -{ - type Item = Result< - ListenerEvent, TransportUpgradeError>, - TransportUpgradeError, - >; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) { - Some(Ok(event)) => { - let event = event - .map(move |future| ListenerUpgradeFuture { - future: Box::pin(future), - upgrade: future::Either::Left(Some(self.upgrade.clone())), - }) - .map_err(TransportUpgradeError::Transport); - Poll::Ready(Some(Ok(event))) - } - Some(Err(err)) => Poll::Ready(Some(Err(TransportUpgradeError::Transport(err)))), - None => Poll::Ready(None), - } - } -} - -impl Unpin for ListenerStream {} - -/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport. -pub struct ListenerUpgradeFuture -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, -{ - future: Pin>, - upgrade: future::Either, (Option, InboundUpgradeApply)>, -} - -impl Future for ListenerUpgradeFuture -where - F: TryFuture, - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D>, - U::Error: Error, -{ - type Output = Result<(PeerId, D), TransportUpgradeError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // We use a `this` variable because the compiler can't mutably borrow multiple times - // accross a `Deref`. - let this = &mut *self; - - loop { - this.upgrade = match this.upgrade { - future::Either::Left(ref mut up) => { - let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx) - .map_err(TransportUpgradeError::Transport)) - { - Ok(v) => v, - Err(err) => return Poll::Ready(Err(err)), - }; - let u = up - .take() - .expect("ListenerUpgradeFuture is constructed with Either::Left(Some)."); - future::Either::Right((Some(i), apply_inbound(c, u))) - } - future::Either::Right((ref mut i, ref mut up)) => { - let d = match ready!(TryFuture::try_poll(Pin::new(up), cx) - .map_err(TransportUpgradeError::Upgrade)) - { - Ok(v) => v, - Err(err) => return Poll::Ready(Err(err)), - }; - let i = i - .take() - .expect("ListenerUpgradeFuture polled after completion."); - return Poll::Ready(Ok((i, d))); - } - } - } - } -} - -impl Unpin for ListenerUpgradeFuture -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, -{ -} +type EitherUpgrade = future::Either, InboundUpgradeApply>; diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index d9edd6492bc..2dad9419418 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -70,7 +70,10 @@ mod transfer; use futures::future::Future; pub use self::{ - apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, + apply::{ + apply, apply_authentication, apply_inbound, apply_outbound, AuthenticationUpgradeApply, + AuthenticationVersion, InboundUpgradeApply, OutboundUpgradeApply, Version, + }, denied::DeniedUpgrade, either::EitherUpgrade, error::UpgradeError, @@ -81,7 +84,7 @@ pub use self::{ transfer::{read_length_prefixed, read_varint, write_length_prefixed, write_varint}, }; pub use crate::Negotiated; -pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version}; +pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Role}; /// Types serving as protocol names. /// diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 3b4763d2303..cd8e81b8279 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -19,15 +19,53 @@ // DEALINGS IN THE SOFTWARE. use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; -use crate::{ConnectedPoint, Negotiated}; -use futures::{future::Either, prelude::*}; +use crate::{ConnectedPoint, Negotiated, PeerId}; +use futures::{ + future::{Either, MapOk, TryFutureExt}, + prelude::*, +}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::{iter, mem, pin::Pin, task::Context, task::Poll}; -pub use multistream_select::Version; +pub use multistream_select::{NegotiationError, Role}; + +/// Wrapper around multistream-select `Version`. +/// +/// See [`multistream_select::Version`] for details. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Version { + /// See [`multistream_select::Version::V1`]. + V1, + /// See [`multistream_select::Version::V1Lazy`]. + V1Lazy, +} + +impl From for multistream_select::Version { + fn from(v: Version) -> Self { + match v { + Version::V1 => multistream_select::Version::V1, + Version::V1Lazy => multistream_select::Version::V1Lazy, + } + } +} + +impl Default for Version { + fn default() -> Self { + match multistream_select::Version::default() { + multistream_select::Version::V1 => Version::V1, + multistream_select::Version::V1Lazy => Version::V1Lazy, + multistream_select::Version::V1SimultaneousOpen => { + unreachable!("see `v1_sim_open_is_not_default`") + } + } + } +} /// Applies an upgrade to the inbound and outbound direction of a connection or substream. +/// +/// Note: Use [`apply_authentication`] when negotiating an authentication protocol on top of a +/// transport allowing simultaneously opened connections. pub fn apply( conn: C, up: U, @@ -74,7 +112,7 @@ where .protocol_info() .into_iter() .map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::dialer_select_proto(conn, iter, v); + let future = multistream_select::dialer_select_proto(conn, iter, v.into()); OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { future, @@ -208,13 +246,18 @@ where mut future, upgrade, } => { - let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? { + let (info, connection, role) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; return Poll::Pending; } }; + assert_eq!( + role, Role::Initiator, + "Expect negotiation not using `Version::V1SimultaneousOpen` to either return \ + as `Initiator` or fail.", + ); self.inner = OutboundUpgradeApplyState::Upgrade { future: Box::pin(upgrade.upgrade_outbound(connection, info.0)), }; @@ -243,14 +286,223 @@ where } } -type NameWrapIter = iter::Map::Item) -> NameWrap<::Item>>; +/// Wrapper around multistream-select `Version`. +/// +/// See [`multistream_select::Version`] for details. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum AuthenticationVersion { + /// See [`multistream_select::Version::V1`]. + V1, + /// See [`multistream_select::Version::V1Lazy`]. + V1Lazy, + /// See [`multistream_select::Version::V1SimultaneousOpen`]. + V1SimultaneousOpen, +} + +impl Default for AuthenticationVersion { + fn default() -> Self { + match multistream_select::Version::default() { + multistream_select::Version::V1 => AuthenticationVersion::V1, + multistream_select::Version::V1Lazy => AuthenticationVersion::V1Lazy, + multistream_select::Version::V1SimultaneousOpen => { + AuthenticationVersion::V1SimultaneousOpen + } + } + } +} + +impl From for multistream_select::Version { + fn from(v: AuthenticationVersion) -> Self { + match v { + AuthenticationVersion::V1 => multistream_select::Version::V1, + AuthenticationVersion::V1Lazy => multistream_select::Version::V1Lazy, + AuthenticationVersion::V1SimultaneousOpen => { + multistream_select::Version::V1SimultaneousOpen + } + } + } +} + +impl From for AuthenticationVersion { + fn from(v: Version) -> Self { + match v { + Version::V1 => AuthenticationVersion::V1, + Version::V1Lazy => AuthenticationVersion::V1Lazy, + } + } +} + +/// Applies an authentication upgrade to the inbound or outbound direction of a connection. +/// +/// Note: This is like [`apply`] with additional support for transports allowing simultaneously +/// opened connections. Unless run on such transport and used to negotiate the authentication +/// protocol you likely want to use [`apply`] instead of [`apply_authentication`]. +pub fn apply_authentication( + conn: C, + up: U, + cp: ConnectedPoint, + v: AuthenticationVersion, +) -> AuthenticationUpgradeApply +where + C: AsyncRead + AsyncWrite + Unpin, + D: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade, Output = (PeerId, D)>, + U: OutboundUpgrade< + Negotiated, + Output = (PeerId, D), + Error = >>::Error, + > + Clone, +{ + fn add_responder(input: (P, C)) -> (P, C, Role) { + (input.0, input.1, Role::Responder) + } + + let iter = up + .protocol_info() + .into_iter() + .map(NameWrap as fn(_) -> NameWrap<_>); + + AuthenticationUpgradeApply { + inner: AuthenticationUpgradeApplyState::Init { + future: match cp { + ConnectedPoint::Dialer { .. } => Either::Left( + multistream_select::dialer_select_proto(conn, iter, v.into()), + ), + ConnectedPoint::Listener { .. } => Either::Right( + multistream_select::listener_select_proto(conn, iter) + .map_ok(add_responder as fn(_) -> _), + ), + }, + upgrade: up, + }, + } +} + +pub struct AuthenticationUpgradeApply +where + U: InboundUpgrade> + OutboundUpgrade>, +{ + inner: AuthenticationUpgradeApplyState, +} + +impl Unpin for AuthenticationUpgradeApply +where + C: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade> + OutboundUpgrade>, +{ +} + +enum AuthenticationUpgradeApplyState +where + U: InboundUpgrade> + OutboundUpgrade>, +{ + Init { + future: Either< + multistream_select::DialerSelectFuture< + C, + NameWrapIter<::IntoIter>, + >, + MapOk< + ListenerSelectFuture>, + fn((NameWrap, Negotiated)) -> (NameWrap, Negotiated, Role), + >, + >, + upgrade: U, + }, + Upgrade { + role: Role, + future: Either< + Pin>>::Future>>, + Pin>>::Future>>, + >, + }, + Undefined, +} + +impl Future for AuthenticationUpgradeApply +where + C: AsyncRead + AsyncWrite + Unpin, + D: AsyncRead + AsyncWrite + Unpin, + U: InboundUpgrade, Output = (PeerId, D)>, + U: OutboundUpgrade< + Negotiated, + Output = (PeerId, D), + Error = >>::Error, + > + Clone, +{ + type Output = + Result<((PeerId, Role), D), UpgradeError<>>::Error>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match mem::replace(&mut self.inner, AuthenticationUpgradeApplyState::Undefined) { + AuthenticationUpgradeApplyState::Init { + mut future, + upgrade, + } => { + let (info, io, role) = match Future::poll(Pin::new(&mut future), cx)? { + Poll::Ready(x) => x, + Poll::Pending => { + self.inner = AuthenticationUpgradeApplyState::Init { future, upgrade }; + return Poll::Pending; + } + }; + let fut = match role { + Role::Initiator => { + Either::Left(Box::pin(upgrade.upgrade_outbound(io, info.0))) + } + Role::Responder => { + Either::Right(Box::pin(upgrade.upgrade_inbound(io, info.0))) + } + }; + self.inner = AuthenticationUpgradeApplyState::Upgrade { future: fut, role }; + } + AuthenticationUpgradeApplyState::Upgrade { mut future, role } => { + match Future::poll(Pin::new(&mut future), cx) { + Poll::Pending => { + self.inner = AuthenticationUpgradeApplyState::Upgrade { future, role }; + return Poll::Pending; + } + Poll::Ready(Ok((peer_id, d))) => { + debug!("Successfully applied negotiated protocol"); + return Poll::Ready(Ok(((peer_id, role), d))); + } + Poll::Ready(Err(e)) => { + debug!("Failed to apply negotiated protocol"); + return Poll::Ready(Err(UpgradeError::Apply(e))); + } + } + } + AuthenticationUpgradeApplyState::Undefined => { + panic!("AuthenticationUpgradeApplyState::poll called after completion") + } + } + } + } +} + +pub type NameWrapIter = + iter::Map::Item) -> NameWrap<::Item>>; /// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`. #[derive(Clone)] -struct NameWrap(N); +pub struct NameWrap(N); impl AsRef<[u8]> for NameWrap { fn as_ref(&self) -> &[u8] { self.0.protocol_name() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn v1_sim_open_is_not_default() { + assert_ne!( + multistream_select::Version::default(), + multistream_select::Version::V1SimultaneousOpen, + ); + } +} diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index f02fb2f3bd7..42712463e89 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -23,7 +23,7 @@ mod util; use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{MemoryTransport, Transport}; -use libp2p_core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; use multiaddr::{Multiaddr, Protocol}; @@ -85,7 +85,7 @@ fn upgrade_pipeline() { .into_authentic(&listener_keys) .unwrap(); let listener_transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(listener_noise_keys).into_authenticated()) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) @@ -103,7 +103,7 @@ fn upgrade_pipeline() { .into_authentic(&dialer_keys) .unwrap(); let dialer_transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(dialer_noise_keys).into_authenticated()) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) diff --git a/core/tests/util.rs b/core/tests/util.rs index 9592daca9eb..b9e6c52578b 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -7,7 +7,7 @@ use libp2p_core::{ muxing::{StreamMuxer, StreamMuxerBox}, network::{Network, NetworkConfig}, transport::{self, memory::MemoryTransport}, - upgrade, Multiaddr, PeerId, Transport, + Multiaddr, PeerId, Transport, }; use libp2p_mplex as mplex; use libp2p_noise as noise; @@ -24,7 +24,7 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork { .into_authentic(&local_key) .unwrap(); let transport: TestTransport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(mplex::MplexConfig::new()) .boxed(); diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 47dbb4341ba..a04cebd4f92 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -38,7 +38,6 @@ use futures::StreamExt; use libp2p::{ - core::upgrade, floodsub::{self, Floodsub, FloodsubEvent}, identity, mdns::{Mdns, MdnsEvent}, @@ -74,7 +73,7 @@ async fn main() -> Result<(), Box> { // encryption and Mplex for multiplexing of substreams on a TCP stream. let transport = TokioTcpConfig::new() .nodelay(true) - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(mplex::MplexConfig::new()) .boxed(); diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 4b44ad3f40a..24f4a371554 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -34,9 +34,7 @@ use async_std::{io, task}; use futures::{future, prelude::*}; use libp2p::{ - core::{ - either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version, - }, + core::{either::EitherTransport, muxing::StreamMuxerBox, transport}, gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity}, identify::{Identify, IdentifyConfig, IdentifyEvent}, identity, @@ -77,7 +75,7 @@ pub fn build_transport( None => EitherTransport::Right(base_transport), }; maybe_encrypted - .upgrade(Version::V1) + .upgrade() .authenticate(noise_config) .multiplex(yamux_config) .timeout(Duration::from_secs(20)) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 696d4f4c355..9eda03a516e 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,8 +1,22 @@ +# 0.11.0 [unreleased] + +- Add support for [simultaneous open extension] via `Version::V1SimultaneousOpen`. + + [`Role`] struct returned by `dialer_select_proto` `Future` can be ignored unless + `Version::V1SimultaneousOpen` is used. + + This is one important component of the greater effort to support hole punching in rust-libp2p. + + See [PR 2066]. + # 0.10.4 [2021-11-01] - Implement `From for ProtocolError` instead of `Into`. [PR 2169](https://github.com/libp2p/rust-libp2p/pull/2169) +[simultaneous open extension]: https://github.com/libp2p/specs/blob/master/connections/simopen.md +[PR 2066]: https://github.com/libp2p/rust-libp2p/pull/2066 + # 0.10.3 [2021-03-17] - Update dependencies. diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 2f6a72db9ef..04dd0372e4d 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "multistream-select" description = "Multistream-select negotiation protocol for libp2p" -version = "0.10.4" +version = "0.11.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,6 +14,7 @@ bytes = "1" futures = "0.3" log = "0.4" pin-project = "1.0.0" +rand = "0.7" smallvec = "1.6.1" unsigned-varint = "0.7" diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 7a8c75daa6f..decd266348d 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -20,11 +20,12 @@ //! Protocol negotiation strategies for the peer acting as the dialer. -use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError}; +use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError, SIM_OPEN_ID}; use crate::{Negotiated, NegotiationError, Version}; use futures::{future::Either, prelude::*}; use std::{ + cmp::Ordering, convert::TryFrom as _, iter, mem, pin::Pin, @@ -36,8 +37,9 @@ use std::{ /// /// This function is given an I/O stream and a list of protocols and returns a /// computation that performs the protocol negotiation with the remote. The -/// returned `Future` resolves with the name of the negotiated protocol and -/// a [`Negotiated`] I/O stream. +/// returned `Future` resolves with the name of the negotiated protocol, a +/// [`Negotiated`] I/O stream and the [`Role`] of the peer on the connection +/// going forward. /// /// The chosen message flow for protocol negotiation depends on the numbers of /// supported protocols given. That is, this function delegates to serial or @@ -61,15 +63,22 @@ where I::Item: AsRef<[u8]>, { let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) - } else { - Either::Right(dialer_select_proto_parallel(inner, iter, version)) + match version { + Version::V1 | Version::V1Lazy => { + // We choose between the "serial" and "parallel" strategies based on the number of protocols. + if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } else { + Either::Right(dialer_select_proto_parallel(inner, iter, version)) + } + } + Version::V1SimultaneousOpen => { + Either::Left(dialer_select_proto_serial(inner, iter, version)) + } } } -/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer +/// Future, returned by `dialer_select_proto`, which selects a protocol and /// either trying protocols in-order, or by requesting all protocols supported /// by the remote upfront, from which the first protocol found in the dialer's /// list of protocols is selected. @@ -141,10 +150,44 @@ pub struct DialerSelectSeq { } enum SeqState { - SendHeader { io: MessageIO }, - SendProtocol { io: MessageIO, protocol: N }, - FlushProtocol { io: MessageIO, protocol: N }, - AwaitProtocol { io: MessageIO, protocol: N }, + SendHeader { + io: MessageIO, + }, + + // Simultaneous open protocol extension + SendSimOpen { + io: MessageIO, + protocol: Option, + }, + FlushSimOpen { + io: MessageIO, + protocol: N, + }, + AwaitSimOpen { + io: MessageIO, + protocol: N, + }, + SimOpenPhase { + selection: SimOpenPhase, + protocol: N, + }, + Responder { + responder: crate::ListenerSelectFuture, + }, + + // Standard multistream-select protocol + SendProtocol { + io: MessageIO, + protocol: N, + }, + FlushProtocol { + io: MessageIO, + protocol: N, + }, + AwaitProtocol { + io: MessageIO, + protocol: N, + }, Done, } @@ -154,9 +197,9 @@ where // It also makes the implementation considerably easier to write. R: AsyncRead + AsyncWrite + Unpin, I: Iterator, - I::Item: AsRef<[u8]>, + I::Item: AsRef<[u8]> + Clone, { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, Negotiated, Role), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -177,13 +220,141 @@ where return Poll::Ready(Err(From::from(err))); } - let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + match this.version { + Version::V1 | Version::V1Lazy => { + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + + // The dialer always sends the header and the first protocol + // proposal in one go for efficiency. + *this.state = SeqState::SendProtocol { io, protocol }; + } + Version::V1SimultaneousOpen => { + *this.state = SeqState::SendSimOpen { io, protocol: None }; + } + } + } + + SeqState::SendSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {} + Poll::Pending => { + *this.state = SeqState::SendSimOpen { io, protocol }; + return Poll::Pending; + } + } + + match protocol { + None => { + let msg = Message::Protocol(SIM_OPEN_ID); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + *this.state = SeqState::SendSimOpen { + io, + protocol: Some(protocol), + }; + } + Some(protocol) => { + let p = Protocol::try_from(protocol.as_ref())?; + if let Err(err) = + Pin::new(&mut io).start_send(Message::Protocol(p.clone())) + { + return Poll::Ready(Err(From::from(err))); + } + log::debug!("Dialer: Proposed protocol: {}", p); - // The dialer always sends the header and the first protocol - // proposal in one go for efficiency. - *this.state = SeqState::SendProtocol { io, protocol }; + *this.state = SeqState::FlushSimOpen { io, protocol } + } + } } + SeqState::FlushSimOpen { mut io, protocol } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => *this.state = SeqState::AwaitSimOpen { io, protocol }, + Poll::Pending => { + *this.state = SeqState::FlushSimOpen { io, protocol }; + return Poll::Pending; + } + } + } + + SeqState::AwaitSimOpen { mut io, protocol } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + return Poll::Pending; + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + Message::Header(v) if v == HeaderLine::from(*this.version) => { + *this.state = SeqState::AwaitSimOpen { io, protocol }; + } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce { io }, + }; + *this.state = SeqState::SimOpenPhase { + selection, + protocol, + }; + } + Message::NotAvailable => { + *this.state = SeqState::AwaitProtocol { io, protocol } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), + } + } + + SeqState::SimOpenPhase { + mut selection, + protocol, + } => { + let (io, selection_res) = match Pin::new(&mut selection).poll(cx)? { + Poll::Ready((io, res)) => (io, res), + Poll::Pending => { + *this.state = SeqState::SimOpenPhase { + selection, + protocol, + }; + return Poll::Pending; + } + }; + + match selection_res { + Role::Initiator => { + *this.state = SeqState::SendProtocol { io, protocol }; + } + Role::Responder => { + #[allow(clippy::needless_collect)] + let protocols: Vec<_> = this.protocols.collect(); + *this.state = SeqState::Responder { + responder: crate::listener_select::listener_select_proto_no_header( + io, + std::iter::once(protocol).chain(protocols.into_iter()), + ), + } + } + } + } + + SeqState::Responder { mut responder } => match Pin::new(&mut responder).poll(cx) { + Poll::Ready(res) => { + return Poll::Ready(res.map(|(p, io)| (p, io, Role::Responder))) + } + Poll::Pending => { + *this.state = SeqState::Responder { responder }; + return Poll::Pending; + } + }, + SeqState::SendProtocol { mut io, protocol } => { match Pin::new(&mut io).poll_ready(cx)? { Poll::Ready(()) => {} @@ -203,7 +374,9 @@ where *this.state = SeqState::FlushProtocol { io, protocol } } else { match this.version { - Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, + Version::V1 | Version::V1SimultaneousOpen => { + *this.state = SeqState::FlushProtocol { io, protocol } + } // This is the only effect that `V1Lazy` has compared to `V1`: // Optimistically settling on the only protocol that // the dialer supports for this negotiation. Notably, @@ -212,7 +385,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let hl = HeaderLine::from(Version::V1Lazy); let io = Negotiated::expecting(io.into_reader(), p, Some(hl)); - return Poll::Ready(Ok((protocol, io))); + return Poll::Ready(Ok((protocol, io, Role::Initiator))); } } } @@ -245,10 +418,19 @@ where Message::Header(v) if v == HeaderLine::from(*this.version) => { *this.state = SeqState::AwaitProtocol { io, protocol }; } + Message::Protocol(p) if p == SIM_OPEN_ID => { + let selection = SimOpenPhase { + state: SimOpenState::SendNonce { io }, + }; + *this.state = SeqState::SimOpenPhase { + selection, + protocol, + }; + } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); let io = Negotiated::completed(io.into_inner()); - return Poll::Ready(Ok((protocol, io))); + return Poll::Ready(Ok((protocol, io, Role::Initiator))); } Message::NotAvailable => { log::debug!( @@ -268,6 +450,182 @@ where } } +struct SimOpenPhase { + state: SimOpenState, +} + +enum SimOpenState { + SendNonce { io: MessageIO }, + FlushNonce { io: MessageIO, local_nonce: u64 }, + ReadNonce { io: MessageIO, local_nonce: u64 }, + SendRole { io: MessageIO, local_role: Role }, + FlushRole { io: MessageIO, local_role: Role }, + ReadRole { io: MessageIO, local_role: Role }, + Done, +} + +/// Role of the local node after protocol negotiation. +/// +/// Always equals [`Role::Initiator`] unless [`Version::V1SimultaneousOpen`] is +/// used in which case node may end up in either role after negotiation. +/// +/// See [`Version::V1SimultaneousOpen`] for details. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Role { + Initiator, + Responder, +} + +impl Future for SimOpenPhase +where + // The Unpin bound here is required because we produce a `Negotiated` as the output. + // It also makes the implementation considerably easier to write. + R: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(MessageIO, Role), NegotiationError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match mem::replace(&mut self.state, SimOpenState::Done) { + SimOpenState::SendNonce { mut io } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {} + Poll::Pending => { + self.state = SimOpenState::SendNonce { io }; + return Poll::Pending; + } + } + + let local_nonce = rand::random(); + let msg = Message::Select(local_nonce); + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushNonce { io, local_nonce }; + } + SimOpenState::FlushNonce { + mut io, + local_nonce, + } => match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadNonce { io, local_nonce }, + Poll::Pending => { + self.state = SimOpenState::FlushNonce { io, local_nonce }; + return Poll::Pending; + } + }, + SimOpenState::ReadNonce { + mut io, + local_nonce, + } => { + let msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + return Poll::Pending; + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + match msg { + // As an optimization, the simultaneous open + // multistream-select variant sends both the + // simultaneous open ID (`/libp2p/simultaneous-connect`) + // and a protocol before flushing. In the case where the + // remote acts as a listener already, it can accept or + // decline the attached protocol within the same + // round-trip. + // + // In this particular situation, the remote acts as a + // dialer and uses the simultaneous open variant. Given + // that nonces need to be exchanged first, the attached + // protocol by the remote needs to be ignored. + Message::Protocol(_) => { + self.state = SimOpenState::ReadNonce { io, local_nonce }; + } + Message::Select(remote_nonce) => { + match local_nonce.cmp(&remote_nonce) { + Ordering::Equal => { + // Start over. + self.state = SimOpenState::SendNonce { io }; + } + Ordering::Greater => { + self.state = SimOpenState::SendRole { + io, + local_role: Role::Initiator, + }; + } + Ordering::Less => { + self.state = SimOpenState::SendRole { + io, + local_role: Role::Responder, + }; + } + } + } + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), + } + } + SimOpenState::SendRole { mut io, local_role } => { + match Pin::new(&mut io).poll_ready(cx)? { + Poll::Ready(()) => {} + Poll::Pending => { + self.state = SimOpenState::SendRole { io, local_role }; + return Poll::Pending; + } + } + + let msg = match local_role { + Role::Initiator => Message::Initiator, + Role::Responder => Message::Responder, + }; + + if let Err(err) = Pin::new(&mut io).start_send(msg) { + return Poll::Ready(Err(From::from(err))); + } + + self.state = SimOpenState::FlushRole { io, local_role }; + } + SimOpenState::FlushRole { mut io, local_role } => { + match Pin::new(&mut io).poll_flush(cx)? { + Poll::Ready(()) => self.state = SimOpenState::ReadRole { io, local_role }, + Poll::Pending => { + self.state = SimOpenState::FlushRole { io, local_role }; + return Poll::Pending; + } + } + } + SimOpenState::ReadRole { mut io, local_role } => { + let remote_msg = match Pin::new(&mut io).poll_next(cx)? { + Poll::Ready(Some(msg)) => msg, + Poll::Pending => { + self.state = SimOpenState::ReadRole { io, local_role }; + return Poll::Pending; + } + // Treat EOF error as [`NegotiationError::Failed`], not as + // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O + // stream as a permissible way to "gracefully" fail a negotiation. + Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), + }; + + let result = match local_role { + Role::Initiator if remote_msg == Message::Responder => Ok((io, local_role)), + Role::Responder if remote_msg == Message::Initiator => Ok((io, local_role)), + + _ => Err(ProtocolError::InvalidMessage.into()), + }; + + return Poll::Ready(result); + } + SimOpenState::Done => panic!("SimOpenPhase::poll called after completion"), + } + } + } +} + /// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates /// a protocol selectively by considering all supported protocols of the remote /// "in parallel". @@ -295,7 +653,7 @@ where I: Iterator, I::Item: AsRef<[u8]>, { - type Output = Result<(I::Item, Negotiated), NegotiationError>; + type Output = Result<(I::Item, Negotiated, Role), NegotiationError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -394,7 +752,7 @@ where log::debug!("Dialer: Expecting proposed protocol: {}", p); let io = Negotiated::expecting(io.into_reader(), p, None); - return Poll::Ready(Ok((protocol, io))); + return Poll::Ready(Ok((protocol, io, Role::Initiator))); } ParState::Done => panic!("ParState::poll called after completion"), diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 00291f4ece8..1239b016939 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -79,7 +79,7 @@ //! let socket = TcpStream::connect("127.0.0.1:10333").await.unwrap(); //! //! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; -//! let (protocol, _io) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); +//! let (protocol, _io, _role) = dialer_select_proto(socket, protos, Version::V1).await.unwrap(); //! //! println!("Negotiated protocol: {:?}", protocol); //! // You can now use `_io` to communicate with the remote. @@ -94,7 +94,7 @@ mod negotiated; mod protocol; mod tests; -pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; +pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture, Role}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError}; pub use self::protocol::ProtocolError; @@ -137,6 +137,18 @@ pub enum Version { /// [1]: https://github.com/multiformats/go-multistream/issues/20 /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 V1Lazy, + /// A variant of version 1 that selects a single initiator when both peers are acting as such, + /// in other words when both peers simultaneously open a connection. + /// + /// This multistream-select variant is specified in [1]. + /// + /// Note: [`Version::V1SimultaneousOpen`] should only be used (a) on transports that allow + /// simultaneously opened connections, e.g. TCP with socket reuse and (2) during the first + /// negotiation on the connection, most likely the secure channel protocol negotiation. In all + /// other cases one should use [`Version::V1`] or [`Version::V1Lazy`]. + /// + /// [1]: https://github.com/libp2p/specs/blob/master/connections/simopen.md + V1SimultaneousOpen, // Draft: https://github.com/libp2p/specs/pull/95 // V2, } diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index aa433e40c4d..15c06fb323a 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -42,6 +42,37 @@ use std::{ /// returned `Future` resolves with the name of the negotiated protocol and /// a [`Negotiated`] I/O stream. pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]>, +{ + listener_select_proto_with_state( + State::RecvHeader { + io: MessageIO::new(inner), + }, + protocols, + ) +} + +/// Used when selected as a [`crate::Role::Responder`] during [`crate::dialer_select_proto`] +/// negotiation with [`crate::Version::V1SimultaneousOpen`] +pub(crate) fn listener_select_proto_no_header( + io: MessageIO, + protocols: I, +) -> ListenerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: IntoIterator, + I::Item: AsRef<[u8]>, +{ + listener_select_proto_with_state(State::RecvMessage { io }, protocols) +} + +fn listener_select_proto_with_state( + state: State, + protocols: I, +) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, I: IntoIterator, @@ -62,9 +93,7 @@ where }); ListenerSelectFuture { protocols: SmallVec::from_iter(protocols), - state: State::RecvHeader { - io: MessageIO::new(inner), - }, + state, last_sent_na: false, } } diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 1cfdcc4b588..b7f1611c27f 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -35,6 +35,7 @@ use std::{ error::Error, fmt, io, pin::Pin, + str::FromStr, task::{Context, Poll}, }; use unsigned_varint as uvi; @@ -48,6 +49,20 @@ const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n"; const MSG_PROTOCOL_NA: &[u8] = b"na\n"; /// The encoded form of a multistream-select 'ls' message. const MSG_LS: &[u8] = b"ls\n"; +/// The encoded form of a 'select:' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_SELECT: &[u8] = b"select:"; +/// The encoded form of a 'initiator' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_INITIATOR: &[u8] = b"initiator\n"; +/// The encoded form of a 'responder' message of the multistream-select +/// simultaneous open protocol extension. +const MSG_RESPONDER: &[u8] = b"responder\n"; + +/// The identifier of the multistream-select simultaneous open protocol +/// extension. +pub(crate) const SIM_OPEN_ID: Protocol = + Protocol(Bytes::from_static(b"/libp2p/simultaneous-connect")); /// The multistream-select header lines preceeding negotiation. /// @@ -61,7 +76,7 @@ pub enum HeaderLine { impl From for HeaderLine { fn from(v: Version) -> HeaderLine { match v { - Version::V1 | Version::V1Lazy => HeaderLine::V1, + Version::V1 | Version::V1Lazy | Version::V1SimultaneousOpen => HeaderLine::V1, } } } @@ -119,6 +134,9 @@ pub enum Message { Protocols(Vec), /// A message signaling that a requested protocol is not available. NotAvailable, + Select(u64), + Initiator, + Responder, } impl Message { @@ -160,6 +178,22 @@ impl Message { dest.put(MSG_PROTOCOL_NA); Ok(()) } + Message::Select(nonce) => { + dest.put(MSG_SELECT); + dest.put(nonce.to_string().as_ref()); + dest.put_u8(b'\n'); + Ok(()) + } + Message::Initiator => { + dest.reserve(MSG_INITIATOR.len()); + dest.put(MSG_INITIATOR); + Ok(()) + } + Message::Responder => { + dest.reserve(MSG_RESPONDER.len()); + dest.put(MSG_RESPONDER); + Ok(()) + } } } @@ -177,6 +211,26 @@ impl Message { return Ok(Message::ListProtocols); } + if msg.len() > MSG_SELECT.len() + 1 /* \n */ + && msg[.. MSG_SELECT.len()] == *MSG_SELECT + && msg.last() == Some(&b'\n') + { + if let Some(nonce) = std::str::from_utf8(&msg[MSG_SELECT.len()..msg.len() - 1]) + .ok() + .and_then(|s| u64::from_str(s).ok()) + { + return Ok(Message::Select(nonce)); + } + } + + if msg == MSG_INITIATOR { + return Ok(Message::Initiator); + } + + if msg == MSG_RESPONDER { + return Ok(Message::Responder); + } + // If it starts with a `/`, ends with a line feed without any // other line feeds in-between, it must be a protocol name. if msg.get(0) == Some(&b'/') diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index ca627d24fcf..763301943d7 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -35,25 +35,24 @@ fn select_proto_basic() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener_addr = listener.local_addr().unwrap(); - let server = async_std::task::spawn(async move { + let server = async move { let connec = listener.accept().await.unwrap().0; let protos = vec![b"/proto1", b"/proto2"]; let (proto, mut io) = listener_select_proto(connec, protos).await.unwrap(); assert_eq!(proto, b"/proto2"); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"ping"); io.write_all(b"pong").await.unwrap(); io.flush().await.unwrap(); - }); + }; - let client = async_std::task::spawn(async move { + let client = async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, mut io) = dialer_select_proto(connec, protos.into_iter(), version) + let (proto, mut io, _) = dialer_select_proto(connec, protos.into_iter(), version) .await .unwrap(); assert_eq!(proto, b"/proto2"); @@ -61,18 +60,17 @@ fn select_proto_basic() { io.write_all(b"ping").await.unwrap(); io.flush().await.unwrap(); - let mut out = vec![0; 32]; - let n = io.read(&mut out).await.unwrap(); - out.truncate(n); + let mut out = vec![0; 4]; + io.read_exact(&mut out).await.unwrap(); assert_eq!(out, b"pong"); - }); + }; - server.await; - client.await; + futures::future::join(server, client).await; } async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimultaneousOpen)); } /// Tests the expected behaviour of failed negotiations. @@ -110,7 +108,7 @@ fn negotiation_failed() { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let mut io = match dialer_select_proto(connec, dial_protos.into_iter(), version).await { Err(NegotiationError::Failed) => return, - Ok((_, io)) => io, + Ok((_, io, _)) => io, Err(_) => panic!(), }; // The dialer may write a payload that is even sent before it @@ -170,7 +168,7 @@ fn negotiation_failed() { for (listen_protos, dial_protos) in protos { for dial_payload in payloads.clone() { - for &version in &[Version::V1, Version::V1Lazy] { + for &version in &[Version::V1, Version::V1Lazy, Version::V1SimultaneousOpen] { async_std::task::block_on(run(Test { version, listen_protos: listen_protos.clone(), @@ -199,7 +197,7 @@ fn select_proto_parallel() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) + let (proto, io, _) = dialer_select_proto_parallel(connec, protos.into_iter(), version) .await .unwrap(); assert_eq!(proto, b"/proto2"); @@ -231,7 +229,7 @@ fn select_proto_serial() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, io, _) = dialer_select_proto_serial(connec, protos.into_iter(), version) .await .unwrap(); assert_eq!(proto, b"/proto2"); @@ -244,4 +242,37 @@ fn select_proto_serial() { async_std::task::block_on(run(Version::V1)); async_std::task::block_on(run(Version::V1Lazy)); + async_std::task::block_on(run(Version::V1SimultaneousOpen)); +} + +#[test] +fn simultaneous_open() { + async fn run(version: Version) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = async move { + let connec = listener.accept().await.unwrap().0; + let protos = vec![b"/proto1", b"/proto2"]; + let (proto, io, _) = dialer_select_proto_serial(connec, protos, version) + .await + .unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + let client = async move { + let connec = TcpStream::connect(&listener_addr).await.unwrap(); + let protos = vec![b"/proto3", b"/proto2"]; + let (proto, io, _) = dialer_select_proto_serial(connec, protos.into_iter(), version) + .await + .unwrap(); + assert_eq!(proto, b"/proto2"); + io.complete().await.unwrap(); + }; + + futures::future::join(server, client).await; + } + + futures::executor::block_on(run(Version::V1SimultaneousOpen)); } diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index 1c48af37715..d5c47354d4c 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -39,17 +39,20 @@ use std::{ type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>; type TestNetwork = Network; -fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) { +fn mk_transport(version: upgrade::Version) -> (PeerId, TestTransport) { let keys = identity::Keypair::generate_ed25519(); let id = keys.public().to_peer_id(); ( id, MemoryTransport::default() - .upgrade(up) - .authenticate(PlainText2Config { - local_public_key: keys.public(), - }) - .multiplex(MplexConfig::default()) + .upgrade() + .authenticate_with_version( + PlainText2Config { + local_public_key: keys.public(), + }, + version.into(), + ) + .multiplex_with_version(MplexConfig::default(), version) .boxed(), ) } @@ -60,9 +63,9 @@ fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) { fn transport_upgrade() { let _ = env_logger::try_init(); - fn run(up: upgrade::Version) { - let (dialer_id, dialer_transport) = mk_transport(up); - let (listener_id, listener_transport) = mk_transport(up); + fn run(version: upgrade::Version) { + let (dialer_id, dialer_transport) = mk_transport(version); + let (listener_id, listener_transport) = mk_transport(version); let listen_addr = Multiaddr::from(Protocol::Memory(random::())); diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 5380f21cc6b..66592c4febf 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -155,7 +155,7 @@ fn tcp_transport(split_send_size: usize) -> BenchTransport { libp2p_tcp::TcpConfig::new() .nodelay(true) - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(PlainText2Config { local_public_key }) .multiplex(mplex) .timeout(Duration::from_secs(5)) @@ -170,7 +170,7 @@ fn mem_transport(split_send_size: usize) -> BenchTransport { mplex.set_split_send_size(split_send_size); transport::MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(PlainText2Config { local_public_key }) .multiplex(mplex) .timeout(Duration::from_secs(5)) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index a05c81806ee..774b2c04ccb 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -87,7 +87,7 @@ //! // This is test transport (memory). //! let noise_keys = libp2p_noise::Keypair::::new().into_authentic(&local_key).unwrap(); //! let transport = MemoryTransport::default() -//! .upgrade(libp2p_core::upgrade::Version::V1) +//! .upgrade() //! .authenticate(libp2p_noise::NoiseConfig::xx(noise_keys).into_authenticated()) //! .multiplex(libp2p_mplex::MplexConfig::new()) //! .boxed(); diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index f4345ed3160..fdbb3acd295 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -30,7 +30,7 @@ use std::{ use futures::StreamExt; use libp2p_core::{ - identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport, + identity, multiaddr::Protocol, transport::MemoryTransport, Multiaddr, Transport, }; use libp2p_gossipsub::{ Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, @@ -148,7 +148,7 @@ fn build_node() -> (Multiaddr, Swarm) { let public_key = key.public(); let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(PlainText2Config { local_public_key: public_key.clone(), }) diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index e3600e0d818..2e1baf8bc1e 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -518,7 +518,7 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { mod tests { use super::*; use futures::pin_mut; - use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport}; + use libp2p_core::{identity, muxing::StreamMuxerBox, transport, PeerId, Transport}; use libp2p_mplex::MplexConfig; use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmEvent}; @@ -535,7 +535,7 @@ mod tests { let pubkey = id_keys.public(); let transport = TcpConfig::new() .nodelay(true) - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(MplexConfig::new()) .boxed(); diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index b5d52f273fb..b9828df3fd2 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -33,7 +33,7 @@ use libp2p_core::{ multiaddr::{multiaddr, Multiaddr, Protocol}, multihash::{Code, Multihash, MultihashDigest}, transport::MemoryTransport, - upgrade, PeerId, Transport, + PeerId, Transport, }; use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmEvent}; @@ -60,7 +60,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { .into_authentic(&local_key) .unwrap(); let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 3a7acd72fb1..cd4c2cb4d1d 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -250,7 +250,7 @@ fn mk_transport(muxer: MuxerChoice) -> (PeerId, transport::Boxed<(PeerId, Stream peer_id, TcpConfig::new() .nodelay(true) - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(match muxer { MuxerChoice::Yamux => upgrade::EitherUpgrade::A(yamux::YamuxConfig::default()), diff --git a/protocols/relay/examples/relay.rs b/protocols/relay/examples/relay.rs index 0cd421959f8..4502c89a799 100644 --- a/protocols/relay/examples/relay.rs +++ b/protocols/relay/examples/relay.rs @@ -110,7 +110,7 @@ fn main() -> Result<(), Box> { }; let transport = relay_wrapped_transport - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(plaintext) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/relay/src/lib.rs b/protocols/relay/src/lib.rs index 1b40b52f4fd..a05dedfa44f 100644 --- a/protocols/relay/src/lib.rs +++ b/protocols/relay/src/lib.rs @@ -46,7 +46,7 @@ //! ); //! //! let transport = relay_transport -//! .upgrade(upgrade::Version::V1) +//! .upgrade() //! .authenticate(plaintext) //! .multiplex(YamuxConfig::default()) //! .boxed(); diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index c316f06a8fc..e25b8be382f 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -28,7 +28,7 @@ use libp2p_core::connection::ConnectedPoint; use libp2p_core::either::EitherTransport; use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::{MemoryTransport, Transport, TransportError}; -use libp2p_core::{identity, upgrade, PeerId}; +use libp2p_core::{identity, PeerId}; use libp2p_identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo}; use libp2p_kad::{GetClosestPeersOk, Kademlia, KademliaEvent, QueryResult}; use libp2p_ping as ping; @@ -1293,7 +1293,7 @@ fn build_swarm(reachability: Reachability, relay_mode: RelayMode) -> Swarm Swarm { libp2p_relay::new_transport_and_behaviour(RelayConfig::default(), transport); let transport = transport - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(plaintext) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); @@ -1353,7 +1353,7 @@ fn build_keep_alive_only_swarm() -> Swarm { let transport = MemoryTransport::default(); let transport = transport - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(plaintext) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 6b5a202b476..f9d993284c6 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -23,7 +23,6 @@ use futures::stream::FusedStream; use futures::StreamExt; use futures::{future, Stream}; use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport::upgrade::Version; use libp2p::core::transport::MemoryTransport; use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; @@ -52,7 +51,7 @@ where let noise = NoiseConfig::xx(dh_keys).into_authenticated(); let transport = MemoryTransport::default() - .upgrade(Version::V1) + .upgrade() .authenticate(noise) .multiplex(SelectUpgrade::new( YamuxConfig::default(), diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 6cd6a732d4e..5ad332e85e4 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -26,7 +26,7 @@ use libp2p_core::{ identity, muxing::StreamMuxerBox, transport::{self, Transport}, - upgrade::{self, read_length_prefixed, write_length_prefixed}, + upgrade::{read_length_prefixed, write_length_prefixed}, Multiaddr, PeerId, }; use libp2p_noise::{Keypair, NoiseConfig, X25519Spec}; @@ -302,7 +302,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { peer_id, TcpConfig::new() .nodelay(true) - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(), diff --git a/src/lib.rs b/src/lib.rs index b8728005ea3..d3fac145d26 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -204,7 +204,7 @@ pub async fn development_transport( .expect("Signing libp2p-noise static DH keypair failed."); Ok(transport - .upgrade(core::upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(core::upgrade::SelectUpgrade::new( yamux::YamuxConfig::default(), @@ -261,7 +261,7 @@ pub fn tokio_development_transport( .expect("Signing libp2p-noise static DH keypair failed."); Ok(transport - .upgrade(core::upgrade::Version::V1) + .upgrade() .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) .multiplex(core::upgrade::SelectUpgrade::new( yamux::YamuxConfig::default(), diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index f50b0250f6f..54fc9f93e3d 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -285,7 +285,7 @@ pub enum NetworkBehaviourAction< /// # use libp2p::core::connection::ConnectionId; /// # use libp2p::core::identity; /// # use libp2p::core::transport::{MemoryTransport, Transport}; - /// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; + /// # use libp2p::core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; /// # use libp2p::core::PeerId; /// # use libp2p::plaintext::PlainText2Config; /// # use libp2p::swarm::{ @@ -304,7 +304,7 @@ pub enum NetworkBehaviourAction< /// # let local_peer_id = PeerId::from(local_public_key.clone()); /// # /// # let transport = MemoryTransport::default() - /// # .upgrade(upgrade::Version::V1) + /// # .upgrade() /// # .authenticate(PlainText2Config { local_public_key }) /// # .multiplex(yamux::YamuxConfig::default()) /// # .boxed(); diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0059a21a0cb..4c98bafca3d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1411,7 +1411,7 @@ mod tests { use crate::protocols_handler::DummyProtocolsHandler; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::{executor, future}; - use libp2p::core::{identity, multiaddr, transport, upgrade}; + use libp2p::core::{identity, multiaddr, transport}; use libp2p::plaintext; use libp2p::yamux; @@ -1431,7 +1431,7 @@ mod tests { let id_keys = identity::Keypair::generate_ed25519(); let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() - .upgrade(upgrade::Version::V1) + .upgrade() .authenticate(plaintext::PlainText2Config { local_public_key: local_public_key.clone(), }) diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index d6141483a40..28261e85dd1 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -47,7 +47,7 @@ //! let id_keys = identity::Keypair::generate_ed25519(); //! let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); //! let noise = NoiseConfig::xx(dh_keys).into_authenticated(); -//! let builder = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise); +//! let builder = TcpConfig::new().upgrade().authenticate(noise); //! // let transport = builder.multiplex(...); //! # } //! ``` @@ -349,8 +349,9 @@ where /// See [`NoiseConfig::into_authenticated`]. /// /// On success, the upgrade yields the [`PeerId`] obtained from the -/// `RemoteIdentity`. The output of this upgrade is thus directly suitable -/// for creating an [`authenticated`](libp2p_core::transport::upgrade::Authenticate) +/// `RemoteIdentity`. The output of this upgrade is thus directly suitable for +/// creating an +/// [`authenticated`](libp2p_core::transport::upgrade::Builder::authenticate) /// transport for use with a [`Network`](libp2p_core::Network). #[derive(Clone)] pub struct NoiseAuthenticated { diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index e1e1e1c0c04..dc5a386dbf8 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -41,9 +41,7 @@ fn core_upgrade_compat() { let id_keys = identity::Keypair::generate_ed25519(); let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); let noise = NoiseConfig::xx(dh_keys).into_authenticated(); - let _ = TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(noise); + let _ = TcpConfig::new().upgrade().authenticate(noise); } #[test] diff --git a/transports/plaintext/src/lib.rs b/transports/plaintext/src/lib.rs index 1e9cfecf66f..99fbf2fd3d6 100644 --- a/transports/plaintext/src/lib.rs +++ b/transports/plaintext/src/lib.rs @@ -56,7 +56,7 @@ mod structs_proto { /// io, /// PlainText1Config{}, /// endpoint, -/// libp2p_core::transport::upgrade::Version::V1, +/// libp2p_core::upgrade::Version::V1, /// ) /// }) /// .map(|plaintext, _endpoint| {