Skip to content

Commit

Permalink
verifies shred slot and parent in fetch stage (#26225)
Browse files Browse the repository at this point in the history
Shred slot and parent are not verified until window-service where
resources are already wasted to sig-verify and deserialize shreds.
This commit moves above verification to earlier in the pipeline in fetch
stage.
  • Loading branch information
behzadnouri authored Jun 28, 2022
1 parent e8fed88 commit 348fe9e
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 216 deletions.
10 changes: 7 additions & 3 deletions core/src/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ pub fn repair_response_packet_from_bytes(
Some(packet)
}

pub fn nonce(packet: &Packet) -> Option<Nonce> {
let nonce_start = packet.meta.size.checked_sub(SIZE_OF_NONCE)?;
packet.deserialize_slice(nonce_start..).ok()
pub(crate) fn nonce(packet: &Packet) -> Option<Nonce> {
// Nonces are attached to both repair and ancestor hashes responses.
let data = packet.data(..)?;
let offset = data.len().checked_sub(SIZE_OF_NONCE)?;
<[u8; SIZE_OF_NONCE]>::try_from(&data[offset..])
.map(Nonce::from_le_bytes)
.ok()
}

#[cfg(test)]
Expand Down
13 changes: 3 additions & 10 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo},
window_service::{should_retransmit_and_persist, WindowService},
window_service::WindowService,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
itertools::{izip, Itertools},
Expand Down Expand Up @@ -421,7 +421,7 @@ impl RetransmitStage {
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
turbine_disabled: Option<Arc<AtomicBool>>,
turbine_disabled: Arc<AtomicBool>,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
Expand Down Expand Up @@ -470,14 +470,7 @@ impl RetransmitStage {
exit,
repair_info,
leader_schedule_cache,
move |shred, last_root| {
let turbine_disabled = turbine_disabled
.as_ref()
.map(|x| x.load(Ordering::Relaxed))
.unwrap_or(false);
let rv = should_retransmit_and_persist(shred, last_root);
rv && !turbine_disabled
},
turbine_disabled,
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
Expand Down
43 changes: 22 additions & 21 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
crate::packet_hasher::PacketHasher,
crossbeam_channel::{unbounded, Sender},
lru::LruCache,
solana_ledger::shred::{self, get_shred_slot_index_type, ShredFetchStats},
solana_ledger::shred::{should_discard_shred, ShredFetchStats},
solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
Expand Down Expand Up @@ -67,6 +67,7 @@ impl ShredFetchStage {
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
last_root,
slot_bounds.clone(),
shred_version,
&packet_hasher,
Expand Down Expand Up @@ -195,24 +196,15 @@ impl ShredFetchStage {
#[must_use]
fn should_discard_packet(
packet: &Packet,
root: Slot,
// Range of slots to ingest shreds for.
slot_bounds: impl RangeBounds<Slot>,
shred_version: u16,
packet_hasher: &PacketHasher,
shreds_received: &mut ShredsReceived,
stats: &mut ShredFetchStats,
) -> bool {
let slot = match get_shred_slot_index_type(packet, stats) {
None => return true,
Some((slot, _index, _shred_type)) => slot,
};
if !slot_bounds.contains(&slot) {
stats.slot_out_of_range += 1;
return true;
}
let shred = shred::layout::get_shred(packet);
if shred.and_then(shred::layout::get_version) != Some(shred_version) {
stats.shred_version_mismatch += 1;
if should_discard_shred(packet, root, shred_version, slot_bounds, stats) {
return true;
}
let hash = packet_hasher.hash_packet(packet);
Expand Down Expand Up @@ -242,12 +234,12 @@ mod tests {
let mut packet = Packet::default();
let mut stats = ShredFetchStats::default();

let slot = 1;
let slot = 2;
let shred_version = 45189;
let shred = Shred::new_from_data(
slot,
3, // shred index
0, // parent offset
1, // parent offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0, // reference_tick
Expand All @@ -264,6 +256,7 @@ mod tests {
let slot_bounds = (last_root + 1)..(last_slot + 2 * slots_per_epoch);
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
Expand All @@ -278,6 +271,7 @@ mod tests {
coding[0].copy_to_packet(&mut packet);
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds,
shred_version,
&hasher,
Expand All @@ -303,6 +297,7 @@ mod tests {
// packet size is 0, so cannot get index
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
Expand All @@ -311,20 +306,21 @@ mod tests {
));
assert_eq!(stats.index_overrun, 1);
let shred = Shred::new_from_data(
1,
3,
0,
&[],
2, // slot
3, // index
1, // parent_offset
&[], // data
ShredFlags::LAST_SHRED_IN_SLOT,
0,
0, // reference_tick
shred_version,
0,
0, // fec_set_index
);
shred.copy_to_packet(&mut packet);

// rejected slot is 1, root is 3
// rejected slot is 2, root is 3
assert!(should_discard_packet(
&packet,
3,
3..slot_bounds.end,
shred_version,
&hasher,
Expand All @@ -335,6 +331,7 @@ mod tests {

assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
345, // shred_version
&hasher,
Expand All @@ -346,6 +343,7 @@ mod tests {
// Accepted for 1,3
assert!(!should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
Expand All @@ -356,6 +354,7 @@ mod tests {
// shreds_received should filter duplicate
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
Expand All @@ -379,6 +378,7 @@ mod tests {
// Slot 1 million is too high
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds.clone(),
shred_version,
&hasher,
Expand All @@ -391,6 +391,7 @@ mod tests {
shred.copy_to_packet(&mut packet);
assert!(should_discard_packet(
&packet,
last_root,
slot_bounds,
shred_version,
&hasher,
Expand Down
4 changes: 2 additions & 2 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Tvu {
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
turbine_disabled: Option<Arc<AtomicBool>>,
turbine_disabled: Arc<AtomicBool>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
Expand Down Expand Up @@ -415,7 +415,7 @@ pub mod tests {
&leader_schedule_cache,
&exit,
block_commitment_cache,
None,
Arc::<AtomicBool>::default(),
None,
None,
None,
Expand Down
4 changes: 2 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct ValidatorConfig {
pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_shreds: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub turbine_disabled: Option<Arc<AtomicBool>>,
pub turbine_disabled: Arc<AtomicBool>,
pub enforce_ulimit_nofile: bool,
pub fixed_leader_schedule: Option<FixedSchedule>,
pub wait_for_supermajority: Option<Slot>,
Expand Down Expand Up @@ -196,7 +196,7 @@ impl Default for ValidatorConfig {
pubsub_config: PubSubConfig::default(),
snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard,
turbine_disabled: None,
turbine_disabled: Arc::<AtomicBool>::default(),
enforce_ulimit_nofile: true,
fixed_leader_schedule: None,
wait_for_supermajority: None,
Expand Down
Loading

0 comments on commit 348fe9e

Please sign in to comment.