From b1a18f559f3099e1d6cbccaae5624d13e401bf61 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 5 Jul 2024 11:00:51 +0200 Subject: [PATCH] Introduce basic slot-based collator (#4097) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part of #3168 On top of #3568 ### Changes Overview - Introduces a new collator variant in `cumulus/client/consensus/aura/src/collators/slot_based/mod.rs` - Two tasks are part of that module, one for block building and one for collation building and submission. - Introduces a new variant of `cumulus-test-runtime` which has 2s slot duration, used for zombienet testing - Zombienet tests for the new collator **Note:** This collator is considered experimental and should only be used for testing and exploration for now. ### Comparison with `lookahead` collator - The new variant is slot based, meaning it waits for the next slot of the parachain, then starts authoring - The search for potential parents remains mostly unchanged from lookahead - As anchor, we use the current best relay parent - In general, the new collator tends to be anchored to one relay parent earlier. `lookahead` generally waits for a new relay block to arrive before it attempts to build a block. This means the actual timing of parachain blocks depends on when the relay block has been authored and imported. With the slot-triggered approach we are authoring directly on the slot boundary, were a new relay chain block has probably not yet arrived. ### Limitations - Overall, the current implementation focuses on the "happy path" - We assume that we want to collate close to the tip of the relay chain. It would be useful however to have some kind of configurable drift, so that we could lag behind a bit. https://github.com/paritytech/polkadot-sdk/issues/3965 - The collation task is pretty dumb currently. It checks if we have cores scheduled and if yes, submits all the messages we have received from the block builder until we have something submitted for every core. Ideally we should do some extra checks, i.e. we do not need to submit if the built block is already too old (build on a out of range relay parent) or was authored with a relay parent that is not an ancestor of the relay block we are submitting at. https://github.com/paritytech/polkadot-sdk/issues/3966 - There is no throttling, we assume that we can submit _velocity_ blocks every relay chain block. There should be communication between the collator task and block-builder task. - The parent search and ConsensusHook are not yet properly adjusted. The parent search makes assumptions about the pending candidate which no longer hold. https://github.com/paritytech/polkadot-sdk/issues/3967 - Custom triggers for block building not implemented. --------- Co-authored-by: Davide Galassi Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Co-authored-by: Bastian Köcher Co-authored-by: Javier Viola <363911+pepoviola@users.noreply.github.com> Co-authored-by: command-bot <> --- .gitlab/pipeline/zombienet/cumulus.yml | 24 + Cargo.lock | 12 +- cumulus/client/consensus/aura/Cargo.toml | 2 + cumulus/client/consensus/aura/src/collator.rs | 64 ++- .../consensus/aura/src/collators/basic.rs | 10 +- .../consensus/aura/src/collators/lookahead.rs | 220 +------- .../consensus/aura/src/collators/mod.rs | 182 ++++++- .../slot_based/block_builder_task.rs | 491 ++++++++++++++++++ .../collators/slot_based/collation_task.rs | 140 +++++ .../aura/src/collators/slot_based/mod.rs | 178 +++++++ cumulus/client/consensus/common/src/lib.rs | 202 +------ .../common/src/parachain_consensus.rs | 105 ++-- .../consensus/common/src/parent_search.rs | 418 +++++++++++++++ cumulus/client/consensus/common/src/tests.rs | 373 ++++++++++++- cumulus/client/network/src/tests.rs | 14 +- cumulus/client/parachain-inherent/Cargo.toml | 1 - cumulus/client/pov-recovery/src/tests.rs | 11 +- .../src/lib.rs | 13 +- .../client/relay-chain-interface/src/lib.rs | 19 +- .../relay-chain-minimal-node/Cargo.toml | 7 - .../relay-chain-rpc-interface/src/lib.rs | 9 +- .../pallets/aura-ext/src/consensus_hook.rs | 21 +- cumulus/pallets/aura-ext/src/lib.rs | 2 +- cumulus/polkadot-parachain/Cargo.toml | 2 +- cumulus/polkadot-parachain/src/cli.rs | 6 + cumulus/polkadot-parachain/src/command.rs | 38 +- cumulus/polkadot-parachain/src/service.rs | 137 ++++- cumulus/test/client/src/lib.rs | 1 + cumulus/test/runtime/Cargo.toml | 1 + cumulus/test/runtime/build.rs | 7 + cumulus/test/runtime/src/lib.rs | 26 +- cumulus/test/service/Cargo.toml | 3 - cumulus/test/service/src/chain_spec.rs | 19 +- cumulus/test/service/src/cli.rs | 18 +- cumulus/test/service/src/lib.rs | 117 +++-- cumulus/test/service/src/main.rs | 2 + .../tests/0003-full_node_catching_up.zndsl | 3 + .../0006-rpc_collator_builds_blocks.zndsl | 4 + .../tests/0008-elastic_authoring.toml | 50 ++ .../tests/0008-elastic_authoring.zndsl | 19 + .../tests/0009-elastic_pov_recovery.toml | 48 ++ .../tests/0009-elastic_pov_recovery.zndsl | 19 + cumulus/zombienet/tests/assign-core.js | 46 ++ .../0001-basic-3cores-6s-blocks.toml | 4 +- prdoc/pr_4097.prdoc | 45 ++ .../basic-authorship/src/basic_authorship.rs | 6 +- .../client/consensus/aura/src/standalone.rs | 6 +- substrate/client/consensus/slots/src/lib.rs | 2 +- templates/parachain/node/src/service.rs | 12 +- 49 files changed, 2554 insertions(+), 605 deletions(-) create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs create mode 100644 cumulus/client/consensus/aura/src/collators/slot_based/mod.rs create mode 100644 cumulus/client/consensus/common/src/parent_search.rs create mode 100644 cumulus/zombienet/tests/0008-elastic_authoring.toml create mode 100644 cumulus/zombienet/tests/0008-elastic_authoring.zndsl create mode 100644 cumulus/zombienet/tests/0009-elastic_pov_recovery.toml create mode 100644 cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl create mode 100644 cumulus/zombienet/tests/assign-core.js create mode 100644 prdoc/pr_4097.prdoc diff --git a/.gitlab/pipeline/zombienet/cumulus.yml b/.gitlab/pipeline/zombienet/cumulus.yml index a7f321505bac..6e2b53fae619 100644 --- a/.gitlab/pipeline/zombienet/cumulus.yml +++ b/.gitlab/pipeline/zombienet/cumulus.yml @@ -149,3 +149,27 @@ zombienet-cumulus-0007-full_node_warp_sync: --local-dir="${LOCAL_DIR}" --concurrency=1 --test="0007-full_node_warp_sync.zndsl" + +zombienet-cumulus-0008-elastic_authoring: + extends: + - .zombienet-cumulus-common + - .zombienet-refs + - .zombienet-before-script + - .zombienet-after-script + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}" + --concurrency=1 + --test="0008-elastic_authoring.zndsl" + +zombienet-cumulus-0009-elastic_pov_recovery: + extends: + - .zombienet-cumulus-common + - .zombienet-refs + - .zombienet-before-script + - .zombienet-after-script + script: + - /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh + --local-dir="${LOCAL_DIR}" + --concurrency=1 + --test="0009-elastic_pov_recovery.zndsl" diff --git a/Cargo.lock b/Cargo.lock index 5fb439d19957..8ba7043cbb0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3772,6 +3772,7 @@ dependencies = [ "sc-consensus-babe", "sc-consensus-slots", "sc-telemetry", + "sc-utils", "schnellru", "sp-api", "sp-application-crypto", @@ -3786,6 +3787,7 @@ dependencies = [ "sp-state-machine", "sp-timestamp", "substrate-prometheus-endpoint", + "tokio", "tracing", ] @@ -3906,7 +3908,6 @@ dependencies = [ "cumulus-test-relay-sproof-builder", "parity-scale-codec", "sc-client-api", - "scale-info", "sp-api", "sp-crypto-hashing", "sp-inherents", @@ -4333,15 +4334,8 @@ dependencies = [ "cumulus-relay-chain-interface", "cumulus-relay-chain-rpc-interface", "futures", - "parking_lot 0.12.3", - "polkadot-availability-recovery", - "polkadot-collator-protocol", "polkadot-core-primitives", "polkadot-network-bridge", - "polkadot-node-collation-generation", - "polkadot-node-core-chain-api", - "polkadot-node-core-prospective-parachains", - "polkadot-node-core-runtime-api", "polkadot-node-network-protocol", "polkadot-node-subsystem-util", "polkadot-overseer", @@ -4535,7 +4529,6 @@ dependencies = [ "polkadot-test-service", "portpicker", "rand", - "rococo-parachain-runtime", "sc-basic-authorship", "sc-block-builder", "sc-chain-spec", @@ -4560,7 +4553,6 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-consensus-aura", - "sp-consensus-grandpa", "sp-core", "sp-io", "sp-keyring", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 5ab3e6f25129..01e07cb395a9 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -16,6 +16,7 @@ futures = { workspace = true } parking_lot = { workspace = true } tracing = { workspace = true, default-features = true } schnellru = { workspace = true } +tokio = { workspace = true, features = ["macros"] } # Substrate sc-client-api = { workspace = true, default-features = true } @@ -23,6 +24,7 @@ sc-consensus = { workspace = true, default-features = true } sc-consensus-aura = { workspace = true, default-features = true } sc-consensus-babe = { workspace = true, default-features = true } sc-consensus-slots = { workspace = true, default-features = true } +sc-utils = { workspace = true, default-features = true } sc-telemetry = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-application-crypto = { workspace = true, default-features = true } diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 776052215d93..dc830e463a4f 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -156,15 +156,8 @@ where Ok((paras_inherent_data, other_inherent_data)) } - /// Propose, seal, and import a block, packaging it into a collation. - /// - /// Provide the slot to build at as well as any other necessary pre-digest logs, - /// the inherent data, and the proposal duration and PoV size limits. - /// - /// The Aura pre-digest should not be explicitly provided and is set internally. - /// - /// This does not announce the collation to the parachain network or the relay chain. - pub async fn collate( + /// Build and import a parachain block on the given parent header, using the given slot claim. + pub async fn build_block_and_import( &mut self, parent_header: &Block::Header, slot_claim: &SlotClaim, @@ -172,10 +165,7 @@ where inherent_data: (ParachainInherentData, InherentData), proposal_duration: Duration, max_pov_size: usize, - ) -> Result< - Option<(Collation, ParachainBlockData, Block::Hash)>, - Box, - > { + ) -> Result>, Box> { let mut digest = additional_pre_digest.into().unwrap_or_default(); digest.push(slot_claim.pre_digest.clone()); @@ -205,7 +195,6 @@ where ) .map_err(|e| e as Box)?; - let post_hash = sealed_importable.post_hash(); let block = Block::new( sealed_importable.post_header(), sealed_importable @@ -220,11 +209,46 @@ where .map_err(|e| Box::new(e) as Box) .await?; - if let Some((collation, block_data)) = self.collator_service.build_collation( - parent_header, - post_hash, - ParachainCandidate { block, proof: proposal.proof }, - ) { + Ok(Some(ParachainCandidate { block, proof: proposal.proof })) + } + + /// Propose, seal, import a block and packaging it into a collation. + /// + /// Provide the slot to build at as well as any other necessary pre-digest logs, + /// the inherent data, and the proposal duration and PoV size limits. + /// + /// The Aura pre-digest should not be explicitly provided and is set internally. + /// + /// This does not announce the collation to the parachain network or the relay chain. + pub async fn collate( + &mut self, + parent_header: &Block::Header, + slot_claim: &SlotClaim, + additional_pre_digest: impl Into>>, + inherent_data: (ParachainInherentData, InherentData), + proposal_duration: Duration, + max_pov_size: usize, + ) -> Result< + Option<(Collation, ParachainBlockData, Block::Hash)>, + Box, + > { + let maybe_candidate = self + .build_block_and_import( + parent_header, + slot_claim, + additional_pre_digest, + inherent_data, + proposal_duration, + max_pov_size, + ) + .await?; + + let Some(candidate) = maybe_candidate else { return Ok(None) }; + + let hash = candidate.block.header().hash(); + if let Some((collation, block_data)) = + self.collator_service.build_collation(parent_header, hash, candidate) + { tracing::info!( target: crate::LOG_TARGET, "PoV size {{ header: {}kb, extrinsics: {}kb, storage_proof: {}kb }}", @@ -241,7 +265,7 @@ where ); } - Ok(Some((collation, block_data, post_hash))) + Ok(Some((collation, block_data, hash))) } else { Err(Box::::from("Unable to produce collation") as Box) diff --git a/cumulus/client/consensus/aura/src/collators/basic.rs b/cumulus/client/consensus/aura/src/collators/basic.rs index 1047c6219ad1..4efd50a04ec6 100644 --- a/cumulus/client/consensus/aura/src/collators/basic.rs +++ b/cumulus/client/consensus/aura/src/collators/basic.rs @@ -41,7 +41,6 @@ use sc_consensus::BlockImport; use sp_api::{CallApiAt, ProvideRuntimeApi}; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; use sp_consensus_aura::AuraApi; use sp_core::crypto::Pair; use sp_inherents::CreateInherentDataProviders; @@ -53,7 +52,7 @@ use std::{sync::Arc, time::Duration}; use crate::collator as collator_util; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -64,8 +63,6 @@ pub struct Params { pub para_client: Arc, /// A handle to the relay-chain client. pub relay_client: RClient, - /// A chain synchronization oracle. - pub sync_oracle: SO, /// The underlying keystore, which should contain Aura consensus keys. pub keystore: KeystorePtr, /// The collator key used to sign collations before submitting to validators. @@ -89,8 +86,8 @@ pub struct Params { } /// Run bare Aura consensus as a relay-chain-driven collator. -pub fn run( - params: Params, +pub fn run( + params: Params, ) -> impl Future + Send + 'static where Block: BlockT + Send, @@ -108,7 +105,6 @@ where CIDP: CreateInherentDataProviders + Send + 'static, CIDP::InherentDataProviders: Send, BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, - SO: SyncOracle + Send + Sync + Clone + 'static, Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, P: Pair, diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index b6f7b07f55d3..749b13112394 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -33,46 +33,34 @@ use codec::{Codec, Encode}; use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; -use cumulus_client_consensus_common::{ - self as consensus_common, load_abridged_host_configuration, ParachainBlockImportMarker, - ParentSearchParams, -}; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_primitives_aura::AuraUnincludedSegmentApi; -use cumulus_primitives_core::{ - relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData, -}; +use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::SubmitCollationParams; -use polkadot_node_subsystem::messages::{ - CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest, -}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; -use polkadot_primitives::{ - AsyncBackingParams, CollatorPair, CoreIndex, CoreState, Id as ParaId, OccupiedCoreAssumption, -}; +use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption}; -use futures::{channel::oneshot, prelude::*}; +use futures::prelude::*; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; use sc_consensus::BlockImport; -use sc_consensus_aura::standalone as aura_internal; use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; use sp_consensus_aura::{AuraApi, Slot}; use sp_core::crypto::Pair; use sp_inherents::CreateInherentDataProviders; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; -use sp_timestamp::Timestamp; use std::{sync::Arc, time::Duration}; -use crate::collator::{self as collator_util, SlotClaim}; +use crate::collator::{self as collator_util}; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -87,8 +75,6 @@ pub struct Params { pub relay_client: RClient, /// A validation code hash provider, used to get the current validation code hash. pub code_hash_provider: CHP, - /// A chain synchronization oracle. - pub sync_oracle: SO, /// The underlying keystore, which should contain Aura consensus keys. pub keystore: KeystorePtr, /// The collator key used to sign collations before submitting to validators. @@ -110,8 +96,8 @@ pub struct Params { } /// Run async-backing-friendly Aura. -pub fn run( - mut params: Params, +pub fn run( + mut params: Params, ) -> impl Future + Send + 'static where Block: BlockT, @@ -130,7 +116,6 @@ where CIDP: CreateInherentDataProviders + 'static, CIDP::InherentDataProviders: Send, BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, - SO: SyncOracle + Send + Sync + Clone + 'static, Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, @@ -138,14 +123,6 @@ where P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, { - // This is an arbitrary value which is likely guaranteed to exceed any reasonable - // limit, as it would correspond to 10 non-included blocks. - // - // Since we only search for parent blocks which have already been imported, - // we can guarantee that all imported blocks respect the unincluded segment - // rules specified by the parachain's runtime and thus will never be too deep. - const PARENT_SEARCH_DEPTH: usize = 10; - async move { cumulus_client_collator::initialize_collator_subsystems( &mut params.overseer_handle, @@ -186,12 +163,9 @@ where while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); - // TODO: Currently we use just the first core here, but for elastic scaling - // we iterate and build on all of the cores returned. - let core_index = if let Some(core_index) = cores_scheduled_for_para( + let core_index = if let Some(core_index) = super::cores_scheduled_for_para( relay_parent, params.para_id, - &mut params.overseer_handle, &mut params.relay_client, ) .await @@ -226,42 +200,16 @@ where }, }; - let parent_search_params = ParentSearchParams { + let (included_block, initial_parent) = match crate::collators::find_parent( relay_parent, - para_id: params.para_id, - ancestry_lookback: async_backing_params(relay_parent, ¶ms.relay_client) - .await - .map(|c| c.allowed_ancestry_len as usize) - .unwrap_or(0), - max_depth: PARENT_SEARCH_DEPTH, - ignore_alternative_branches: true, - }; - - let potential_parents = - cumulus_client_consensus_common::find_potential_parents::( - parent_search_params, - &*params.para_backend, - ¶ms.relay_client, - ) - .await; - - let mut potential_parents = match potential_parents { - Err(e) => { - tracing::error!( - target: crate::LOG_TARGET, - ?relay_parent, - err = ?e, - "Could not fetch potential parents to build upon" - ); - - continue - }, - Ok(x) => x, - }; - - let included_block = match potential_parents.iter().find(|x| x.depth == 0) { - None => continue, // also serves as an `is_empty` check. - Some(b) => b.hash, + params.para_id, + &*params.para_backend, + ¶ms.relay_client, + ) + .await + { + Some(value) => value, + None => continue, }; let para_client = &*params.para_client; @@ -292,7 +240,7 @@ where relay_chain_slot_duration = ?params.relay_chain_slot_duration, "Adjusted relay-chain slot to parachain slot" ); - Some(can_build_upon::<_, _, P>( + Some(super::can_build_upon::<_, _, P>( slot_now, timestamp, block_hash, @@ -302,13 +250,6 @@ where )) }; - // Sort by depth, ascending, to choose the longest chain. - // - // If the longest chain has space, build upon that. Otherwise, don't - // build at all. - potential_parents.sort_by_key(|a| a.depth); - let Some(initial_parent) = potential_parents.pop() else { continue }; - // Build in a loop until not allowed. Note that the authorities can change // at any block, so we need to re-claim our slot every time. let mut parent_hash = initial_parent.hash; @@ -435,124 +376,3 @@ where } } } - -// Checks if we own the slot at the given block and whether there -// is space in the unincluded segment. -async fn can_build_upon( - slot: Slot, - timestamp: Timestamp, - parent_hash: Block::Hash, - included_block: Block::Hash, - client: &Client, - keystore: &KeystorePtr, -) -> Option> -where - Client: ProvideRuntimeApi, - Client::Api: AuraApi + AuraUnincludedSegmentApi, - P: Pair, - P::Public: Codec, - P::Signature: Codec, -{ - let runtime_api = client.runtime_api(); - let authorities = runtime_api.authorities(parent_hash).ok()?; - let author_pub = aura_internal::claim_slot::

(slot, &authorities, keystore).await?; - - // Here we lean on the property that building on an empty unincluded segment must always - // be legal. Skipping the runtime API query here allows us to seamlessly run this - // collator against chains which have not yet upgraded their runtime. - if parent_hash != included_block { - if !runtime_api.can_build_upon(parent_hash, included_block, slot).ok()? { - return None - } - } - - Some(SlotClaim::unchecked::

(author_pub, slot, timestamp)) -} - -/// Reads async backing parameters from the relay chain storage at the given relay parent. -async fn async_backing_params( - relay_parent: PHash, - relay_client: &impl RelayChainInterface, -) -> Option { - match load_abridged_host_configuration(relay_parent, relay_client).await { - Ok(Some(config)) => Some(config.async_backing_params), - Ok(None) => { - tracing::error!( - target: crate::LOG_TARGET, - "Active config is missing in relay chain storage", - ); - None - }, - Err(err) => { - tracing::error!( - target: crate::LOG_TARGET, - ?err, - ?relay_parent, - "Failed to read active config from relay chain client", - ); - None - }, - } -} - -// Return all the cores assigned to the para at the provided relay parent. -async fn cores_scheduled_for_para( - relay_parent: PHash, - para_id: ParaId, - overseer_handle: &mut OverseerHandle, - relay_client: &impl RelayChainInterface, -) -> Vec { - // Get `AvailabilityCores` from runtime - let (tx, rx) = oneshot::channel(); - let request = RuntimeApiRequest::AvailabilityCores(tx); - overseer_handle - .send_msg(RuntimeApiMessage::Request(relay_parent, request), "LookaheadCollator") - .await; - - let cores = match rx.await { - Ok(Ok(cores)) => cores, - Ok(Err(error)) => { - tracing::error!( - target: crate::LOG_TARGET, - ?error, - ?relay_parent, - "Failed to query availability cores runtime API", - ); - return Vec::new() - }, - Err(oneshot::Canceled) => { - tracing::error!( - target: crate::LOG_TARGET, - ?relay_parent, - "Sender for availability cores runtime request dropped", - ); - return Vec::new() - }, - }; - - let max_candidate_depth = async_backing_params(relay_parent, relay_client) - .await - .map(|c| c.max_candidate_depth) - .unwrap_or(0); - - cores - .iter() - .enumerate() - .filter_map(|(index, core)| { - let core_para_id = match core { - CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id), - CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core - .next_up_on_available - .as_ref() - .map(|scheduled_core| scheduled_core.para_id), - CoreState::Free | CoreState::Occupied(_) => None, - }; - - if core_para_id == Some(para_id) { - Some(CoreIndex(index as u32)) - } else { - None - } - }) - .collect() -} diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index 0abc034c1ed6..7d430ecdc727 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -20,13 +20,35 @@ //! included parachain block, as well as the [`lookahead`] collator, which prospectively //! builds on parachain blocks which have not yet been included in the relay chain. +use crate::collator::SlotClaim; +use codec::Codec; +use cumulus_client_consensus_common::{ + self as consensus_common, load_abridged_host_configuration, ParentSearchParams, +}; +use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot}; +use cumulus_primitives_core::{relay_chain::Hash as ParaHash, BlockT}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_primitives::{ - Hash as RHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash, + AsyncBackingParams, CoreIndex, CoreState, Hash as RelayHash, Id as ParaId, + OccupiedCoreAssumption, ValidationCodeHash, }; +use sc_consensus_aura::{standalone as aura_internal, AuraApi}; +use sp_api::ProvideRuntimeApi; +use sp_core::Pair; +use sp_keystore::KeystorePtr; +use sp_timestamp::Timestamp; pub mod basic; pub mod lookahead; +pub mod slot_based; + +// This is an arbitrary value which is likely guaranteed to exceed any reasonable +// limit, as it would correspond to 10 non-included blocks. +// +// Since we only search for parent blocks which have already been imported, +// we can guarantee that all imported blocks respect the unincluded segment +// rules specified by the parachain's runtime and thus will never be too deep. +const PARENT_SEARCH_DEPTH: usize = 10; /// Check the `local_validation_code_hash` against the validation code hash in the relay chain /// state. @@ -36,7 +58,7 @@ async fn check_validation_code_or_log( local_validation_code_hash: &ValidationCodeHash, para_id: ParaId, relay_client: &impl RelayChainInterface, - relay_parent: RHash, + relay_parent: RelayHash, ) { let state_validation_code_hash = match relay_client .validation_code_hash(relay_parent, para_id, OccupiedCoreAssumption::Included) @@ -77,3 +99,159 @@ async fn check_validation_code_or_log( }, } } + +/// Reads async backing parameters from the relay chain storage at the given relay parent. +async fn async_backing_params( + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> Option { + match load_abridged_host_configuration(relay_parent, relay_client).await { + Ok(Some(config)) => Some(config.async_backing_params), + Ok(None) => { + tracing::error!( + target: crate::LOG_TARGET, + "Active config is missing in relay chain storage", + ); + None + }, + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?relay_parent, + "Failed to read active config from relay chain client", + ); + None + }, + } +} + +// Return all the cores assigned to the para at the provided relay parent. +async fn cores_scheduled_for_para( + relay_parent: RelayHash, + para_id: ParaId, + relay_client: &impl RelayChainInterface, +) -> Vec { + // Get `AvailabilityCores` from runtime + let cores = match relay_client.availability_cores(relay_parent).await { + Ok(cores) => cores, + Err(error) => { + tracing::error!( + target: crate::LOG_TARGET, + ?error, + ?relay_parent, + "Failed to query availability cores runtime API", + ); + return Vec::new() + }, + }; + + let max_candidate_depth = async_backing_params(relay_parent, relay_client) + .await + .map(|c| c.max_candidate_depth) + .unwrap_or(0); + + cores + .iter() + .enumerate() + .filter_map(|(index, core)| { + let core_para_id = match core { + CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id), + CoreState::Occupied(occupied_core) if max_candidate_depth > 0 => occupied_core + .next_up_on_available + .as_ref() + .map(|scheduled_core| scheduled_core.para_id), + CoreState::Free | CoreState::Occupied(_) => None, + }; + + if core_para_id == Some(para_id) { + Some(CoreIndex(index as u32)) + } else { + None + } + }) + .collect() +} + +// Checks if we own the slot at the given block and whether there +// is space in the unincluded segment. +async fn can_build_upon( + slot: Slot, + timestamp: Timestamp, + parent_hash: Block::Hash, + included_block: Block::Hash, + client: &Client, + keystore: &KeystorePtr, +) -> Option> +where + Client: ProvideRuntimeApi, + Client::Api: AuraApi + AuraUnincludedSegmentApi, + P: Pair, + P::Public: Codec, + P::Signature: Codec, +{ + let runtime_api = client.runtime_api(); + let authorities = runtime_api.authorities(parent_hash).ok()?; + let author_pub = aura_internal::claim_slot::

