From 412ebfcaf2c55eefe0f33875c1508191daf76ad1 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 21 Mar 2019 11:53:18 -0700 Subject: [PATCH] ReplayStage::new is too long break into more functions --- core/src/replay_stage.rs | 280 +++++++++++++++++++++++---------------- 1 file changed, 168 insertions(+), 112 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index affb41363cc3d2..7459d51f42300f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -93,31 +93,20 @@ impl ReplayStage { if exit_.load(Ordering::Relaxed) { break; } + Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap()); - let active_banks = bank_forks.read().unwrap().active_banks(); - trace!("active banks {:?}", active_banks); + let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some(); - for bank_slot in &active_banks { - let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); - ticks_per_slot = bank.ticks_per_slot(); - if bank.collector_id() != my_id { - Self::replay_blocktree_into_bank( - &bank, - &blocktree, - &mut progress, - &forward_entry_sender, - )?; - } - let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; - if bank.tick_height() == max_tick_height { - Self::process_completed_bank( - &my_id, - bank, - &mut progress, - &slot_full_sender, - ); - } - } + + Self::replay_active_banks( + &blocktree, + &bank_forks, + &my_id, + &mut ticks_per_slot, + &mut progress, + &forward_entry_sender, + &slot_full_sender, + )?; if ticks_per_slot == 0 { let frozen_banks = bank_forks.read().unwrap().frozen_banks(); @@ -125,100 +114,23 @@ impl ReplayStage { ticks_per_slot = bank.ticks_per_slot(); } - let locktower_start = Instant::now(); - // Locktower voting - let descendants = bank_forks.read().unwrap().descendants(); - let ancestors = bank_forks.read().unwrap().ancestors(); - let frozen_banks = bank_forks.read().unwrap().frozen_banks(); - - trace!("frozen_banks {}", frozen_banks.len()); - let mut votable: Vec<(u128, Arc)> = frozen_banks - .values() - .filter(|b| { - let is_votable = b.is_votable(); - trace!("bank is votable: {} {}", b.slot(), is_votable); - is_votable - }) - .filter(|b| { - let has_voted = locktower.has_voted(b.slot()); - trace!("bank is has_voted: {} {}", b.slot(), has_voted); - !has_voted - }) - .filter(|b| { - let is_locked_out = locktower.is_locked_out(b.slot(), &descendants); - trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out); - !is_locked_out - }) - .map(|bank| { - ( - bank, - locktower.collect_vote_lockouts( - bank.slot(), - bank.vote_accounts(), - &ancestors, - ), - ) - }) - .filter(|(b, stake_lockouts)| { - let vote_threshold = - locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts); - debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold); - vote_threshold - }) - .map(|(b, stake_lockouts)| { - (locktower.calculate_weight(&stake_lockouts), b.clone()) - }) - .collect(); - - votable.sort_by_key(|b| b.0); - trace!("votable_banks {}", votable.len()); - let ms = timing::duration_as_ms(&locktower_start.elapsed()); - if !votable.is_empty() { - let weights: Vec = votable.iter().map(|x| x.0).collect(); - info!( - "@{:?} locktower duration: {:?} len: {} weights: {:?}", - timing::timestamp(), - ms, - votable.len(), - weights - ); - } - inc_new_counter_info!("replay_stage-locktower_duration", ms as usize); + let votable = Self::generate_votable_banks(&bank_forks, &locktower); if let Some((_, bank)) = votable.last() { subscriptions.notify_subscribers(&bank); - if let Some(ref voting_keypair) = voting_keypair { - let keypair = voting_keypair.as_ref(); - let vote = VoteTransaction::new_vote( - &vote_account, - keypair, - bank.slot(), - bank.last_blockhash(), - 0, - ); - if let Some(new_root) = locktower.record_vote(bank.slot()) { - bank_forks.write().unwrap().set_root(new_root); - Self::handle_new_root(&bank_forks, &mut progress); - } - locktower.update_epoch(&bank); - cluster_info.write().unwrap().push_vote(vote); - } - let next_leader_slot = - leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); - poh_recorder.lock().unwrap().reset( - bank.tick_height(), - bank.last_blockhash(), - bank.slot(), - next_leader_slot, - ticks_per_slot, - ); - debug!( - "{:?} voted and reset poh at {}. next leader slot {:?}", - my_id, - bank.tick_height(), - next_leader_slot + Self::handle_votable_bank( + &bank, + &bank_forks, + &mut locktower, + &mut progress, + &voting_keypair, + &vote_account, + &cluster_info, ); + + Self::reset_poh_recorder(&my_id, &bank, &poh_recorder, ticks_per_slot); + is_tpu_bank_active = false; } @@ -354,6 +266,150 @@ impl ReplayStage { Ok(()) } + fn handle_votable_bank( + bank: &Arc, + bank_forks: &Arc>, + locktower: &mut Locktower, + progress: &mut HashMap, + voting_keypair: &Option>, + vote_account: &Pubkey, + cluster_info: &Arc>, + ) where + T: 'static + KeypairUtil + Send + Sync, + { + if let Some(ref voting_keypair) = voting_keypair { + let keypair = voting_keypair.as_ref(); + let vote = VoteTransaction::new_vote( + &vote_account, + keypair, + bank.slot(), + bank.last_blockhash(), + 0, + ); + if let Some(new_root) = locktower.record_vote(bank.slot()) { + bank_forks.write().unwrap().set_root(new_root); + Self::handle_new_root(&bank_forks, progress); + } + locktower.update_epoch(&bank); + cluster_info.write().unwrap().push_vote(vote); + } + } + + fn reset_poh_recorder( + my_id: &Pubkey, + bank: &Arc, + poh_recorder: &Arc>, + ticks_per_slot: u64, + ) { + let next_leader_slot = leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank); + poh_recorder.lock().unwrap().reset( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + next_leader_slot, + ticks_per_slot, + ); + debug!( + "{:?} voted and reset poh at {}. next leader slot {:?}", + my_id, + bank.tick_height(), + next_leader_slot + ); + } + + fn replay_active_banks( + blocktree: &Arc, + bank_forks: &Arc>, + my_id: &Pubkey, + ticks_per_slot: &mut u64, + progress: &mut HashMap, + forward_entry_sender: &EntrySender, + slot_full_sender: &Sender<(u64, Pubkey)>, + ) -> result::Result<()> { + let active_banks = bank_forks.read().unwrap().active_banks(); + trace!("active banks {:?}", active_banks); + + for bank_slot in &active_banks { + let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); + *ticks_per_slot = bank.ticks_per_slot(); + if bank.collector_id() != *my_id { + Self::replay_blocktree_into_bank( + &bank, + &blocktree, + progress, + &forward_entry_sender, + )?; + } + let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; + if bank.tick_height() == max_tick_height { + Self::process_completed_bank(my_id, bank, progress, slot_full_sender); + } + } + Ok(()) + } + + fn generate_votable_banks( + bank_forks: &Arc>, + locktower: &Locktower, + ) -> Vec<(u128, Arc)> { + let locktower_start = Instant::now(); + // Locktower voting + let descendants = bank_forks.read().unwrap().descendants(); + let ancestors = bank_forks.read().unwrap().ancestors(); + let frozen_banks = bank_forks.read().unwrap().frozen_banks(); + + trace!("frozen_banks {}", frozen_banks.len()); + let mut votable: Vec<(u128, Arc)> = frozen_banks + .values() + .filter(|b| { + let is_votable = b.is_votable(); + trace!("bank is votable: {} {}", b.slot(), is_votable); + is_votable + }) + .filter(|b| { + let has_voted = locktower.has_voted(b.slot()); + trace!("bank is has_voted: {} {}", b.slot(), has_voted); + !has_voted + }) + .filter(|b| { + let is_locked_out = locktower.is_locked_out(b.slot(), &descendants); + trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out); + !is_locked_out + }) + .map(|bank| { + ( + bank, + locktower.collect_vote_lockouts(bank.slot(), bank.vote_accounts(), &ancestors), + ) + }) + .filter(|(b, stake_lockouts)| { + let vote_threshold = + locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts); + debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold); + vote_threshold + }) + .map(|(b, stake_lockouts)| (locktower.calculate_weight(&stake_lockouts), b.clone())) + .collect(); + + votable.sort_by_key(|b| b.0); + let ms = timing::duration_as_ms(&locktower_start.elapsed()); + + trace!("votable_banks {}", votable.len()); + if !votable.is_empty() { + let weights: Vec = votable.iter().map(|x| x.0).collect(); + info!( + "@{:?} locktower duration: {:?} len: {} weights: {:?}", + timing::timestamp(), + ms, + votable.len(), + weights + ); + } + inc_new_counter_info!("replay_stage-locktower_duration", ms as usize); + + votable + } + pub fn load_blocktree_entries( bank: &Bank, blocktree: &Blocktree,