From 63f03bee1916782f32c8ee7a28433596900259bd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 14 Dec 2022 11:50:08 +1100 Subject: [PATCH] refactor(swarm)!: deprecate `PollParameters` where possible (#3153) This patch deprecates 3 out of 4 functions on `PollParameters`: - `local_peer_id` - `listened_addresses` - `external_addresses` The addresses can be obtained by inspecting the `FromSwarm` event. To make this easier, we introduce two utility structs in `libp2p-swarm`: - `ExternalAddresses` - `ListenAddresses` A node's `PeerId` is always known to the caller, thus we can require them to pass it in. Related: #3124. --- examples/chat-tokio.rs | 2 +- examples/chat.rs | 2 +- examples/distributed-key-value-store.rs | 2 +- examples/gossipsub-chat.rs | 2 +- examples/mdns-passive-discovery.rs | 2 +- protocols/autonat/CHANGELOG.md | 4 ++ protocols/autonat/src/behaviour.rs | 16 ++++- protocols/autonat/src/behaviour/as_client.rs | 23 ++++--- protocols/dcutr/CHANGELOG.md | 3 + protocols/dcutr/examples/dcutr.rs | 2 +- protocols/dcutr/src/behaviour.rs | 34 ++++++---- protocols/dcutr/tests/lib.rs | 2 +- protocols/identify/src/behaviour.rs | 31 ++++++--- protocols/kad/src/behaviour.rs | 67 ++++++++++---------- protocols/mdns/CHANGELOG.md | 4 ++ protocols/mdns/src/behaviour.rs | 21 ++++-- protocols/mdns/src/behaviour/iface.rs | 18 +++--- protocols/mdns/src/behaviour/iface/dns.rs | 6 +- protocols/mdns/src/behaviour/iface/query.rs | 2 +- protocols/mdns/tests/use-async-std.rs | 2 +- protocols/mdns/tests/use-tokio.rs | 2 +- protocols/relay/src/behaviour.rs | 26 ++++---- protocols/rendezvous/src/client.rs | 16 +++-- swarm/CHANGELOG.md | 2 + swarm/src/behaviour.rs | 17 +++++ swarm/src/behaviour/external_addresses.rs | 50 +++++++++++++++ swarm/src/behaviour/listen_addresses.rs | 33 ++++++++++ swarm/src/lib.rs | 3 +- 28 files changed, 282 insertions(+), 112 deletions(-) create mode 100644 swarm/src/behaviour/external_addresses.rs create mode 100644 swarm/src/behaviour/listen_addresses.rs diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 240a52fb0c9..2d644b0f360 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -91,7 +91,7 @@ async fn main() -> Result<(), Box> { } // Create a Swarm to manage peers and events. - let mdns_behaviour = mdns::Behaviour::new(Default::default())?; + let mdns_behaviour = mdns::Behaviour::new(Default::default(), peer_id)?; let behaviour = MyBehaviour { floodsub: Floodsub::new(peer_id), mdns: mdns_behaviour, diff --git a/examples/chat.rs b/examples/chat.rs index 0c57f90c8ff..6d6825125df 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -107,7 +107,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let mdns = mdns::async_io::Behaviour::new(mdns::Config::default())?; + let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(local_peer_id), mdns, diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index bd5f4be916b..18d74270e7e 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -97,7 +97,7 @@ async fn main() -> Result<(), Box> { // Create a Kademlia behaviour. let store = MemoryStore::new(local_peer_id); let kademlia = Kademlia::new(local_peer_id, store); - let mdns = mdns::async_io::Behaviour::new(mdns::Config::default())?; + let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?; let behaviour = MyBehaviour { kademlia, mdns }; Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 07c04e472e5..9150aa64624 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -104,7 +104,7 @@ async fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let mdns = mdns::async_io::Behaviour::new(mdns::Config::default())?; + let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?; let behaviour = MyBehaviour { gossipsub, mdns }; Swarm::with_async_std_executor(transport, behaviour, local_peer_id) }; diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index fc84b7972bb..587d268d0e2 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -39,7 +39,7 @@ async fn main() -> Result<(), Box> { let transport = libp2p::development_transport(id_keys).await?; // Create an MDNS network behaviour. - let behaviour = mdns::async_io::Behaviour::new(mdns::Config::default())?; + let behaviour = mdns::async_io::Behaviour::new(mdns::Config::default(), peer_id)?; // Create a Swarm that establishes connections through the given transport. // Note that the MDNS behaviour itself will not actually inititiate any connections, diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 4c4efd79bf2..3da1cedb2aa 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -1,9 +1,13 @@ # 0.10.0 [unreleased] +- Require the node's local `PeerId` to be passed into the constructor of `libp2p_autonat::Behaviour`. See [PR 3153]. + - Update to `libp2p-request-response` `v0.24.0`. - Update to `libp2p-swarm` `v0.42.0`. +[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 + # 0.9.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index a0d5ad4fedc..d9980f16dbb 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -39,8 +39,8 @@ use libp2p_swarm::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, FromSwarm, }, - ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, + ConnectionHandler, ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, }; use std::{ collections::{HashMap, VecDeque}, @@ -212,6 +212,9 @@ pub struct Behaviour { pending_out_events: VecDeque<::OutEvent>, probe_id: ProbeId, + + listen_addresses: ListenAddresses, + external_addresses: ExternalAddresses, } impl Behaviour { @@ -236,6 +239,8 @@ impl Behaviour { last_probe: None, pending_out_events: VecDeque::new(), probe_id: ProbeId(0), + listen_addresses: Default::default(), + external_addresses: Default::default(), } } @@ -288,6 +293,8 @@ impl Behaviour { ongoing_outbound: &mut self.ongoing_outbound, last_probe: &mut self.last_probe, schedule_probe: &mut self.schedule_probe, + listen_addresses: &self.listen_addresses, + external_addresses: &self.external_addresses, } } @@ -457,7 +464,7 @@ impl NetworkBehaviour for Behaviour { Poll::Pending => is_inner_pending = true, } - match self.as_client().poll_auto_probe(params, cx) { + match self.as_client().poll_auto_probe(cx) { Poll::Ready(event) => self .pending_out_events .push_back(Event::OutboundProbe(event)), @@ -476,6 +483,9 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { + self.listen_addresses.on_swarm_event(&event); + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionEstablished(connection_established) => { self.inner diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 43763029f65..76ddebce2dc 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -29,7 +29,9 @@ use futures_timer::Delay; use instant::Instant; use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId}; use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; -use libp2p_swarm::{AddressScore, NetworkBehaviourAction, PollParameters}; +use libp2p_swarm::{ + AddressScore, ExternalAddresses, ListenAddresses, NetworkBehaviourAction, PollParameters, +}; use rand::{seq::SliceRandom, thread_rng}; use std::{ collections::{HashMap, VecDeque}, @@ -97,6 +99,9 @@ pub struct AsClient<'a> { pub last_probe: &'a mut Option, pub schedule_probe: &'a mut Delay, + + pub listen_addresses: &'a ListenAddresses, + pub external_addresses: &'a ExternalAddresses, } impl<'a> HandleInnerEvent for AsClient<'a> { @@ -146,6 +151,8 @@ impl<'a> HandleInnerEvent for AsClient<'a> { if let Ok(address) = response.result { // Update observed address score if it is finite. + #[allow(deprecated)] + // TODO: Fix once we report `AddressScore` through `FromSwarm` event. let score = params .external_addresses() .find_map(|r| (r.addr == address).then_some(r.score)) @@ -188,17 +195,17 @@ impl<'a> HandleInnerEvent for AsClient<'a> { } impl<'a> AsClient<'a> { - pub fn poll_auto_probe( - &mut self, - params: &mut impl PollParameters, - cx: &mut Context<'_>, - ) -> Poll { + pub fn poll_auto_probe(&mut self, cx: &mut Context<'_>) -> Poll { match self.schedule_probe.poll_unpin(cx) { Poll::Ready(()) => { self.schedule_probe.reset(self.config.retry_interval); - let mut addresses: Vec<_> = params.external_addresses().map(|r| r.addr).collect(); - addresses.extend(params.listened_addresses()); + let addresses = self + .external_addresses + .iter() + .chain(self.listen_addresses.iter()) + .cloned() + .collect(); let probe_id = self.probe_id.next(); let event = match self.do_probe(probe_id, addresses) { diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 6c45f4cc9d4..23c9b616541 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -5,7 +5,10 @@ - Declare `InboundUpgradeError` and `OutboundUpgradeError` as type aliases instead of renames. This is a workaround for a missing feature in `cargo semver-checks`. See [PR 3213]. +- Require the node's local `PeerId` to be passed into the constructor of `libp2p_dcutr::Behaviour`. See [PR 3153]. + [PR 3213]: https://github.com/libp2p/rust-libp2p/pull/3213 +[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 # 0.8.0 diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index 1d9c3366642..6356ef6647f 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -157,7 +157,7 @@ fn main() -> Result<(), Box> { "/TODO/0.0.1".to_string(), local_key.public(), )), - dcutr: dcutr::behaviour::Behaviour::new(), + dcutr: dcutr::behaviour::Behaviour::new(local_peer_id), }; let mut swarm = match ThreadPool::new() { diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 78b22c89ac1..15dfe078bfe 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -29,8 +29,8 @@ use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ - ConnectionHandler, ConnectionHandlerUpgrErr, IntoConnectionHandler, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, + ConnectionHandler, ConnectionHandlerUpgrErr, ExternalAddresses, IntoConnectionHandler, + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; @@ -66,20 +66,25 @@ pub enum UpgradeError { Handler(ConnectionHandlerUpgrErr), } -#[derive(Default)] pub struct Behaviour { /// Queue of actions to return when polled. queued_actions: VecDeque, /// All direct (non-relayed) connections. direct_connections: HashMap>, + + external_addresses: ExternalAddresses, + + local_peer_id: PeerId, } impl Behaviour { - pub fn new() -> Self { + pub fn new(local_peer_id: PeerId) -> Self { Behaviour { queued_actions: Default::default(), direct_connections: Default::default(), + external_addresses: Default::default(), + local_peer_id, } } @@ -308,16 +313,18 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, _cx: &mut Context<'_>, - poll_parameters: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll> { if let Some(action) = self.queued_actions.pop_front() { - return Poll::Ready(action.build(poll_parameters)); + return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } Poll::Pending } fn on_swarm_event(&mut self, event: FromSwarm) { + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) @@ -364,16 +371,15 @@ impl From> for ActionBuilder { impl ActionBuilder { fn build( self, - poll_parameters: &mut impl PollParameters, + local_peer_id: PeerId, + external_addresses: &ExternalAddresses, ) -> NetworkBehaviourAction { let obs_addrs = || { - poll_parameters - .external_addresses() - .filter(|a| !a.addr.iter().any(|p| p == Protocol::P2pCircuit)) - .map(|a| { - a.addr - .with(Protocol::P2p((*poll_parameters.local_peer_id()).into())) - }) + external_addresses + .iter() + .cloned() + .filter(|a| !a.iter().any(|p| p == Protocol::P2pCircuit)) + .map(|a| a.with(Protocol::P2p(local_peer_id.into()))) .collect() }; diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 65f5f977388..27885aa574c 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -127,7 +127,7 @@ fn build_client() -> Swarm { transport, Client { relay: behaviour, - dcutr: dcutr::behaviour::Behaviour::new(), + dcutr: dcutr::behaviour::Behaviour::new(local_peer_id), }, local_peer_id, ) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index f37abfc0da7..beb264da789 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -26,7 +26,8 @@ use libp2p_core::{ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, + NetworkBehaviourAction, NotifyHandler, PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -56,6 +57,9 @@ pub struct Behaviour { events: VecDeque>, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, + + listen_addresses: ListenAddresses, + external_addresses: ExternalAddresses, } /// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info @@ -177,6 +181,8 @@ impl Behaviour { requests: Vec::new(), events: VecDeque::new(), discovered_peers, + listen_addresses: Default::default(), + external_addresses: Default::default(), } } @@ -318,7 +324,12 @@ impl NetworkBehaviour for Behaviour { peer_id, handler: NotifyHandler::Any, event: InEvent { - listen_addrs: listen_addrs(params), + listen_addrs: self + .listen_addresses + .iter() + .chain(self.external_addresses.iter()) + .cloned() + .collect(), supported_protocols: supported_protocols(params), protocol: Protocol::Push, }, @@ -330,7 +341,12 @@ impl NetworkBehaviour for Behaviour { peer_id, handler: NotifyHandler::One(connection_id), event: InEvent { - listen_addrs: listen_addrs(params), + listen_addrs: self + .listen_addresses + .iter() + .chain(self.external_addresses.iter()) + .cloned() + .collect(), supported_protocols: supported_protocols(params), protocol: Protocol::Identify(connection_id), }, @@ -344,6 +360,9 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { + self.listen_addresses.on_swarm_event(&event); + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) @@ -453,12 +472,6 @@ fn supported_protocols(params: &impl PollParameters) -> Vec { .collect() } -fn listen_addrs(params: &impl PollParameters) -> Vec { - let mut listen_addrs: Vec<_> = params.external_addresses().map(|r| r.addr).collect(); - listen_addrs.extend(params.listened_addresses()); - listen_addrs -} - /// If there is a given peer_id in the multiaddr, make sure it is the same as /// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true. fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 643f618567c..42081b22108 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -41,12 +41,12 @@ use fnv::{FnvHashMap, FnvHashSet}; use instant::Instant; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ - AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr, - FromSwarm, NewExternalAddr, NewListenAddr, + AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, }; use libp2p_swarm::{ dial_opts::{self, DialOpts}, - DialError, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -103,12 +103,15 @@ pub struct Kademlia { /// Queued events to return when the behaviour is being polled. queued_events: VecDeque>>, - /// The currently known addresses of the local node. - local_addrs: HashSet, + listen_addresses: ListenAddresses, + + external_addresses: ExternalAddresses, /// See [`KademliaConfig::caching`]. caching: KademliaCaching, + local_peer_id: PeerId, + /// The record storage. store: TStore, } @@ -439,6 +442,7 @@ where protocol_config: config.protocol_config, record_filtering: config.record_filtering, queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()), + listen_addresses: Default::default(), queries: QueryPool::new(config.query_config), connected_peers: Default::default(), add_provider_job, @@ -446,7 +450,8 @@ where record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, - local_addrs: HashSet::new(), + external_addresses: Default::default(), + local_peer_id: id, } } @@ -1034,7 +1039,9 @@ where fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec { let kbuckets = &mut self.kbuckets; let connected = &mut self.connected_peers; - let local_addrs = &self.local_addrs; + let listen_addresses = &self.listen_addresses; + let external_addresses = &self.external_addresses; + self.store .providers(key) .into_iter() @@ -1055,7 +1062,13 @@ where // done before provider records were stored along with // their addresses. if &node_id == kbuckets.local_key().preimage() { - Some(local_addrs.iter().cloned().collect::>()) + Some( + listen_addresses + .iter() + .chain(external_addresses.iter()) + .cloned() + .collect::>(), + ) } else { let key = kbucket::Key::from(node_id); kbuckets @@ -1226,11 +1239,7 @@ where } /// Handles a finished (i.e. successful) query. - fn query_finished( - &mut self, - q: Query, - params: &mut impl PollParameters, - ) -> Option { + fn query_finished(&mut self, q: Query) -> Option { let query_id = q.id(); log::trace!("Query {:?} finished.", query_id); let result = q.into_result(); @@ -1340,8 +1349,8 @@ where key, phase: AddProviderPhase::GetClosestPeers, } => { - let provider_id = *params.local_peer_id(); - let external_addresses = params.external_addresses().map(|r| r.addr).collect(); + let provider_id = self.local_peer_id; + let external_addresses = self.external_addresses.iter().cloned().collect(); let inner = QueryInner::new(QueryInfo::AddProvider { context, key, @@ -2285,7 +2294,7 @@ where fn poll( &mut self, cx: &mut Context<'_>, - parameters: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll> { let now = Instant::now(); @@ -2352,7 +2361,7 @@ where loop { match self.queries.poll(now) { QueryPoolState::Finished(q) => { - if let Some(event) = self.query_finished(q, parameters) { + if let Some(event) = self.query_finished(q) { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } } @@ -2406,6 +2415,9 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { + self.listen_addresses.on_swarm_event(&event); + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionEstablished(connection_established) => { self.on_connection_established(connection_established) @@ -2415,18 +2427,10 @@ where } FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), FromSwarm::AddressChange(address_change) => self.on_address_change(address_change), - FromSwarm::ExpiredListenAddr(ExpiredListenAddr { addr, .. }) => { - self.local_addrs.remove(addr); - } - FromSwarm::NewExternalAddr(NewExternalAddr { addr }) => { - if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS { - self.local_addrs.insert(addr.clone()); - } - } - FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => { - self.local_addrs.insert(addr.clone()); - } - FromSwarm::ListenFailure(_) + FromSwarm::ExpiredListenAddr(_) + | FromSwarm::NewExternalAddr(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ListenFailure(_) | FromSwarm::NewListener(_) | FromSwarm::ListenerClosed(_) | FromSwarm::ListenerError(_) @@ -3179,8 +3183,3 @@ pub enum RoutingUpdate { /// peer ID). Failed, } - -/// The maximum number of local external addresses. When reached any -/// further externally reported addresses are ignored. The behaviour always -/// tracks all its listen addresses. -const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index c997b9d5d8b..4ebe5674b6c 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,7 +1,11 @@ # 0.43.0 [unreleased] +- Require the node's local `PeerId` to be passed into the constructor of `libp2p_mdns::Behaviour`. See [PR 3153]. + - Update to `libp2p-swarm` `v0.42.0`. +[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 + # 0.42.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index ef9ac50addf..815a23c9bdc 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -30,7 +30,8 @@ use if_watch::IfEvent; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - dummy, ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + dummy, ConnectionHandler, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, }; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; @@ -121,6 +122,10 @@ where /// /// `None` if `discovered_nodes` is empty. closest_expiration: Option, + + listen_addresses: ListenAddresses, + + local_peer_id: PeerId, } impl

Behaviour

@@ -128,13 +133,15 @@ where P: Provider, { /// Builds a new `Mdns` behaviour. - pub fn new(config: Config) -> io::Result { + pub fn new(config: Config, local_peer_id: PeerId) -> io::Result { Ok(Self { config, if_watch: P::new_watcher()?, iface_states: Default::default(), discovered_nodes: Default::default(), closest_expiration: Default::default(), + listen_addresses: Default::default(), + local_peer_id, }) } @@ -189,6 +196,8 @@ where } fn on_swarm_event(&mut self, event: FromSwarm) { + self.listen_addresses.on_swarm_event(&event); + match event { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -221,7 +230,7 @@ where fn poll( &mut self, cx: &mut Context<'_>, - params: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll> { // Poll ifwatch. while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { @@ -237,7 +246,7 @@ where continue; } if let Entry::Vacant(e) = self.iface_states.entry(addr) { - match InterfaceState::new(addr, self.config.clone()) { + match InterfaceState::new(addr, self.config.clone(), self.local_peer_id) { Ok(iface_state) => { e.insert(iface_state); } @@ -257,7 +266,9 @@ where // Emit discovered event. let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new(); for iface_state in self.iface_states.values_mut() { - while let Poll::Ready((peer, addr, expiration)) = iface_state.poll(cx, params) { + while let Poll::Ready((peer, addr, expiration)) = + iface_state.poll(cx, &self.listen_addresses) + { if let Some((_, _, cur_expires)) = self .discovered_nodes .iter_mut() diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 0985f3cdc82..f07e6564a4c 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -26,7 +26,7 @@ use self::query::MdnsPacket; use crate::behaviour::{socket::AsyncSocket, timer::Builder}; use crate::Config; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::PollParameters; +use libp2p_swarm::ListenAddresses; use socket2::{Domain, Socket, Type}; use std::{ collections::VecDeque, @@ -66,6 +66,8 @@ pub struct InterfaceState { discovered: VecDeque<(PeerId, Multiaddr, Instant)>, /// TTL ttl: Duration, + + local_peer_id: PeerId, } impl InterfaceState @@ -74,7 +76,7 @@ where T: Builder + futures::Stream, { /// Builds a new [`InterfaceState`]. - pub fn new(addr: IpAddr, config: Config) -> io::Result { + pub fn new(addr: IpAddr, config: Config, local_peer_id: PeerId) -> io::Result { log::info!("creating instance on iface {}", addr); let recv_socket = match addr { IpAddr::V4(addr) => { @@ -134,6 +136,7 @@ where timeout: T::interval_at(Instant::now(), query_interval), multicast_addr, ttl: config.ttl, + local_peer_id, }) } @@ -148,7 +151,7 @@ where pub fn poll( &mut self, cx: &mut Context, - params: &impl PollParameters, + listen_addresses: &ListenAddresses, ) -> Poll<(PeerId, Multiaddr, Instant)> { loop { // 1st priority: Low latency: Create packet ASAP after timeout. @@ -198,8 +201,8 @@ where self.send_buffer.extend(build_query_response( query.query_id(), - *params.local_peer_id(), - params.listened_addresses(), + self.local_peer_id, + listen_addresses.iter(), self.ttl, )); continue; @@ -211,9 +214,8 @@ where self.addr ); - self.discovered.extend( - response.extract_discovered(Instant::now(), *params.local_peer_id()), - ); + self.discovered + .extend(response.extract_discovered(Instant::now(), self.local_peer_id)); continue; } Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => { diff --git a/protocols/mdns/src/behaviour/iface/dns.rs b/protocols/mdns/src/behaviour/iface/dns.rs index 1f0825727a2..8ea4db49f2d 100644 --- a/protocols/mdns/src/behaviour/iface/dns.rs +++ b/protocols/mdns/src/behaviour/iface/dns.rs @@ -103,10 +103,10 @@ pub fn build_query() -> MdnsPacket { /// Builds the response to an address discovery DNS query. /// /// If there are more than 2^16-1 addresses, ignores the rest. -pub fn build_query_response( +pub fn build_query_response<'a>( id: u16, peer_id: PeerId, - addresses: impl ExactSizeIterator, + addresses: impl ExactSizeIterator, ttl: Duration, ) -> Vec { // Convert the TTL into seconds. @@ -413,7 +413,7 @@ mod tests { let packets = build_query_response( 0xf8f8, my_peer_id, - vec![addr1, addr2].into_iter(), + vec![&addr1, &addr2].into_iter(), Duration::from_secs(60), ); for packet in packets { diff --git a/protocols/mdns/src/behaviour/iface/query.rs b/protocols/mdns/src/behaviour/iface/query.rs index 50b86bd888f..7cf22223c41 100644 --- a/protocols/mdns/src/behaviour/iface/query.rs +++ b/protocols/mdns/src/behaviour/iface/query.rs @@ -335,7 +335,7 @@ mod tests { let packets = build_query_response( 0xf8f8, peer_id, - vec![addr1, addr2].into_iter(), + vec![&addr1, &addr2].into_iter(), Duration::from_secs(60), ); diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 736371a50a4..e12eeb09299 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -63,7 +63,7 @@ async fn create_swarm(config: Config) -> Result, Box .authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap()) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); - let behaviour = Behaviour::new(config)?; + let behaviour = Behaviour::new(config, peer_id)?; let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 1a48e8c7361..77deae55412 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -59,7 +59,7 @@ async fn create_swarm(config: Config) -> Result, Box .authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap()) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); - let behaviour = Behaviour::new(config)?; + let behaviour = Behaviour::new(config, peer_id)?; let mut swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; Ok(swarm) diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index 3a10045ad9f..32150f884a0 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -32,8 +32,8 @@ use libp2p_core::multiaddr::Protocol; use libp2p_core::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, + ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -199,6 +199,8 @@ pub struct Behaviour { /// Queue of actions to return when polled. queued_actions: VecDeque, + + external_addresses: ExternalAddresses, } impl Behaviour { @@ -209,6 +211,7 @@ impl Behaviour { reservations: Default::default(), circuits: Default::default(), queued_actions: Default::default(), + external_addresses: Default::default(), } } @@ -261,6 +264,8 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) @@ -637,10 +642,10 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, _cx: &mut Context<'_>, - poll_parameters: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll> { if let Some(action) = self.queued_actions.pop_front() { - return Poll::Ready(action.build(poll_parameters)); + return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } Poll::Pending @@ -758,7 +763,8 @@ impl From> for Action { impl Action { fn build( self, - poll_parameters: &mut impl PollParameters, + local_peer_id: PeerId, + external_addresses: &ExternalAddresses, ) -> NetworkBehaviourAction { match self { Action::Done(action) => action, @@ -771,15 +777,13 @@ impl Action { peer_id, event: Either::Left(handler::In::AcceptReservationReq { inbound_reservation_req, - addrs: poll_parameters - .external_addresses() - .map(|a| a.addr) + addrs: external_addresses + .iter() + .cloned() // Add local peer ID in case it isn't present yet. .filter_map(|a| match a.iter().last()? { Protocol::P2p(_) => Some(a), - _ => Some( - a.with(Protocol::P2p(*poll_parameters.local_peer_id().as_ref())), - ), + _ => Some(a.with(Protocol::P2p(local_peer_id.into()))), }) .collect(), }), diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 173831d95d6..599fc8f508f 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -34,7 +34,8 @@ use libp2p_core::identity::Keypair; use libp2p_core::{Multiaddr, PeerId, PeerRecord}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + CloseConnection, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; @@ -57,6 +58,8 @@ pub struct Behaviour { /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers` otherwise we have a memory leak. expiring_registrations: FuturesUnordered>, + + external_addresses: ExternalAddresses, } impl Behaviour { @@ -70,6 +73,7 @@ impl Behaviour { expiring_registrations: FuturesUnordered::from_iter(vec![ futures::future::pending().boxed() ]), + external_addresses: Default::default(), } } @@ -215,7 +219,7 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, cx: &mut Context<'_>, - poll_params: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); @@ -224,10 +228,8 @@ impl NetworkBehaviour for Behaviour { if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() { // Update our external addresses based on the Swarm's current knowledge. // It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside. - let external_addresses = poll_params - .external_addresses() - .map(|r| r.addr) - .collect::>(); + + let external_addresses = self.external_addresses.iter().cloned().collect::>(); if external_addresses.is_empty() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent( @@ -268,6 +270,8 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm) { + self.external_addresses.on_swarn_event(&event); + match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e111ea099c0..e6f0a92c8ec 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,11 +1,13 @@ # 0.42.0 [unreleased] - Removed deprecated Swarm constructors. For transition notes see [0.41.0](#0.41.0). See [PR 3170]. +- Deprecate functions on `PollParameters` in preparation for `PollParameters` to be removed entirely eventually. See [PR 3153]. - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 +[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 # 0.41.1 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index cbd2a4381c4..141d6c3efbd 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -19,8 +19,13 @@ // DEALINGS IN THE SOFTWARE. mod either; +mod external_addresses; +mod listen_addresses; pub mod toggle; +pub use external_addresses::ExternalAddresses; +pub use listen_addresses::ListenAddresses; + use crate::dial_opts::DialOpts; use crate::handler::{ConnectionHandler, IntoConnectionHandler}; use crate::{AddressRecord, AddressScore, DialError}; @@ -402,12 +407,24 @@ pub trait PollParameters { fn supported_protocols(&self) -> Self::SupportedProtocolsIter; /// Returns the list of the addresses we're listening on. + #[deprecated( + since = "0.42.0", + note = "Use `libp2p_swarm::ListenAddresses` instead." + )] fn listened_addresses(&self) -> Self::ListenedAddressesIter; /// Returns the list of the addresses nodes can use to reach us. + #[deprecated( + since = "0.42.0", + note = "Use `libp2p_swarm::ExternalAddresses` instead." + )] fn external_addresses(&self) -> Self::ExternalAddressesIter; /// Returns the peer id of the local node. + #[deprecated( + since = "0.42.0", + note = "Pass the node's `PeerId` into the behaviour instead." + )] fn local_peer_id(&self) -> &PeerId; } diff --git a/swarm/src/behaviour/external_addresses.rs b/swarm/src/behaviour/external_addresses.rs new file mode 100644 index 00000000000..0ce07962e7e --- /dev/null +++ b/swarm/src/behaviour/external_addresses.rs @@ -0,0 +1,50 @@ +use crate::behaviour::{ExpiredExternalAddr, FromSwarm, NewExternalAddr}; +use crate::IntoConnectionHandler; +use libp2p_core::Multiaddr; +use std::collections::HashSet; + +/// The maximum number of local external addresses. When reached any +/// further externally reported addresses are ignored. The behaviour always +/// tracks all its listen addresses. +const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; + +/// Utility struct for tracking the external addresses of a [`Swarm`](crate::Swarm). +#[derive(Debug, Clone)] +pub struct ExternalAddresses { + addresses: HashSet, + limit: usize, +} + +impl Default for ExternalAddresses { + fn default() -> Self { + Self { + addresses: Default::default(), + limit: MAX_LOCAL_EXTERNAL_ADDRS, + } + } +} + +impl ExternalAddresses { + /// Returns an [`Iterator`] over all external addresses. + pub fn iter(&self) -> impl ExactSizeIterator { + self.addresses.iter() + } + + /// Feed a [`FromSwarm`] event to this struct. + pub fn on_swarn_event(&mut self, event: &FromSwarm) + where + THandler: IntoConnectionHandler, + { + match event { + FromSwarm::NewExternalAddr(NewExternalAddr { addr, .. }) => { + if self.addresses.len() < self.limit { + self.addresses.insert((*addr).clone()); + } + } + FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr, .. }) => { + self.addresses.insert((*addr).clone()); + } + _ => {} + } + } +} diff --git a/swarm/src/behaviour/listen_addresses.rs b/swarm/src/behaviour/listen_addresses.rs new file mode 100644 index 00000000000..37a7d05d12c --- /dev/null +++ b/swarm/src/behaviour/listen_addresses.rs @@ -0,0 +1,33 @@ +use crate::behaviour::{ExpiredListenAddr, FromSwarm, NewListenAddr}; +use crate::IntoConnectionHandler; +use libp2p_core::Multiaddr; +use std::collections::HashSet; + +/// Utility struct for tracking the addresses a [`Swarm`](crate::Swarm) is listening on. +#[derive(Debug, Default, Clone)] +pub struct ListenAddresses { + addresses: HashSet, +} + +impl ListenAddresses { + /// Returns an [`Iterator`] over all listen addresses. + pub fn iter(&self) -> impl ExactSizeIterator { + self.addresses.iter() + } + + /// Feed a [`FromSwarm`] event to this struct. + pub fn on_swarm_event(&mut self, event: &FromSwarm) + where + THandler: IntoConnectionHandler, + { + match event { + FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => { + self.addresses.insert((*addr).clone()); + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { addr, .. }) => { + self.addresses.insert((*addr).clone()); + } + _ => {} + } + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 68ac8009884..a2ef648d6e6 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -101,7 +101,8 @@ pub mod derive_prelude { } pub use behaviour::{ - CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + CloseConnection, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, PollParameters, }; pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::{