Skip to content

Commit

Permalink
Create leader schedule before processing blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed Mar 14, 2022
1 parent 543d5d4 commit 390dc24
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 70 deletions.
4 changes: 2 additions & 2 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,9 @@ mod tests {
full_leader_cache: true,
..ProcessOptions::default()
};
let (bank_forks, cached_leader_schedule, _) =
let (bank_forks, leader_schedule_cache) =
test_process_blockstore(&genesis_config, &blockstore, opts);
let leader_schedule_cache = Arc::new(cached_leader_schedule);
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let bank_forks = Arc::new(RwLock::new(bank_forks));

let mut me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0);
Expand Down
81 changes: 42 additions & 39 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,19 @@ impl Validator {
transaction_notifier,
);

let tower = {
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
});
}

post_process_restored_tower(restored_tower, &id, &vote_account, config, &bank_forks)
};
info!("Tower state: {:?}", tower);

*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;

if !config.no_os_network_stats_reporting {
Expand Down Expand Up @@ -1319,33 +1332,38 @@ fn new_banks_from_ledger(
.cache_block_meta_sender
.as_ref();

let (bank_forks, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);

let (mut bank_forks, mut leader_schedule_cache, last_full_snapshot_slot) =
blockstore_processor::process_blockstore_from_root(
let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
bank_forks,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
cache_block_meta_sender,
config.snapshot_config.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
accounts_update_notifier,
);

leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);

let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
&blockstore,
&mut bank_forks,
&leader_schedule_cache,
&process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
cache_block_meta_sender,
config.snapshot_config.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});

let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
Expand Down Expand Up @@ -1399,21 +1417,6 @@ fn new_banks_from_ledger(
);
}

let tower = post_process_restored_tower(
restored_tower,
validator_identity,
vote_account,
config,
&bank_forks,
);

info!("Tower state: {:?}", tower);

leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());

bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);

if let Some(blockstore_root_scan) = blockstore_root_scan {
if let Err(err) = blockstore_root_scan.join() {
warn!("blockstore_root_scan failed to join {:?}", err);
Expand Down
23 changes: 16 additions & 7 deletions ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn load(
accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> LoadResult {
let (bank_forks, starting_snapshot_hashes) = load_bank_forks(
let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks(
genesis_config,
blockstore,
account_paths,
Expand All @@ -60,16 +60,15 @@ pub fn load(

blockstore_processor::process_blockstore_from_root(
blockstore,
bank_forks,
&mut bank_forks,
&leader_schedule_cache,
&process_options,
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
)
.map(|(bank_forks, leader_schedule_cache, ..)| {
(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
})
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -82,7 +81,11 @@ pub fn load_bank_forks(
process_options: &ProcessOptions,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> (BankForks, Option<StartingSnapshotHashes>) {
) -> (
BankForks,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
) {
let snapshot_present = if let Some(snapshot_config) = snapshot_config {
info!(
"Initializing bank snapshot path: {}",
Expand All @@ -107,7 +110,7 @@ pub fn load_bank_forks(
false
};

if snapshot_present {
let (bank_forks, starting_snapshot_hashes) = if snapshot_present {
bank_forks_from_snapshot(
genesis_config,
account_paths,
Expand Down Expand Up @@ -139,7 +142,13 @@ pub fn load_bank_forks(
),
None,
)
};

let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank());
if process_options.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}
(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
}

#[allow(clippy::too_many_arguments)]
Expand Down
43 changes: 21 additions & 22 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ impl BlockCostCapacityMeter {
}
}

pub type BlockstoreProcessorInner = (BankForks, LeaderScheduleCache, Option<Slot>);

pub type BlockstoreProcessorResult =
result::Result<BlockstoreProcessorInner, BlockstoreProcessorError>;

thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix))
Expand Down Expand Up @@ -570,8 +565,8 @@ pub fn test_process_blockstore(
genesis_config: &GenesisConfig,
blockstore: &Blockstore,
opts: ProcessOptions,
) -> BlockstoreProcessorInner {
let (bank_forks, ..) = crate::bank_forks_utils::load_bank_forks(
) -> (BankForks, LeaderScheduleCache) {
let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
Vec::new(),
Expand All @@ -584,14 +579,16 @@ pub fn test_process_blockstore(
let (accounts_package_sender, _) = unbounded();
process_blockstore_from_root(
blockstore,
bank_forks,
&mut bank_forks,
&leader_schedule_cache,
&opts,
None,
None,
None,
accounts_package_sender,
)
.unwrap()
.unwrap();
(bank_forks, leader_schedule_cache)
}

pub(crate) fn process_blockstore_for_bank_0(
Expand Down Expand Up @@ -632,13 +629,14 @@ pub(crate) fn process_blockstore_for_bank_0(
#[allow(clippy::too_many_arguments)]
pub fn process_blockstore_from_root(
blockstore: &Blockstore,
mut bank_forks: BankForks,
bank_forks: &mut BankForks,
leader_schedule_cache: &LeaderScheduleCache,
opts: &ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
) -> BlockstoreProcessorResult {
) -> result::Result<Option<Slot>, BlockstoreProcessorError> {
if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| {
*pool.borrow_mut() = rayon::ThreadPoolBuilder::new()
Expand Down Expand Up @@ -692,10 +690,6 @@ pub fn process_blockstore_from_root(

let mut timing = ExecuteTimings::default();
// Iterate and replay slots from blockstore starting from `start_slot`
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
if opts.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}

let mut last_full_snapshot_slot = None;

Expand All @@ -704,11 +698,11 @@ pub fn process_blockstore_from_root(
.unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot))
{
load_frozen_forks(
&mut bank_forks,
bank_forks,
start_slot,
&start_slot_meta,
blockstore,
&leader_schedule_cache,
leader_schedule_cache,
opts,
transaction_status_sender,
cache_block_meta_sender,
Expand Down Expand Up @@ -764,7 +758,7 @@ pub fn process_blockstore_from_root(
);
assert!(bank_forks.active_banks().is_empty());

Ok((bank_forks, leader_schedule_cache, last_full_snapshot_slot))
Ok(last_full_snapshot_slot)
}

/// Verify that a segment of entries has the correct number of ticks and hashes
Expand Down Expand Up @@ -2374,7 +2368,7 @@ pub mod tests {
accounts_db_test_hash_calculation: true,
..ProcessOptions::default()
};
let (_bank_forks, leader_schedule, _) =
let (_bank_forks, leader_schedule) =
test_process_blockstore(&genesis_config, &blockstore, opts);
assert_eq!(leader_schedule.max_schedules(), std::usize::MAX);
}
Expand Down Expand Up @@ -3160,11 +3154,14 @@ pub mod tests {
None,
);

let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1);

// Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = unbounded();
let (bank_forks, ..) = process_blockstore_from_root(
process_blockstore_from_root(
&blockstore,
bank_forks,
&mut bank_forks,
&leader_schedule_cache,
&opts,
None,
None,
Expand Down Expand Up @@ -3268,10 +3265,12 @@ pub mod tests {
};

let (accounts_package_sender, accounts_package_receiver) = unbounded();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);

process_blockstore_from_root(
&blockstore,
bank_forks,
&mut bank_forks,
&leader_schedule_cache,
&opts,
None,
None,
Expand Down

0 comments on commit 390dc24

Please sign in to comment.