diff --git a/turbine/src/cluster_nodes.rs b/turbine/src/cluster_nodes.rs index 27425d629dd65c..beadf7b331f59d 100644 --- a/turbine/src/cluster_nodes.rs +++ b/turbine/src/cluster_nodes.rs @@ -85,6 +85,9 @@ pub struct ClusterNodesCache { pub struct RetransmitPeers<'a> { root_distance: usize, // distance from the root node children: Vec<&'a Node>, + // Maps tvu addresses to the first node + // in the shuffle with the same address. + addrs: HashMap, // tvu addresses } impl Node { @@ -173,13 +176,16 @@ impl ClusterNodes { let RetransmitPeers { root_distance, children, + addrs, } = self.get_retransmit_peers(slot_leader, shred, fanout)?; let protocol = get_broadcast_protocol(shred); - let peers = children - .into_iter() - .filter_map(|node| node.contact_info()?.tvu(protocol).ok()) - .collect(); - Ok((root_distance, peers)) + let peers = children.into_iter().filter_map(|node| { + node.contact_info()? + .tvu(protocol) + .ok() + .filter(|addr| addrs.get(addr) == Some(&node.pubkey())) + }); + Ok((root_distance, peers.collect())) } pub fn get_retransmit_peers( @@ -199,10 +205,19 @@ impl ClusterNodes { if let Some(index) = self.index.get(slot_leader) { weighted_shuffle.remove_index(*index); } + let mut addrs = HashMap::::with_capacity(self.nodes.len()); let mut rng = get_seeded_rng(slot_leader, shred); + let protocol = get_broadcast_protocol(shred); let nodes: Vec<_> = weighted_shuffle .shuffle(&mut rng) .map(|index| &self.nodes[index]) + .inspect(|node| { + if let Some(node) = node.contact_info() { + if let Ok(addr) = node.tvu(protocol) { + addrs.entry(addr).or_insert(*node.pubkey()); + } + } + }) .collect(); let self_index = nodes .iter() @@ -221,6 +236,7 @@ impl ClusterNodes { Ok(RetransmitPeers { root_distance, children: peers.collect(), + addrs, }) }