From 20be1b279ab7495f918a39a9c017516241de39ef Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 9 Apr 2024 18:12:26 -0500 Subject: [PATCH] BankingStage Forwarding Filter (#685) * add PacketFlags::FROM_STAKED_NODE * Only forward packets from staked node * fix local-cluster test forwarding * review comment * tpu_votes get marked as from_staked_node (cherry picked from commit 1744e9efd74d83aeb15b384a8174949dbe753172) resolve conflict remove test_ledger_cleanup_service --- bench-streamer/src/main.rs | 1 + core/src/banking_stage/forwarder.rs | 1 + core/src/fetch_stage.rs | 3 + core/src/repair/ancestor_hashes_service.rs | 2 + core/src/repair/serve_repair_service.rs | 1 + core/src/shred_fetch_stage.rs | 1 + gossip/src/gossip_service.rs | 1 + local-cluster/tests/local_cluster.rs | 85 ++++++++-------------- sdk/src/packet.rs | 14 ++++ streamer/src/nonblocking/quic.rs | 1 + streamer/src/streamer.rs | 9 ++- 11 files changed, 65 insertions(+), 54 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 987df411341672..a65e1f8a72d4c0 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -116,6 +116,7 @@ fn main() -> Result<()> { Duration::from_millis(1), // coalesce true, None, + false, )); } diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 1cb656f0ddc701..ebc352f115b559 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -161,6 +161,7 @@ impl Forwarder { self.update_data_budget(); let packet_vec: Vec<_> = forwardable_packets .filter(|p| !p.meta().forwarded()) + .filter(|p| p.meta().is_from_staked_node()) .filter(|p| self.data_budget.take(p.meta().size)) .filter_map(|p| p.data(..).map(|data| data.to_vec())) .collect(); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index b3eb36201fc637..7535d2a9edefa5 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -169,6 +169,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -190,6 +191,7 @@ impl FetchStage { coalesce, true, in_vote_only_mode.clone(), + false, // unstaked connections ) }) .collect() @@ -210,6 +212,7 @@ impl FetchStage { coalesce, true, None, + true, // only staked connections should be voting ) }) .collect(); diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index fc70dbab16c825..a1d1b9bb1cf6c9 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -170,6 +170,7 @@ impl AncestorHashesService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded(); @@ -1299,6 +1300,7 @@ mod test { Duration::from_millis(1), // coalesce false, None, + false, ); let (remote_request_sender, remote_request_receiver) = unbounded(); let t_packet_adapter = Builder::new() diff --git a/core/src/repair/serve_repair_service.rs b/core/src/repair/serve_repair_service.rs index 9819d0ea43855d..e49de6084007a9 100644 --- a/core/src/repair/serve_repair_service.rs +++ b/core/src/repair/serve_repair_service.rs @@ -46,6 +46,7 @@ impl ServeRepairService { Duration::from_millis(1), // coalesce false, // use_pinned_memory None, // in_vote_only_mode + false, // is_staked_service ); let t_packet_adapter = Builder::new() .name(String::from("solServRAdapt")) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 703167b0b44ac1..8bcf0e06b89afb 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -156,6 +156,7 @@ impl ShredFetchStage { PACKET_COALESCE_DURATION, true, // use_pinned_memory None, // in_vote_only_mode + false, ) }) .collect(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index b587a5e0672421..03ade4e7095695 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -58,6 +58,7 @@ impl GossipService { Duration::from_millis(1), // coalesce false, None, + false, ); let (consume_sender, listen_receiver) = unbounded(); let t_socket_consume = cluster_info.clone().start_socket_consume_thread( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index c654efdd35de85..7e7bf89dcc582e 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -8,7 +8,7 @@ use { solana_accounts_db::{ accounts_db::create_accounts_run_and_snapshot_dirs, hardened_unpack::open_genesis_config, }, - solana_client::thin_client::ThinClient, + solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -55,12 +55,9 @@ use { response::RpcSignatureResult, }, solana_runtime::{ - commitment::VOTE_THRESHOLD_SIZE, - snapshot_archive_info::SnapshotArchiveInfoGetter, - snapshot_bank_utils, - snapshot_config::SnapshotConfig, - snapshot_package::SnapshotKind, - snapshot_utils::{self}, + commitment::VOTE_THRESHOLD_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_bank_utils, snapshot_config::SnapshotConfig, snapshot_package::SnapshotKind, + snapshot_utils, }, solana_sdk::{ account::AccountSharedData, @@ -77,7 +74,7 @@ use { system_program, system_transaction, vote::state::VoteStateUpdate, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::broadcast_stage::{ broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, BroadcastStageType, @@ -89,11 +86,12 @@ use { fs, io::Read, iter, + net::{IpAddr, Ipv4Addr}, num::NonZeroUsize, path::Path, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -360,6 +358,13 @@ fn test_forwarding() { ), ..ClusterConfig::default() }; + + let client_keypair = Keypair::new(); + let mut overrides = HashMap::new(); + let stake = DEFAULT_NODE_STAKE * 10; + let total_stake = stake + config.node_stakes.iter().sum::(); + overrides.insert(client_keypair.pubkey(), stake); + config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides)); let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( @@ -377,11 +382,28 @@ fn test_forwarding() { .find(|c| c.pubkey() != &leader_pubkey) .unwrap(); + let stakes = HashMap::from([ + (client_keypair.pubkey(), stake), + (Pubkey::new_unique(), total_stake - stake), + ]); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ))); + + let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options( + "client-connection-cache", + 1, + None, + Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), + Some((&staked_nodes, &client_keypair.pubkey())), + )); + // Confirm that transactions were forwarded to and processed by the leader. cluster_tests::send_many_transactions( validator_info, &cluster.funding_keypair, - &cluster.connection_cache, + &client_connection_cache, 10, 20, ); @@ -4273,49 +4295,6 @@ fn test_leader_failure_4() { ); } -#[test] -#[serial] -fn test_ledger_cleanup_service() { - solana_logger::setup_with_default(RUST_LOG_FILTER); - error!("test_ledger_cleanup_service"); - let num_nodes = 3; - let validator_config = ValidatorConfig { - max_ledger_shreds: Some(100), - ..ValidatorConfig::default_for_test() - }; - let mut config = ClusterConfig { - cluster_lamports: DEFAULT_CLUSTER_LAMPORTS, - poh_config: PohConfig::new_sleep(Duration::from_millis(50)), - node_stakes: vec![DEFAULT_NODE_STAKE; num_nodes], - validator_configs: make_identical_validator_configs(&validator_config, num_nodes), - ..ClusterConfig::default() - }; - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - // 200ms/per * 100 = 20 seconds, so sleep a little longer than that. - sleep(Duration::from_secs(60)); - - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - HashSet::new(), - SocketAddrSpace::Unspecified, - &cluster.connection_cache, - ); - cluster.close_preserve_ledgers(); - //check everyone's ledgers and make sure only ~100 slots are stored - for info in cluster.validators.values() { - let mut slots = 0; - let blockstore = Blockstore::open(&info.info.ledger_path).unwrap(); - blockstore - .slot_meta_iterator(0) - .unwrap() - .for_each(|_| slots += 1); - // with 3 nodes up to 3 slots can be in progress and not complete so max slots in blockstore should be up to 103 - assert!(slots <= 103, "got {slots}"); - } -} - // This test verifies that even if votes from a validator end up taking too long to land, and thus // some of the referenced slots are slots are no longer present in the slot hashes sysvar, // consensus can still be attained. diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 8300b57218c696..0529b01d58a8ad 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -33,8 +33,12 @@ bitflags! { /// the packet is built. /// This field can be removed when the above feature gate is adopted by mainnet-beta. const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000; + /// For tracking performance const PERF_TRACK_PACKET = 0b0100_0000; + + /// For marking packets from staked nodes + const FROM_STAKED_NODE = 0b1000_0000; } } @@ -215,6 +219,11 @@ impl Meta { self.port = socket_addr.port(); } + pub fn set_from_staked_node(&mut self, from_staked_node: bool) { + self.flags + .set(PacketFlags::FROM_STAKED_NODE, from_staked_node); + } + #[inline] pub fn discard(&self) -> bool { self.flags.contains(PacketFlags::DISCARD) @@ -278,6 +287,11 @@ impl Meta { pub fn round_compute_unit_price(&self) -> bool { self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE) } + + #[inline] + pub fn is_from_staked_node(&self) -> bool { + self.flags.contains(PacketFlags::FROM_STAKED_NODE) + } } impl Default for Meta { diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index bf0d323216b5a5..5996d44f1d2141 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1132,6 +1132,7 @@ async fn handle_chunk( if packet_accum.is_none() { let mut meta = Meta::default(); meta.set_socket_addr(remote_addr); + meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked)); *packet_accum = Some(PacketAccumulator { meta, chunks: Vec::new(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 1fd7bfc97404cc..3c89d2574ebf34 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -110,6 +110,7 @@ fn recv_loop( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> Result<()> { loop { let mut packet_batch = if use_pinned_memory { @@ -147,7 +148,9 @@ fn recv_loop( if len == PACKETS_PER_BATCH { full_packet_batches_count.fetch_add(1, Ordering::Relaxed); } - + packet_batch + .iter_mut() + .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service)); packet_batch_sender.send(packet_batch)?; } break; @@ -156,6 +159,7 @@ fn recv_loop( } } +#[allow(clippy::too_many_arguments)] pub fn receiver( socket: Arc, exit: Arc, @@ -165,6 +169,7 @@ pub fn receiver( coalesce: Duration, use_pinned_memory: bool, in_vote_only_mode: Option>, + is_staked_service: bool, ) -> JoinHandle<()> { let res = socket.set_read_timeout(Some(Duration::new(1, 0))); assert!(res.is_ok(), "streamer::receiver set_read_timeout error"); @@ -180,6 +185,7 @@ pub fn receiver( coalesce, use_pinned_memory, in_vote_only_mode, + is_staked_service, ); }) .unwrap() @@ -488,6 +494,7 @@ mod test { Duration::from_millis(1), // coalesce true, None, + false, ); const NUM_PACKETS: usize = 5; let t_responder = {