Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break replay_stage::new into more functions #3415

Merged
merged 1 commit into from
Mar 21, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 168 additions & 112 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,132 +93,44 @@ impl ReplayStage {
if exit_.load(Ordering::Relaxed) {
break;
}

Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);

let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != my_id {
Self::replay_blocktree_into_bank(
&bank,
&blocktree,
&mut progress,
&forward_entry_sender,
)?;
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&slot_full_sender,
);
}
}

Self::replay_active_banks(
&blocktree,
&bank_forks,
&my_id,
&mut ticks_per_slot,
&mut progress,
&forward_entry_sender,
&slot_full_sender,
)?;

if ticks_per_slot == 0 {
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
let bank = frozen_banks.values().next().unwrap();
ticks_per_slot = bank.ticks_per_slot();
}

let locktower_start = Instant::now();
// Locktower voting
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();

trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
locktower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts(),
&ancestors,
),
)
})
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| {
(locktower.calculate_weight(&stake_lockouts), b.clone())
})
.collect();

votable.sort_by_key(|b| b.0);
trace!("votable_banks {}", votable.len());
let ms = timing::duration_as_ms(&locktower_start.elapsed());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);
let votable = Self::generate_votable_banks(&bank_forks, &locktower);

if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(&bank);

if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
&vote_account,
keypair,
bank.slot(),
bank.last_blockhash(),
0,
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, &mut progress);
}
locktower.update_epoch(&bank);
cluster_info.write().unwrap().push_vote(vote);
}
let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_id,
bank.tick_height(),
next_leader_slot
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut locktower,
&mut progress,
&voting_keypair,
&vote_account,
&cluster_info,
);

Self::reset_poh_recorder(&my_id, &bank, &poh_recorder, ticks_per_slot);

is_tpu_bank_active = false;
}

Expand Down Expand Up @@ -354,6 +266,150 @@ impl ReplayStage {
Ok(())
}

fn handle_votable_bank<T>(
bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &mut Locktower,
progress: &mut HashMap<u64, (Hash, usize)>,
voting_keypair: &Option<Arc<T>>,
vote_account: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) where
T: 'static + KeypairUtil + Send + Sync,
{
if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
&vote_account,
keypair,
bank.slot(),
bank.last_blockhash(),
0,
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, progress);
}
locktower.update_epoch(&bank);
cluster_info.write().unwrap().push_vote(vote);
}
}

fn reset_poh_recorder(
my_id: &Pubkey,
bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
ticks_per_slot: u64,
) {
let next_leader_slot = leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_id,
bank.tick_height(),
next_leader_slot
);
}

fn replay_active_banks(
blocktree: &Arc<Blocktree>,
bank_forks: &Arc<RwLock<BankForks>>,
my_id: &Pubkey,
ticks_per_slot: &mut u64,
progress: &mut HashMap<u64, (Hash, usize)>,
forward_entry_sender: &EntrySender,
slot_full_sender: &Sender<(u64, Pubkey)>,
) -> result::Result<()> {
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);

for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
*ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != *my_id {
Self::replay_blocktree_into_bank(
&bank,
&blocktree,
progress,
&forward_entry_sender,
)?;
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(my_id, bank, progress, slot_full_sender);
}
}
Ok(())
}

fn generate_votable_banks(
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &Locktower,
) -> Vec<(u128, Arc<Bank>)> {
let locktower_start = Instant::now();
// Locktower voting
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();

trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
locktower.collect_vote_lockouts(bank.slot(), bank.vote_accounts(), &ancestors),
)
})
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| (locktower.calculate_weight(&stake_lockouts), b.clone()))
.collect();

votable.sort_by_key(|b| b.0);
let ms = timing::duration_as_ms(&locktower_start.elapsed());

trace!("votable_banks {}", votable.len());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);

votable
}

pub fn load_blocktree_entries(
bank: &Bank,
blocktree: &Blocktree,
Expand Down