Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify upgrade #2098

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use peer_id::PeerId;
pub use identity::PublicKey;
pub use transport::Transport;
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use upgrade::{Role, Upgrade, UpgradeError, ProtocolName};
pub use connection::{Connected, Endpoint, ConnectedPoint};
pub use network::Network;

Expand Down
1 change: 0 additions & 1 deletion core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub use self::boxed::Boxed;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;

/// A transport provides connection-oriented communication between two peers
/// through ordered streams of data (i.e. connections).
Expand Down
100 changes: 50 additions & 50 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ use crate::{
muxing::{StreamMuxer, StreamMuxerBox},
upgrade::{
self,
OutboundUpgrade,
InboundUpgrade,
apply_inbound,
apply_outbound,
Role,
Upgrade,
UpgradeApply,
UpgradeError,
OutboundUpgradeApply,
InboundUpgradeApply
},
PeerId
};
Expand Down Expand Up @@ -111,14 +108,18 @@ where
T: Transport<Output = C>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
U: Upgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| {
let role = if endpoint.is_listener() {
Role::Responder
} else {
Role::Initiator
};
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint, version)
inner: upgrade::apply(conn, upgrade, role, version)
}
}), version))
}
Expand All @@ -132,21 +133,18 @@ where
pub struct Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
U: Upgrade<Negotiated<C>>
{
#[pin]
inner: EitherUpgrade<C, U>
inner: UpgradeApply<C, U>
}

impl<C, U> Future for Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>,
Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
Error = <U as InboundUpgrade<Negotiated<C>>>::Error
>
U: Upgrade<Negotiated<C>>,
{
type Output = <EitherUpgrade<C, U> as Future>::Output;
type Output = <UpgradeApply<C, U> as Future>::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Expand All @@ -162,18 +160,17 @@ where
pub struct Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
U: Upgrade<Negotiated<C>>,
{
peer_id: Option<PeerId>,
#[pin]
upgrade: EitherUpgrade<C, U>,
upgrade: UpgradeApply<C, U>,
}

impl<C, U, M, E> Future for Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
U: Upgrade<Negotiated<C>, Output = M, Error = E>,
{
type Output = Result<(PeerId, M), UpgradeError<E>>;

Expand Down Expand Up @@ -207,16 +204,15 @@ where
///
/// * I/O upgrade: `C -> D`.
/// * Transport output: `(PeerId, C) -> (PeerId, D)`.
pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<TransportUpgrade<T, U>>
where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
U: Upgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version))
Authenticated(Builder::new(TransportUpgrade::new(self.0.inner, upgrade), self.0.version))
}

/// Upgrades the transport with a (sub)stream multiplexer.
Expand All @@ -235,13 +231,17 @@ where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
U: Upgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
let role = if endpoint.is_listener() {
Role::Responder
} else {
Role::Initiator
};
let upgrade = upgrade::apply(c, upgrade, role, version);
Multiplex { peer_id: Some(i), upgrade }
}))
}
Expand All @@ -263,14 +263,18 @@ where
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
U: Upgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
let role = if endpoint.is_listener() {
Role::Responder
} else {
Role::Initiator
};
let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), role, version);
Multiplex { peer_id: Some(peer_id), upgrade }
}))
}
Expand Down Expand Up @@ -340,28 +344,24 @@ where
}
}

/// An inbound or outbound upgrade.
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;

/// A custom upgrade on an [`Authenticated`] transport.
///
/// See [`Transport::upgrade`]
#[derive(Debug, Copy, Clone)]
pub struct Upgrade<T, U> { inner: T, upgrade: U }
pub struct TransportUpgrade<T, U> { inner: T, upgrade: U }

impl<T, U> Upgrade<T, U> {
impl<T, U> TransportUpgrade<T, U> {
pub fn new(inner: T, upgrade: U) -> Self {
Upgrade { inner, upgrade }
TransportUpgrade { inner, upgrade }
}
}

impl<T, C, D, U, E> Transport for Upgrade<T, U>
impl<T, C, D, U, E> Transport for TransportUpgrade<T, U>
where
T: Transport<Output = (PeerId, C)>,
T::Error: 'static,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
U: Upgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static
{
type Output = (PeerId, D);
Expand Down Expand Up @@ -431,18 +431,18 @@ where
/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
pub struct DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
U: Upgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<PeerId>, OutboundUpgradeApply<C, U>)>
upgrade: future::Either<Option<U>, (Option<PeerId>, UpgradeApply<C, U>)>
}

impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>, Output = D>,
U: Upgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
Expand All @@ -460,7 +460,7 @@ where
Err(err) => return Poll::Ready(Err(err)),
};
let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
future::Either::Right((Some(i), upgrade::apply(c, u, Role::Initiator, Version::V1)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Expand All @@ -477,7 +477,7 @@ where

impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
U: Upgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
}
Expand All @@ -493,7 +493,7 @@ where
S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
U: Upgrade<Negotiated<C>, Output = D> + Clone
{
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;

Expand Down Expand Up @@ -525,17 +525,17 @@ impl<S, U> Unpin for ListenerStream<S, U> {
pub struct ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
U: Upgrade<Negotiated<C>>
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<PeerId>, InboundUpgradeApply<C, U>)>
upgrade: future::Either<Option<U>, (Option<PeerId>, UpgradeApply<C, U>)>
}

impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D>,
U: Upgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
Expand All @@ -553,7 +553,7 @@ where
Err(err) => return Poll::Ready(Err(err))
};
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
future::Either::Right((Some(i), apply_inbound(c, u)))
future::Either::Right((Some(i), upgrade::apply(c, u, Role::Responder, Version::V1)))
}
future::Either::Right((ref mut i, ref mut up)) => {
let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
Expand All @@ -571,6 +571,6 @@ where
impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
U: Upgrade<Negotiated<C>>
{
}
Loading