Skip to content

Commit

Permalink
{core,swarm}/: Allow configuring dial concurrency factor per dial (#2…
Browse files Browse the repository at this point in the history
…404)

Enable a `NetworkBehaviour` or a user via `Swarm::dial` to override the
dial concurrency factor per dial. This is especially relevant in the
case of libp2p-autonat where one wants to probe addresses in sequence to
reduce the amount of work a remote peer can force onto the local node.

To enable the above, this commit also:

- Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`.
  Passed as an argument to `Network::dial`.
- Removes `Peer::dial` in favor of `Network::dial`.
- Simplifies `Swarm::dial_with_handler`.

The introduction of `libp2p_core::DialOpts` will be useful beyond this
feature, e.g. for libp2p/rust-libp2p#2363.

In the long run I would like to move and merge `libp2p_core::Network`
and `libp2p_core::Pool` into `libp2p_swarm::Swarm` thus deduplicating
`libp2p_core::DialOpts` and `libp2p_swarm::DialOpts`.

Fixes #2385.
  • Loading branch information
santos227 authored Jan 13, 2022
1 parent 86d87a6 commit 6fc98db
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 155 deletions.
10 changes: 10 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@

- Add `ConnectedPoint::is_relayed` (see [PR 2392]).

- Enable overriding _dial concurrency factor_ per dial via
`DialOpts::override_dial_concurrency_factor`.

- Introduces `libp2p_core::DialOpts` mirroring `libp2p_swarm::DialOpts`.
Passed as an argument to `Network::dial`.
- Removes `Peer::dial` in favor of `Network::dial`.

See [PR 2404].

- Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408])

[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352
[PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408

# 0.30.1 [2021-11-16]
Expand Down
8 changes: 7 additions & 1 deletion core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>,
handler: THandler,
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Clone + Send,
Expand All @@ -544,7 +545,12 @@ where
return Err(DialError::ConnectionLimit { limit, handler });
};

let dial = ConcurrentDial::new(transport, peer, addresses, self.dial_concurrency_factor);
let dial = ConcurrentDial::new(
transport,
peer,
addresses,
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
);

let connection_id = self.next_connection_id();

Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub use identity::PublicKey;
pub use multiaddr::Multiaddr;
pub use multihash;
pub use muxing::StreamMuxer;
pub use network::Network;
pub use network::{DialOpts, Network};
pub use peer_id::PeerId;
pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;
Expand Down
206 changes: 147 additions & 59 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ use crate::{
transport::{Transport, TransportError},
Executor, Multiaddr, PeerId,
};
use either::Either;
use std::{
convert::TryFrom as _,
error, fmt,
num::{NonZeroU8, NonZeroUsize},
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;

/// Implementation of `Stream` that handles the nodes.
pub struct Network<TTrans, THandler>
Expand Down Expand Up @@ -185,16 +187,15 @@ where
&self.local_peer_id
}

/// Dials a [`Multiaddr`] that may or may not encapsulate a
/// specific expected remote peer ID.
/// Dial a known or unknown peer.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(
&mut self,
address: &Multiaddr,
handler: THandler,
opts: impl Into<DialOpts>,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport + Send,
Expand All @@ -203,50 +204,54 @@ where
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() {
if let Ok(peer) = PeerId::try_from(ma) {
return self.dial_peer(DialingOpts {
peer,
addresses: std::iter::once(address.clone()),
handler,
});
let opts = opts.into();

let (peer_id, addresses, dial_concurrency_factor_override) = match opts.0 {
// Dial a known peer.
Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses {
peer_id,
addresses,
dial_concurrency_factor_override,
}) => (
Some(peer_id),
Either::Left(addresses.into_iter()),
dial_concurrency_factor_override,
),
// Dial an unknown peer.
Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress { address }) => {
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
let peer_id = match address
.iter()
.last()
.and_then(|p| {
if let multiaddr::Protocol::P2p(ma) = p {
Some(PeerId::try_from(ma))
} else {
None
}
})
.transpose()
{
Ok(peer_id) => peer_id,
Err(_) => return Err(DialError::InvalidPeerId { handler }),
};

(peer_id, Either::Right(std::iter::once(address)), None)
}
}
};

self.pool.add_outgoing(
self.transport().clone(),
std::iter::once(address.clone()),
None,
addresses,
peer_id,
handler,
dial_concurrency_factor_override,
)
}

/// Initiates a connection attempt to a known peer.
fn dial_peer<I>(
&mut self,
opts: DialingOpts<THandler, I>,
) -> Result<ConnectionId, DialError<THandler>>
where
I: Iterator<Item = Multiaddr> + Send + 'static,
TTrans: Transport + Send,
TTrans::Output: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
{
let id = self.pool.add_outgoing(
self.transport().clone(),
opts.addresses,
Some(opts.peer),
opts.handler,
)?;

Ok(id)
}

/// Returns information about the state of the `Network`.
pub fn info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers();
Expand Down Expand Up @@ -463,14 +468,6 @@ where
}
}

/// Options for a dialing attempt (i.e. repeated connection attempt
/// via a list of address) to a peer.
struct DialingOpts<THandler, I> {
peer: PeerId,
handler: THandler,
addresses: I,
}

/// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
Expand Down Expand Up @@ -560,31 +557,122 @@ impl NetworkConfig {
}

/// Possible (synchronous) errors when dialing a peer.
#[derive(Clone)]
#[derive(Debug, Clone, Error)]
pub enum DialError<THandler> {
/// The dialing attempt is rejected because of a connection limit.
ConnectionLimit {
limit: ConnectionLimit,
handler: THandler,
},
/// The dialing attempt is rejected because the peer being dialed is the local peer.
LocalPeerId { handler: THandler },
LocalPeerId {
handler: THandler,
},
InvalidPeerId {
handler: THandler,
},
}

impl<THandler> fmt::Debug for DialError<THandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
DialError::ConnectionLimit { limit, handler: _ } => f
.debug_struct("DialError::ConnectionLimit")
.field("limit", limit)
.finish(),
DialError::LocalPeerId { handler: _ } => {
f.debug_struct("DialError::LocalPeerId").finish()
}
/// Options to configure a dial to a known or unknown peer.
///
/// Used in [`Network::dial`].
///
/// To construct use either of:
///
/// - [`DialOpts::peer_id`] dialing a known peer
///
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
#[derive(Debug, Clone, PartialEq)]
pub struct DialOpts(pub(super) Opts);

impl DialOpts {
/// Dial a known peer.
pub fn peer_id(peer_id: PeerId) -> WithPeerId {
WithPeerId { peer_id }
}

/// Dial an unknown peer.
pub fn unknown_peer_id() -> WithoutPeerId {
WithoutPeerId {}
}
}

impl From<Multiaddr> for DialOpts {
fn from(address: Multiaddr) -> Self {
DialOpts::unknown_peer_id().address(address).build()
}
}

/// Internal options type.
///
/// Not to be constructed manually. Use either of the below instead:
///
/// - [`DialOpts::peer_id`] dialing a known peer
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
#[derive(Debug, Clone, PartialEq)]
pub(super) enum Opts {
WithPeerIdWithAddresses(WithPeerIdWithAddresses),
WithoutPeerIdWithAddress(WithoutPeerIdWithAddress),
}

#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerId {
pub(crate) peer_id: PeerId,
}

impl WithPeerId {
/// Specify a set of addresses to be used to dial the known peer.
pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses {
WithPeerIdWithAddresses {
peer_id: self.peer_id,
addresses,
dial_concurrency_factor_override: Default::default(),
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerIdWithAddresses {
pub(crate) peer_id: PeerId,
pub(crate) addresses: Vec<Multiaddr>,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
}

impl WithPeerIdWithAddresses {
/// Override [`NetworkConfig::with_dial_concurrency_factor`].
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor_override = Some(factor);
self
}

/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithPeerIdWithAddresses(self))
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerId {}

impl WithoutPeerId {
/// Specify a single address to dial the unknown peer.
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
WithoutPeerIdWithAddress { address }
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerIdWithAddress {
pub(crate) address: Multiaddr,
}

impl WithoutPeerIdWithAddress {
/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithoutPeerIdWithAddress(self))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 6fc98db

Please sign in to comment.