Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pov-recovery race condition/Improve zombienet test #2526

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ where
?block_hash,
parent_hash = ?parent,
parent_scheduled_for_recovery,
waiting_blocks = self.waiting_for_parent.len(),
"Waiting for recovery of parent.",
);

Expand Down Expand Up @@ -442,13 +443,13 @@ where
_ => (),
}

self.import_block(block).await;
self.import_block(block);
}

/// Import the given `block`.
///
/// This will also recursivley drain `waiting_for_parent` and import them as well.
async fn import_block(&mut self, block: Block) {
fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();

tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
Expand Down Expand Up @@ -551,7 +552,6 @@ where
};

futures::pin_mut!(pending_candidates);

loop {
select! {
pending_candidate = pending_candidates.next() => {
Expand All @@ -573,6 +573,17 @@ where
imported = imported_blocks.next() => {
if let Some(imported) = imported {
self.clear_waiting_recovery(&imported.hash);

// We need to double check that no blocks are waiting for this block.
// Can happen when a waiting child block is queued to wait for parent while the parent block is still
// in the import queue.
if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
for block in waiting_blocks {
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
self.import_block(block);
}
};

} else {
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
Expand Down
18 changes: 12 additions & 6 deletions cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod genesis;
use runtime::AccountId;
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use std::{
collections::HashSet,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
Expand Down Expand Up @@ -57,7 +58,7 @@ use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi};
use frame_system_rpc_runtime_api::AccountNonceApi;
use polkadot_node_subsystem::{errors::RecoveryError, messages::AvailabilityRecoveryMessage};
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Hash as PHash, PersistedValidationData};
use polkadot_primitives::{CandidateHash, CollatorPair, Hash as PHash, PersistedValidationData};
use polkadot_service::ProvideRuntimeApi;
use sc_consensus::ImportQueue;
use sc_network::{
Expand Down Expand Up @@ -144,12 +145,13 @@ pub type TransactionPool = Arc<sc_transaction_pool::FullPool<Block, Client>>;
pub struct FailingRecoveryHandle {
overseer_handle: OverseerHandle,
counter: u32,
failed_hashes: HashSet<CandidateHash>,
}

impl FailingRecoveryHandle {
/// Create a new FailingRecoveryHandle
pub fn new(overseer_handle: OverseerHandle) -> Self {
Self { overseer_handle, counter: 0 }
Self { overseer_handle, counter: 0, failed_hashes: Default::default() }
}
}

Expand All @@ -160,11 +162,15 @@ impl RecoveryHandle for FailingRecoveryHandle {
message: AvailabilityRecoveryMessage,
origin: &'static str,
) {
// For every 5th block we immediately signal unavailability to trigger
// a retry.
if self.counter % 5 == 0 {
let AvailabilityRecoveryMessage::RecoverAvailableData(ref receipt, _, _, _) = message;
let candidate_hash = receipt.hash();

// For every 3rd block we immediately signal unavailability to trigger
// a retry. The same candidate is never failed multiple times to ensure progress.
if self.counter % 3 == 0 && self.failed_hashes.insert(candidate_hash) {
tracing::info!(target: LOG_TARGET, ?candidate_hash, "Failing pov recovery.");

let AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, back_sender) = message;
tracing::info!(target: LOG_TARGET, "Failing pov recovery.");
back_sender
.send(Err(RecoveryError::Unavailable))
.expect("Return channel should work here.");
Expand Down
5 changes: 2 additions & 3 deletions cumulus/zombienet/tests/0002-pov_recovery.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,20 @@ add_to_genesis = false
args = ["--disable-block-announcements"]

# run 'alice' as a parachain collator who does not produce blocks
# 'alice' is a bootnode for 'bob' and 'charlie'
[[parachains.collators]]
name = "alice"
validator = true # collator
image = "{{COL_IMAGE}}"
command = "test-parachain"
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--use-null-consensus", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]

# run 'charlie' as a parachain full node
[[parachains.collators]]
name = "charlie"
validator = false # full node
image = "{{COL_IMAGE}}"
command = "test-parachain"
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0","--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]
args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'bob'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0", "--", "--reserved-only", "--reserved-nodes {{'ferdie'|zombie('multiAddress')}}"]

# we fail recovery for 'eve' from time to time to test retries
[[parachains.collators]]
Expand Down