Skip to content

Commit

Permalink
BankingStage Forwarding Filter (solana-labs#685)
Browse files Browse the repository at this point in the history
* add PacketFlags::FROM_STAKED_NODE

* Only forward packets from staked node

* fix local-cluster test forwarding

* review comment

* tpu_votes get marked as from_staked_node

(cherry picked from commit 1744e9e)

resolve conflict

remove test_ledger_cleanup_service
  • Loading branch information
apfitzge authored and lijunwangs committed Apr 10, 2024
1 parent e7c62c4 commit 20be1b2
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 54 deletions.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn main() -> Result<()> {
Duration::from_millis(1), // coalesce
true,
None,
false,
));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Forwarder {
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| p.meta().is_from_staked_node())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
Expand Down
3 changes: 3 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -190,6 +191,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -210,6 +212,7 @@ impl FetchStage {
coalesce,
true,
None,
true, // only staked connections should be voting
)
})
.collect();
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);

let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
Expand Down Expand Up @@ -1299,6 +1300,7 @@ mod test {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl ServeRepairService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl ShredFetchStage {
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
false,
)
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl GossipService {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
Expand Down
85 changes: 32 additions & 53 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_accounts_db::{
accounts_db::create_accounts_run_and_snapshot_dirs, hardened_unpack::open_genesis_config,
},
solana_client::thin_client::ThinClient,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_core::{
consensus::{
tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH,
Expand Down Expand Up @@ -55,12 +55,9 @@ use {
response::RpcSignatureResult,
},
solana_runtime::{
commitment::VOTE_THRESHOLD_SIZE,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils,
snapshot_config::SnapshotConfig,
snapshot_package::SnapshotKind,
snapshot_utils::{self},
commitment::VOTE_THRESHOLD_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils, snapshot_config::SnapshotConfig, snapshot_package::SnapshotKind,
snapshot_utils,
},
solana_sdk::{
account::AccountSharedData,
Expand All @@ -77,7 +74,7 @@ use {
system_program, system_transaction,
vote::state::VoteStateUpdate,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::broadcast_stage::{
broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition},
BroadcastStageType,
Expand All @@ -89,11 +86,12 @@ use {
fs,
io::Read,
iter,
net::{IpAddr, Ipv4Addr},
num::NonZeroUsize,
path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand Down Expand Up @@ -360,6 +358,13 @@ fn test_forwarding() {
),
..ClusterConfig::default()
};

let client_keypair = Keypair::new();
let mut overrides = HashMap::new();
let stake = DEFAULT_NODE_STAKE * 10;
let total_stake = stake + config.node_stakes.iter().sum::<u64>();
overrides.insert(client_keypair.pubkey(), stake);
config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides));
let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);

let cluster_nodes = discover_cluster(
Expand All @@ -377,11 +382,28 @@ fn test_forwarding() {
.find(|c| c.pubkey() != &leader_pubkey)
.unwrap();

let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake - stake),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options(
"client-connection-cache",
1,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
));

// Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(
validator_info,
&cluster.funding_keypair,
&cluster.connection_cache,
&client_connection_cache,
10,
20,
);
Expand Down Expand Up @@ -4273,49 +4295,6 @@ fn test_leader_failure_4() {
);
}

#[test]
#[serial]
fn test_ledger_cleanup_service() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
error!("test_ledger_cleanup_service");
let num_nodes = 3;
let validator_config = ValidatorConfig {
max_ledger_shreds: Some(100),
..ValidatorConfig::default_for_test()
};
let mut config = ClusterConfig {
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS,
poh_config: PohConfig::new_sleep(Duration::from_millis(50)),
node_stakes: vec![DEFAULT_NODE_STAKE; num_nodes],
validator_configs: make_identical_validator_configs(&validator_config, num_nodes),
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
// 200ms/per * 100 = 20 seconds, so sleep a little longer than that.
sleep(Duration::from_secs(60));

cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
cluster.close_preserve_ledgers();
//check everyone's ledgers and make sure only ~100 slots are stored
for info in cluster.validators.values() {
let mut slots = 0;
let blockstore = Blockstore::open(&info.info.ledger_path).unwrap();
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|_| slots += 1);
// with 3 nodes up to 3 slots can be in progress and not complete so max slots in blockstore should be up to 103
assert!(slots <= 103, "got {slots}");
}
}

// This test verifies that even if votes from a validator end up taking too long to land, and thus
// some of the referenced slots are slots are no longer present in the slot hashes sysvar,
// consensus can still be attained.
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;

/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;

/// For marking packets from staked nodes
const FROM_STAKED_NODE = 0b1000_0000;
}
}

Expand Down Expand Up @@ -215,6 +219,11 @@ impl Meta {
self.port = socket_addr.port();
}

pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
self.flags
.set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
}

#[inline]
pub fn discard(&self) -> bool {
self.flags.contains(PacketFlags::DISCARD)
Expand Down Expand Up @@ -278,6 +287,11 @@ impl Meta {
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}

#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
}
}

impl Default for Meta {
Expand Down
1 change: 1 addition & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ async fn handle_chunk(
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked));
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn recv_loop(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
Expand Down Expand Up @@ -147,7 +148,9 @@ fn recv_loop(
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}

packet_batch
.iter_mut()
.for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
packet_batch_sender.send(packet_batch)?;
}
break;
Expand All @@ -156,6 +159,7 @@ fn recv_loop(
}
}

#[allow(clippy::too_many_arguments)]
pub fn receiver(
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
Expand All @@ -165,6 +169,7 @@ pub fn receiver(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Expand All @@ -180,6 +185,7 @@ pub fn receiver(
coalesce,
use_pinned_memory,
in_vote_only_mode,
is_staked_service,
);
})
.unwrap()
Expand Down Expand Up @@ -488,6 +494,7 @@ mod test {
Duration::from_millis(1), // coalesce
true,
None,
false,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
Expand Down

0 comments on commit 20be1b2

Please sign in to comment.