(slot, &authorities, keystore).await?; + + // Here we lean on the property that building on an empty unincluded segment must always + // be legal. Skipping the runtime API query here allows us to seamlessly run this + // collator against chains which have not yet upgraded their runtime. + if parent_hash != included_block && + !runtime_api.can_build_upon(parent_hash, included_block, slot).ok()? + { + return None + } + + Some(SlotClaim::unchecked::

(author_pub, slot, timestamp)) +} + +/// Use [`cumulus_client_consensus_common::find_potential_parents`] to find parachain blocks that +/// we can build on. Once a list of potential parents is retrieved, return the last one of the +/// longest chain. +async fn find_parent( + relay_parent: ParaHash, + para_id: ParaId, + para_backend: &impl sc_client_api::Backend, + relay_client: &impl RelayChainInterface, +) -> Option<(::Hash, consensus_common::PotentialParent)> +where + Block: BlockT, +{ + let parent_search_params = ParentSearchParams { + relay_parent, + para_id, + ancestry_lookback: crate::collators::async_backing_params(relay_parent, relay_client) + .await + .map_or(0, |params| params.allowed_ancestry_len as usize), + max_depth: PARENT_SEARCH_DEPTH, + ignore_alternative_branches: true, + }; + + let potential_parents = cumulus_client_consensus_common::find_potential_parents::( + parent_search_params, + para_backend, + relay_client, + ) + .await; + + let potential_parents = match potential_parents { + Err(e) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_parent, + err = ?e, + "Could not fetch potential parents to build upon" + ); + + return None + }, + Ok(x) => x, + }; + + let included_block = potential_parents.iter().find(|x| x.depth == 0)?.hash; + potential_parents + .into_iter() + .max_by_key(|a| a.depth) + .map(|parent| (included_block, parent)) +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs new file mode 100644 index 000000000000..1fbc0689da86 --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -0,0 +1,491 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::{Codec, Encode}; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_aura::AuraUnincludedSegmentApi; +use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData}; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_primitives::{ + BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, + OccupiedCoreAssumption, +}; + +use futures::prelude::*; +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_consensus_aura::{AuraApi, Slot, SlotDuration}; +use sp_core::crypto::Pair; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; +use sp_timestamp::Timestamp; +use std::{sync::Arc, time::Duration}; + +use super::CollatorMessage; +use crate::{ + collator::{self as collator_util}, + collators::{check_validation_code_or_log, cores_scheduled_for_para}, + LOG_TARGET, +}; + +/// Parameters for [`run_block_builder`]. +pub struct BuilderTaskParams< + Block: BlockT, + BI, + CIDP, + Client, + Backend, + RelayClient, + CHP, + Proposer, + CS, +> { + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RelayClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The para's ID. + pub para_id: ParaId, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Channel to send built blocks to the collation task. + pub collator_sender: sc_utils::mpsc::TracingUnboundedSender>, + /// Slot duration of the relay chain + pub relay_chain_slot_duration: Duration, + /// Drift every slot by this duration. + /// This is a time quantity that is subtracted from the actual timestamp when computing + /// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the + /// intent to keep our "clock" slightly behind the relay chain one and thus reducing the + /// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to + /// wait for relay chain notifications because we woke up too early). + pub slot_drift: Duration, +} + +#[derive(Debug)] +struct SlotInfo { + pub timestamp: Timestamp, + pub slot: Slot, + pub slot_duration: SlotDuration, +} + +#[derive(Debug)] +struct SlotTimer { + client: Arc, + drift: Duration, + _marker: std::marker::PhantomData<(Block, Box)>, +} + +/// Returns current duration since Unix epoch. +fn duration_now() -> Duration { + use std::time::SystemTime; + let now = SystemTime::now(); + now.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_else(|e| { + panic!("Current time {:?} is before Unix epoch. Something is wrong: {:?}", now, e) + }) +} + +/// Returns the duration until the next slot from now. +fn time_until_next_slot(slot_duration: Duration, drift: Duration) -> Duration { + let now = duration_now().as_millis() - drift.as_millis(); + + let next_slot = (now + slot_duration.as_millis()) / slot_duration.as_millis(); + let remaining_millis = next_slot * slot_duration.as_millis() - now; + Duration::from_millis(remaining_millis as u64) +} + +impl SlotTimer +where + Block: BlockT, + Client: ProvideRuntimeApi + Send + Sync + 'static + UsageProvider, + Client::Api: AuraApi, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + pub fn new_with_drift(client: Arc, drift: Duration) -> Self { + Self { client, drift, _marker: Default::default() } + } + + /// Returns a future that resolves when the next slot arrives. + pub async fn wait_until_next_slot(&self) -> Result { + let Ok(slot_duration) = crate::slot_duration(&*self.client) else { + tracing::error!(target: crate::LOG_TARGET, "Failed to fetch slot duration from runtime."); + return Err(()) + }; + + let time_until_next_slot = time_until_next_slot(slot_duration.as_duration(), self.drift); + tokio::time::sleep(time_until_next_slot).await; + let timestamp = sp_timestamp::Timestamp::current(); + Ok(SlotInfo { + slot: Slot::from_timestamp(timestamp, slot_duration), + timestamp, + slot_duration, + }) + } +} + +/// Run block-builder. +pub fn run_block_builder( + params: BuilderTaskParams, +) -> impl Future + Send + 'static +where + Block: BlockT, + Client: ProvideRuntimeApi + + UsageProvider + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: + AuraApi + CollectCollationInfo + AuraUnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RelayClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + async move { + tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task."); + let BuilderTaskParams { + relay_client, + create_inherent_data_providers, + para_client, + keystore, + block_import, + para_id, + proposer, + collator_service, + collator_sender, + code_hash_provider, + authoring_duration, + para_backend, + relay_chain_slot_duration, + slot_drift, + } = params; + + let slot_timer = SlotTimer::<_, _, P>::new_with_drift(para_client.clone(), slot_drift); + + let mut collator = { + let params = collator_util::Params { + create_inherent_data_providers, + block_import, + relay_client: relay_client.clone(), + keystore: keystore.clone(), + para_id, + proposer, + collator_service, + }; + + collator_util::Collator::::new(params) + }; + + let mut relay_chain_fetcher = RelayChainCachingFetcher::new(relay_client.clone(), para_id); + + loop { + // We wait here until the next slot arrives. + let Ok(para_slot) = slot_timer.wait_until_next_slot().await else { + return; + }; + + let Some(expected_cores) = + expected_core_count(relay_chain_slot_duration, para_slot.slot_duration) + else { + return + }; + + let Ok(RelayChainData { + relay_parent_header, + max_pov_size, + relay_parent_hash: relay_parent, + scheduled_cores, + }) = relay_chain_fetcher.get_relay_chain_data().await + else { + continue; + }; + + if scheduled_cores.is_empty() { + tracing::debug!(target: LOG_TARGET, "Parachain not scheduled, skipping slot."); + continue; + } + + let core_index_in_scheduled: u64 = *para_slot.slot % expected_cores; + let Some(core_index) = scheduled_cores.get(core_index_in_scheduled as usize) else { + tracing::debug!(target: LOG_TARGET, core_index_in_scheduled, core_len = scheduled_cores.len(), "Para is scheduled, but not enough cores available."); + continue; + }; + + let Some((included_block, parent)) = + crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client) + .await + else { + continue + }; + + let parent_header = parent.header; + let parent_hash = parent.hash; + + // We mainly call this to inform users at genesis if there is a mismatch with the + // on-chain data. + collator.collator_service().check_block_status(parent_hash, &parent_header); + + let slot_claim = match crate::collators::can_build_upon::<_, _, P>( + para_slot.slot, + para_slot.timestamp, + parent_hash, + included_block, + &*para_client, + &keystore, + ) + .await + { + Some(slot) => slot, + None => { + tracing::debug!( + target: crate::LOG_TARGET, + ?core_index, + slot_info = ?para_slot, + unincluded_segment_len = parent.depth, + relay_parent = %relay_parent, + included = %included_block, + parent = %parent_hash, + "Not building block." + ); + continue + }, + }; + + tracing::debug!( + target: crate::LOG_TARGET, + ?core_index, + slot_info = ?para_slot, + unincluded_segment_len = parent.depth, + relay_parent = %relay_parent, + included = %included_block, + parent = %parent_hash, + "Building block." + ); + + let validation_data = PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_parent_header.number(), + relay_parent_storage_root: *relay_parent_header.state_root(), + max_pov_size, + }; + + let (parachain_inherent_data, other_inherent_data) = match collator + .create_inherent_data( + relay_parent, + &validation_data, + parent_hash, + slot_claim.timestamp(), + ) + .await + { + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break + }, + Ok(x) => x, + }; + + let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) { + None => { + tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash"); + break + }, + Some(v) => v, + }; + + check_validation_code_or_log( + &validation_code_hash, + para_id, + &relay_client, + relay_parent, + ) + .await; + + let Ok(Some(candidate)) = collator + .build_block_and_import( + &parent_header, + &slot_claim, + None, + (parachain_inherent_data, other_inherent_data), + authoring_duration, + // Set the block limit to 50% of the maximum PoV size. + // + // TODO: If we got benchmarking that includes the proof size, + // we should be able to use the maximum pov size. + (validation_data.max_pov_size / 2) as usize, + ) + .await + else { + tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot."); + continue; + }; + + let new_block_hash = candidate.block.header().hash(); + + // Announce the newly built block to our peers. + collator.collator_service().announce_block(new_block_hash, None); + + if let Err(err) = collator_sender.unbounded_send(CollatorMessage { + relay_parent, + parent_header, + parachain_candidate: candidate, + validation_code_hash, + core_index: *core_index, + }) { + tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task."); + return + } + } + } +} + +/// Calculate the expected core count based on the slot duration of the relay and parachain. +/// +/// If `slot_duration` is smaller than `relay_chain_slot_duration` that means that we produce more +/// than one parachain block per relay chain block. In order to get these backed, we need multiple +/// cores. This method calculates how many cores we should expect to have scheduled under the +/// assumption that we have a fixed number of cores assigned to our parachain. +fn expected_core_count( + relay_chain_slot_duration: Duration, + slot_duration: SlotDuration, +) -> Option { + let slot_duration_millis = slot_duration.as_millis(); + u64::try_from(relay_chain_slot_duration.as_millis()) + .map_err(|e| tracing::error!("Unable to calculate expected parachain core count: {e}")) + .map(|relay_slot_duration| (relay_slot_duration / slot_duration_millis).max(1)) + .ok() +} + +/// Contains relay chain data necessary for parachain block building. +#[derive(Clone)] +struct RelayChainData { + /// Current relay chain parent header. + pub relay_parent_header: RelayHeader, + /// The cores this para is scheduled on in the context of the relay parent. + pub scheduled_cores: Vec, + /// Maximum configured PoV size on the relay chain. + pub max_pov_size: u32, + /// Current relay chain parent header. + pub relay_parent_hash: RelayHash, +} + +/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block +/// hash. +struct RelayChainCachingFetcher { + relay_client: RI, + para_id: ParaId, + last_data: Option<(RelayHash, RelayChainData)>, +} + +impl RelayChainCachingFetcher +where + RI: RelayChainInterface + Clone + 'static, +{ + pub fn new(relay_client: RI, para_id: ParaId) -> Self { + Self { relay_client, para_id, last_data: None } + } + + /// Fetch required [`RelayChainData`] from the relay chain. + /// If this data has been fetched in the past for the incoming hash, it will reuse + /// cached data. + pub async fn get_relay_chain_data(&mut self) -> Result { + let Ok(relay_parent) = self.relay_client.best_block_hash().await else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash."); + return Err(()) + }; + + match &self.last_data { + Some((last_seen_hash, data)) if *last_seen_hash == relay_parent => { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent."); + Ok(data.clone()) + }, + _ => { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain."); + let data = self.update_for_relay_parent(relay_parent).await?; + self.last_data = Some((relay_parent, data.clone())); + Ok(data) + }, + } + } + + /// Fetch fresh data from the relay chain for the given relay parent hash. + async fn update_for_relay_parent(&self, relay_parent: RelayHash) -> Result { + let scheduled_cores = + cores_scheduled_for_para(relay_parent, self.para_id, &self.relay_client).await; + let Ok(Some(relay_parent_header)) = + self.relay_client.header(BlockId::Hash(relay_parent)).await + else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header."); + return Err(()) + }; + + let max_pov_size = match self + .relay_client + .persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included) + .await + { + Ok(None) => return Err(()), + Ok(Some(pvd)) => pvd.max_pov_size, + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client"); + return Err(()) + }, + }; + + Ok(RelayChainData { + relay_parent_hash: relay_parent, + relay_parent_header, + scheduled_cores, + max_pov_size, + }) + } +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs new file mode 100644 index 000000000000..5b8151f6302c --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/collation_task.rs @@ -0,0 +1,140 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::Encode; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; +use polkadot_overseer::Handle as OverseerHandle; +use polkadot_primitives::{CollatorPair, Id as ParaId}; + +use futures::prelude::*; + +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_runtime::traits::{Block as BlockT, Header}; + +use super::CollatorMessage; + +const LOG_TARGET: &str = "aura::cumulus::collation_task"; + +/// Parameters for the collation task. +pub struct Params { + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The para's ID. + pub para_id: ParaId, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Collator service interface + pub collator_service: CS, + /// Receiver channel for communication with the block builder task. + pub collator_receiver: TracingUnboundedReceiver>, +} + +/// Asynchronously executes the collation task for a parachain. +/// +/// This function initializes the collator subsystems necessary for producing and submitting +/// collations to the relay chain. It listens for new best relay chain block notifications and +/// handles collator messages. If our parachain is scheduled on a core and we have a candidate, +/// the task will build a collation and send it to the relay chain. +pub async fn run_collation_task(mut params: Params) +where + Block: BlockT, + CS: CollatorServiceInterface + Send + Sync + 'static, + RClient: RelayChainInterface + Clone + 'static, +{ + let Ok(mut overseer_handle) = params.relay_client.overseer_handle() else { + tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); + return + }; + + cumulus_client_collator::initialize_collator_subsystems( + &mut overseer_handle, + params.collator_key, + params.para_id, + params.reinitialize, + ) + .await; + + let collator_service = params.collator_service; + while let Some(collator_message) = params.collator_receiver.next().await { + handle_collation_message(collator_message, &collator_service, &mut overseer_handle).await; + } +} + +/// Handle an incoming collation message from the block builder task. +/// This builds the collation from the [`CollatorMessage`] and submits it to +/// the collation-generation subsystem of the relay chain. +async fn handle_collation_message( + message: CollatorMessage, + collator_service: &impl CollatorServiceInterface, + overseer_handle: &mut OverseerHandle, +) { + let CollatorMessage { + parent_header, + parachain_candidate, + validation_code_hash, + relay_parent, + core_index, + } = message; + + let hash = parachain_candidate.block.header().hash(); + let number = *parachain_candidate.block.header().number(); + let (collation, block_data) = + match collator_service.build_collation(&parent_header, hash, parachain_candidate) { + Some(collation) => collation, + None => { + tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation."); + return; + }, + }; + + tracing::info!( + target: LOG_TARGET, + "PoV size {{ header: {:.2}kB, extrinsics: {:.2}kB, storage_proof: {:.2}kB }}", + block_data.header().encoded_size() as f64 / 1024f64, + block_data.extrinsics().encoded_size() as f64 / 1024f64, + block_data.storage_proof().encoded_size() as f64 / 1024f64, + ); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + tracing::debug!(target: LOG_TARGET, ?core_index, %hash, %number, "Submitting collation for core."); + overseer_handle + .send_msg( + CollationGenerationMessage::SubmitCollation(SubmitCollationParams { + relay_parent, + collation, + parent_head: parent_header.encode().into(), + validation_code_hash, + core_index, + result_sender: None, + }), + "SubmitCollation", + ) + .await; +} diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs new file mode 100644 index 000000000000..0fe49d58d25b --- /dev/null +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -0,0 +1,178 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! A collator for Aura that looks ahead of the most recently included parachain block +//! when determining what to build upon. +//! +//! The block building mechanism consists of two parts: +//! 1. A block-builder task that builds parachain blocks at each of our slots. +//! 2. A collator task that transforms the blocks into a collation and submits them to the relay +//! chain. +//! +//! Blocks are built on every parachain slot if there is a core scheduled on the relay chain. At the +//! beginning of each block building loop, we determine how many blocks we expect to build per relay +//! chain block. The collator implementation then expects that we have that many cores scheduled +//! during the relay chain block. After the block is built, the block builder task sends it to +//! the collation task which compresses it and submits it to the collation-generation subsystem. + +use codec::Codec; +use consensus_common::ParachainCandidate; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_aura::AuraUnincludedSegmentApi; +use cumulus_primitives_core::CollectCollationInfo; +use cumulus_relay_chain_interface::RelayChainInterface; +use polkadot_primitives::{ + CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash, +}; + +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sc_utils::mpsc::tracing_unbounded; + +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_consensus_aura::AuraApi; +use sp_core::crypto::Pair; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Member}; + +use std::{sync::Arc, time::Duration}; + +use self::{block_builder_task::run_block_builder, collation_task::run_collation_task}; + +mod block_builder_task; +mod collation_task; + +/// Parameters for [`run`]. +pub struct Params { + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The para's ID. + pub para_id: ParaId, + /// The length of slots in the relay chain. + pub relay_chain_slot_duration: Duration, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Drift slots by a fixed duration. This can be used to create more preferrable authoring + /// timings. + pub slot_drift: Duration, +} + +/// Run aura-based block building and collation task. +pub fn run( + params: Params, +) -> (impl futures::Future, impl futures::Future) +where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + UsageProvider + + Send + + Sync + + 'static, + Client::Api: + AuraApi + CollectCollationInfo + AuraUnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + Clone + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair + 'static, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100); + let collator_task_params = collation_task::Params { + relay_client: params.relay_client.clone(), + collator_key: params.collator_key, + para_id: params.para_id, + reinitialize: params.reinitialize, + collator_service: params.collator_service.clone(), + collator_receiver: rx, + }; + + let collation_task_fut = run_collation_task::(collator_task_params); + + let block_builder_params = block_builder_task::BuilderTaskParams { + create_inherent_data_providers: params.create_inherent_data_providers, + block_import: params.block_import, + para_client: params.para_client, + para_backend: params.para_backend, + relay_client: params.relay_client, + code_hash_provider: params.code_hash_provider, + keystore: params.keystore, + para_id: params.para_id, + proposer: params.proposer, + collator_service: params.collator_service, + authoring_duration: params.authoring_duration, + collator_sender: tx, + relay_chain_slot_duration: params.relay_chain_slot_duration, + slot_drift: params.slot_drift, + }; + + let block_builder_fut = + run_block_builder::(block_builder_params); + + (collation_task_fut, block_builder_fut) +} + +/// Message to be sent from the block builder to the collation task. +/// +/// Contains all data necessary to submit a collation to the relay chain. +struct CollatorMessage { + /// The hash of the relay chain block that provides the context for the parachain block. + pub relay_parent: RelayHash, + /// The header of the parent block. + pub parent_header: Block::Header, + /// The parachain block candidate. + pub parachain_candidate: ParachainCandidate, + /// The validation code hash at the parent block. + pub validation_code_hash: ValidationCodeHash, + /// Core index that this block should be submitted on + pub core_index: CoreIndex, +} diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 2b0d8290182a..e12750dcc553 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -19,16 +19,13 @@ use polkadot_primitives::{ Block as PBlock, Hash as PHash, Header as PHeader, PersistedValidationData, ValidationCodeHash, }; -use cumulus_primitives_core::{ - relay_chain::{self, BlockId as RBlockId, OccupiedCoreAssumption}, - AbridgedHostConfiguration, ParaId, -}; +use cumulus_primitives_core::{relay_chain, AbridgedHostConfiguration}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface}; -use sc_client_api::{Backend, HeaderBackend}; +use sc_client_api::Backend; use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult}; -use sp_blockchain::Backend as BlockchainBackend; use sp_consensus_slots::Slot; + use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_timestamp::Timestamp; @@ -36,9 +33,12 @@ use std::{sync::Arc, time::Duration}; mod level_monitor; mod parachain_consensus; +mod parent_search; #[cfg(test)] mod tests; +pub use parent_search::*; + pub use parachain_consensus::run_parachain_consensus; use level_monitor::LevelMonitor; @@ -229,196 +229,6 @@ pub trait ParachainBlockImportMarker {} impl ParachainBlockImportMarker for ParachainBlockImport {} -/// Parameters when searching for suitable parents to build on top of. -#[derive(Debug)] -pub struct ParentSearchParams { - /// The relay-parent that is intended to be used. - pub relay_parent: PHash, - /// The ID of the parachain. - pub para_id: ParaId, - /// A limitation on the age of relay parents for parachain blocks that are being - /// considered. This is relative to the `relay_parent` number. - pub ancestry_lookback: usize, - /// How "deep" parents can be relative to the included parachain block at the relay-parent. - /// The included block has depth 0. - pub max_depth: usize, - /// Whether to only ignore "alternative" branches, i.e. branches of the chain - /// which do not contain the block pending availability. - pub ignore_alternative_branches: bool, -} - -/// A potential parent block returned from [`find_potential_parents`] -#[derive(Debug, PartialEq)] -pub struct PotentialParent { - /// The hash of the block. - pub hash: B::Hash, - /// The header of the block. - pub header: B::Header, - /// The depth of the block. - pub depth: usize, - /// Whether the block is the included block, is itself pending on-chain, or descends - /// from the block pending availability. - pub aligned_with_pending: bool, -} - -/// Perform a recursive search through blocks to find potential -/// parent blocks for a new block. -/// -/// This accepts a relay-chain block to be used as an anchor and a maximum search depth, -/// along with some arguments for filtering parachain blocks and performs a recursive search -/// for parachain blocks. The search begins at the last included parachain block and returns -/// a set of [`PotentialParent`]s which could be potential parents of a new block with this -/// relay-parent according to the search parameters. -/// -/// A parachain block is a potential parent if it is either the last included parachain block, the -/// pending parachain block (when `max_depth` >= 1), or all of the following hold: -/// * its parent is a potential parent -/// * its relay-parent is within `ancestry_lookback` of the targeted relay-parent. -/// * its relay-parent is within the same session as the targeted relay-parent. -/// * the block number is within `max_depth` blocks of the included block -pub async fn find_potential_parents( - params: ParentSearchParams, - client: &impl Backend, - relay_client: &impl RelayChainInterface, -) -> Result>, RelayChainError> { - // 1. Build up the ancestry record of the relay chain to compare against. - let rp_ancestry = { - let mut ancestry = Vec::with_capacity(params.ancestry_lookback + 1); - let mut current_rp = params.relay_parent; - let mut required_session = None; - - while ancestry.len() <= params.ancestry_lookback { - let header = match relay_client.header(RBlockId::hash(current_rp)).await? { - None => break, - Some(h) => h, - }; - - let session = relay_client.session_index_for_child(current_rp).await?; - if let Some(required_session) = required_session { - // Respect the relay-chain rule not to cross session boundaries. - if session != required_session { - break - } - } else { - required_session = Some(session); - } - - ancestry.push((current_rp, *header.state_root())); - current_rp = *header.parent_hash(); - - // don't iterate back into the genesis block. - if header.number == 1 { - break - } - } - - ancestry - }; - - let is_hash_in_ancestry = |hash| rp_ancestry.iter().any(|x| x.0 == hash); - let is_root_in_ancestry = |root| rp_ancestry.iter().any(|x| x.1 == root); - - // 2. Get the included and pending availability blocks. - let included_header = relay_client - .persisted_validation_data( - params.relay_parent, - params.para_id, - OccupiedCoreAssumption::TimedOut, - ) - .await?; - - let included_header = match included_header { - Some(pvd) => pvd.parent_head, - None => return Ok(Vec::new()), // this implies the para doesn't exist. - }; - - let pending_header = relay_client - .persisted_validation_data( - params.relay_parent, - params.para_id, - OccupiedCoreAssumption::Included, - ) - .await? - .and_then(|x| if x.parent_head != included_header { Some(x.parent_head) } else { None }); - - let included_header = match B::Header::decode(&mut &included_header.0[..]).ok() { - None => return Ok(Vec::new()), - Some(x) => x, - }; - // Silently swallow if pending block can't decode. - let pending_header = pending_header.and_then(|p| B::Header::decode(&mut &p.0[..]).ok()); - let included_hash = included_header.hash(); - let pending_hash = pending_header.as_ref().map(|hdr| hdr.hash()); - - let mut frontier = vec![PotentialParent:: { - hash: included_hash, - header: included_header, - depth: 0, - aligned_with_pending: true, - }]; - - // Recursive search through descendants of the included block which have acceptable - // relay parents. - let mut potential_parents = Vec::new(); - while let Some(entry) = frontier.pop() { - let is_pending = - entry.depth == 1 && pending_hash.as_ref().map_or(false, |h| &entry.hash == h); - let is_included = entry.depth == 0; - - // note: even if the pending block or included block have a relay parent - // outside of the expected part of the relay chain, they are always allowed - // because they have already been posted on chain. - let is_potential = is_pending || is_included || { - let digest = entry.header.digest(); - cumulus_primitives_core::extract_relay_parent(digest).map_or(false, is_hash_in_ancestry) || - cumulus_primitives_core::rpsr_digest::extract_relay_parent_storage_root(digest) - .map(|(r, _n)| r) - .map_or(false, is_root_in_ancestry) - }; - - let parent_aligned_with_pending = entry.aligned_with_pending; - let child_depth = entry.depth + 1; - let hash = entry.hash; - - if is_potential { - potential_parents.push(entry); - } - - if !is_potential || child_depth > params.max_depth { - continue - } - - // push children onto search frontier. - for child in client.blockchain().children(hash).ok().into_iter().flatten() { - let aligned_with_pending = parent_aligned_with_pending && - if child_depth == 1 { - pending_hash.as_ref().map_or(true, |h| &child == h) - } else { - true - }; - - if params.ignore_alternative_branches && !aligned_with_pending { - continue - } - - let header = match client.blockchain().header(child) { - Ok(Some(h)) => h, - Ok(None) => continue, - Err(_) => continue, - }; - - frontier.push(PotentialParent { - hash: child, - header, - depth: child_depth, - aligned_with_pending, - }); - } - } - - Ok(potential_parents) -} - /// Get the relay-parent slot and timestamp from a header. pub fn relay_slot_and_timestamp( relay_parent_header: &PHeader, diff --git a/cumulus/client/consensus/common/src/parachain_consensus.rs b/cumulus/client/consensus/common/src/parachain_consensus.rs index b4b315bb32be..944917673b11 100644 --- a/cumulus/client/consensus/common/src/parachain_consensus.rs +++ b/cumulus/client/consensus/common/src/parachain_consensus.rs @@ -375,60 +375,61 @@ async fn handle_new_best_parachain_head( target: LOG_TARGET, block_hash = ?hash, "Skipping set new best block, because block is already the best.", - ) - } else { - // Make sure the block is already known or otherwise we skip setting new best. - match parachain.block_status(hash) { - Ok(BlockStatus::InChainWithState) => { - unset_best_header.take(); - tracing::debug!( - target: LOG_TARGET, - ?hash, - "Importing block as new best for parachain.", - ); - import_block_as_new_best(hash, parachain_head, parachain).await; - }, - Ok(BlockStatus::InChainPruned) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - "Trying to set pruned block as new best!", - ); - }, - Ok(BlockStatus::Unknown) => { - *unset_best_header = Some(parachain_head); + ); + return; + } - tracing::debug!( - target: LOG_TARGET, - block_hash = ?hash, - "Parachain block not yet imported, waiting for import to enact as best block.", - ); - - if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { - // Best effort channel to actively encourage block recovery. - // An error here is not fatal; the relay chain continuously re-announces - // the best block, thus we will have other opportunities to retry. - let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; - if let Err(err) = recovery_chan_tx.try_send(req) { - tracing::warn!( - target: LOG_TARGET, - block_hash = ?hash, - error = ?err, - "Unable to notify block recovery subsystem" - ) - } + // Make sure the block is already known or otherwise we skip setting new best. + match parachain.block_status(hash) { + Ok(BlockStatus::InChainWithState) => { + unset_best_header.take(); + tracing::debug!( + target: LOG_TARGET, + included = ?hash, + "Importing block as new best for parachain.", + ); + import_block_as_new_best(hash, parachain_head, parachain).await; + }, + Ok(BlockStatus::InChainPruned) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + "Trying to set pruned block as new best!", + ); + }, + Ok(BlockStatus::Unknown) => { + *unset_best_header = Some(parachain_head); + + tracing::debug!( + target: LOG_TARGET, + block_hash = ?hash, + "Parachain block not yet imported, waiting for import to enact as best block.", + ); + + if let Some(ref mut recovery_chan_tx) = recovery_chan_tx { + // Best effort channel to actively encourage block recovery. + // An error here is not fatal; the relay chain continuously re-announces + // the best block, thus we will have other opportunities to retry. + let req = RecoveryRequest { hash, kind: RecoveryKind::Full }; + if let Err(err) = recovery_chan_tx.try_send(req) { + tracing::warn!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?err, + "Unable to notify block recovery subsystem" + ) } - }, - Err(e) => { - tracing::error!( - target: LOG_TARGET, - block_hash = ?hash, - error = ?e, - "Failed to get block status of block.", - ); - }, - _ => {}, - } + } + }, + Err(e) => { + tracing::error!( + target: LOG_TARGET, + block_hash = ?hash, + error = ?e, + "Failed to get block status of block.", + ); + }, + _ => {}, } } diff --git a/cumulus/client/consensus/common/src/parent_search.rs b/cumulus/client/consensus/common/src/parent_search.rs new file mode 100644 index 000000000000..c371ec62f845 --- /dev/null +++ b/cumulus/client/consensus/common/src/parent_search.rs @@ -0,0 +1,418 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::Decode; +use polkadot_primitives::Hash as RelayHash; + +use cumulus_primitives_core::{ + relay_chain::{BlockId as RBlockId, OccupiedCoreAssumption}, + ParaId, +}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface}; + +use sc_client_api::{Backend, HeaderBackend}; + +use sp_blockchain::{Backend as BlockchainBackend, TreeRoute}; + +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +const PARENT_SEARCH_LOG_TARGET: &str = "consensus::common::find_potential_parents"; + +/// Parameters when searching for suitable parents to build on top of. +#[derive(Debug)] +pub struct ParentSearchParams { + /// The relay-parent that is intended to be used. + pub relay_parent: RelayHash, + /// The ID of the parachain. + pub para_id: ParaId, + /// A limitation on the age of relay parents for parachain blocks that are being + /// considered. This is relative to the `relay_parent` number. + pub ancestry_lookback: usize, + /// How "deep" parents can be relative to the included parachain block at the relay-parent. + /// The included block has depth 0. + pub max_depth: usize, + /// Whether to only ignore "alternative" branches, i.e. branches of the chain + /// which do not contain the block pending availability. + pub ignore_alternative_branches: bool, +} + +/// A potential parent block returned from [`find_potential_parents`] +#[derive(PartialEq)] +pub struct PotentialParent { + /// The hash of the block. + pub hash: B::Hash, + /// The header of the block. + pub header: B::Header, + /// The depth of the block with respect to the included block. + pub depth: usize, + /// Whether the block is the included block, is itself pending on-chain, or descends + /// from the block pending availability. + pub aligned_with_pending: bool, +} + +impl std::fmt::Debug for PotentialParent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PotentialParent") + .field("hash", &self.hash) + .field("depth", &self.depth) + .field("aligned_with_pending", &self.aligned_with_pending) + .field("number", &self.header.number()) + .finish() + } +} + +/// Perform a recursive search through blocks to find potential +/// parent blocks for a new block. +/// +/// This accepts a relay-chain block to be used as an anchor and a maximum search depth, +/// along with some arguments for filtering parachain blocks and performs a recursive search +/// for parachain blocks. The search begins at the last included parachain block and returns +/// a set of [`PotentialParent`]s which could be potential parents of a new block with this +/// relay-parent according to the search parameters. +/// +/// A parachain block is a potential parent if it is either the last included parachain block, the +/// pending parachain block (when `max_depth` >= 1), or all of the following hold: +/// * its parent is a potential parent +/// * its relay-parent is within `ancestry_lookback` of the targeted relay-parent. +/// * its relay-parent is within the same session as the targeted relay-parent. +/// * the block number is within `max_depth` blocks of the included block +pub async fn find_potential_parents( + params: ParentSearchParams, + backend: &impl Backend, + relay_client: &impl RelayChainInterface, +) -> Result>, RelayChainError> { + tracing::trace!("Parent search parameters: {params:?}"); + // Get the included block. + let Some((included_header, included_hash)) = + fetch_included_from_relay_chain(relay_client, backend, params.para_id, params.relay_parent) + .await? + else { + return Ok(Default::default()) + }; + + let only_included = vec![PotentialParent { + hash: included_hash, + header: included_header.clone(), + depth: 0, + aligned_with_pending: true, + }]; + + if params.max_depth == 0 { + return Ok(only_included) + }; + + // Pending header and hash. + let maybe_pending = { + // Fetch the most recent pending header from the relay chain. We use + // `OccupiedCoreAssumption::Included` so the candidate pending availability gets enacted + // before being returned to us. + let pending_header = relay_client + .persisted_validation_data( + params.relay_parent, + params.para_id, + OccupiedCoreAssumption::Included, + ) + .await? + .and_then(|p| B::Header::decode(&mut &p.parent_head.0[..]).ok()) + .filter(|x| x.hash() != included_hash); + + // If the pending block is not locally known, we can't do anything. + if let Some(header) = pending_header { + let pending_hash = header.hash(); + match backend.blockchain().header(pending_hash) { + // We are supposed to ignore branches that don't contain the pending block, but we + // do not know the pending block locally. + Ok(None) | Err(_) if params.ignore_alternative_branches => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %pending_hash, + "Failed to get header for pending block.", + ); + return Ok(Default::default()) + }, + Ok(Some(_)) => Some((header, pending_hash)), + _ => None, + } + } else { + None + } + }; + + let maybe_route_to_last_pending = maybe_pending + .as_ref() + .map(|(_, pending)| { + sp_blockchain::tree_route(backend.blockchain(), included_hash, *pending) + }) + .transpose()?; + + // If we want to ignore alternative branches there is no reason to start + // the parent search at the included block. We can add the included block and + // the path to the pending block to the potential parents directly (limited by max_depth). + let (frontier, potential_parents) = match ( + &maybe_pending, + params.ignore_alternative_branches, + &maybe_route_to_last_pending, + ) { + (Some((pending_header, pending_hash)), true, Some(ref route_to_pending)) => { + let mut potential_parents = only_included; + + // This is a defensive check, should never happen. + if !route_to_pending.retracted().is_empty() { + tracing::warn!(target: PARENT_SEARCH_LOG_TARGET, "Included block not an ancestor of pending block. This should not happen."); + return Ok(Default::default()) + } + + // Add all items on the path included -> pending - 1 to the potential parents, but + // not more than `max_depth`. + let num_parents_on_path = + route_to_pending.enacted().len().saturating_sub(1).min(params.max_depth); + for (num, block) in + route_to_pending.enacted().iter().take(num_parents_on_path).enumerate() + { + let Ok(Some(header)) = backend.blockchain().header(block.hash) else { continue }; + + potential_parents.push(PotentialParent { + hash: block.hash, + header, + depth: 1 + num, + aligned_with_pending: true, + }); + } + + // The search for additional potential parents should now start at the children of + // the pending block. + ( + vec![PotentialParent { + hash: *pending_hash, + header: pending_header.clone(), + depth: route_to_pending.enacted().len(), + aligned_with_pending: true, + }], + potential_parents, + ) + }, + _ => (only_included, Default::default()), + }; + + if potential_parents.len() > params.max_depth { + return Ok(potential_parents); + } + + // Build up the ancestry record of the relay chain to compare against. + let rp_ancestry = + build_relay_parent_ancestry(params.ancestry_lookback, params.relay_parent, relay_client) + .await?; + + Ok(search_child_branches_for_parents( + frontier, + maybe_route_to_last_pending, + included_header, + maybe_pending.map(|(_, hash)| hash), + backend, + params.max_depth, + params.ignore_alternative_branches, + rp_ancestry, + potential_parents, + )) +} + +/// Fetch the included block from the relay chain. +async fn fetch_included_from_relay_chain( + relay_client: &impl RelayChainInterface, + backend: &impl Backend, + para_id: ParaId, + relay_parent: RelayHash, +) -> Result, RelayChainError> { + // Fetch the pending header from the relay chain. We use `OccupiedCoreAssumption::TimedOut` + // so that even if there is a pending candidate, we assume it is timed out and we get the + // included head. + let included_header = relay_client + .persisted_validation_data(relay_parent, para_id, OccupiedCoreAssumption::TimedOut) + .await?; + let included_header = match included_header { + Some(pvd) => pvd.parent_head, + None => return Ok(None), // this implies the para doesn't exist. + }; + + let included_header = match B::Header::decode(&mut &included_header.0[..]).ok() { + None => return Ok(None), + Some(x) => x, + }; + + let included_hash = included_header.hash(); + // If the included block is not locally known, we can't do anything. + match backend.blockchain().header(included_hash) { + Ok(None) => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %included_hash, + "Failed to get header for included block.", + ); + return Ok(None) + }, + Err(e) => { + tracing::warn!( + target: PARENT_SEARCH_LOG_TARGET, + %included_hash, + %e, + "Failed to get header for included block.", + ); + return Ok(None) + }, + _ => {}, + }; + + Ok(Some((included_header, included_hash))) +} + +/// Build an ancestry of relay parents that are acceptable. +/// +/// An acceptable relay parent is one that is no more than `ancestry_lookback` + 1 blocks below the +/// relay parent we want to build on. Parachain blocks anchored on relay parents older than that can +/// not be considered potential parents for block building. They have no chance of still getting +/// included, so our newly build parachain block would also not get included. +/// +/// On success, returns a vector of `(header_hash, state_root)` of the relevant relay chain +/// ancestry blocks. +async fn build_relay_parent_ancestry( + ancestry_lookback: usize, + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> Result, RelayChainError> { + let mut ancestry = Vec::with_capacity(ancestry_lookback + 1); + let mut current_rp = relay_parent; + let mut required_session = None; + while ancestry.len() <= ancestry_lookback { + let Some(header) = relay_client.header(RBlockId::hash(current_rp)).await? else { break }; + + let session = relay_client.session_index_for_child(current_rp).await?; + if required_session.get_or_insert(session) != &session { + // Respect the relay-chain rule not to cross session boundaries. + break; + } + + ancestry.push((current_rp, *header.state_root())); + current_rp = *header.parent_hash(); + + // don't iterate back into the genesis block. + if header.number == 1 { + break + } + } + Ok(ancestry) +} + +/// Start search for child blocks that can be used as parents. +pub fn search_child_branches_for_parents( + mut frontier: Vec>, + maybe_route_to_last_pending: Option>, + included_header: Block::Header, + pending_hash: Option, + backend: &impl Backend, + max_depth: usize, + ignore_alternative_branches: bool, + rp_ancestry: Vec<(RelayHash, RelayHash)>, + mut potential_parents: Vec>, +) -> Vec> { + let included_hash = included_header.hash(); + let is_hash_in_ancestry = |hash| rp_ancestry.iter().any(|x| x.0 == hash); + let is_root_in_ancestry = |root| rp_ancestry.iter().any(|x| x.1 == root); + + // The distance between pending and included block. Is later used to check if a child + // is aligned with pending when it is between pending and included block. + let pending_distance = maybe_route_to_last_pending.as_ref().map(|route| route.enacted().len()); + + // If a block is on the path included -> pending, we consider it `aligned_with_pending`. + let is_child_pending = |hash| { + maybe_route_to_last_pending + .as_ref() + .map_or(true, |route| route.enacted().iter().any(|x| x.hash == hash)) + }; + + tracing::trace!( + target: PARENT_SEARCH_LOG_TARGET, + ?included_hash, + included_num = ?included_header.number(), + ?pending_hash , + ?rp_ancestry, + "Searching relay chain ancestry." + ); + while let Some(entry) = frontier.pop() { + let is_pending = pending_hash.as_ref().map_or(false, |h| &entry.hash == h); + let is_included = included_hash == entry.hash; + + // note: even if the pending block or included block have a relay parent + // outside of the expected part of the relay chain, they are always allowed + // because they have already been posted on chain. + let is_potential = is_pending || is_included || { + let digest = entry.header.digest(); + let is_hash_in_ancestry_check = cumulus_primitives_core::extract_relay_parent(digest) + .map_or(false, is_hash_in_ancestry); + let is_root_in_ancestry_check = + cumulus_primitives_core::rpsr_digest::extract_relay_parent_storage_root(digest) + .map(|(r, _n)| r) + .map_or(false, is_root_in_ancestry); + + is_hash_in_ancestry_check || is_root_in_ancestry_check + }; + + let parent_aligned_with_pending = entry.aligned_with_pending; + let child_depth = entry.depth + 1; + let hash = entry.hash; + + tracing::trace!( + target: PARENT_SEARCH_LOG_TARGET, + ?hash, + is_potential, + is_pending, + is_included, + "Checking potential parent." + ); + + if is_potential { + potential_parents.push(entry); + } + + if !is_potential || child_depth > max_depth { + continue + } + + // push children onto search frontier. + for child in backend.blockchain().children(hash).ok().into_iter().flatten() { + tracing::trace!(target: PARENT_SEARCH_LOG_TARGET, ?child, child_depth, ?pending_distance, "Looking at child."); + + let aligned_with_pending = parent_aligned_with_pending && + (pending_distance.map_or(true, |dist| child_depth > dist) || + is_child_pending(child)); + + if ignore_alternative_branches && !aligned_with_pending { + tracing::trace!(target: PARENT_SEARCH_LOG_TARGET, ?child, "Child is not aligned with pending block."); + continue + } + + let Ok(Some(header)) = backend.blockchain().header(child) else { continue }; + + frontier.push(PotentialParent { + hash: child, + header, + depth: child_depth, + aligned_with_pending, + }); + } + } + + potential_parents +} diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index 2a944bc7f9fa..284fa39ed1e7 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use codec::Encode; use cumulus_client_pov_recovery::RecoveryKind; use cumulus_primitives_core::{ - relay_chain::{self, BlockId}, + relay_chain::{BlockId, BlockNumber, CoreState}, CumulusDigestItem, InboundDownwardMessage, InboundHrmpMessage, }; use cumulus_relay_chain_interface::{ @@ -37,6 +37,7 @@ use futures_timer::Delay; use polkadot_primitives::HeadData; use sc_client_api::{Backend as _, UsageProvider}; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; +use sp_blockchain::Backend as BlockchainBackend; use sp_consensus::{BlockOrigin, BlockStatus}; use sp_version::RuntimeVersion; use std::{ @@ -46,11 +47,11 @@ use std::{ time::Duration, }; -fn relay_block_num_from_hash(hash: &PHash) -> relay_chain::BlockNumber { +fn relay_block_num_from_hash(hash: &PHash) -> BlockNumber { hash.to_low_u64_be() as u32 } -fn relay_hash_from_block_num(block_number: relay_chain::BlockNumber) -> PHash { +fn relay_hash_from_block_num(block_number: BlockNumber) -> PHash { PHash::from_low_u64_be(block_number as u64) } @@ -257,6 +258,13 @@ impl RelayChainInterface for Relaychain { })) } + async fn availability_cores( + &self, + _relay_parent: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test"); + } + async fn version(&self, _: PHash) -> RelayChainResult { unimplemented!("Not needed for test") } @@ -1138,6 +1146,357 @@ fn find_potential_parents_with_max_depth() { } } +#[test] +fn find_potential_parents_unknown_included() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + + let sproof = sproof_with_best_parent(&client); + let included_but_unknown = build_block(&*client, sproof, None, None, Some(relay_parent)); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert_eq!(potential_parents.len(), 0); +} + +#[test] +fn find_potential_parents_unknown_pending() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let sproof = sproof_with_parent_by_hash(&client, included_block.header().hash()); + let pending_but_unknown = build_block( + &*client, + sproof, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert!(potential_parents.is_empty()); +} + +#[test] +fn find_potential_parents_unknown_pending_include_alternative_branches() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let alt_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(included_block.header().hash()), + None, + Some(search_relay_parent), + ); + + tracing::info!(hash = %alt_block.header().hash(), "Alt block."); + let sproof = sproof_with_parent_by_hash(&client, included_block.header().hash()); + let pending_but_unknown = build_block( + &*client, + sproof, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_but_unknown.header().clone()); + } + + // Ignore alternative branch: + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth: NON_INCLUDED_CHAIN_LEN, + ignore_alternative_branches: false, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + let expected_parents: Vec<_> = vec![&included_block, &alt_block]; + assert_eq!(potential_parents.len(), 2); + assert_eq!(expected_parents[0].hash(), potential_parents[0].hash); + assert_eq!(expected_parents[1].hash(), potential_parents[1].hash); +} + +/// Test where there are multiple pending blocks. +#[test] +fn find_potential_parents_aligned_with_late_pending() { + sp_tracing::try_init_simple(); + + const NON_INCLUDED_CHAIN_LEN: usize = 5; + + let backend = Arc::new(Backend::new_test(1000, 1)); + let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build()); + let mut para_import = + ParachainBlockImport::new_with_delayed_best_block(client.clone(), backend.clone()); + + let relay_parent = relay_hash_from_block_num(10); + // Choose different relay parent for alternative chain to get new hashes. + let search_relay_parent = relay_hash_from_block_num(11); + let included_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + None, + None, + Some(relay_parent), + ); + + let in_between_block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(included_block.header().hash()), + None, + Some(relay_parent), + ); + + let pending_block = build_and_import_block_ext( + &client, + BlockOrigin::Own, + true, + &mut para_import, + Some(in_between_block.header().hash()), + None, + Some(relay_parent), + ); + + let relay_chain = Relaychain::new(); + { + let relay_inner = &mut relay_chain.inner.lock().unwrap(); + relay_inner + .relay_chain_hash_to_header + .insert(search_relay_parent, included_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, in_between_block.header().clone()); + relay_inner + .relay_chain_hash_to_header_pending + .insert(search_relay_parent, pending_block.header().clone()); + } + + // Build some blocks on the pending block and on the included block. + // We end up with two sibling chains, one is aligned with the pending block, + // the other is not. + let mut aligned_blocks = Vec::new(); + let mut parent = pending_block.header().hash(); + for _ in 2..NON_INCLUDED_CHAIN_LEN { + let block = build_and_import_block_ext( + &client, + BlockOrigin::Own, + true, + &mut para_import, + Some(parent), + None, + Some(relay_parent), + ); + parent = block.header().hash(); + aligned_blocks.push(block); + } + + let mut alt_blocks = Vec::new(); + let mut parent = included_block.header().hash(); + for _ in 0..NON_INCLUDED_CHAIN_LEN { + let block = build_and_import_block_ext( + &client, + BlockOrigin::NetworkInitialSync, + true, + &mut para_import, + Some(parent), + None, + Some(search_relay_parent), + ); + parent = block.header().hash(); + alt_blocks.push(block); + } + + // Ignore alternative branch: + for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth, + ignore_alternative_branches: true, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + assert_eq!(potential_parents.len(), max_depth + 1); + let expected_parents: Vec<_> = [&included_block, &in_between_block, &pending_block] + .into_iter() + .chain(aligned_blocks.iter()) + .take(max_depth + 1) + .collect(); + + for i in 0..(max_depth + 1) { + let parent = &potential_parents[i]; + let expected = &expected_parents[i]; + + assert_eq!(parent.hash, expected.hash()); + assert_eq!(&parent.header, expected.header()); + assert_eq!(parent.depth, i); + assert!(parent.aligned_with_pending); + } + } + + // Do not ignore: + for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + let potential_parents = block_on(find_potential_parents( + ParentSearchParams { + relay_parent: search_relay_parent, + para_id: ParaId::from(100), + ancestry_lookback: 1, // aligned chain is in ancestry. + max_depth, + ignore_alternative_branches: false, + }, + &*backend, + &relay_chain, + )) + .unwrap(); + + let expected_len = 2 * max_depth + 1; + assert_eq!(potential_parents.len(), expected_len); + let expected_aligned: Vec<_> = [&included_block, &in_between_block, &pending_block] + .into_iter() + .chain(aligned_blocks.iter()) + .take(max_depth + 1) + .collect(); + let expected_alt = alt_blocks.iter().take(max_depth); + + let expected_parents: Vec<_> = + expected_aligned.clone().into_iter().chain(expected_alt).collect(); + // Check correctness. + assert_eq!(expected_parents.len(), expected_len); + + for i in 0..expected_len { + let parent = &potential_parents[i]; + let expected = expected_parents + .iter() + .find(|block| block.header().hash() == parent.hash) + .expect("missing parent"); + + let is_aligned = expected_aligned.contains(&expected); + + assert_eq!(parent.hash, expected.hash()); + assert_eq!(&parent.header, expected.header()); + + assert_eq!(parent.aligned_with_pending, is_aligned); + } + } +} + #[test] fn find_potential_parents_aligned_with_pending() { sp_tracing::try_init_simple(); @@ -1249,6 +1608,7 @@ fn find_potential_parents_aligned_with_pending() { // Do not ignore: for max_depth in 0..=NON_INCLUDED_CHAIN_LEN { + log::info!("Ran with max_depth = {max_depth}"); let potential_parents = block_on(find_potential_parents( ParentSearchParams { relay_parent: search_relay_parent, @@ -1276,6 +1636,7 @@ fn find_potential_parents_aligned_with_pending() { // Check correctness. assert_eq!(expected_parents.len(), expected_len); + potential_parents.iter().for_each(|p| log::info!("result: {:?}", p)); for i in 0..expected_len { let parent = &potential_parents[i]; let expected = expected_parents @@ -1288,6 +1649,12 @@ fn find_potential_parents_aligned_with_pending() { assert_eq!(parent.hash, expected.hash()); assert_eq!(&parent.header, expected.header()); + log::info!( + "Check hash: {:?} expected: {} is: {}", + parent.hash, + is_aligned, + parent.aligned_with_pending, + ); assert_eq!(parent.aligned_with_pending, is_aligned); } } diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index eb0d7f0e01b3..18d121c41d16 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -26,9 +26,10 @@ use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt use parking_lot::Mutex; use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_primitives::{ - CandidateCommitments, CandidateDescriptor, CollatorPair, CommittedCandidateReceipt, - Hash as PHash, HeadData, InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, SessionIndex, SigningContext, ValidationCodeHash, ValidatorId, + BlockNumber, CandidateCommitments, CandidateDescriptor, CollatorPair, + CommittedCandidateReceipt, CoreState, Hash as PHash, HeadData, InboundDownwardMessage, + InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, + SigningContext, ValidationCodeHash, ValidatorId, }; use polkadot_test_client::{ Client as PClient, ClientBlockImportExt, DefaultTestClientBuilderExt, FullBackend as PBackend, @@ -297,6 +298,13 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(header) } + async fn availability_cores( + &self, + _relay_parent: PHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test"); + } + async fn version(&self, _: PHash) -> RelayChainResult { let version = self.data.lock().runtime_version; diff --git a/cumulus/client/parachain-inherent/Cargo.toml b/cumulus/client/parachain-inherent/Cargo.toml index 9d346ce17f56..d81f727b41b9 100644 --- a/cumulus/client/parachain-inherent/Cargo.toml +++ b/cumulus/client/parachain-inherent/Cargo.toml @@ -9,7 +9,6 @@ license = "Apache-2.0" [dependencies] async-trait = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } -scale-info = { features = ["derive"], workspace = true, default-features = true } tracing = { workspace = true, default-features = true } # Substrate diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs index 75bf308ef27a..6f274ed18b6b 100644 --- a/cumulus/client/pov-recovery/src/tests.rs +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -17,7 +17,9 @@ use super::*; use assert_matches::assert_matches; use codec::{Decode, Encode}; -use cumulus_primitives_core::relay_chain::{BlockId, CandidateCommitments, CandidateDescriptor}; +use cumulus_primitives_core::relay_chain::{ + BlockId, CandidateCommitments, CandidateDescriptor, CoreState, +}; use cumulus_relay_chain_interface::{ InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PHash, PHeader, PersistedValidationData, StorageValue, ValidationCodeHash, ValidatorId, @@ -478,6 +480,13 @@ impl RelayChainInterface for Relaychain { async fn header(&self, _: BlockId) -> RelayChainResult> { unimplemented!("Not needed for test"); } + + async fn availability_cores( + &self, + _: PHash, + ) -> RelayChainResult>>> { + unimplemented!("Not needed for test"); + } } fn make_candidate_chain(candidate_number_range: Range) -> Vec { diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 7871623e8447..8f8d666bd143 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -19,9 +19,9 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use async_trait::async_trait; use cumulus_primitives_core::{ relay_chain::{ - runtime_api::ParachainHost, Block as PBlock, BlockId, CommittedCandidateReceipt, - Hash as PHash, Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, - ValidationCodeHash, ValidatorId, + runtime_api::ParachainHost, Block as PBlock, BlockId, BlockNumber, + CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage, + OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -256,6 +256,13 @@ impl RelayChainInterface for RelayChainInProcessInterface { Ok(Box::pin(notifications_stream)) } + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>> { + Ok(self.full_client.runtime_api().availability_cores(relay_parent)?) + } + async fn candidates_pending_availability( &self, hash: PHash, diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index 46e19b40f010..d02035e84e92 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -29,8 +29,8 @@ use sp_api::ApiError; use cumulus_primitives_core::relay_chain::BlockId; pub use cumulus_primitives_core::{ relay_chain::{ - CommittedCandidateReceipt, Hash as PHash, Header as PHeader, InboundHrmpMessage, - OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, + BlockNumber, CommittedCandidateReceipt, CoreState, Hash as PHash, Header as PHeader, + InboundHrmpMessage, OccupiedCoreAssumption, SessionIndex, ValidationCodeHash, ValidatorId, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; @@ -217,6 +217,14 @@ pub trait RelayChainInterface: Send + Sync { /// Get the runtime version of the relay chain. async fn version(&self, relay_parent: PHash) -> RelayChainResult; + + /// Yields information on all availability cores as relevant to the child block. + /// + /// Cores are either free, scheduled or occupied. Free cores can have paras assigned to them. + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>>; } #[async_trait] @@ -337,6 +345,13 @@ where .await } + async fn availability_cores( + &self, + relay_parent: PHash, + ) -> RelayChainResult>> { + (**self).availability_cores(relay_parent).await + } + async fn candidates_pending_availability( &self, block_id: PHash, diff --git a/cumulus/client/relay-chain-minimal-node/Cargo.toml b/cumulus/client/relay-chain-minimal-node/Cargo.toml index 1d89316d400b..95ecadc8bd06 100644 --- a/cumulus/client/relay-chain-minimal-node/Cargo.toml +++ b/cumulus/client/relay-chain-minimal-node/Cargo.toml @@ -17,13 +17,7 @@ polkadot-overseer = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } polkadot-node-network-protocol = { workspace = true, default-features = true } -polkadot-availability-recovery = { workspace = true, default-features = true } -polkadot-collator-protocol = { workspace = true, default-features = true } polkadot-network-bridge = { workspace = true, default-features = true } -polkadot-node-collation-generation = { workspace = true, default-features = true } -polkadot-node-core-runtime-api = { workspace = true, default-features = true } -polkadot-node-core-chain-api = { workspace = true, default-features = true } -polkadot-node-core-prospective-parachains = { workspace = true, default-features = true } polkadot-service = { workspace = true, default-features = true } # substrate deps @@ -51,4 +45,3 @@ array-bytes = { workspace = true, default-features = true } tracing = { workspace = true, default-features = true } async-trait = { workspace = true } futures = { workspace = true } -parking_lot = { workspace = true, default-features = true } diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index bb7bfa5dc322..692a1fb537a8 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -24,7 +24,7 @@ use cumulus_primitives_core::{ InboundDownwardMessage, ParaId, PersistedValidationData, }; use cumulus_relay_chain_interface::{ - PHeader, RelayChainError, RelayChainInterface, RelayChainResult, + BlockNumber, CoreState, PHeader, RelayChainError, RelayChainInterface, RelayChainResult, }; use futures::{FutureExt, Stream, StreamExt}; use polkadot_overseer::Handle; @@ -252,4 +252,11 @@ impl RelayChainInterface for RelayChainRpcInterface { async fn version(&self, relay_parent: RelayHash) -> RelayChainResult { self.rpc_client.runtime_version(relay_parent).await } + + async fn availability_cores( + &self, + relay_parent: RelayHash, + ) -> RelayChainResult>> { + self.rpc_client.parachain_host_availability_cores(relay_parent).await + } } diff --git a/cumulus/pallets/aura-ext/src/consensus_hook.rs b/cumulus/pallets/aura-ext/src/consensus_hook.rs index 592029803391..560d477b2a85 100644 --- a/cumulus/pallets/aura-ext/src/consensus_hook.rs +++ b/cumulus/pallets/aura-ext/src/consensus_hook.rs @@ -65,9 +65,19 @@ where let para_slot_from_relay = Slot::from_timestamp(relay_chain_timestamp.into(), para_slot_duration); - // Perform checks. - assert_eq!(slot, para_slot_from_relay, "slot number mismatch"); - if authored > velocity + 1 { + // Check that we are not too far in the future. Since we expect `V` parachain blocks + // during the relay chain slot, we can allow for `V` parachain slots into the future. + if *slot > *para_slot_from_relay + u64::from(velocity) { + panic!( + "Parachain slot is too far in the future: parachain_slot: {:?}, derived_from_relay_slot: {:?} velocity: {:?}", + slot, + para_slot_from_relay, + velocity + ); + } + + // We need to allow authoring multiple blocks in the same slot. + if slot != para_slot_from_relay && authored > velocity { panic!("authored blocks limit is reached for the slot") } let weight = T::DbWeight::get().reads(1); @@ -113,6 +123,11 @@ impl< return false } + // TODO: This logic needs to be adjusted. + // It checks that we have not authored more than `V + 1` blocks in the slot. + // As a slot however, we take the parachain slot here. Velocity should + // be measured in relation to the relay chain slot. + // https://github.com/paritytech/polkadot-sdk/issues/3967 if last_slot == new_slot { authored_so_far < velocity + 1 } else { diff --git a/cumulus/pallets/aura-ext/src/lib.rs b/cumulus/pallets/aura-ext/src/lib.rs index 7ca84dff7c51..4605dd325bee 100644 --- a/cumulus/pallets/aura-ext/src/lib.rs +++ b/cumulus/pallets/aura-ext/src/lib.rs @@ -83,7 +83,7 @@ pub mod pallet { SlotInfo::::put((new_slot, authored)); - T::DbWeight::get().reads_writes(2, 1) + T::DbWeight::get().reads_writes(4, 2) } } diff --git a/cumulus/polkadot-parachain/Cargo.toml b/cumulus/polkadot-parachain/Cargo.toml index ae5abdcfab6a..7085211dad26 100644 --- a/cumulus/polkadot-parachain/Cargo.toml +++ b/cumulus/polkadot-parachain/Cargo.toml @@ -120,7 +120,7 @@ substrate-build-script-utils = { workspace = true, default-features = true } assert_cmd = { workspace = true } nix = { features = ["signal"], workspace = true } tempfile = { workspace = true } -tokio = { features = ["macros", "parking_lot", "time"], workspace = true, default-features = true } +tokio = { version = "1.32.0", features = ["macros", "parking_lot", "time"] } wait-timeout = { workspace = true } [features] diff --git a/cumulus/polkadot-parachain/src/cli.rs b/cumulus/polkadot-parachain/src/cli.rs index 3f8a2ec0d118..7c01e34f9a03 100644 --- a/cumulus/polkadot-parachain/src/cli.rs +++ b/cumulus/polkadot-parachain/src/cli.rs @@ -73,6 +73,12 @@ pub struct Cli { #[command(flatten)] pub run: cumulus_client_cli::RunCmd, + /// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling. + /// + /// Use with care, this flag is unstable and subject to change. + #[arg(long)] + pub experimental_use_slot_based: bool, + /// Disable automatic hardware benchmarks. /// /// By default these benchmarks are automatically ran at startup and measure diff --git a/cumulus/polkadot-parachain/src/command.rs b/cumulus/polkadot-parachain/src/command.rs index 2a1f20d5c176..323216f300d8 100644 --- a/cumulus/polkadot-parachain/src/command.rs +++ b/cumulus/polkadot-parachain/src/command.rs @@ -690,6 +690,7 @@ pub fn run() -> Result<()> { polkadot_config, collator_options, id, + cli.experimental_use_slot_based, hwbench, ) .await, @@ -699,6 +700,7 @@ pub fn run() -> Result<()> { polkadot_config, collator_options, id, + cli.experimental_use_slot_based, hwbench, ) .await, @@ -713,24 +715,27 @@ async fn start_node>( polkadot_config: sc_service::Configuration, collator_options: cumulus_client_cli::CollatorOptions, id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> Result { match config.chain_spec.runtime()? { - Runtime::AssetHubPolkadot => crate::service::start_asset_hub_lookahead_node::< - AssetHubPolkadotRuntimeApi, - AssetHubPolkadotAuraId, - Network, - >(config, polkadot_config, collator_options, id, hwbench) - .await - .map(|r| r.0) - .map_err(Into::into), + Runtime::AssetHubPolkadot => + crate::service::start_asset_hub_async_backing_node::< + AssetHubPolkadotRuntimeApi, + AssetHubPolkadotAuraId, + Network, + >(config, polkadot_config, collator_options, id, use_experimental_slot_based, hwbench) + .await + .map(|r| r.0) + .map_err(Into::into), Runtime::AssetHub | Runtime::Collectives => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -753,6 +758,7 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -770,11 +776,12 @@ async fn start_node>( chain_spec::bridge_hubs::BridgeHubRuntimeType::Rococo | chain_spec::bridge_hubs::BridgeHubRuntimeType::RococoLocal | chain_spec::bridge_hubs::BridgeHubRuntimeType::RococoDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -793,11 +800,12 @@ async fn start_node>( chain_spec::coretime::CoretimeRuntimeType::Westend | chain_spec::coretime::CoretimeRuntimeType::WestendLocal | chain_spec::coretime::CoretimeRuntimeType::WestendDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -810,17 +818,19 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await .map(|r| r.0) .map_err(Into::into), - Runtime::Glutton => crate::service::start_basic_lookahead_node::( + Runtime::Glutton => crate::service::start_basic_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -838,11 +848,12 @@ async fn start_node>( chain_spec::people::PeopleRuntimeType::Westend | chain_spec::people::PeopleRuntimeType::WestendLocal | chain_spec::people::PeopleRuntimeType::WestendDevelopment => - crate::service::start_generic_aura_lookahead_node::( + crate::service::start_generic_aura_async_backing_node::( config, polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await @@ -857,6 +868,7 @@ async fn start_node>( polkadot_config, collator_options, id, + use_experimental_slot_based, hwbench, ) .await diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 42efe8098b26..0f2aed8ee4d8 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -16,7 +16,10 @@ use cumulus_client_cli::CollatorOptions; use cumulus_client_collator::service::CollatorService; -use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams}; +use cumulus_client_consensus_aura::collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{self as slot_based, Params as SlotBasedParams}, +}; use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport; use cumulus_client_consensus_proposer::Proposer; #[allow(deprecated)] @@ -51,7 +54,6 @@ use sc_consensus::{ }; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::{config::FullNetworkConfiguration, service::traits::NetworkBackend, NetworkBlock}; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::{ApiExt, ConstructRuntimeApi, ProvideRuntimeApi}; @@ -214,7 +216,6 @@ where &TaskManager, Arc, Arc>>, - Arc>, KeystorePtr, Duration, ParaId, @@ -348,7 +349,6 @@ where &task_manager, relay_chain_interface.clone(), transaction_pool, - sync_service.clone(), params.keystore_container.keystore(), relay_chain_slot_duration, para_id, @@ -408,8 +408,14 @@ pub async fn start_rococo_parachain_node>( polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -418,7 +424,7 @@ pub async fn start_rococo_parachain_node>( para_id, build_parachain_rpc_extensions::, build_aura_import_queue, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -580,13 +586,19 @@ where /// Uses the lookahead collator to support async backing. /// /// Start an aura powered parachain node. Some system chains use this. -pub async fn start_generic_aura_lookahead_node>( +pub async fn start_generic_aura_async_backing_node>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -595,7 +607,7 @@ pub async fn start_generic_aura_lookahead_node> para_id, build_parachain_rpc_extensions::, build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -607,11 +619,12 @@ pub async fn start_generic_aura_lookahead_node> /// /// Uses the lookahead collator to support async backing. #[sc_tracing::logging::prefix_logs_with("Parachain")] -pub async fn start_asset_hub_lookahead_node( +pub async fn start_asset_hub_async_backing_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> where @@ -622,15 +635,21 @@ where AuraId: AuraIdT + Sync, Net: NetworkBackend, { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; + start_node_impl::( parachain_config, polkadot_config, collator_options, CollatorSybilResistance::Resistant, // Aura para_id, - build_parachain_rpc_extensions::, + build_parachain_rpc_extensions, build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::, + consensus_starter, hwbench, ) .await @@ -676,7 +695,6 @@ fn start_relay_chain_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>>, - _sync_oracle: Arc>, _keystore: KeystorePtr, _relay_chain_slot_duration: Duration, para_id: ParaId, @@ -747,7 +765,6 @@ fn start_lookahead_aura_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>>, - sync_oracle: Arc>, keystore: KeystorePtr, relay_chain_slot_duration: Duration, para_id: ParaId, @@ -788,7 +805,6 @@ where client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) } }, - sync_oracle, keystore, collator_key, para_id, @@ -802,23 +818,104 @@ where let fut = async move { wait_for_aura(client).await; - aura::run::::Pair, _, _, _, _, _, _, _, _, _>(params).await; + aura::run::::Pair, _, _, _, _, _, _, _, _>(params).await; }; task_manager.spawn_essential_handle().spawn("aura", None, fut); Ok(()) } +/// Start consensus using the lookahead aura collator. +fn start_slot_based_aura_consensus( + client: Arc>, + block_import: ParachainBlockImport, + prometheus_registry: Option<&Registry>, + telemetry: Option, + task_manager: &TaskManager, + relay_chain_interface: Arc, + transaction_pool: Arc>>, + keystore: KeystorePtr, + relay_chain_slot_duration: Duration, + para_id: ParaId, + collator_key: CollatorPair, + _overseer_handle: OverseerHandle, + announce_block: Arc>) + Send + Sync>, + backend: Arc, +) -> Result<(), sc_service::Error> +where + RuntimeApi: ConstructNodeRuntimeApi>, + RuntimeApi::RuntimeApi: AuraRuntimeApi, + AuraId: AuraIdT + Sync, +{ + let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording( + task_manager.spawn_handle(), + client.clone(), + transaction_pool, + prometheus_registry, + telemetry.clone(), + ); + + let proposer = Proposer::new(proposer_factory); + let collator_service = CollatorService::new( + client.clone(), + Arc::new(task_manager.spawn_handle()), + announce_block, + client.clone(), + ); + + let client_for_aura = client.clone(); + let params = SlotBasedParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + slot_drift: Duration::from_secs(1), + }; + + let (collation_future, block_builder_future) = + slot_based::run::::Pair, _, _, _, _, _, _, _, _>(params); + + task_manager.spawn_essential_handle().spawn( + "collation-task", + Some("parachain-block-authoring"), + collation_future, + ); + task_manager.spawn_essential_handle().spawn( + "block-builder-task", + Some("parachain-block-authoring"), + block_builder_future, + ); + Ok(()) +} + /// Start an aura powered parachain node which uses the lookahead collator to support async backing. /// This node is basic in the sense that its runtime api doesn't include common contents such as /// transaction payment. Used for aura glutton. -pub async fn start_basic_lookahead_node>( +pub async fn start_basic_async_backing_node>( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -827,7 +924,7 @@ pub async fn start_basic_lookahead_node>( para_id, |_, _, _, _| Ok(RpcModule::new(())), build_relay_to_aura_import_queue::<_, AuraId>, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await @@ -839,8 +936,14 @@ pub async fn start_contracts_rococo_node>( polkadot_config: Configuration, collator_options: CollatorOptions, para_id: ParaId, + use_experimental_slot_based: bool, hwbench: Option, ) -> sc_service::error::Result<(TaskManager, Arc>)> { + let consensus_starter = if use_experimental_slot_based { + start_slot_based_aura_consensus::<_, AuraId> + } else { + start_lookahead_aura_consensus::<_, AuraId> + }; start_node_impl::( parachain_config, polkadot_config, @@ -849,7 +952,7 @@ pub async fn start_contracts_rococo_node>( para_id, build_contracts_rpc_extensions, build_aura_import_queue, - start_lookahead_aura_consensus::<_, AuraId>, + consensus_starter, hwbench, ) .await diff --git a/cumulus/test/client/src/lib.rs b/cumulus/test/client/src/lib.rs index d233ad269176..f26413e441e7 100644 --- a/cumulus/test/client/src/lib.rs +++ b/cumulus/test/client/src/lib.rs @@ -79,6 +79,7 @@ impl substrate_test_client::GenesisInit for GenesisParameters { cumulus_test_service::chain_spec::get_chain_spec_with_extra_endowed( None, self.endowed_accounts.clone(), + cumulus_test_runtime::WASM_BINARY.expect("WASM binary not compiled!"), ) .build_storage() .expect("Builds test runtime genesis storage") diff --git a/cumulus/test/runtime/Cargo.toml b/cumulus/test/runtime/Cargo.toml index fc0eb3ce742f..d5582f2d2a23 100644 --- a/cumulus/test/runtime/Cargo.toml +++ b/cumulus/test/runtime/Cargo.toml @@ -93,3 +93,4 @@ std = [ "substrate-wasm-builder", ] increment-spec-version = [] +elastic-scaling = [] diff --git a/cumulus/test/runtime/build.rs b/cumulus/test/runtime/build.rs index ebd5c178cba0..bf579f4121e5 100644 --- a/cumulus/test/runtime/build.rs +++ b/cumulus/test/runtime/build.rs @@ -24,6 +24,13 @@ fn main() { .enable_feature("increment-spec-version") .set_file_name("wasm_binary_spec_version_incremented.rs") .build(); + + WasmBuilder::new() + .with_current_project() + .enable_feature("elastic-scaling") + .import_memory() + .set_file_name("wasm_binary_elastic_scaling.rs") + .build(); } #[cfg(not(feature = "std"))] diff --git a/cumulus/test/runtime/src/lib.rs b/cumulus/test/runtime/src/lib.rs index 26c6635e1ad3..97cb02ab779e 100644 --- a/cumulus/test/runtime/src/lib.rs +++ b/cumulus/test/runtime/src/lib.rs @@ -27,6 +27,11 @@ pub mod wasm_spec_version_incremented { include!(concat!(env!("OUT_DIR"), "/wasm_binary_spec_version_incremented.rs")); } +pub mod elastic_scaling { + #[cfg(feature = "std")] + include!(concat!(env!("OUT_DIR"), "/wasm_binary_elastic_scaling.rs")); +} + mod test_pallet; use frame_support::{derive_impl, traits::OnRuntimeUpgrade, PalletId}; use sp_api::{decl_runtime_apis, impl_runtime_apis}; @@ -83,8 +88,23 @@ impl_opaque_keys! { /// The para-id used in this runtime. pub const PARACHAIN_ID: u32 = 100; -const UNINCLUDED_SEGMENT_CAPACITY: u32 = 3; +#[cfg(not(feature = "elastic-scaling"))] +const UNINCLUDED_SEGMENT_CAPACITY: u32 = 4; +#[cfg(not(feature = "elastic-scaling"))] const BLOCK_PROCESSING_VELOCITY: u32 = 1; + +#[cfg(feature = "elastic-scaling")] +const UNINCLUDED_SEGMENT_CAPACITY: u32 = 7; +#[cfg(feature = "elastic-scaling")] +const BLOCK_PROCESSING_VELOCITY: u32 = 4; + +#[cfg(not(feature = "elastic-scaling"))] +pub const MILLISECS_PER_BLOCK: u64 = 6000; +#[cfg(feature = "elastic-scaling")] +pub const MILLISECS_PER_BLOCK: u64 = 2000; + +pub const SLOT_DURATION: u64 = MILLISECS_PER_BLOCK; + const RELAY_CHAIN_SLOT_DURATION_MILLIS: u32 = 6000; // The only difference between the two declarations below is the `spec_version`. With the @@ -126,10 +146,6 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { state_version: 1, }; -pub const MILLISECS_PER_BLOCK: u64 = 6000; - -pub const SLOT_DURATION: u64 = MILLISECS_PER_BLOCK; - pub const EPOCH_DURATION_IN_BLOCKS: u32 = 10 * MINUTES; // These time units are defined in number of blocks. diff --git a/cumulus/test/service/Cargo.toml b/cumulus/test/service/Cargo.toml index c40863b90b54..f766d1236320 100644 --- a/cumulus/test/service/Cargo.toml +++ b/cumulus/test/service/Cargo.toml @@ -92,8 +92,6 @@ pallet-timestamp = { workspace = true, default-features = true } [dev-dependencies] futures = { workspace = true } portpicker = { workspace = true } -rococo-parachain-runtime = { workspace = true } -sp-consensus-grandpa = { workspace = true, default-features = true } sp-authority-discovery = { workspace = true, default-features = true } cumulus-test-client = { workspace = true } @@ -116,7 +114,6 @@ runtime-benchmarks = [ "polkadot-primitives/runtime-benchmarks", "polkadot-service/runtime-benchmarks", "polkadot-test-service/runtime-benchmarks", - "rococo-parachain-runtime/runtime-benchmarks", "sc-service/runtime-benchmarks", "sp-runtime/runtime-benchmarks", ] diff --git a/cumulus/test/service/src/chain_spec.rs b/cumulus/test/service/src/chain_spec.rs index 174d478f2575..ae71028ad486 100644 --- a/cumulus/test/service/src/chain_spec.rs +++ b/cumulus/test/service/src/chain_spec.rs @@ -66,9 +66,10 @@ where pub fn get_chain_spec_with_extra_endowed( id: Option, extra_endowed_accounts: Vec, + code: &[u8], ) -> ChainSpec { ChainSpec::builder( - cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + code, Extensions { para_id: id.unwrap_or(cumulus_test_runtime::PARACHAIN_ID.into()).into() }, ) .with_name("Local Testnet") @@ -83,7 +84,21 @@ pub fn get_chain_spec_with_extra_endowed( /// Get the chain spec for a specific parachain ID. pub fn get_chain_spec(id: Option) -> ChainSpec { - get_chain_spec_with_extra_endowed(id, Default::default()) + get_chain_spec_with_extra_endowed( + id, + Default::default(), + cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + ) +} + +/// Get the chain spec for a specific parachain ID. +pub fn get_elastic_scaling_chain_spec(id: Option) -> ChainSpec { + get_chain_spec_with_extra_endowed( + id, + Default::default(), + cumulus_test_runtime::elastic_scaling::WASM_BINARY + .expect("WASM binary was not built, please build it!"), + ) } /// Local testnet genesis for testing. diff --git a/cumulus/test/service/src/cli.rs b/cumulus/test/service/src/cli.rs index 87d1d4af8a95..37ca27542cbf 100644 --- a/cumulus/test/service/src/cli.rs +++ b/cumulus/test/service/src/cli.rs @@ -50,6 +50,12 @@ pub struct TestCollatorCli { #[arg(long)] pub fail_pov_recovery: bool, + + /// EXPERIMENTAL: Use slot-based collator which can handle elastic scaling. + /// + /// Use with care, this flag is unstable and subject to change. + #[arg(long)] + pub experimental_use_slot_based: bool, } #[derive(Debug, clap::Subcommand)] @@ -253,8 +259,16 @@ impl SubstrateCli for TestCollatorCli { fn load_spec(&self, id: &str) -> std::result::Result, String> { Ok(match id { - "" => - Box::new(cumulus_test_service::get_chain_spec(Some(ParaId::from(2000)))) as Box<_>, + "" => { + tracing::info!("Using default test service chain spec."); + Box::new(cumulus_test_service::get_chain_spec(Some(ParaId::from(2000)))) as Box<_> + }, + "elastic-scaling" => { + tracing::info!("Using elastic-scaling chain spec."); + Box::new(cumulus_test_service::get_elastic_scaling_chain_spec(Some(ParaId::from( + 2100, + )))) as Box<_> + }, path => { let chain_spec = cumulus_test_service::chain_spec::ChainSpec::from_json_file(path.into())?; diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index 6f8b9d19bb29..51cdebbaf54e 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -25,7 +25,10 @@ pub mod chain_spec; use cumulus_client_collator::service::CollatorService; use cumulus_client_consensus_aura::{ - collators::lookahead::{self as aura, Params as AuraParams}, + collators::{ + lookahead::{self as aura, Params as AuraParams}, + slot_based::{self as slot_based, Params as SlotBasedParams}, + }, ImportQueueParams, }; use cumulus_client_consensus_proposer::Proposer; @@ -45,7 +48,7 @@ use cumulus_client_cli::{CollatorOptions, RelayChainMode}; use cumulus_client_consensus_common::{ ParachainBlockImport as TParachainBlockImport, ParachainCandidate, ParachainConsensus, }; -use cumulus_client_pov_recovery::RecoveryHandle; +use cumulus_client_pov_recovery::{RecoveryDelayRange, RecoveryHandle}; #[allow(deprecated)] use cumulus_client_service::old_consensus; use cumulus_client_service::{ @@ -304,7 +307,7 @@ async fn build_relay_chain_interface( /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api. -#[sc_tracing::logging::prefix_logs_with(parachain_config.network.node_name.as_str())] +#[sc_tracing::logging::prefix_logs_with("Parachain")] pub async fn start_node_impl>( parachain_config: Configuration, collator_key: Option, @@ -316,6 +319,7 @@ pub async fn start_node_impl>( consensus: Consensus, collator_options: CollatorOptions, proof_recording_during_import: bool, + use_slot_based_collator: bool, ) -> sc_service::error::Result<( TaskManager, Arc, @@ -409,7 +413,6 @@ where } else { Box::new(overseer_handle.clone()) }; - let is_collator = collator_key.is_some(); let relay_chain_slot_duration = Duration::from_secs(6); start_relay_chain_tasks(StartRelayChainTasksParams { @@ -418,11 +421,11 @@ where para_id, relay_chain_interface: relay_chain_interface.clone(), task_manager: &mut task_manager, - da_recovery_profile: if is_collator { - DARecoveryProfile::Collator - } else { - DARecoveryProfile::FullNode - }, + // Increase speed of recovery for testing purposes. + da_recovery_profile: DARecoveryProfile::Other(RecoveryDelayRange { + min: Duration::from_secs(1), + max: Duration::from_secs(5), + }), import_queue: import_queue_service, relay_chain_slot_duration, recovery_handle, @@ -461,29 +464,72 @@ where ); let client_for_aura = client.clone(); - let params = AuraParams { - create_inherent_data_providers: move |_, ()| async move { Ok(()) }, - block_import, - para_client: client.clone(), - para_backend: backend.clone(), - relay_client: relay_chain_interface, - code_hash_provider: move |block_hash| { - client_for_aura.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) - }, - sync_oracle: sync_service, - keystore, - collator_key, - para_id, - overseer_handle, - relay_chain_slot_duration, - proposer, - collator_service, - authoring_duration: Duration::from_millis(2000), - reinitialize: false, - }; - let fut = aura::run::(params); - task_manager.spawn_essential_handle().spawn("aura", None, fut); + if use_slot_based_collator { + tracing::info!(target: LOG_TARGET, "Starting block authoring with slot based authoring."); + let params = SlotBasedParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura + .code_at(block_hash) + .ok() + .map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + slot_drift: Duration::from_secs(1), + }; + + let (collation_future, block_builer_future) = + slot_based::run::(params); + task_manager.spawn_essential_handle().spawn( + "collation-task", + None, + collation_future, + ); + task_manager.spawn_essential_handle().spawn( + "block-builder-task", + None, + block_builer_future, + ); + } else { + tracing::info!(target: LOG_TARGET, "Starting block authoring with lookahead collator."); + let params = AuraParams { + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import, + para_client: client.clone(), + para_backend: backend.clone(), + relay_client: relay_chain_interface, + code_hash_provider: move |block_hash| { + client_for_aura + .code_at(block_hash) + .ok() + .map(|c| ValidationCode::from(c).hash()) + }, + keystore, + collator_key, + para_id, + overseer_handle, + relay_chain_slot_duration, + proposer, + collator_service, + authoring_duration: Duration::from_millis(2000), + reinitialize: false, + }; + + let fut = aura::run::(params); + task_manager.spawn_essential_handle().spawn("aura", None, fut); + } } } @@ -720,6 +766,7 @@ impl TestNodeBuilder { self.consensus, collator_options, self.record_proof_during_import, + false, ) .await .expect("could not create Cumulus test service"), @@ -735,6 +782,7 @@ impl TestNodeBuilder { self.consensus, collator_options, self.record_proof_during_import, + false, ) .await .expect("could not create Cumulus test service"), @@ -766,8 +814,11 @@ pub fn node_config( let root = base_path.path().join(format!("cumulus_test_service_{}", key)); let role = if is_collator { Role::Authority } else { Role::Full }; let key_seed = key.to_seed(); - let mut spec = - Box::new(chain_spec::get_chain_spec_with_extra_endowed(Some(para_id), endowed_accounts)); + let mut spec = Box::new(chain_spec::get_chain_spec_with_extra_endowed( + Some(para_id), + endowed_accounts, + cumulus_test_runtime::WASM_BINARY.expect("WASM binary was not built, please build it!"), + )); let mut storage = spec.as_storage_builder().build_storage().expect("could not build storage"); diff --git a/cumulus/test/service/src/main.rs b/cumulus/test/service/src/main.rs index 90d37173dd59..9357978b769a 100644 --- a/cumulus/test/service/src/main.rs +++ b/cumulus/test/service/src/main.rs @@ -118,6 +118,7 @@ fn main() -> Result<(), sc_cli::Error> { consensus, collator_options, true, + cli.experimental_use_slot_based, ) .await, sc_network::config::NetworkBackendType::Litep2p => @@ -135,6 +136,7 @@ fn main() -> Result<(), sc_cli::Error> { consensus, collator_options, true, + cli.experimental_use_slot_based, ) .await, } diff --git a/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl b/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl index 49b6d9e94fd1..e1e8442f3050 100644 --- a/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl +++ b/cumulus/zombienet/tests/0003-full_node_catching_up.zndsl @@ -6,3 +6,6 @@ alice: parachain 2000 is registered within 225 seconds dave: reports block height is at least 7 within 250 seconds eve: reports block height is at least 7 within 250 seconds ferdie: reports block height is at least 7 within 250 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +charlie: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl b/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl index 7da8416d0161..b14c15ed5e5b 100644 --- a/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl +++ b/cumulus/zombienet/tests/0006-rpc_collator_builds_blocks.zndsl @@ -13,3 +13,7 @@ two: restart after 1 seconds three: restart after 20 seconds dave: is up dave: reports block height is at least 30 within 200 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +dave: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds +eve: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0008-elastic_authoring.toml b/cumulus/zombienet/tests/0008-elastic_authoring.toml new file mode 100644 index 000000000000..f2e2010a9e45 --- /dev/null +++ b/cumulus/zombienet/tests/0008-elastic_authoring.toml @@ -0,0 +1,50 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params] + max_candidate_depth = 6 + allowed_ancestry_len = 3 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 1 + num_cores = 4 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] + max_approval_coalesce_count = 5 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +command = "polkadot" + + [[relaychain.nodes]] + name = "alice" + args = ["" ] + + [[relaychain.node_groups]] + name = "validator" + args = ["-lruntime=debug,parachain=trace" ] + count = 8 + +# Slot based authoring with 3 cores and 2s slot duration +[[parachains]] +id = 2100 +chain = "elastic-scaling" +add_to_genesis = true + + [[parachains.collators]] + name = "collator-elastic" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] + +# Slot based authoring with 1 core and 6s slot duration +[[parachains]] +id = 2000 +add_to_genesis = true + + [[parachains.collators]] + name = "collator-single-core" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] diff --git a/cumulus/zombienet/tests/0008-elastic_authoring.zndsl b/cumulus/zombienet/tests/0008-elastic_authoring.zndsl new file mode 100644 index 000000000000..a06ffd24fefd --- /dev/null +++ b/cumulus/zombienet/tests/0008-elastic_authoring.zndsl @@ -0,0 +1,19 @@ +Description: Slot based authoring for elastic scaling +Network: ./0008-elastic_authoring.toml +Creds: config + +alice: is up +collator-elastic: is up +collator-single-core: is up + + +# configure relay chain +alice: js-script ./assign-core.js with "2100,0" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "2100,1" return is 0 within 600 seconds + +collator-single-core: reports block height is at least 20 within 225 seconds +collator-elastic: reports block height is at least 40 within 225 seconds + +# We want to make sure that none of the consensus hook checks fail, even if the chain makes progress +collator-elastic: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds +collator-single-core: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds diff --git a/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml new file mode 100644 index 000000000000..9b296e8a8b36 --- /dev/null +++ b/cumulus/zombienet/tests/0009-elastic_pov_recovery.toml @@ -0,0 +1,48 @@ +[settings] +timeout = 1000 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.async_backing_params] + max_candidate_depth = 6 + allowed_ancestry_len = 3 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] + max_validators_per_core = 1 + num_cores = 4 + +[relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] + max_approval_coalesce_count = 5 + +[relaychain] +default_image = "{{ZOMBIENET_INTEGRATION_TEST_IMAGE}}" +chain = "rococo-local" +command = "polkadot" + + [[relaychain.nodes]] + name = "alice" + args = ["" ] + + [[relaychain.node_groups]] + name = "validator" + args = ["-lruntime=debug,parachain=trace", "--reserved-only", "--reserved-nodes {{'alice'|zombie('multiAddress')}}"] + count = 8 + +# Slot based authoring with 3 cores and 2s slot duration +[[parachains]] +id = 2100 +chain = "elastic-scaling" +add_to_genesis = true + + # Slot based authoring with 3 cores and 2s slot duration + [[parachains.collators]] + name = "collator-elastic" + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["--disable-block-announcements", "-laura=trace,runtime=info,cumulus-consensus=trace,consensus::common=trace,parachain::collation-generation=trace,parachain::collator-protocol=trace,parachain=debug", "--force-authoring", "--experimental-use-slot-based"] + + # run 'recovery-target' as a parachain full node + [[parachains.collators]] + name = "recovery-target" + validator = false # full node + image = "{{COL_IMAGE}}" + command = "test-parachain" + args = ["-lparachain::availability=trace,sync=debug,parachain=debug,cumulus-pov-recovery=debug,cumulus-consensus=debug", "--disable-block-announcements", "--bootnodes {{'collator-elastic'|zombie('multiAddress')}}", "--in-peers 0", "--out-peers 0", "--", "--reserved-only", "--reserved-nodes {{'alice'|zombie('multiAddress')}}"] diff --git a/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl b/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl new file mode 100644 index 000000000000..3a805078112c --- /dev/null +++ b/cumulus/zombienet/tests/0009-elastic_pov_recovery.zndsl @@ -0,0 +1,19 @@ +Description: Elastic scaling PoV recovery test +Network: ./0009-elastic_pov_recovery.toml +Creds: config + +alice: is up +collator-elastic: is up + +# wait 20 blocks and register parachain +alice: reports block height is at least 20 within 250 seconds + +# configure relay chain +alice: js-script ./assign-core.js with "2100,0" return is 0 within 600 seconds +alice: js-script ./assign-core.js with "2100,1" return is 0 within 600 seconds + +# check block production +collator-elastic: reports block height is at least 40 within 225 seconds +collator-elastic: count of log lines containing "set_validation_data inherent needs to be present in every block" is 0 within 10 seconds + +recovery-target: count of log lines containing "Importing block retrieved using pov_recovery" is greater than 35 within 10 seconds diff --git a/cumulus/zombienet/tests/assign-core.js b/cumulus/zombienet/tests/assign-core.js new file mode 100644 index 000000000000..4179b68b2e3c --- /dev/null +++ b/cumulus/zombienet/tests/assign-core.js @@ -0,0 +1,46 @@ +// Assign a parachain to a core. +// +// First argument should be the parachain id. +// Second argument should be the core. +async function run(nodeName, networkInfo, args) { + const { wsUri, userDefinedTypes } = networkInfo.nodesByName[nodeName]; + const api = await zombie.connect(wsUri, userDefinedTypes); + + let para = Number(args[0]); + let core = Number(args[1]); + console.log(`Assigning para ${para} to core ${core}`); + + await zombie.util.cryptoWaitReady(); + + // Submit transaction with Alice accoung + const keyring = new zombie.Keyring({ type: "sr25519" }); + const alice = keyring.addFromUri("//Alice"); + + // Wait for this transaction to be finalized in a block. + await new Promise(async (resolve, reject) => { + const unsub = await api.tx.sudo + .sudo(api.tx.coretime.assignCore(core, 0, [[{ task: para }, 57600]], null)) + .signAndSend(alice, ({ status, isError }) => { + if (status.isInBlock) { + console.log( + `Transaction included at blockhash ${status.asInBlock}`, + ); + } else if (status.isFinalized) { + console.log( + `Transaction finalized at blockHash ${status.asFinalized}`, + ); + unsub(); + return resolve(); + } else if (isError) { + console.log(`Transaction error`); + reject(`Transaction error`); + } + }); + }); + + + + return 0; +} + +module.exports = { run }; diff --git a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml index 83f5434edddb..611978a33a5f 100644 --- a/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml +++ b/polkadot/zombienet_tests/elastic_scaling/0001-basic-3cores-6s-blocks.toml @@ -7,11 +7,9 @@ timeout = 1000 [relaychain.genesis.runtimeGenesis.patch.configuration.config.scheduler_params] max_validators_per_core = 1 - scheduling_lookahead = 2 num_cores = 3 [relaychain.genesis.runtimeGenesis.patch.configuration.config.approval_voting_params] - needed_approvals = 3 max_approval_coalesce_count = 5 [relaychain] @@ -48,4 +46,4 @@ addToGenesis = true [types.Header] number = "u64" parent_hash = "Hash" -post_state = "Hash" \ No newline at end of file +post_state = "Hash" diff --git a/prdoc/pr_4097.prdoc b/prdoc/pr_4097.prdoc new file mode 100644 index 000000000000..2804a9571c79 --- /dev/null +++ b/prdoc/pr_4097.prdoc @@ -0,0 +1,45 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Introduce experimental slot-based collator + +doc: + - audience: Node Operator + description: | + Introduces an experimental collator that is fit fot elastic-scaling. + It can be activated on `test-parachain` and `polkadot-parachain` binaries via + `--experimental-use-slot-based` flag. The current implementation is MVP status and purely + for testing. Behaviour can change any time and should not be relied upon in environments with + any stability requirements. + +crates: + - name: cumulus-client-consensus-aura + bump: major + - name: cumulus-client-consensus-common + bump: minor + - name: cumulus-client-pov-recovery + bump: none + validate: false + - name: cumulus-pallet-aura-ext + bump: patch + - name: cumulus-relay-chain-interface + bump: major + validate: false + - name: sc-consensus-slots + bump: minor + - name: sc-basic-authorship + bump: patch + - name: cumulus-client-network + bump: none + validate: false + - name: cumulus-relay-chain-inprocess-interface + bump: minor + - name: sc-consensus-aura + bump: patch + - name: cumulus-relay-chain-rpc-interface + bump: minor + - name: polkadot-parachain-bin + bump: patch + - name: polkadot + bump: none + validate: false diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs index 1519c76c42c0..74805488792a 100644 --- a/substrate/client/basic-authorship/src/basic_authorship.rs +++ b/substrate/client/basic-authorship/src/basic_authorship.rs @@ -205,7 +205,11 @@ where ) -> Proposer { let parent_hash = parent_header.hash(); - info!("🙌 Starting consensus session on top of parent {:?}", parent_hash); + info!( + "🙌 Starting consensus session on top of parent {:?} (#{})", + parent_hash, + parent_header.number() + ); let proposer = Proposer::<_, _, _, PR> { spawn_handle: self.spawn_handle.clone(), diff --git a/substrate/client/consensus/aura/src/standalone.rs b/substrate/client/consensus/aura/src/standalone.rs index 0f9b8668d447..c1536d9ef73f 100644 --- a/substrate/client/consensus/aura/src/standalone.rs +++ b/substrate/client/consensus/aura/src/standalone.rs @@ -24,7 +24,7 @@ use log::trace; use codec::Codec; -use sc_client_api::{backend::AuxStore, UsageProvider}; +use sc_client_api::UsageProvider; use sp_api::{Core, ProvideRuntimeApi}; use sp_application_crypto::{AppCrypto, AppPublic}; use sp_blockchain::Result as CResult; @@ -48,7 +48,7 @@ pub fn slot_duration(client: &C) -> CResult where A: Codec, B: BlockT, - C: AuxStore + ProvideRuntimeApi + UsageProvider, + C: ProvideRuntimeApi + UsageProvider, C::Api: AuraApi, { slot_duration_at(client, client.usage_info().chain.best_hash) @@ -59,7 +59,7 @@ pub fn slot_duration_at(client: &C, block_hash: B::Hash) -> CResult, + C: ProvideRuntimeApi, C::Api: AuraApi, { client.runtime_api().slot_duration(block_hash).map_err(|err| err.into()) diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index d9d792005312..7cdf90877dff 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -29,8 +29,8 @@ mod aux_schema; mod slots; pub use aux_schema::{check_equivocation, MAX_SLOT_CAPACITY, PRUNING_BOUND}; -pub use slots::SlotInfo; use slots::Slots; +pub use slots::{time_until_next_slot, SlotInfo}; use futures::{future::Either, Future, TryFutureExt}; use futures_timer::Delay; diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index bf44207acc9c..3e7d4de10553 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -35,7 +35,6 @@ use sc_client_api::Backend; use sc_consensus::ImportQueue; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::NetworkBlock; -use sc_network_sync::SyncingService; use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; @@ -172,7 +171,6 @@ fn start_consensus( task_manager: &TaskManager, relay_chain_interface: Arc, transaction_pool: Arc>, - sync_oracle: Arc>, keystore: KeystorePtr, relay_chain_slot_duration: Duration, para_id: ParaId, @@ -206,7 +204,6 @@ fn start_consensus( code_hash_provider: move |block_hash| { client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash()) }, - sync_oracle, keystore, collator_key, para_id, @@ -217,11 +214,9 @@ fn start_consensus( authoring_duration: Duration::from_millis(2000), reinitialize: false, }; - - let fut = - aura::run::( - params, - ); + let fut = aura::run::( + params, + ); task_manager.spawn_essential_handle().spawn("aura", None, fut); Ok(()) @@ -398,7 +393,6 @@ pub async fn start_parachain_node( &task_manager, relay_chain_interface, transaction_pool, - sync_service, params.keystore_container.keystore(), relay_chain_slot_duration, para_id,