From a2535511f2c933f7df9071b6d97c622c6764a490 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 07:29:10 +0200 Subject: [PATCH 1/2] Unify upgrade. --- core/src/lib.rs | 2 +- core/src/transport.rs | 1 - core/src/transport/upgrade.rs | 100 +++++++++---------- core/src/upgrade.rs | 95 ++++++------------ core/src/upgrade/apply.rs | 172 +++++++++----------------------- core/src/upgrade/denied.rs | 23 +---- core/src/upgrade/either.rs | 49 +++------- core/src/upgrade/from_fn.rs | 41 +++----- core/src/upgrade/map.rs | 179 ++++------------------------------ core/src/upgrade/optional.rs | 38 ++------ core/src/upgrade/select.rs | 47 +++------ 11 files changed, 195 insertions(+), 552 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 844fd2a23bc..4f48ae6b23a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,7 @@ pub use peer_id::PeerId; pub use identity::PublicKey; pub use transport::Transport; pub use translation::address_translation; -pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; +pub use upgrade::{Role, Upgrade, UpgradeError, ProtocolName}; pub use connection::{Connected, Endpoint, ConnectedPoint}; pub use network::Network; diff --git a/core/src/transport.rs b/core/src/transport.rs index f6e70c44628..79d0ec5899d 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -46,7 +46,6 @@ pub use self::boxed::Boxed; pub use self::choice::OrTransport; pub use self::memory::MemoryTransport; pub use self::optional::OptionalTransport; -pub use self::upgrade::Upgrade; /// A transport provides connection-oriented communication between two peers /// through ordered streams of data (i.e. connections). diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index b2cb7b46804..f46d8dfb735 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -36,13 +36,10 @@ use crate::{ muxing::{StreamMuxer, StreamMuxerBox}, upgrade::{ self, - OutboundUpgrade, - InboundUpgrade, - apply_inbound, - apply_outbound, + Role, + Upgrade, + UpgradeApply, UpgradeError, - OutboundUpgradeApply, - InboundUpgradeApply }, PeerId }; @@ -111,14 +108,18 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = (PeerId, D), Error = E>, - U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone, + U: Upgrade, Output = (PeerId, D), Error = E> + Clone, E: Error + 'static, { let version = self.version; Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| { + let role = if endpoint.is_listener() { + Role::Responder + } else { + Role::Initiator + }; Authenticate { - inner: upgrade::apply(conn, upgrade, endpoint, version) + inner: upgrade::apply(conn, upgrade, role, version) } }), version)) } @@ -132,21 +133,18 @@ where pub struct Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade> + U: Upgrade> { #[pin] - inner: EitherUpgrade + inner: UpgradeApply } impl Future for Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade, - Output = >>::Output, - Error = >>::Error - > + U: Upgrade>, { - type Output = as Future>::Output; + type Output = as Future>::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -162,18 +160,17 @@ where pub struct Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, + U: Upgrade>, { peer_id: Option, #[pin] - upgrade: EitherUpgrade, + upgrade: UpgradeApply, } impl Future for Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + U: Upgrade, Output = M, Error = E>, { type Output = Result<(PeerId, M), UpgradeError>; @@ -207,16 +204,15 @@ where /// /// * I/O upgrade: `C -> D`. /// * Transport output: `(PeerId, C) -> (PeerId, D)`. - pub fn apply(self, upgrade: U) -> Authenticated> + pub fn apply(self, upgrade: U) -> Authenticated> where T: Transport, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + U: Upgrade, Output = D, Error = E> + Clone, E: Error + 'static, { - Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version)) + Authenticated(Builder::new(TransportUpgrade::new(self.0.inner, upgrade), self.0.version)) } /// Upgrades the transport with a (sub)stream multiplexer. @@ -235,13 +231,17 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + Clone, + U: Upgrade, Output = M, Error = E> + Clone, E: Error + 'static, { let version = self.0.version; Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| { - let upgrade = upgrade::apply(c, upgrade, endpoint, version); + let role = if endpoint.is_listener() { + Role::Responder + } else { + Role::Initiator + }; + let upgrade = upgrade::apply(c, upgrade, role, version); Multiplex { peer_id: Some(i), upgrade } })) } @@ -263,14 +263,18 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + Clone, + U: Upgrade, Output = M, Error = E> + Clone, E: Error + 'static, F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone { let version = self.0.version; Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| { - let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version); + let role = if endpoint.is_listener() { + Role::Responder + } else { + Role::Initiator + }; + let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), role, version); Multiplex { peer_id: Some(peer_id), upgrade } })) } @@ -340,28 +344,24 @@ where } } -/// An inbound or outbound upgrade. -type EitherUpgrade = future::Either, OutboundUpgradeApply>; - /// A custom upgrade on an [`Authenticated`] transport. /// /// See [`Transport::upgrade`] #[derive(Debug, Copy, Clone)] -pub struct Upgrade { inner: T, upgrade: U } +pub struct TransportUpgrade { inner: T, upgrade: U } -impl Upgrade { +impl TransportUpgrade { pub fn new(inner: T, upgrade: U) -> Self { - Upgrade { inner, upgrade } + TransportUpgrade { inner, upgrade } } } -impl Transport for Upgrade +impl Transport for TransportUpgrade where T: Transport, T::Error: 'static, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + U: Upgrade, Output = D, Error = E> + Clone, E: Error + 'static { type Output = (PeerId, D); @@ -431,18 +431,18 @@ where /// The [`Transport::Dial`] future of an [`Upgrade`]d transport. pub struct DialUpgradeFuture where - U: OutboundUpgrade>, + U: Upgrade>, C: AsyncRead + AsyncWrite + Unpin, { future: Pin>, - upgrade: future::Either, (Option, OutboundUpgradeApply)> + upgrade: future::Either, (Option, UpgradeApply)> } impl Future for DialUpgradeFuture where F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, Output = D>, + U: Upgrade, Output = D>, U::Error: Error { type Output = Result<(PeerId, D), TransportUpgradeError>; @@ -460,7 +460,7 @@ where Err(err) => return Poll::Ready(Err(err)), }; let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some)."); - future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1))) + future::Either::Right((Some(i), upgrade::apply(c, u, Role::Initiator, Version::V1))) } future::Either::Right((ref mut i, ref mut up)) => { let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) { @@ -477,7 +477,7 @@ where impl Unpin for DialUpgradeFuture where - U: OutboundUpgrade>, + U: Upgrade>, C: AsyncRead + AsyncWrite + Unpin, { } @@ -493,7 +493,7 @@ where S: TryStream, Error = E>, F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D> + Clone + U: Upgrade, Output = D> + Clone { type Item = Result, TransportUpgradeError>, TransportUpgradeError>; @@ -525,17 +525,17 @@ impl Unpin for ListenerStream { pub struct ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + U: Upgrade> { future: Pin>, - upgrade: future::Either, (Option, InboundUpgradeApply)> + upgrade: future::Either, (Option, UpgradeApply)> } impl Future for ListenerUpgradeFuture where F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D>, + U: Upgrade, Output = D>, U::Error: Error { type Output = Result<(PeerId, D), TransportUpgradeError>; @@ -553,7 +553,7 @@ where Err(err) => return Poll::Ready(Err(err)) }; let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some)."); - future::Either::Right((Some(i), apply_inbound(c, u))) + future::Either::Right((Some(i), upgrade::apply(c, u, Role::Responder, Version::V1))) } future::Either::Right((ref mut i, ref mut up)) => { let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) { @@ -571,6 +571,6 @@ where impl Unpin for ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + U: Upgrade> { } diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 9798ae6c27a..5a9e4f6396d 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -72,12 +72,12 @@ use futures::future::Future; pub use crate::Negotiated; pub use multistream_select::{Version, NegotiatedComplete, NegotiationError, ProtocolError}; pub use self::{ - apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, + apply::{apply, UpgradeApply}, denied::DeniedUpgrade, either::EitherUpgrade, error::UpgradeError, from_fn::{from_fn, FromFnUpgrade}, - map::{MapInboundUpgrade, MapOutboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgradeErr}, + map::{MapUpgrade, MapUpgradeErr}, optional::OptionalUpgrade, select::SelectUpgrade, transfer::{write_one, write_with_len_prefix, write_varint, read_one, ReadOneError, read_varint}, @@ -130,95 +130,60 @@ impl> ProtocolName for T { } } +/// Upgrade role. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum Role { + /// Initiator. + Initiator, + /// Responder. + Responder, +} + /// Common trait for upgrades that can be applied on inbound substreams, outbound substreams, /// or both. -pub trait UpgradeInfo { +pub trait Upgrade: Send + 'static { /// Opaque type representing a negotiable protocol. - type Info: ProtocolName + Clone; + type Info: ProtocolName + Clone + Send + 'static; /// Iterator returned by `protocol_info`. - type InfoIter: IntoIterator; - - /// Returns the list of protocols that are supported. Used during the negotiation process. - fn protocol_info(&self) -> Self::InfoIter; -} + type InfoIter: IntoIterator + Send + 'static; -/// Possible upgrade on an inbound connection or substream. -pub trait InboundUpgrade: UpgradeInfo { /// Output after the upgrade has been successfully negotiated and the handshake performed. - type Output; + type Output: Send + 'static; /// Possible error during the handshake. - type Error; + type Error: Send + 'static; /// Future that performs the handshake with the remote. - type Future: Future>; - - /// After we have determined that the remote supports one of the protocols we support, this - /// method is called to start the handshake. - /// - /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future; -} - -/// Extension trait for `InboundUpgrade`. Automatically implemented on all types that implement -/// `InboundUpgrade`. -pub trait InboundUpgradeExt: InboundUpgrade { - /// Returns a new object that wraps around `Self` and applies a closure to the `Output`. - fn map_inbound(self, f: F) -> MapInboundUpgrade - where - Self: Sized, - F: FnOnce(Self::Output) -> T - { - MapInboundUpgrade::new(self, f) - } - - /// Returns a new object that wraps around `Self` and applies a closure to the `Error`. - fn map_inbound_err(self, f: F) -> MapInboundUpgradeErr - where - Self: Sized, - F: FnOnce(Self::Error) -> T - { - MapInboundUpgradeErr::new(self, f) - } -} - -impl> InboundUpgradeExt for U {} + type Future: Future> + Send + 'static; -/// Possible upgrade on an outbound connection or substream. -pub trait OutboundUpgrade: UpgradeInfo { - /// Output after the upgrade has been successfully negotiated and the handshake performed. - type Output; - /// Possible error during the handshake. - type Error; - /// Future that performs the handshake with the remote. - type Future: Future>; + /// Returns the list of protocols that are supported. Used during the negotiation process. + fn protocol_info(&self) -> Self::InfoIter; /// After we have determined that the remote supports one of the protocols we support, this /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; + fn upgrade(self, socket: C, info: Self::Info, role: Role) -> Self::Future; } -/// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement -/// `OutboundUpgrade`. -pub trait OutboundUpgradeExt: OutboundUpgrade { +/// Extention trait for `Upgrade`. Automatically implemented on all types that implement +/// `Upgrade`. +pub trait UpgradeExt: Upgrade { /// Returns a new object that wraps around `Self` and applies a closure to the `Output`. - fn map_outbound(self, f: F) -> MapOutboundUpgrade + fn map(self, f: F) -> MapUpgrade where Self: Sized, - F: FnOnce(Self::Output) -> T + F: FnOnce(Self::Output) -> T + Send, { - MapOutboundUpgrade::new(self, f) + MapUpgrade::new(self, f) } /// Returns a new object that wraps around `Self` and applies a closure to the `Error`. - fn map_outbound_err(self, f: F) -> MapOutboundUpgradeErr + fn map_err(self, f: F) -> MapUpgradeErr where Self: Sized, - F: FnOnce(Self::Error) -> T + F: FnOnce(Self::Error) -> T + Send, { - MapOutboundUpgradeErr::new(self, f) + MapUpgradeErr::new(self, f) } } -impl> OutboundUpgradeExt for U {} - +impl> UpgradeExt for U {} diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index eaf25e884b3..182b0dd2f88 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -18,9 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ConnectedPoint, Negotiated}; -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName}; -use futures::{future::Either, prelude::*}; +use crate::Negotiated; +use crate::upgrade::{Role, Upgrade, UpgradeError, ProtocolName}; +use futures::prelude::*; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::{iter, mem, pin::Pin, task::Context, task::Poll}; @@ -28,178 +28,99 @@ use std::{iter, mem, pin::Pin, task::Context, task::Poll}; pub use multistream_select::Version; /// Applies an upgrade to the inbound and outbound direction of a connection or substream. -pub fn apply(conn: C, up: U, cp: ConnectedPoint, v: Version) - -> Either, OutboundUpgradeApply> +pub fn apply(conn: C, upgrade: U, role: Role, v: Version) -> UpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, + U: Upgrade> { - if cp.is_listener() { - Either::Left(apply_inbound(conn, up)) + let iter = upgrade.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); + let inner = if role == Role::Responder { + UpgradeApplyState::ListenerSelect { + future: multistream_select::listener_select_proto(conn, iter), + upgrade, + } } else { - Either::Right(apply_outbound(conn, up, v)) - } -} - -/// Tries to perform an upgrade on an inbound connection or substream. -pub fn apply_inbound(conn: C, up: U) -> InboundUpgradeApply -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, -{ - let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::listener_select_proto(conn, iter); - InboundUpgradeApply { - inner: InboundUpgradeApplyState::Init { future, upgrade: up } - } -} - -/// Tries to perform an upgrade on an outbound connection or substream. -pub fn apply_outbound(conn: C, up: U, v: Version) -> OutboundUpgradeApply -where - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade> -{ - let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); - let future = multistream_select::dialer_select_proto(conn, iter, v); - OutboundUpgradeApply { - inner: OutboundUpgradeApplyState::Init { future, upgrade: up } - } + UpgradeApplyState::DialerSelect { + future: multistream_select::dialer_select_proto(conn, iter, v), + upgrade, + } + }; + UpgradeApply { inner } } -/// Future returned by `apply_inbound`. Drives the upgrade process. -pub struct InboundUpgradeApply +/// Future returned by `apply`. Drives the upgrade process. +pub struct UpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + U: Upgrade> { - inner: InboundUpgradeApplyState + inner: UpgradeApplyState } -enum InboundUpgradeApplyState +enum UpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: Upgrade>, { - Init { + ListenerSelect { future: ListenerSelectFuture>, upgrade: U, }, + DialerSelect { + future: DialerSelectFuture::IntoIter>>, + upgrade: U, + }, Upgrade { future: Pin> }, Undefined } -impl Unpin for InboundUpgradeApply +impl Unpin for UpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: Upgrade>, { } -impl Future for InboundUpgradeApply +impl Future for UpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: Upgrade>, { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) { - InboundUpgradeApplyState::Init { mut future, upgrade } => { + match mem::replace(&mut self.inner, UpgradeApplyState::Undefined) { + UpgradeApplyState::ListenerSelect { mut future, upgrade } => { let (info, io) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { - self.inner = InboundUpgradeApplyState::Init { future, upgrade }; + self.inner = UpgradeApplyState::ListenerSelect { future, upgrade }; return Poll::Pending } }; - self.inner = InboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_inbound(io, info.0)) + self.inner = UpgradeApplyState::Upgrade { + future: Box::pin(upgrade.upgrade(io, info.0, Role::Responder)) }; } - InboundUpgradeApplyState::Upgrade { mut future } => { - match Future::poll(Pin::new(&mut future), cx) { - Poll::Pending => { - self.inner = InboundUpgradeApplyState::Upgrade { future }; - return Poll::Pending - } - Poll::Ready(Ok(x)) => { - debug!("Successfully applied negotiated protocol"); - return Poll::Ready(Ok(x)) - } - Poll::Ready(Err(e)) => { - debug!("Failed to apply negotiated protocol"); - return Poll::Ready(Err(UpgradeError::Apply(e))) - } - } - } - InboundUpgradeApplyState::Undefined => - panic!("InboundUpgradeApplyState::poll called after completion") - } - } - } -} - -/// Future returned by `apply_outbound`. Drives the upgrade process. -pub struct OutboundUpgradeApply -where - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade> -{ - inner: OutboundUpgradeApplyState -} - -enum OutboundUpgradeApplyState -where - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade> -{ - Init { - future: DialerSelectFuture::IntoIter>>, - upgrade: U - }, - Upgrade { - future: Pin> - }, - Undefined -} - -impl Unpin for OutboundUpgradeApply -where - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, -{ -} - -impl Future for OutboundUpgradeApply -where - C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, -{ - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) { - OutboundUpgradeApplyState::Init { mut future, upgrade } => { + UpgradeApplyState::DialerSelect { mut future, upgrade } => { let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { - self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; + self.inner = UpgradeApplyState::DialerSelect { future, upgrade }; return Poll::Pending } }; - self.inner = OutboundUpgradeApplyState::Upgrade { - future: Box::pin(upgrade.upgrade_outbound(connection, info.0)) + self.inner = UpgradeApplyState::Upgrade { + future: Box::pin(upgrade.upgrade(connection, info.0, Role::Initiator)) }; } - OutboundUpgradeApplyState::Upgrade { mut future } => { + UpgradeApplyState::Upgrade { mut future } => { match Future::poll(Pin::new(&mut future), cx) { Poll::Pending => { - self.inner = OutboundUpgradeApplyState::Upgrade { future }; + self.inner = UpgradeApplyState::Upgrade { future }; return Poll::Pending } Poll::Ready(Ok(x)) => { @@ -208,12 +129,12 @@ where } Poll::Ready(Err(e)) => { debug!("Failed to apply negotiated protocol"); - return Poll::Ready(Err(UpgradeError::Apply(e))); + return Poll::Ready(Err(UpgradeError::Apply(e))) } } } - OutboundUpgradeApplyState::Undefined => - panic!("OutboundUpgradeApplyState::poll called after completion") + UpgradeApplyState::Undefined => + panic!("InboundUpgradeApplyState::poll called after completion") } } } @@ -230,4 +151,3 @@ impl AsRef<[u8]> for NameWrap { self.0.protocol_name() } } - diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 93438e0bea8..04f53dae073 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{Role, Upgrade}; use futures::future; use std::iter; use void::Void; @@ -28,31 +28,18 @@ use void::Void; #[derive(Debug, Copy, Clone)] pub struct DeniedUpgrade; -impl UpgradeInfo for DeniedUpgrade { +impl Upgrade for DeniedUpgrade { type Info = &'static [u8]; type InfoIter = iter::Empty; - - fn protocol_info(&self) -> Self::InfoIter { - iter::empty() - } -} - -impl InboundUpgrade for DeniedUpgrade { type Output = Void; type Error = Void; type Future = future::Pending>; - fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { - future::pending() + fn protocol_info(&self) -> Self::InfoIter { + iter::empty() } -} - -impl OutboundUpgrade for DeniedUpgrade { - type Output = Void; - type Error = Void; - type Future = future::Pending>; - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + fn upgrade(self, _: C, _: Self::Info, _: Role) -> Self::Future { future::pending() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 28db987ccd7..d7573875a54 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -20,23 +20,29 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{Role, Upgrade}, }; /// A type to represent two possible upgrade types (inbound or outbound). #[derive(Debug, Clone)] pub enum EitherUpgrade { A(A), B(B) } -impl UpgradeInfo for EitherUpgrade +impl Upgrade for EitherUpgrade where - A: UpgradeInfo, - B: UpgradeInfo + A: Upgrade, + ::IntoIter: Send, + B: Upgrade, + ::IntoIter: Send, + C: Send + 'static, { type Info = EitherName; type InfoIter = EitherIter< ::IntoIter, ::IntoIter >; + type Output = EitherOutput; + type Error = EitherError; + type Future = EitherFuture2; fn protocol_info(&self) -> Self::InfoIter { match self { @@ -44,44 +50,16 @@ where EitherUpgrade::B(b) => EitherIter::B(b.protocol_info().into_iter()) } } -} - -impl InboundUpgrade for EitherUpgrade -where - A: InboundUpgrade, - B: InboundUpgrade, -{ - type Output = EitherOutput; - type Error = EitherError; - type Future = EitherFuture2; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade(self, sock: C, info: Self::Info, role: Role) -> Self::Future { match (self, info) { - (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_inbound(sock, info)), - (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_inbound(sock, info)), + (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade(sock, info, role)), + (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade(sock, info, role)), _ => panic!("Invalid invocation of EitherUpgrade::upgrade_inbound") } } } -impl OutboundUpgrade for EitherUpgrade -where - A: OutboundUpgrade, - B: OutboundUpgrade, -{ - type Output = EitherOutput; - type Error = EitherError; - type Future = EitherFuture2; - - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - match (self, info) { - (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), - (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), - _ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound") - } - } -} - /// A type to represent two possible `Iterator` types. #[derive(Debug, Clone)] pub enum EitherIter { A(A), B(B) } @@ -107,4 +85,3 @@ where } } } - diff --git a/core/src/upgrade/from_fn.rs b/core/src/upgrade/from_fn.rs index c6ef52c1e08..877d1b11c0c 100644 --- a/core/src/upgrade/from_fn.rs +++ b/core/src/upgrade/from_fn.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{Endpoint, upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}}; +use crate::{Endpoint, upgrade::{ProtocolName, Role, Upgrade}}; use futures::prelude::*; use std::iter; @@ -66,44 +66,25 @@ pub struct FromFnUpgrade { fun: F, } -impl UpgradeInfo for FromFnUpgrade +impl Upgrade for FromFnUpgrade where - P: ProtocolName + Clone, + P: ProtocolName + Clone + Send + 'static, + F: FnOnce(C, Role) -> Fut + Send + 'static, + Fut: Future> + Send + 'static, + Out: Send + 'static, + Err: Send + 'static, { type Info = P; type InfoIter = iter::Once

; - - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol_name.clone()) - } -} - -impl InboundUpgrade for FromFnUpgrade -where - P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, - Fut: Future>, -{ type Output = Out; type Error = Err; type Future = Fut; - fn upgrade_inbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Listener) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol_name.clone()) } -} - -impl OutboundUpgrade for FromFnUpgrade -where - P: ProtocolName + Clone, - F: FnOnce(C, Endpoint) -> Fut, - Fut: Future>, -{ - type Output = Out; - type Error = Err; - type Future = Fut; - fn upgrade_outbound(self, sock: C, _: Self::Info) -> Self::Future { - (self.fun)(sock, Endpoint::Dialer) + fn upgrade(self, sock: C, _: Self::Info, role: Role) -> Self::Future { + (self.fun)(sock, role) } } diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index 2f5ca31e207..d9283f29de6 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -18,109 +18,39 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{Role, Upgrade}; use futures::prelude::*; use std::{pin::Pin, task::Context, task::Poll}; /// Wraps around an upgrade and applies a closure to the output. #[derive(Debug, Clone)] -pub struct MapInboundUpgrade { upgrade: U, fun: F } +pub struct MapUpgrade { upgrade: U, fun: F } -impl MapInboundUpgrade { +impl MapUpgrade { pub fn new(upgrade: U, fun: F) -> Self { - MapInboundUpgrade { upgrade, fun } + MapUpgrade { upgrade, fun } } } -impl UpgradeInfo for MapInboundUpgrade +impl Upgrade for MapUpgrade where - U: UpgradeInfo + U: Upgrade, + F: FnOnce(U::Output) -> T + Send + 'static, + T: Send + 'static, { type Info = U::Info; type InfoIter = U::InfoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.upgrade.protocol_info() - } -} - -impl InboundUpgrade for MapInboundUpgrade -where - U: InboundUpgrade, - F: FnOnce(U::Output) -> T -{ type Output = T; type Error = U::Error; type Future = MapFuture; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - MapFuture { - inner: self.upgrade.upgrade_inbound(sock, info), - map: Some(self.fun) - } - } -} - -impl OutboundUpgrade for MapInboundUpgrade -where - U: OutboundUpgrade, -{ - type Output = U::Output; - type Error = U::Error; - type Future = U::Future; - - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) - } -} - -/// Wraps around an upgrade and applies a closure to the output. -#[derive(Debug, Clone)] -pub struct MapOutboundUpgrade { upgrade: U, fun: F } - -impl MapOutboundUpgrade { - pub fn new(upgrade: U, fun: F) -> Self { - MapOutboundUpgrade { upgrade, fun } - } -} - -impl UpgradeInfo for MapOutboundUpgrade -where - U: UpgradeInfo -{ - type Info = U::Info; - type InfoIter = U::InfoIter; - fn protocol_info(&self) -> Self::InfoIter { self.upgrade.protocol_info() } -} -impl InboundUpgrade for MapOutboundUpgrade -where - U: InboundUpgrade, -{ - type Output = U::Output; - type Error = U::Error; - type Future = U::Future; - - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_inbound(sock, info) - } -} - -impl OutboundUpgrade for MapOutboundUpgrade -where - U: OutboundUpgrade, - F: FnOnce(U::Output) -> T -{ - type Output = T; - type Error = U::Error; - type Future = MapFuture; - - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade(self, sock: C, info: Self::Info, role: Role) -> Self::Future { MapFuture { - inner: self.upgrade.upgrade_outbound(sock, info), + inner: self.upgrade.upgrade(sock, info, role), map: Some(self.fun) } } @@ -128,108 +58,38 @@ where /// Wraps around an upgrade and applies a closure to the error. #[derive(Debug, Clone)] -pub struct MapInboundUpgradeErr { upgrade: U, fun: F } +pub struct MapUpgradeErr { upgrade: U, fun: F } -impl MapInboundUpgradeErr { +impl MapUpgradeErr { pub fn new(upgrade: U, fun: F) -> Self { - MapInboundUpgradeErr { upgrade, fun } + MapUpgradeErr { upgrade, fun } } } -impl UpgradeInfo for MapInboundUpgradeErr +impl Upgrade for MapUpgradeErr where - U: UpgradeInfo + U: Upgrade, + F: FnOnce(U::Error) -> T + Send + 'static, + T: Send + 'static, { type Info = U::Info; type InfoIter = U::InfoIter; - - fn protocol_info(&self) -> Self::InfoIter { - self.upgrade.protocol_info() - } -} - -impl InboundUpgrade for MapInboundUpgradeErr -where - U: InboundUpgrade, - F: FnOnce(U::Error) -> T -{ type Output = U::Output; type Error = T; type Future = MapErrFuture; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - MapErrFuture { - fut: self.upgrade.upgrade_inbound(sock, info), - fun: Some(self.fun) - } - } -} - -impl OutboundUpgrade for MapInboundUpgradeErr -where - U: OutboundUpgrade, -{ - type Output = U::Output; - type Error = U::Error; - type Future = U::Future; - - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_outbound(sock, info) - } -} - -/// Wraps around an upgrade and applies a closure to the error. -#[derive(Debug, Clone)] -pub struct MapOutboundUpgradeErr { upgrade: U, fun: F } - -impl MapOutboundUpgradeErr { - pub fn new(upgrade: U, fun: F) -> Self { - MapOutboundUpgradeErr { upgrade, fun } - } -} - -impl UpgradeInfo for MapOutboundUpgradeErr -where - U: UpgradeInfo -{ - type Info = U::Info; - type InfoIter = U::InfoIter; - fn protocol_info(&self) -> Self::InfoIter { self.upgrade.protocol_info() } -} - -impl OutboundUpgrade for MapOutboundUpgradeErr -where - U: OutboundUpgrade, - F: FnOnce(U::Error) -> T -{ - type Output = U::Output; - type Error = T; - type Future = MapErrFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade(self, sock: C, info: Self::Info, role: Role) -> Self::Future { MapErrFuture { - fut: self.upgrade.upgrade_outbound(sock, info), + fut: self.upgrade.upgrade(sock, info, role), fun: Some(self.fun) } } } -impl InboundUpgrade for MapOutboundUpgradeErr -where - U: InboundUpgrade -{ - type Output = U::Output; - type Error = U::Error; - type Future = U::Future; - - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - self.upgrade.upgrade_inbound(sock, info) - } -} - #[pin_project::pin_project] pub struct MapFuture { #[pin] @@ -283,4 +143,3 @@ where } } } - diff --git a/core/src/upgrade/optional.rs b/core/src/upgrade/optional.rs index 02dc3c48f78..62d1268138e 100644 --- a/core/src/upgrade/optional.rs +++ b/core/src/upgrade/optional.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::upgrade::{Role, Upgrade}; /// Upgrade that can be disabled at runtime. /// @@ -39,46 +39,24 @@ impl OptionalUpgrade { } } -impl UpgradeInfo for OptionalUpgrade +impl Upgrade for OptionalUpgrade where - T: UpgradeInfo, + T: Upgrade, + ::IntoIter: Send, { type Info = T::Info; type InfoIter = Iter<::IntoIter>; - - fn protocol_info(&self) -> Self::InfoIter { - Iter(self.0.as_ref().map(|p| p.protocol_info().into_iter())) - } -} - -impl InboundUpgrade for OptionalUpgrade -where - T: InboundUpgrade, -{ type Output = T::Output; type Error = T::Error; type Future = T::Future; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - if let Some(inner) = self.0 { - inner.upgrade_inbound(sock, info) - } else { - panic!("Bad API usage; a protocol has been negotiated while this struct contains None") - } + fn protocol_info(&self) -> Self::InfoIter { + Iter(self.0.as_ref().map(|p| p.protocol_info().into_iter())) } -} - -impl OutboundUpgrade for OptionalUpgrade -where - T: OutboundUpgrade, -{ - type Output = T::Output; - type Error = T::Error; - type Future = T::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade(self, sock: C, info: Self::Info, role: Role) -> Self::Future { if let Some(inner) = self.0 { - inner.upgrade_outbound(sock, info) + inner.upgrade(sock, info, role) } else { panic!("Bad API usage; a protocol has been negotiated while this struct contains None") } diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 8fa4c5b8a7a..ab9d0251119 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -20,7 +20,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{Role, Upgrade}, }; /// Upgrade that combines two upgrades into one. Supports all the protocols supported by either @@ -39,52 +39,30 @@ impl SelectUpgrade { } } -impl UpgradeInfo for SelectUpgrade +impl Upgrade for SelectUpgrade where - A: UpgradeInfo, - B: UpgradeInfo + A: Upgrade, + B: Upgrade, + ::IntoIter: Send, + ::IntoIter: Send, { type Info = EitherName; type InfoIter = InfoIterChain< ::IntoIter, ::IntoIter >; + type Output = EitherOutput; + type Error = EitherError; + type Future = EitherFuture2; fn protocol_info(&self) -> Self::InfoIter { InfoIterChain(self.0.protocol_info().into_iter(), self.1.protocol_info().into_iter()) } -} - -impl InboundUpgrade for SelectUpgrade -where - A: InboundUpgrade, - B: InboundUpgrade, -{ - type Output = EitherOutput; - type Error = EitherError; - type Future = EitherFuture2; - - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { - match info { - EitherName::A(info) => EitherFuture2::A(self.0.upgrade_inbound(sock, info)), - EitherName::B(info) => EitherFuture2::B(self.1.upgrade_inbound(sock, info)) - } - } -} - -impl OutboundUpgrade for SelectUpgrade -where - A: OutboundUpgrade, - B: OutboundUpgrade, -{ - type Output = EitherOutput; - type Error = EitherError; - type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade(self, sock: C, info: Self::Info, role: Role) -> Self::Future { match info { - EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), - EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) + EitherName::A(info) => EitherFuture2::A(self.0.upgrade(sock, info, role)), + EitherName::B(info) => EitherFuture2::B(self.1.upgrade(sock, info, role)) } } } @@ -117,4 +95,3 @@ where (min1.saturating_add(min2), max) } } - From 22b8e8b35c0539ca60cafb05cec31fda9d873418 Mon Sep 17 00:00:00 2001 From: David Craven Date: Wed, 9 Jun 2021 08:21:50 +0200 Subject: [PATCH 2/2] Update swarm. --- swarm/src/lib.rs | 54 +++++++++----- swarm/src/protocols_handler.rs | 20 ++--- swarm/src/protocols_handler/dummy.rs | 10 +-- swarm/src/protocols_handler/map_in.rs | 12 +-- swarm/src/protocols_handler/map_out.rs | 12 +-- swarm/src/protocols_handler/multi.rs | 81 ++++++--------------- swarm/src/protocols_handler/node_handler.rs | 19 +++-- swarm/src/protocols_handler/one_shot.rs | 25 ++++--- swarm/src/protocols_handler/select.rs | 31 +++++--- swarm/src/toggle.rs | 29 ++++---- 10 files changed, 142 insertions(+), 151 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 28ea83ee589..bfb1c8cb634 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -57,7 +57,6 @@ mod behaviour; mod registry; #[cfg(test)] mod test; -mod upgrade; pub mod protocols_handler; pub mod toggle; @@ -121,14 +120,13 @@ use libp2p_core::{ NetworkConfig, peer::ConnectedPeer, }, - upgrade::{ProtocolName}, + upgrade::{ProtocolName, Upgrade}, }; use registry::{Addresses, AddressIntoIter}; use smallvec::SmallVec; use std::{error, fmt, io, pin::Pin, task::{Context, Poll}}; use std::collections::HashSet; use std::num::{NonZeroU32, NonZeroUsize}; -use upgrade::UpgradeInfoSend as _; /// Contains the state of the network, plus the way it should behave. pub type Swarm = ExpandedSwarm< @@ -259,6 +257,8 @@ pub enum SwarmEvent { pub struct ExpandedSwarm where THandler: IntoProtocolsHandler, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { network: Network< transport::Boxed<(PeerId, StreamMuxerBox)>, @@ -297,17 +297,22 @@ impl Unpin for ExpandedSwarm where THandler: IntoProtocolsHandler, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { } impl ExpandedSwarm -where TBehaviour: NetworkBehaviour, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - THandler: IntoProtocolsHandler + Send + 'static, - THandler::Handler: ProtocolsHandler, - THandleErr: error::Error + Send + 'static, +where + TBehaviour: NetworkBehaviour, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler: IntoProtocolsHandler + Send + 'static, + THandler::Handler: ProtocolsHandler, + THandleErr: error::Error + Send + 'static, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { /// Builds a new `Swarm`. pub fn new( @@ -843,11 +848,14 @@ where impl Stream for ExpandedSwarm -where TBehaviour: NetworkBehaviour, - THandler: IntoProtocolsHandler + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - THandler::Handler: ProtocolsHandler, +where + TBehaviour: NetworkBehaviour, + THandler: IntoProtocolsHandler + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler::Handler: ProtocolsHandler, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type Item = TBehaviour::OutEvent; @@ -864,11 +872,14 @@ where TBehaviour: NetworkBehaviour, /// the stream of behaviour events never terminates, so we can implement fused for it impl FusedStream for ExpandedSwarm -where TBehaviour: NetworkBehaviour, - THandler: IntoProtocolsHandler + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - THandler::Handler: ProtocolsHandler, +where + TBehaviour: NetworkBehaviour, + THandler: IntoProtocolsHandler + Send + 'static, + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + THandler::Handler: ProtocolsHandler, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { fn is_terminated(&self) -> bool { false @@ -917,7 +928,10 @@ pub struct SwarmBuilder { } impl SwarmBuilder -where TBehaviour: NetworkBehaviour, +where + TBehaviour: NetworkBehaviour, + <<<::Handler as ProtocolsHandler>::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<<::Handler as ProtocolsHandler>::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { /// Creates a new `SwarmBuilder` from the given transport, behaviour and /// local peer ID. The `Swarm` with its underlying `Network` is obtained diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 42c6110164d..4e34dd81335 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -45,16 +45,12 @@ mod one_shot; mod select; pub mod multi; -pub use crate::upgrade::{ - InboundUpgradeSend, - OutboundUpgradeSend, - UpgradeInfoSend, -}; - +use crate::NegotiatedSubstream; use libp2p_core::{ ConnectedPoint, Multiaddr, PeerId, + Upgrade, upgrade::UpgradeError, }; use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; @@ -107,9 +103,9 @@ pub trait ProtocolsHandler: Send + 'static { /// The type of errors returned by [`ProtocolsHandler::poll`]. type Error: error::Error + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. - type InboundProtocol: InboundUpgradeSend; + type InboundProtocol: Upgrade; /// The outbound upgrade for the protocol(s) used by the handler. - type OutboundProtocol: OutboundUpgradeSend; + type OutboundProtocol: Upgrade; /// The type of additional information returned from `listen_protocol`. type InboundOpenInfo: Send + 'static; /// The type of additional information passed to an `OutboundSubstreamRequest`. @@ -127,7 +123,7 @@ pub trait ProtocolsHandler: Send + 'static { /// Injects the output of a successful upgrade on a new inbound substream. fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::InboundOpenInfo ); @@ -137,7 +133,7 @@ pub trait ProtocolsHandler: Send + 'static { /// [`ProtocolsHandlerEvent::OutboundSubstreamRequest`]. fn inject_fully_negotiated_outbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::OutboundOpenInfo ); @@ -152,7 +148,7 @@ pub trait ProtocolsHandler: Send + 'static { &mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< - ::Error + >::Error > ); @@ -160,7 +156,7 @@ pub trait ProtocolsHandler: Send + 'static { fn inject_listen_upgrade_error( &mut self, _: Self::InboundOpenInfo, - _: ProtocolsHandlerUpgrErr<::Error> + _: ProtocolsHandlerUpgrErr<>::Error> ) {} /// Returns until when the connection should be kept alive. diff --git a/swarm/src/protocols_handler/dummy.rs b/swarm/src/protocols_handler/dummy.rs index 764f95fe2cf..5c765eb035c 100644 --- a/swarm/src/protocols_handler/dummy.rs +++ b/swarm/src/protocols_handler/dummy.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}}; +use libp2p_core::{Multiaddr, upgrade::{Upgrade, DeniedUpgrade}}; use std::task::{Context, Poll}; use void::Void; @@ -59,14 +59,14 @@ impl ProtocolsHandler for DummyProtocolsHandler { fn inject_fully_negotiated_inbound( &mut self, - _: >::Output, + _: >::Output, _: Self::InboundOpenInfo ) { } fn inject_fully_negotiated_outbound( &mut self, - _: >::Output, + _: >::Output, _: Self::OutboundOpenInfo ) { } @@ -75,9 +75,9 @@ impl ProtocolsHandler for DummyProtocolsHandler { fn inject_address_change(&mut self, _: &Multiaddr) {} - fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} - fn inject_listen_upgrade_error(&mut self, _: Self::InboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn inject_listen_upgrade_error(&mut self, _: Self::InboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive diff --git a/swarm/src/protocols_handler/map_in.rs b/swarm/src/protocols_handler/map_in.rs index 7c007db6686..968cdd5b355 100644 --- a/swarm/src/protocols_handler/map_in.rs +++ b/swarm/src/protocols_handler/map_in.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, SubstreamProtocol, @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::Multiaddr; +use libp2p_core::{Multiaddr, Upgrade}; use std::{marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. @@ -68,7 +68,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::InboundOpenInfo ) { self.inner.inject_fully_negotiated_inbound(protocol, info) @@ -76,7 +76,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::OutboundOpenInfo ) { self.inner.inject_fully_negotiated_outbound(protocol, info) @@ -92,11 +92,11 @@ where self.inner.inject_address_change(addr) } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { self.inner.inject_dial_upgrade_error(info, error) } - fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { self.inner.inject_listen_upgrade_error(info, error) } diff --git a/swarm/src/protocols_handler/map_out.rs b/swarm/src/protocols_handler/map_out.rs index 292f0223bab..66fa02556ee 100644 --- a/swarm/src/protocols_handler/map_out.rs +++ b/swarm/src/protocols_handler/map_out.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, SubstreamProtocol, @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::Multiaddr; +use libp2p_core::{Multiaddr, Upgrade}; use std::task::{Context, Poll}; /// Wrapper around a protocol handler that turns the output event into something else. @@ -66,7 +66,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::InboundOpenInfo ) { self.inner.inject_fully_negotiated_inbound(protocol, info) @@ -74,7 +74,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - protocol: ::Output, + protocol: >::Output, info: Self::OutboundOpenInfo ) { self.inner.inject_fully_negotiated_outbound(protocol, info) @@ -88,11 +88,11 @@ where self.inner.inject_address_change(addr) } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { self.inner.inject_dial_upgrade_error(info, error) } - fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { self.inner.inject_listen_upgrade_error(info, error) } diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index f23de96c30e..f4400033773 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -30,14 +30,9 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr, SubstreamProtocol }; -use crate::upgrade::{ - InboundUpgradeSend, - OutboundUpgradeSend, - UpgradeInfoSend -}; use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; -use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError}; +use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, Upgrade}; +use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError, Role}; use rand::Rng; use std::{ cmp, @@ -90,13 +85,13 @@ impl ProtocolsHandler for MultiHandler where K: Clone + Hash + Eq + Send + 'static, H: ProtocolsHandler, - H::InboundProtocol: InboundUpgradeSend, - H::OutboundProtocol: OutboundUpgradeSend + H::InboundProtocol: Upgrade, + H::OutboundProtocol: Upgrade { type InEvent = (K, ::InEvent); type OutEvent = (K, ::OutEvent); type Error = ::Error; - type InboundProtocol = Upgrade::InboundProtocol>; + type InboundProtocol = ProtocolsUpgrade::InboundProtocol>; type OutboundProtocol = ::OutboundProtocol; type InboundOpenInfo = Info::InboundOpenInfo>; type OutboundOpenInfo = (K, ::OutboundOpenInfo); @@ -109,7 +104,7 @@ where let (upgrade, info) = proto.into_upgrade(); (key.clone(), (upgrade, info, timeout)) }) - .fold((Upgrade::new(), Info::new(), Duration::from_secs(0)), + .fold((ProtocolsUpgrade::new(), Info::new(), Duration::from_secs(0)), |(mut upg, mut inf, mut timeout), (k, (u, i, t))| { upg.upgrades.push((k.clone(), u)); inf.infos.push((k, i)); @@ -122,7 +117,7 @@ where fn inject_fully_negotiated_outbound ( &mut self, - protocol: ::Output, + protocol: >::Output, (key, arg): Self::OutboundOpenInfo ) { if let Some(h) = self.handlers.get_mut(&key) { @@ -134,7 +129,7 @@ where fn inject_fully_negotiated_inbound ( &mut self, - (key, arg): ::Output, + (key, arg): >::Output, mut info: Self::InboundOpenInfo ) { if let Some(h) = self.handlers.get_mut(&key) { @@ -163,7 +158,7 @@ where fn inject_dial_upgrade_error ( &mut self, (key, arg): Self::OutboundOpenInfo, - error: ProtocolsHandlerUpgrErr<::Error> + error: ProtocolsHandlerUpgrErr<>::Error> ) { if let Some(h) = self.handlers.get_mut(&key) { h.inject_dial_upgrade_error(arg, error) @@ -175,7 +170,7 @@ where fn inject_listen_upgrade_error( &mut self, mut info: Self::InboundOpenInfo, - error: ProtocolsHandlerUpgrErr<::Error> + error: ProtocolsHandlerUpgrErr<>::Error> ) { match error { ProtocolsHandlerUpgrErr::Timer => @@ -326,7 +321,7 @@ where } fn inbound_protocol(&self) -> ::InboundProtocol { - Upgrade { + ProtocolsUpgrade { upgrades: self.handlers.iter() .map(|(k, h)| (k.clone(), h.inbound_protocol())) .collect() @@ -365,17 +360,17 @@ impl Info { /// Inbound and outbound upgrade for all `ProtocolsHandler`s. #[derive(Clone)] -pub struct Upgrade { +pub struct ProtocolsUpgrade { upgrades: Vec<(K, H)> } -impl Upgrade { +impl ProtocolsUpgrade { fn new() -> Self { - Upgrade { upgrades: Vec::new() } + ProtocolsUpgrade { upgrades: Vec::new() } } } -impl fmt::Debug for Upgrade +impl fmt::Debug for ProtocolsUpgrade where K: fmt::Debug + Eq + Hash, H: fmt::Debug @@ -387,13 +382,16 @@ where } } -impl UpgradeInfoSend for Upgrade +impl Upgrade for ProtocolsUpgrade where - H: UpgradeInfoSend, + H: Upgrade, K: Send + 'static { type Info = IndexedProtoName; type InfoIter = std::vec::IntoIter; + type Output = (K, >::Output); + type Error = (K, >::Error); + type Future = BoxFuture<'static, Result>; fn protocol_info(&self) -> Self::InfoIter { self.upgrades.iter().enumerate() @@ -403,44 +401,11 @@ where .collect::>() .into_iter() } -} - -impl InboundUpgradeSend for Upgrade -where - H: InboundUpgradeSend, - K: Send + 'static -{ - type Output = (K, ::Output); - type Error = (K, ::Error); - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { - let IndexedProtoName(index, info) = info; - let (key, upgrade) = self.upgrades.remove(index); - upgrade.upgrade_inbound(resource, info) - .map(move |out| { - match out { - Ok(o) => Ok((key, o)), - Err(e) => Err((key, e)) - } - }) - .boxed() - } -} - -impl OutboundUpgradeSend for Upgrade -where - H: OutboundUpgradeSend, - K: Send + 'static -{ - type Output = (K, ::Output); - type Error = (K, ::Error); - type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(mut self, resource: NegotiatedSubstream, info: Self::Info) -> Self::Future { + fn upgrade(mut self, resource: NegotiatedSubstream, info: Self::Info, role: Role) -> Self::Future { let IndexedProtoName(index, info) = info; let (key, upgrade) = self.upgrades.remove(index); - upgrade.upgrade_outbound(resource, info) + upgrade.upgrade(resource, info, role) .map(move |out| { match out { Ok(o) => Ok((key, o)), @@ -455,7 +420,7 @@ where fn uniq_proto_names(iter: I) -> Result<(), DuplicateProtonameError> where I: Iterator, - T: UpgradeInfoSend + T: Upgrade { let mut set = HashSet::new(); for infos in iter { diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 72730117cc3..8c87adfad9f 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::SendWrapper; +use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, ProtocolsHandler, @@ -32,6 +32,7 @@ use futures::stream::FuturesUnordered; use libp2p_core::{ Multiaddr, Connected, + Upgrade, connection::{ ConnectionHandler, ConnectionHandlerEvent, @@ -40,7 +41,7 @@ use libp2p_core::{ SubstreamEndpoint, }, muxing::StreamMuxerBox, - upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError} + upgrade::{self, Role, UpgradeApply, UpgradeError, Version} }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::{Delay, Instant}; @@ -79,6 +80,8 @@ impl IntoConnectionHandler where TIntoProtoHandler: IntoProtocolsHandler, TProtoHandler: ProtocolsHandler, + <>::InfoIter as IntoIterator>::IntoIter: Send, + <>::InfoIter as IntoIterator>::IntoIter: Send, { type Handler = NodeHandlerWrapper; @@ -107,16 +110,16 @@ where /// Futures that upgrade incoming substreams. negotiating_in: FuturesUnordered, SendWrapper>, + UpgradeApply, TProtoHandler::InboundProtocol>, >>, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered, SendWrapper>, + UpgradeApply, TProtoHandler::OutboundProtocol>, >>, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). - queued_dial_upgrades: Vec<(u64, SendWrapper)>, + queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>, /// Unique identifier assigned to each queued dial upgrade. unique_dial_upgrade_id: u64, /// The currently planned connection & handler shutdown. @@ -247,7 +250,7 @@ where let protocol = self.handler.listen_protocol(); let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); + let upgrade = upgrade::apply(substream, upgrade, Role::Responder, Version::V1); let timeout = Delay::new(timeout); self.negotiating_in.push(SubstreamUpgrade { user_data: Some(user_data), @@ -276,7 +279,7 @@ where version = v; } } - let upgrade = upgrade::apply_outbound(substream, upgrade, version); + let upgrade = upgrade::apply(substream, upgrade, Role::Initiator, version); let timeout = Delay::new(timeout); self.negotiating_out.push(SubstreamUpgrade { user_data: Some(user_data), @@ -338,7 +341,7 @@ where let timeout = *protocol.timeout(); self.unique_dial_upgrade_id += 1; let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); + self.queued_dial_upgrades.push((id, upgrade)); return Poll::Ready(Ok( ConnectionHandlerEvent::OutboundSubstreamRequest((id, info, timeout)), )); diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 3baf779aa7c..daac700d2c0 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; +use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, ProtocolsHandler, @@ -27,6 +27,7 @@ use crate::protocols_handler::{ SubstreamProtocol }; +use libp2p_core::Upgrade; use smallvec::SmallVec; use std::{error, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; @@ -35,12 +36,12 @@ use wasm_timer::Instant; // TODO: Debug pub struct OneShotHandler where - TOutbound: OutboundUpgradeSend, + TOutbound: Upgrade, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option::Error>>, + pending_error: Option>::Error>>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[TEvent; 4]>, /// Queue of outbound substreams to open. @@ -56,7 +57,7 @@ where impl OneShotHandler where - TOutbound: OutboundUpgradeSend, + TOutbound: Upgrade, { /// Creates a `OneShotHandler`. pub fn new( @@ -105,8 +106,8 @@ where impl Default for OneShotHandler where - TOutbound: OutboundUpgradeSend, - TInbound: InboundUpgradeSend + Default, + TOutbound: Upgrade, + TInbound: Upgrade + Default, { fn default() -> Self { OneShotHandler::new( @@ -118,8 +119,8 @@ where impl ProtocolsHandler for OneShotHandler where - TInbound: InboundUpgradeSend + Send + 'static, - TOutbound: OutboundUpgradeSend, + TInbound: Upgrade + Send + 'static, + TOutbound: Upgrade + 'static, TInbound::Output: Into, TOutbound::Output: Into, TOutbound::Error: error::Error + Send + 'static, @@ -129,7 +130,7 @@ where type InEvent = TOutbound; type OutEvent = TEvent; type Error = ProtocolsHandlerUpgrErr< - ::Error, + >::Error, >; type InboundProtocol = TInbound; type OutboundProtocol = TOutbound; @@ -142,7 +143,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - out: ::Output, + out: >::Output, (): Self::InboundOpenInfo ) { // If we're shutting down the connection for inactivity, reset the timeout. @@ -155,7 +156,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - out: ::Output, + out: >::Output, _: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -170,7 +171,7 @@ where &mut self, _info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< - ::Error, + >::Error, >, ) { if self.pending_error.is_none() { diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index d8005eef79d..f3544e349b0 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{SendWrapper, InboundUpgradeSend, OutboundUpgradeSend}; +use crate::NegotiatedSubstream; use crate::protocols_handler::{ KeepAlive, SubstreamProtocol, @@ -32,6 +32,7 @@ use libp2p_core::{ ConnectedPoint, Multiaddr, PeerId, + Upgrade, either::{EitherError, EitherOutput}, upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError, NegotiationError, ProtocolError} }; @@ -60,6 +61,10 @@ impl IntoProtocolsHandler for IntoProtocolsHandlerSelect::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type Handler = ProtocolsHandlerSelect; @@ -71,7 +76,7 @@ where } fn inbound_protocol(&self) -> ::InboundProtocol { - SelectUpgrade::new(SendWrapper(self.proto1.inbound_protocol()), SendWrapper(self.proto2.inbound_protocol())) + SelectUpgrade::new(self.proto1.inbound_protocol(), self.proto2.inbound_protocol()) } } @@ -98,12 +103,16 @@ impl ProtocolsHandler for ProtocolsHandlerSelect::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, + <<::OutboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type InEvent = EitherOutput; type OutEvent = EitherOutput; type Error = EitherError; - type InboundProtocol = SelectUpgrade::InboundProtocol>, SendWrapper<::InboundProtocol>>; - type OutboundProtocol = EitherUpgrade, SendWrapper>; + type InboundProtocol = SelectUpgrade<::InboundProtocol, ::InboundProtocol>; + type OutboundProtocol = EitherUpgrade; type OutboundOpenInfo = EitherOutput; type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo); @@ -113,11 +122,11 @@ where let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout()); let (u1, i1) = proto1.into_upgrade(); let (u2, i2) = proto2.into_upgrade(); - let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2)); + let choice = SelectUpgrade::new(u1, u2); SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout) } - fn inject_fully_negotiated_outbound(&mut self, protocol: ::Output, endpoint: Self::OutboundOpenInfo) { + fn inject_fully_negotiated_outbound(&mut self, protocol: >::Output, endpoint: Self::OutboundOpenInfo) { match (protocol, endpoint) { (EitherOutput::First(protocol), EitherOutput::First(info)) => self.proto1.inject_fully_negotiated_outbound(protocol, info), @@ -130,7 +139,7 @@ where } } - fn inject_fully_negotiated_inbound(&mut self, protocol: ::Output, (i1, i2): Self::InboundOpenInfo) { + fn inject_fully_negotiated_inbound(&mut self, protocol: >::Output, (i1, i2): Self::InboundOpenInfo) { match protocol { EitherOutput::First(protocol) => self.proto1.inject_fully_negotiated_inbound(protocol, i1), @@ -151,7 +160,7 @@ where self.proto2.inject_address_change(new_address) } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { match (info, error) { (EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => { self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer) @@ -186,7 +195,7 @@ where } } - fn inject_listen_upgrade_error(&mut self, (i1, i2): Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_listen_upgrade_error(&mut self, (i1, i2): Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { match error { ProtocolsHandlerUpgrErr::Timer => { self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Timer); @@ -247,7 +256,7 @@ where Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: protocol - .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + .map_upgrade(|u| EitherUpgrade::A(u)) .map_info(EitherOutput::First) }); }, @@ -264,7 +273,7 @@ where Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: protocol - .map_upgrade(|u| EitherUpgrade::B(SendWrapper(u))) + .map_upgrade(|u| EitherUpgrade::B(u)) .map_info(EitherOutput::Second) }); }, diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index d986f00fb01..2e5b4e9b2cf 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -18,8 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; -use crate::upgrade::{SendWrapper, InboundUpgradeSend, OutboundUpgradeSend}; +use crate::{NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use crate::protocols_handler::{ KeepAlive, SubstreamProtocol, @@ -33,6 +32,7 @@ use libp2p_core::{ ConnectedPoint, PeerId, Multiaddr, + Upgrade, connection::{ConnectionId, ListenerId}, either::{EitherError, EitherOutput}, upgrade::{DeniedUpgrade, EitherUpgrade} @@ -71,7 +71,8 @@ impl From> for Toggle { impl NetworkBehaviour for Toggle where - TBehaviour: NetworkBehaviour + TBehaviour: NetworkBehaviour, + <<<::Handler as ProtocolsHandler>::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type ProtocolsHandler = ToggleIntoProtoHandler; type OutEvent = TBehaviour::OutEvent; @@ -210,7 +211,8 @@ pub struct ToggleIntoProtoHandler { impl IntoProtocolsHandler for ToggleIntoProtoHandler where - TInner: IntoProtocolsHandler + TInner: IntoProtocolsHandler, + <<<::Handler as ProtocolsHandler>::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type Handler = ToggleProtoHandler; @@ -222,9 +224,9 @@ where fn inbound_protocol(&self) -> ::InboundProtocol { if let Some(inner) = self.inner.as_ref() { - EitherUpgrade::A(SendWrapper(inner.inbound_protocol())) + EitherUpgrade::A(inner.inbound_protocol()) } else { - EitherUpgrade::B(SendWrapper(DeniedUpgrade)) + EitherUpgrade::B(DeniedUpgrade) } } } @@ -237,11 +239,12 @@ pub struct ToggleProtoHandler { impl ProtocolsHandler for ToggleProtoHandler where TInner: ProtocolsHandler, + <<::InboundProtocol as Upgrade>::InfoIter as IntoIterator>::IntoIter: Send, { type InEvent = TInner::InEvent; type OutEvent = TInner::OutEvent; type Error = TInner::Error; - type InboundProtocol = EitherUpgrade, SendWrapper>; + type InboundProtocol = EitherUpgrade; type OutboundProtocol = TInner::OutboundProtocol; type OutboundOpenInfo = TInner::OutboundOpenInfo; type InboundOpenInfo = Either; @@ -249,16 +252,16 @@ where fn listen_protocol(&self) -> SubstreamProtocol { if let Some(inner) = self.inner.as_ref() { inner.listen_protocol() - .map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))) + .map_upgrade(|u| EitherUpgrade::A(u)) .map_info(Either::Left) } else { - SubstreamProtocol::new(EitherUpgrade::B(SendWrapper(DeniedUpgrade)), Either::Right(())) + SubstreamProtocol::new(EitherUpgrade::B(DeniedUpgrade), Either::Right(())) } } fn inject_fully_negotiated_inbound( &mut self, - out: ::Output, + out: >::Output, info: Self::InboundOpenInfo ) { let out = match out { @@ -277,7 +280,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - out: ::Output, + out: >::Output, info: Self::OutboundOpenInfo ) { self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED") @@ -295,12 +298,12 @@ where } } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<>::Error>) { self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED") .inject_dial_upgrade_error(info, err) } - fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, err: ProtocolsHandlerUpgrErr<::Error>) { + fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, err: ProtocolsHandlerUpgrErr<>::Error>) { let (inner, info) = match (self.inner.as_mut(), info) { (Some(inner), Either::Left(info)) => (inner, info), // Ignore listen upgrade errors in disabled state.