Skip to content

Commit

Permalink
Adds LoadedPrograms::next_cooperative_loading_task() and LoadedProgra…
Browse files Browse the repository at this point in the history
…ms::cooperative_loading_task_complete().
  • Loading branch information
Lichtso committed Nov 21, 2023
1 parent 11c6f81 commit b0e6d78
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 40 deletions.
75 changes: 75 additions & 0 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,81 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
extracted
}

/// In cooperative loading a TX batch calls this to receive the next task
pub fn next_cooperative_loading_task(
&mut self,
extracted: &Arc<Mutex<ExtractedPrograms>>,
) -> Option<(Pubkey, Arc<LoadedProgram>, bool)> {
// The Mutexes are strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let extracted = extracted.lock().unwrap();
let (key, (entry, reload)) =
extracted.loading.iter().find(|(_key, (entry, _reload))| {
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return false;
};
let processing = mutex.lock().unwrap().0;
!processing
})?;
let (key, entry, reload) = (*key, entry.clone(), *reload);
drop(extracted);
{
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return None;
};
let processing = &mut mutex.lock().unwrap().0;
*processing = true;
}
Some((key, entry, reload))
}

/// Upon completing a task in cooperative loading a TX batch calls this to submit the result
pub fn cooperative_loading_task_complete(
&mut self,
key: Pubkey,
loading: Arc<LoadedProgram>,
loaded: Arc<LoadedProgram>,
) {
let LoadedProgramType::Loading(mutex) = &loading.program else {
debug_assert!(false);
return;
};
let mut mutex = mutex.lock().unwrap();
let processing = &mut mutex.0;
*processing = false;
let waiting_list_is_empty = {
let fork_graph = self
.fork_graph
.as_ref()
.expect("Program cache doesn't have fork graph.");
let fork_graph = fork_graph
.read()
.expect("Failed to lock fork graph for reading.");
let waiting_list = &mut mutex.1;
waiting_list.retain(|waiting| {
// The Mutex around `waiting` is strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let mut waiting = waiting.lock().unwrap();
let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot);
if loaded.deployment_slot <= self.latest_root_slot
|| matches!(relation, BlockRelation::Equal | BlockRelation::Descendant)
{
waiting.loading.remove(&key);
waiting.loaded.assign_program(key, loaded.clone());
return false;
}
true
});
waiting_list.is_empty()
};
if waiting_list_is_empty {
self.remove_program(key, &loading);
}
self.assign_program(key, loaded);
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
self.assign_program(*key, entry.clone());
Expand Down
73 changes: 33 additions & 40 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ use {
compute_budget_processor::process_compute_budget_instructions,
invoke_context::BuiltinFunctionWithContext,
loaded_programs::{
ExtractedPrograms, LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria,
LoadedProgramType, LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria, LoadedProgramType,
LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
ProgramRuntimeEnvironments, WorkingSlot, DELAY_VISIBILITY_SLOT_OFFSET,
},
log_collector::LogCollector,
Expand Down Expand Up @@ -5068,50 +5068,43 @@ impl Bank {
.collect()
};

let ExtractedPrograms {
loaded: mut loaded_programs_for_txs,
missing,
unloaded,
} = {
let extracted = {
// Lock the global cache to figure out which programs need to be loaded
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
loaded_programs_cache.extract(self, programs_and_slots.into_iter())
};

// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
.iter()
.map(|(key, count)| {
let program = self.load_program(key, false, None);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Reload unloaded programs while global cache is unlocked
let unloaded_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = unloaded
.iter()
.map(|(key, count)| {
let program = self.load_program(key, true, None);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Lock the global cache again to replenish the missing programs
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
for (key, program) in missing_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
}
for (key, program) in unloaded_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
// Cooperative loading phase
let mut finished_task = None;
loop {
// Critical section for global coordination
let (key, loading, reload) = {
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
if let Some((key, loading, loaded)) = finished_task.take() {
loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded);
}
if Arc::strong_count(&extracted) == 1 {
// All the missing entries for this batch have been loaded
break;
}
if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted)
{
task
} else {
// Waiting for some other TX batch to complete loading the programs needed by this TX batch
// TODO: Use a Condvar here
continue;
}
};
// Load, verify and compile the program outside of the critical section
let loaded = self.load_program(&key, reload, None);
finished_task = Some((key, loading, loaded));
}

loaded_programs_for_txs
// When we get here we should be the only remaining owner
std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap())
.unwrap()
.loaded
}

/// Returns a hash map of executable program accounts (program accounts that are not writable
Expand Down

0 comments on commit b0e6d78

Please sign in to comment.