Skip to content

Commit

Permalink
feat: report changes in supported protocols to ConnectionHandler
Browse files Browse the repository at this point in the history
With this patch, implementations of `ConnectionHandler` (which are typically composed in a tree) can exchange information about the supported protocols of a remote with each other via `ConnectionHandlerEvent::ReportRemoteProtocols`. The provided `ProtocolSupport` enum can describe either additions or removals of the remote peer's protocols.

This information is aggregated in the connection and passed down to the `ConnectionHandler` via `ConnectionEvent::RemoteProtocolsChange`.

Similarly, if the listen protocols of a connection change, all `ConnectionHandler`s on the connection will be notified via `ConnectionEvent::LocalProtocolsChange`. This will allow us to eventually remove `PollParameters` from `NetworkBehaviour`.

This pattern allows protocols on a connection to communicate with each other. For example, protocols like identify can share the list of (supposedly) supported protocols by the remote with all other handlers. A protocol like kademlia can accurately add and remove a remote from its routing table as a result.

Resolves: #2680.
Related: #3124.

Pull-Request: #3651.
  • Loading branch information
thomaseizinger authored May 8, 2023
1 parent b8a7684 commit b035fc8
Show file tree
Hide file tree
Showing 30 changed files with 844 additions and 243 deletions.
4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl ConnectionHandler for Handler {
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ impl ConnectionHandler for Handler {
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
5 changes: 4 additions & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ impl ConnectionHandler for Handler {
}) => {
log::debug!("Protocol negotiation failed: {e}")
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
Handler::Disabled(_) => {}
Expand Down
166 changes: 51 additions & 115 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Handler, InEvent};
use crate::protocol::{Info, Protocol, UpgradeError};
use crate::protocol::{Info, UpgradeError};
use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
AddressScore, ConnectionDenied, DialError, ExternalAddresses, ListenAddresses,
NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, StreamUpgradeError,
THandlerInEvent, ToSwarm,
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;
Expand All @@ -50,10 +49,6 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
requests: Vec<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand All @@ -63,15 +58,6 @@ pub struct Behaviour {
external_addresses: ExternalAddresses,
}

/// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
#[derive(Debug, PartialEq, Eq)]
struct Request {
peer_id: PeerId,
protocol: Protocol,
}

/// Configuration for the [`identify::Behaviour`](Behaviour).
#[non_exhaustive]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -184,7 +170,6 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
requests: Vec::new(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand All @@ -203,13 +188,11 @@ impl Behaviour {
continue;
}

let request = Request {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
}
handler: NotifyHandler::Any,
event: InEvent::Push,
});
}
}

Expand Down Expand Up @@ -239,6 +222,14 @@ impl Behaviour {
}
}
}

fn all_addresses(&self) -> HashSet<Multiaddr> {
self.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect()
}
}

impl NetworkBehaviour for Behaviour {
Expand All @@ -261,6 +252,7 @@ impl NetworkBehaviour for Behaviour {
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
remote_addr.clone(),
self.all_addresses(),
))
}

Expand All @@ -280,13 +272,14 @@ impl NetworkBehaviour for Behaviour {
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
addr.clone(), // TODO: This is weird? That is the public address we dialed, shouldn't need to tell the other party?
self.all_addresses(),
))
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
_: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Expand Down Expand Up @@ -315,12 +308,6 @@ impl NetworkBehaviour for Behaviour {
self.events
.push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id }));
}
handler::Event::Identify => {
self.requests.push(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
});
}
handler::Event::IdentificationError(error) => {
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error }));
Expand All @@ -331,50 +318,13 @@ impl NetworkBehaviour for Behaviour {
fn poll(
&mut self,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}

// Check for pending requests.
match self.requests.pop() {
Some(Request {
peer_id,
protocol: Protocol::Push,
}) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: InEvent {
listen_addrs: self
.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Push,
},
}),
Some(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
}) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: InEvent {
listen_addrs: self
.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Identify(connection_id),
},
}),
None => Poll::Pending,
}
Poll::Pending
}

fn handle_pending_outbound_connection(
Expand All @@ -393,8 +343,35 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.listen_addresses.on_swarm_event(&event);
self.external_addresses.on_swarm_event(&event);
let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
let external_addr_changed = self.external_addresses.on_swarm_event(&event);

if listen_addr_changed || external_addr_changed {
// notify all connected handlers about our changed addresses
let change_events = self
.connected
.iter()
.flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
.map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connection_id),
event: InEvent::AddressesChanged(self.all_addresses()),
})
.collect::<Vec<_>>();

self.events.extend(change_events)
}

if listen_addr_changed && self.config.push_listen_addr_updates {
// trigger an identify push for all connected peers
let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
peer_id: *peer,
handler: NotifyHandler::Any,
event: InEvent::Push,
});

self.events.extend(push_events);
}

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
Expand All @@ -408,30 +385,11 @@ impl NetworkBehaviour for Behaviour {
}) => {
if remaining_established == 0 {
self.connected.remove(&peer_id);
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id);
}
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
if !self.connected.contains_key(&peer_id) {
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
}
}

if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
if let DialError::Transport(errors) = error {
for (addr, _error) in errors {
Expand All @@ -440,20 +398,9 @@ impl NetworkBehaviour for Behaviour {
}
}
}
FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => {
if self.config.push_listen_addr_updates {
for p in self.connected.keys() {
let request = Request {
peer_id: *p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
}
}
}
}
FromSwarm::AddressChange(_)
FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::ListenerError(_)
Expand Down Expand Up @@ -496,17 +443,6 @@ pub enum Event {
},
}

fn supported_protocols(params: &impl PollParameters) -> Vec<StreamProtocol> {
// The protocol names can be bytes, but the identify protocol except UTF-8 strings.
// There's not much we can do to solve this conflict except strip non-UTF-8 characters.
params
.supported_protocols()
.filter_map(|p| {
StreamProtocol::try_from_owned(String::from_utf8_lossy(&p).to_string()).ok()
})
.collect()
}

/// If there is a given peer_id in the multiaddr, make sure it is the same as
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
Expand Down
Loading

0 comments on commit b035fc8

Please sign in to comment.