Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limits number of nodes per IP address in Turbine #864

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions turbine/benches/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
rand::{seq::SliceRandom, Rng},
solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
solana_ledger::shred::{Shred, ShredFlags},
solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_sdk::{clock::Slot, genesis_config::ClusterType, pubkey::Pubkey},
solana_turbine::{
cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
retransmit_stage::RetransmitStage,
Expand All @@ -21,7 +21,8 @@ fn make_cluster_nodes<R: Rng>(
unstaked_ratio: Option<(u32, u32)>,
) -> (Vec<ContactInfo>, ClusterNodes<RetransmitStage>) {
let (nodes, stakes, cluster_info) = make_test_cluster(rng, 5_000, unstaked_ratio);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
let cluster_nodes =
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
(nodes, cluster_nodes)
}

Expand Down
89 changes: 61 additions & 28 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
solana_sdk::{
clock::{Epoch, Slot},
feature_set,
genesis_config::ClusterType,
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signer},
Expand All @@ -29,7 +30,7 @@ use {
collections::HashMap,
iter::repeat_with,
marker::PhantomData,
net::SocketAddr,
net::{IpAddr, SocketAddr},
sync::{Arc, Mutex, RwLock},
time::{Duration, Instant},
},
Expand All @@ -39,6 +40,9 @@ use {
const DATA_PLANE_FANOUT: usize = 200;
pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;

// Limit number of nodes per IP address.
const MAX_NUM_NODES_PER_IP_ADDRESS: usize = 10;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the usecase for more than one? local-cluster or something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple nodes running behind NAT probably.
Currently I see several nodes on mainnet with the same IP address; in particular one IP address with 5 nodes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lemme guess... all with ~40kSOL stake. sfdp sybils...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not checked their stake.
Either way, it is the status quo and we cannot break it with this commit because it was not disallowed before.


#[derive(Debug, Error)]
pub enum Error {
#[error("Loopback from slot leader: {leader}, shred: {shred:?}")]
Expand Down Expand Up @@ -81,9 +85,6 @@ pub struct ClusterNodesCache<T> {
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
children: Vec<&'a Node>,
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
}

impl Node {
Expand Down Expand Up @@ -147,8 +148,12 @@ impl<T> ClusterNodes<T> {
}

impl ClusterNodes<BroadcastStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
pub fn new(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> Self {
new_cluster_nodes(cluster_info, cluster_type, stakes)
}

pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
Expand All @@ -168,16 +173,13 @@ impl ClusterNodes<RetransmitStage> {
let RetransmitPeers {
root_distance,
children,
addrs,
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
let peers = children
.into_iter()
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
.collect();
Ok((root_distance, peers))
}

pub fn get_retransmit_peers(
Expand All @@ -197,19 +199,10 @@ impl ClusterNodes<RetransmitStage> {
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
}
})
.collect();
let self_index = nodes
.iter()
Expand All @@ -228,7 +221,6 @@ impl ClusterNodes<RetransmitStage> {
Ok(RetransmitPeers {
root_distance,
children: peers.collect(),
addrs,
})
}

Expand Down Expand Up @@ -272,10 +264,11 @@ impl ClusterNodes<RetransmitStage> {

pub fn new_cluster_nodes<T: 'static>(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> ClusterNodes<T> {
let self_pubkey = cluster_info.id();
let nodes = get_nodes(cluster_info, stakes);
let nodes = get_nodes(cluster_info, cluster_type, stakes);
let index: HashMap<_, _> = nodes
.iter()
.enumerate()
Expand All @@ -298,8 +291,21 @@ pub fn new_cluster_nodes<T: 'static>(

// All staked nodes + other known tvu-peers + the node itself;
// sorted by (stake, pubkey) in descending order.
fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<Node> {
fn get_nodes(
cluster_info: &ClusterInfo,
cluster_type: ClusterType,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<Node> {
let self_pubkey = cluster_info.id();
let should_dedup_addrs = match cluster_type {
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
ClusterType::Development => false,
ClusterType::Devnet | ClusterType::Testnet | ClusterType::MainnetBeta => true,
};
// Maps IP addresses to number of nodes at that IP address.
let mut counts = {
let capacity = if should_dedup_addrs { stakes.len() } else { 0 };
HashMap::<IpAddr, usize>::with_capacity(capacity)
};
// The local node itself.
std::iter::once({
let stake = stakes.get(&self_pubkey).copied().unwrap_or_default();
Expand Down Expand Up @@ -328,6 +334,30 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
// Since sorted_by_key is stable, in case of duplicates, this
// will keep nodes with contact-info.
.dedup_by(|a, b| a.pubkey() == b.pubkey())
.filter_map(|node| {
if !should_dedup_addrs
|| node
.contact_info()
.and_then(|node| node.tvu(Protocol::UDP).ok())
.map(|addr| {
*counts
.entry(addr.ip())
.and_modify(|count| *count += 1)
.or_insert(1)
})
Comment on lines +339 to +347

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pop the query out to a closure for the sake of readability? nfc why fmt prefers something like this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get lifetime errors with an out-of-line closure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

are you capturing counts instead of passing it in?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing it might work but it definitely becomes a more verbose code.
I think the inline version is better anyways because it is explicit what is it doing and don't need to jump elsewhere to figure that out.

<= Some(MAX_NUM_NODES_PER_IP_ADDRESS)
{
Some(node)
} else {
// If the node is not staked, drop it entirely. Otherwise, keep the
// pubkey for deterministic shuffle, but strip the contact-info so
// that no more packets are sent to this node.
(node.stake > 0u64).then(|| Node {
t-nelson marked this conversation as resolved.
Show resolved Hide resolved
node: NodeId::from(node.pubkey()),
stake: node.stake,
})
}
})
.collect()
}

Expand Down Expand Up @@ -446,6 +476,7 @@ impl<T: 'static> ClusterNodesCache<T> {
}
let nodes = Arc::new(new_cluster_nodes::<T>(
cluster_info,
root_bank.cluster_type(),
&epoch_staked_nodes.unwrap_or_default(),
));
*entry = Some((Instant::now(), Arc::clone(&nodes)));
Expand Down Expand Up @@ -583,7 +614,8 @@ mod tests {
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
let cluster_nodes =
new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
// All nodes with contact-info should be in the index.
// Staked nodes with no contact-info should be included.
assert!(cluster_nodes.nodes.len() > nodes.len());
Expand Down Expand Up @@ -618,7 +650,8 @@ mod tests {
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let cluster_nodes =
ClusterNodes::<BroadcastStage>::new(&cluster_info, ClusterType::Development, &stakes);
// All nodes with contact-info should be in the index.
// Excluding this node itself.
// Staked nodes with no contact-info should be included.
Expand Down
Loading