Skip to content

Commit

Permalink
Make approval-voting runable on a worker thread
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Gheorghe <[email protected]>
  • Loading branch information
alexggh committed Jul 16, 2024
1 parent 14727c5 commit 1942139
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 168 deletions.
8 changes: 4 additions & 4 deletions polkadot/cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,16 @@ pub fn run() -> Result<()> {
Ok(runner.async_run(|mut config| {
let (client, backend, _, task_manager) =
polkadot_service::new_chain_ops(&mut config, None)?;
let task_handle = task_manager.spawn_handle();
let aux_revert = Box::new(|client, backend, blocks| {
polkadot_service::revert_backend(client, backend, blocks, config).map_err(
|err| {
polkadot_service::revert_backend(client, backend, blocks, config, task_handle)
.map_err(|err| {
match err {
polkadot_service::Error::Blockchain(err) => err.into(),
// Generic application-specific error.
err => sc_cli::Error::Application(err.into()),
}
},
)
})
});
Ok((
cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli),
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/approval-voting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kvdb = { workspace = true }
derive_more = { workspace = true, default-features = true }
thiserror = { workspace = true }
itertools = { workspace = true }
async-trait = { workspace = true }

polkadot-node-subsystem = { workspace = true, default-features = true }
polkadot-node-subsystem-util = { workspace = true, default-features = true }
Expand Down
109 changes: 61 additions & 48 deletions polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{
overseer, RuntimeApiError, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo};
use polkadot_overseer::SubsystemSender;
use polkadot_primitives::{
node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog,
CoreIndex, GroupIndex, Hash, Header, SessionIndex,
Expand Down Expand Up @@ -110,8 +111,8 @@ enum ImportedBlockInfoError {
/// Computes information about the imported block. Returns an error if the info couldn't be
/// extracted.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn imported_block_info<Context>(
ctx: &mut Context,
async fn imported_block_info<Sender: SubsystemSender<RuntimeApiMessage>>(
sender: &mut Sender,
env: ImportedBlockInfoEnv<'_>,
block_hash: Hash,
block_header: &Header,
Expand All @@ -123,11 +124,12 @@ async fn imported_block_info<Context>(
// fetch candidates
let included_candidates: Vec<_> = {
let (c_tx, c_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CandidateEvents(c_tx),
))
.await;

let events: Vec<CandidateEvent> = match c_rx.await {
Ok(Ok(events)) => events,
Expand All @@ -150,11 +152,12 @@ async fn imported_block_info<Context>(
// short, that shouldn't happen.
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
ctx.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_header.parent_hash,
RuntimeApiRequest::SessionIndexForChild(s_tx),
))
.await;

let session_index = match s_rx.await {
Ok(Ok(s)) => s,
Expand Down Expand Up @@ -200,11 +203,12 @@ async fn imported_block_info<Context>(
// by one block. This gives us the opposite invariant for sessions - the parent block's
// post-state gives us the canonical information about the session index for any of its
// children, regardless of which slot number they might be produced at.
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;
sender
.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::CurrentBabeEpoch(s_tx),
))
.await;

match s_rx.await {
Ok(Ok(s)) => s,
Expand All @@ -215,7 +219,7 @@ async fn imported_block_info<Context>(
};

let extended_session_info =
get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await;
get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await;
let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| {
*extended_session_info
.node_features
Expand All @@ -224,7 +228,7 @@ async fn imported_block_info<Context>(
.unwrap_or(&false)
});

let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index)
let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index)
.await
.ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?;

Expand Down Expand Up @@ -328,9 +332,15 @@ pub struct BlockImportedCandidates {
/// * and return information about all candidates imported under each block.
///
/// It is the responsibility of the caller to schedule wakeups for each block.
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
pub(crate) async fn handle_new_head<Context, B: Backend>(
ctx: &mut Context,
pub(crate) async fn handle_new_head<
Sender: SubsystemSender<ChainApiMessage>
+ SubsystemSender<RuntimeApiMessage>
+ SubsystemSender<ChainSelectionMessage>,
AVSender: SubsystemSender<ApprovalDistributionMessage>,
B: Backend,
>(
sender: &mut Sender,
approval_voting_sender: &mut AVSender,
state: &State,
db: &mut OverlayedBackend<'_, B>,
session_info_provider: &mut RuntimeInfo,
Expand All @@ -348,7 +358,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

let header = {
let (h_tx, h_rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
match h_rx.await? {
Err(e) => {
gum::debug!(
Expand All @@ -374,7 +384,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number);

let new_blocks = determine_new_blocks(
ctx.sender(),
sender,
|h| db.load_block_entry(h).map(|e| e.is_some()),
head,
&header,
Expand All @@ -400,12 +410,15 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
keystore: &state.keystore,
};

match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await {
match imported_block_info(sender, env, block_hash, &block_header, finalized_number)
.await
{
Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
Err(error) => {
// It's possible that we've lost a race with finality.
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
sender
.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx))
.await;

let lost_to_finality = match rx.await {
Expand Down Expand Up @@ -449,17 +462,11 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
force_approve,
} = imported_block_info;

let session_info = match get_session_info(
session_info_provider,
ctx.sender(),
head,
session_index,
)
.await
{
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};
let session_info =
match get_session_info(session_info_provider, sender, head, session_index).await {
Some(session_info) => session_info,
None => return Ok(Vec::new()),
};

let (block_tick, no_show_duration) = {
let block_tick = slot_number_to_tick(state.slot_duration_millis, slot);
Expand Down Expand Up @@ -509,7 +516,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
};
// If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
sender.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}

let block_entry = v3::BlockEntry {
Expand Down Expand Up @@ -566,7 +573,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(

// Notify chain-selection of all approved hashes.
for hash in approved_hashes {
ctx.send_message(ChainSelectionMessage::Approved(hash)).await;
sender.send_message(ChainSelectionMessage::Approved(hash)).await;
}
}

Expand Down Expand Up @@ -602,7 +609,8 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
"Informing distribution of newly imported chain",
);

ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
approval_voting_sender
.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
Ok(imported_candidates)
}

Expand Down Expand Up @@ -660,7 +668,7 @@ pub(crate) mod tests {
State {
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: 6_000,
clock: Box::new(MockClock::default()),
clock: Arc::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
Expand Down Expand Up @@ -804,8 +812,9 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4))
.await
.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -951,7 +960,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await;

assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable));
})
Expand Down Expand Up @@ -1090,7 +1099,7 @@ pub(crate) mod tests {
keystore: &LocalKeystore::in_memory(),
};

let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await;
let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(6)).await;

assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized));
})
Expand Down Expand Up @@ -1126,7 +1135,8 @@ pub(crate) mod tests {
#[test]
fn imported_block_info_extracts_force_approve() {
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context(pool.clone());
let (mut ctx, mut handle) =
make_subsystem_context::<ApprovalVotingMessage, _>(pool.clone());

let session = 5;
let session_info = dummy_session_info(session);
Expand Down Expand Up @@ -1189,7 +1199,7 @@ pub(crate) mod tests {
};

let info =
imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap();
imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await.unwrap();

assert_eq!(info.included_candidates, included_candidates);
assert_eq!(info.session_index, session);
Expand Down Expand Up @@ -1382,8 +1392,11 @@ pub(crate) mod tests {
let test_fut = {
Box::pin(async move {
let mut overlay_db = OverlayedBackend::new(&db);

let mut approval_voting_sender = ctx.sender().clone();
let result = handle_new_head(
&mut ctx,
ctx.sender(),
&mut approval_voting_sender,
&state,
&mut overlay_db,
&mut session_info_provider,
Expand Down
Loading

0 comments on commit 1942139

Please sign in to comment.