Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Dial cached enr's before making subnet discovery query #1376

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions beacon_node/eth2_libp2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ use tokio::sync::mpsc;
use types::{EnrForkId, EthSpec, SubnetId};

mod subnet_predicate;
use subnet_predicate::subnet_predicate;
pub use subnet_predicate::subnet_predicate;

/// Local ENR storage filename.
pub const ENR_FILENAME: &str = "enr.dat";
/// Target number of peers we'd like to have connected to a given long-lived subnet.
const TARGET_SUBNET_PEERS: usize = 3;
pub const TARGET_SUBNET_PEERS: usize = 3;
/// Target number of peers to search for given a grouped subnet query.
const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6;
/// Number of times to attempt a discovery request.
Expand Down Expand Up @@ -287,6 +287,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
self.discv5.local_enr()
}

/// Return the cached enrs.
pub fn cached_enrs(&self) -> impl Iterator<Item = (&PeerId, &Enr)> {
self.cached_enrs.iter()
}

/// This adds a new `FindPeers` query to the queue if one doesn't already exist.
pub fn discover_peers(&mut self) {
// If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one.
Expand Down Expand Up @@ -558,7 +563,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
.peers_on_subnet(subnet_query.subnet_id)
.count();

if peers_on_subnet > TARGET_SUBNET_PEERS {
if peers_on_subnet >= TARGET_SUBNET_PEERS {
debug!(self.log, "Discovery ignored";
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
Expand Down
73 changes: 62 additions & 11 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Implementation of a Lighthouse's peer management system.

pub use self::peerdb::*;
use crate::discovery::{Discovery, DiscoveryEvent};
use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS};
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::{error, metrics};
use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery};
Expand All @@ -19,7 +19,7 @@ use std::{
task::{Context, Poll},
time::{Duration, Instant},
};
use types::EthSpec;
use types::{EthSpec, SubnetId};

pub use libp2p::core::{identity::Keypair, Multiaddr};

Expand Down Expand Up @@ -214,18 +214,45 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {

/// A request to find peers on a given subnet.
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
// Extend the time to maintain peers if required.
for s in subnets_to_discover.iter() {
if let Some(min_ttl) = s.min_ttl {
self.network_globals
let filtered: Vec<SubnetDiscovery> = subnets_to_discover
.into_iter()
.filter(|s| {
// Extend min_ttl of connected peers on required subnets
if let Some(min_ttl) = s.min_ttl {
self.network_globals
.peers
.write()
.extend_peers_on_subnet(s.subnet_id, min_ttl);
}
// Already have target number of peers, no need for subnet discovery
let peers_on_subnet = self
.network_globals
.peers
.write()
.extend_peers_on_subnet(s.subnet_id, min_ttl);
}
}
.read()
.peers_on_subnet(s.subnet_id)
.count();
if peers_on_subnet >= TARGET_SUBNET_PEERS {
debug!(
self.log,
"Discovery query ignored";
"subnet_id" => format!("{:?}",s.subnet_id),
"reason" => "Already connected to desired peers",
"connected_peers_on_subnet" => peers_on_subnet,
"target_subnet_peers" => TARGET_SUBNET_PEERS,
);
false
// Queue an outgoing connection request to the cached peers that are on `s.subnet_id`.
// If we connect to the cached peers before the discovery query starts, then we potentially
// save a costly discovery query.
} else {
self.dial_cached_enrs_in_subnet(s.subnet_id);
true
}
})
.collect();

// request the subnet query from discovery
self.discovery.discover_subnet_peers(subnets_to_discover);
self.discovery.discover_subnet_peers(filtered);
}

/// A STATUS message has been received from a peer. This resets the status timer.
Expand Down Expand Up @@ -531,6 +558,30 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::SocketUpdated(multiaddr));
}

/// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't
/// in Connected, Dialing or Banned state.
fn dial_cached_enrs_in_subnet(&mut self, subnet_id: SubnetId) {
let predicate = subnet_predicate::<TSpec>(vec![subnet_id], &self.log);
let peers_to_dial: Vec<PeerId> = self
.discovery()
.cached_enrs()
.filter_map(|(peer_id, enr)| {
let peers = self.network_globals.peers.read();
if predicate(enr)
&& !peers.is_connected_or_dialing(peer_id)
&& !peers.is_banned(peer_id)
{
Some(peer_id.clone())
} else {
None
}
})
.collect();
for peer in &peers_to_dial {
self.dial_peer(peer);
}
}

/// Peers that have been returned by discovery requests are dialed here if they are suitable.
///
/// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated
Expand Down