diff --git a/turbine/benches/cluster_nodes.rs b/turbine/benches/cluster_nodes.rs index 66215380140e4d..4e46ff28b7df04 100644 --- a/turbine/benches/cluster_nodes.rs +++ b/turbine/benches/cluster_nodes.rs @@ -6,7 +6,7 @@ use { rand::{seq::SliceRandom, Rng}, solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo, solana_ledger::shred::{Shred, ShredFlags}, - solana_sdk::{clock::Slot, pubkey::Pubkey}, + solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey}, solana_turbine::{ cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes}, retransmit_stage::RetransmitStage, @@ -21,7 +21,8 @@ fn make_cluster_nodes( unstaked_ratio: Option<(u32, u32)>, ) -> (Vec, ClusterNodes) { let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio); - let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); + let cluster_nodes = + new_cluster_nodes::(&cluster_info, ClusterType::Development, &stakes); (nodes, cluster_nodes) } diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 6036907cd7dc5c..e0cd5316a9ca8a 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -17,6 +17,7 @@ use { solana_sdk::{ clock::{Epoch, Slot}, feature_set, + genesis_config::ClusterType, native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, signature::{Keypair, Signer}, @@ -29,7 +30,7 @@ use { collections::HashMap, iter::repeat_with, marker::PhantomData, - net::SocketAddr, + net::{IpAddr, SocketAddr}, sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }, @@ -39,6 +40,9 @@ use { const DATA_PLANE_FANOUT: usize = 200; pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4; +// Limit number of nodes per IP address. +const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10; + #[derive(Debug, Error)] pub enum Error { #[error("Loopback from slot leader: {leader}, shred: {shred:?}")] @@ -81,9 +85,6 @@ pub struct ClusterNodesCache { pub struct RetransmitPeers<'a> { root_distance: usize, // distance from the root node children: Vec<&'a Node>, - // Maps tvu addresses to the first node - // in the shuffle with the same address. - addrs: HashMap, // tvu addresses } impl Node { @@ -147,8 +148,12 @@ impl ClusterNodes { } impl ClusterNodes { - pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap) -> Self { - new_cluster_nodes(cluster_info, stakes) + pub fn new( + cluster_info: &ClusterInfo, + cluster_type: ClusterType, + stakes: &HashMap, + ) -> Self { + new_cluster_nodes(cluster_info, cluster_type, stakes) } pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> { @@ -168,16 +173,13 @@ impl ClusterNodes { let RetransmitPeers { root_distance, children, - addrs, } = self.get_retransmit_peers(slot_leader, shred, fanout)?; let protocol = get_broadcast_protocol(shred); - let peers = children.into_iter().filter_map(|node| { - node.contact_info()? - .tvu(protocol) - .ok() - .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) - }); - Ok((root_distance, peers.collect())) + let peers = children + .into_iter() + .filter_map(|node| node.contact_info()?.tvu(protocol).ok()) + .collect(); + Ok((root_distance, peers)) } pub fn get_retransmit_peers( @@ -197,19 +199,10 @@ impl ClusterNodes { if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); } - let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut rng = get_seeded_rng(slot_leader, shred); - let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) .map(|index| &self.nodes[index]) - .inspect(|node| { - if let Some(node) = node.contact_info() { - if let Ok(addr) = node.tvu(protocol) { - addrs.entry(addr).or_insert(*node.pubkey()); - } - } - }) .collect(); let self_index = nodes .iter() @@ -228,7 +221,6 @@ impl ClusterNodes { Ok(RetransmitPeers { root_distance, children: peers.collect(), - addrs, }) } @@ -272,10 +264,11 @@ impl ClusterNodes { pub fn new_cluster_nodes( cluster_info: &ClusterInfo, + cluster_type: ClusterType, stakes: &HashMap, ) -> ClusterNodes { let self_pubkey = cluster_info.id(); - let nodes = get_nodes(cluster_info, stakes); + let nodes = get_nodes(cluster_info, cluster_type, stakes); let index: HashMap<_, _> = nodes .iter() .enumerate() @@ -298,8 +291,21 @@ pub fn new_cluster_nodes( // All staked nodes + other known tvu-peers + the node itself; // sorted by (stake, pubkey) in descending order. -fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec { +fn get_nodes( + cluster_info: &ClusterInfo, + cluster_type: ClusterType, + stakes: &HashMap, +) -> Vec { let self_pubkey = cluster_info.id(); + let should_dedup_addrs = match cluster_type { + ClusterType::Development => false, + ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true, + }; + // Maps IP addresses to number of nodes at that IP address. + let mut counts = { + let capacity = if should_dedup_addrs { stakes.len() } else { 0 }; + HashMap::::with_capacity(capacity) + }; // The local node itself. std::iter::once({ let stake = stakes.get(&self_pubkey).copied().unwrap_or_default(); @@ -328,6 +334,30 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec 0u64).then(|| Node { + node: NodeId::from(node.pubkey()), + stake: node.stake, + }) + } + }) .collect() } @@ -446,6 +476,7 @@ impl ClusterNodesCache { } let nodes = Arc::new(new_cluster_nodes::( cluster_info, + root_bank.cluster_type(), &epoch_staked_nodes.unwrap_or_default(), )); *entry = Some((Instant::now(), Arc::clone(&nodes))); @@ -583,7 +614,8 @@ mod tests { let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); - let cluster_nodes = new_cluster_nodes::(&cluster_info, &stakes); + let cluster_nodes = + new_cluster_nodes::(&cluster_info, ClusterType::Development, &stakes); // All nodes with contact-info should be in the index. // Staked nodes with no contact-info should be included. assert!(cluster_nodes.nodes.len() > nodes.len()); @@ -618,7 +650,8 @@ mod tests { let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None); // ClusterInfo::tvu_peers excludes the node itself. assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1); - let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + let cluster_nodes = + ClusterNodes::::new(&cluster_info, ClusterType::Development, &stakes); // All nodes with contact-info should be in the index. // Excluding this node itself. // Staked nodes with no contact-info should be included.