From 851071470841fb49d5ab3fe7d4f43c6cef5e489f Mon Sep 17 00:00:00 2001 From: Tsvetomir Dimitrov Date: Tue, 30 May 2023 12:55:26 +0300 Subject: [PATCH] `RollingSessionWindow` cleanup (#7204) * Replace `RollingSessionWindow` with `RuntimeInfo` - initial commit * Fix tests in import * Fix the rest of the tests * Remove dead code * Fix todos * Simplify session caching * Comments for `SessionInfoProvider` * Separate `SessionInfoProvider` from `State` * `cache_session_info_for_head` becomes freestanding function * Remove unneeded `mut` usage * fn session_info -> fn get_session_info() to avoid name clashes. The function also tries to initialize `SessionInfoProvider` * Fix SessionInfo retrieval * Code cleanup * Don't wrap `SessionInfoProvider` in an `Option` * Remove `earliest_session()` * Remove pre-caching -> wip * Fix some tests and code cleanup * Fix all tests * Fixes in tests * Fix comments, variable names and small style changes * Fix a warning * impl From for NonZeroUsize * Fix logging for `get_session_info` - remove redundant logs and decrease log level to DEBUG * Code review feedback * Storage migration removing `COL_SESSION_WINDOW_DATA` from parachains db * Remove `col_session_data` usages * Storage migration clearing columns w/o removing them * Remove session data column usages from `approval-voting` and `dispute-coordinator` tests * Add some test cases from `RollingSessionWindow` to `dispute-coordinator` tests * Fix formatting in initialized.rs * Fix a corner case in `SessionInfo` caching for `dispute-coordinator` * Remove `RollingSessionWindow` ;( * Revert "Fix formatting in initialized.rs" This reverts commit 0f94664ec9f3a7e3737a30291195990e1e7065fc. * v2 to v3 migration drops `COL_DISPUTE_COORDINATOR_DATA` instead of clearing it * Fix `NUM_COLUMNS` in `approval-voting` * Use `columns::v3::NUM_COLUMNS` when opening db * Update node/service/src/parachains_db/upgrade.rs Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> * Don't write in `COL_DISPUTE_COORDINATOR_DATA` for `test_rocksdb_migrate_2_to_3` * Fix `NUM+COLUMNS` in approval_voting * Fix formatting * Fix columns usage * Clarification comments about the different db versions --------- Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> --- .../approval-voting/src/approval_db/v1/mod.rs | 8 +- .../src/approval_db/v1/tests.rs | 6 +- node/core/approval-voting/src/import.rs | 6 +- node/core/approval-voting/src/lib.rs | 13 +- node/core/approval-voting/src/tests.rs | 9 +- node/core/dispute-coordinator/src/db/v1.rs | 10 +- .../dispute-coordinator/src/initialized.rs | 13 +- node/core/dispute-coordinator/src/lib.rs | 7 +- node/core/dispute-coordinator/src/tests.rs | 180 +- node/service/src/lib.rs | 3 - node/service/src/parachains_db/mod.rs | 38 +- node/service/src/parachains_db/upgrade.rs | 123 +- node/subsystem-util/src/lib.rs | 2 - .../src/rolling_session_window.rs | 1532 ----------------- 14 files changed, 339 insertions(+), 1611 deletions(-) delete mode 100644 node/subsystem-util/src/rolling_session_window.rs diff --git a/node/core/approval-voting/src/approval_db/v1/mod.rs b/node/core/approval-voting/src/approval_db/v1/mod.rs index d2a13ad54550..c31389269d2e 100644 --- a/node/core/approval-voting/src/approval_db/v1/mod.rs +++ b/node/core/approval-voting/src/approval_db/v1/mod.rs @@ -15,6 +15,12 @@ // along with Polkadot. If not, see . //! Version 1 of the DB schema. +//! +//! Note that the version here differs from the actual version of the parachains +//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`). +//! The code in this module implements the way approval voting works with +//! its data in the database. Any breaking changes here will still +//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`). use parity_scale_codec::{Decode, Encode}; use polkadot_node_primitives::approval::{AssignmentCert, DelayTranche}; @@ -154,8 +160,6 @@ pub type Bitfield = BitVec; pub struct Config { /// The column family in the database where data is stored. pub col_approval_data: u32, - /// The column of the database where rolling session window data is stored. - pub col_session_data: u32, } /// Details pertaining to our assignment on a block. diff --git a/node/core/approval-voting/src/approval_db/v1/tests.rs b/node/core/approval-voting/src/approval_db/v1/tests.rs index 0d30cc8c0cdc..07d8242b772e 100644 --- a/node/core/approval-voting/src/approval_db/v1/tests.rs +++ b/node/core/approval-voting/src/approval_db/v1/tests.rs @@ -28,12 +28,10 @@ use std::{collections::HashMap, sync::Arc}; use ::test_helpers::{dummy_candidate_receipt, dummy_candidate_receipt_bad_sig, dummy_hash}; const DATA_COL: u32 = 0; -const SESSION_DATA_COL: u32 = 1; -const NUM_COLUMNS: u32 = 2; +const NUM_COLUMNS: u32 = 1; -const TEST_CONFIG: Config = - Config { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; +const TEST_CONFIG: Config = Config { col_approval_data: DATA_COL }; fn make_db() -> (DbBackend, Arc) { let db = kvdb_memorydb::create(NUM_COLUMNS); diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index 1ea2687a0246..e33caed49c5f 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -609,12 +609,10 @@ pub(crate) mod tests { use crate::{approval_db::v1::Config as DatabaseConfig, criteria, BlockEntry}; const DATA_COL: u32 = 0; - const SESSION_DATA_COL: u32 = 1; - const NUM_COLUMNS: u32 = 2; + const NUM_COLUMNS: u32 = 1; - const TEST_CONFIG: DatabaseConfig = - DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; + const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL }; #[derive(Default)] struct MockClock; diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 18b8746ca317..f5e888c7c538 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -116,8 +116,6 @@ const LOG_TARGET: &str = "parachain::approval-voting"; pub struct Config { /// The column family in the DB where approval-voting data is stored. pub col_approval_data: u32, - /// The of the DB where rolling session info is stored. - pub col_session_data: u32, /// The slot duration of the consensus algorithm, in milliseconds. Should be evenly /// divisible by 500. pub slot_duration_millis: u64, @@ -357,10 +355,7 @@ impl ApprovalVotingSubsystem { keystore, slot_duration_millis: config.slot_duration_millis, db, - db_config: DatabaseConfig { - col_approval_data: config.col_approval_data, - col_session_data: config.col_session_data, - }, + db_config: DatabaseConfig { col_approval_data: config.col_approval_data }, mode: Mode::Syncing(sync_oracle), metrics, } @@ -369,10 +364,8 @@ impl ApprovalVotingSubsystem { /// Revert to the block corresponding to the specified `hash`. /// The operation is not allowed for blocks older than the last finalized one. pub fn revert_to(&self, hash: Hash) -> Result<(), SubsystemError> { - let config = approval_db::v1::Config { - col_approval_data: self.db_config.col_approval_data, - col_session_data: self.db_config.col_session_data, - }; + let config = + approval_db::v1::Config { col_approval_data: self.db_config.col_approval_data }; let mut backend = approval_db::v1::DbBackend::new(self.db.clone(), config); let mut overlay = OverlayedBackend::new(&backend); diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index d7e19a8c09f3..f58e60c6a487 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -14,8 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::tests::test_constants::TEST_CONFIG; - use super::*; use polkadot_node_primitives::{ approval::{ @@ -115,12 +113,10 @@ fn make_sync_oracle(val: bool) -> (Box, TestSyncOracleHan pub mod test_constants { use crate::approval_db::v1::Config as DatabaseConfig; const DATA_COL: u32 = 0; - const SESSION_DATA_COL: u32 = 1; - pub(crate) const NUM_COLUMNS: u32 = 2; + pub(crate) const NUM_COLUMNS: u32 = 1; - pub(crate) const TEST_CONFIG: DatabaseConfig = - DatabaseConfig { col_approval_data: DATA_COL, col_session_data: SESSION_DATA_COL }; + pub(crate) const TEST_CONFIG: DatabaseConfig = DatabaseConfig { col_approval_data: DATA_COL }; } struct MockSupportsParachains; @@ -493,7 +489,6 @@ fn test_harness>( Config { col_approval_data: test_constants::TEST_CONFIG.col_approval_data, slot_duration_millis: SLOT_DURATION_MILLIS, - col_session_data: TEST_CONFIG.col_session_data, }, Arc::new(db), Arc::new(keystore), diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index aa67781ddd25..2d14f5151003 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -15,6 +15,12 @@ // along with Polkadot. If not, see . //! `V1` database for the dispute coordinator. +//! +//! Note that the version here differs from the actual version of the parachains +//! database (check `CURRENT_VERSION` in `node/service/src/parachains_db/upgrade.rs`). +//! The code in this module implements the way dispute coordinator works with +//! the dispute data in the database. Any breaking changes here will still +//! require a db migration (check `node/service/src/parachains_db/upgrade.rs`). use polkadot_node_primitives::DisputeStatus; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; @@ -206,8 +212,6 @@ fn candidate_votes_session_prefix(session: SessionIndex) -> [u8; 15 + 4] { pub struct ColumnConfiguration { /// The column in the key-value DB where data is stored. pub col_dispute_data: u32, - /// The column in the key-value DB where session data is stored. - pub col_session_data: u32, } /// Tracked votes on candidates, for the purposes of dispute resolution. @@ -378,7 +382,7 @@ mod tests { let db = kvdb_memorydb::create(1); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]); let store = Arc::new(db); - let config = ColumnConfiguration { col_dispute_data: 0, col_session_data: 1 }; + let config = ColumnConfiguration { col_dispute_data: 0 }; DbBackend::new(store, config, Metrics::default()) } diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 81134a43a3a0..1b90a9d865e1 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -305,13 +305,12 @@ impl Initialized { Ok(session_idx) if self.gaps_in_cache || session_idx > self.highest_session_seen => { - // If error has occurred during last session caching - fetch the whole window - // Otherwise - cache only the new sessions - let lower_bound = if self.gaps_in_cache { - session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) - } else { - self.highest_session_seen + 1 - }; + // Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps in + // cache and we are not missing too many `SessionInfo`s + let mut lower_bound = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1); + if !self.gaps_in_cache && self.highest_session_seen > lower_bound { + lower_bound = self.highest_session_seen + 1 + } // There is a new session. Perform a dummy fetch to cache it. for idx in lower_bound..=session_idx { diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 7379b392f312..02bb6ef9ecda 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -127,16 +127,11 @@ pub struct DisputeCoordinatorSubsystem { pub struct Config { /// The data column in the store to use for dispute data. pub col_dispute_data: u32, - /// The data column in the store to use for session data. - pub col_session_data: u32, } impl Config { fn column_config(&self) -> db::v1::ColumnConfiguration { - db::v1::ColumnConfiguration { - col_dispute_data: self.col_dispute_data, - col_session_data: self.col_session_data, - } + db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data } } } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 7d3b87f3c228..ceeac351e8b8 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -33,6 +33,7 @@ use polkadot_node_subsystem_util::database::Database; use polkadot_node_primitives::{ DisputeMessage, DisputeStatus, SignedDisputeStatement, SignedFullStatement, Statement, + DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::{ @@ -214,9 +215,9 @@ impl Default for TestState { make_keystore(vec![Sr25519Keyring::Alice.to_seed()].into_iter()).into(); let db = kvdb_memorydb::create(1); - let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); + let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[0]); let db = Arc::new(db); - let config = Config { col_dispute_data: 0, col_session_data: 1 }; + let config = Config { col_dispute_data: 0 }; let genesis_header = Header { parent_hash: Hash::zero(), @@ -330,9 +331,11 @@ impl TestState { assert_eq!(h, block_hash); let _ = tx.send(Ok(session)); + let first_expected_session = session.saturating_sub(DISPUTE_WINDOW.get() - 1); + // Queries for session caching - see `handle_startup` if self.known_session.is_none() { - for i in 0..=session { + for i in first_expected_session..=session { assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( @@ -3393,3 +3396,174 @@ fn informs_chain_selection_when_dispute_concluded_against() { }) }); } + +// On startup `SessionInfo` cache should be populated +#[test] +fn session_info_caching_on_startup_works() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + test_state + }) + }); +} + +// Underflow means that no more than `DISPUTE_WINDOW` sessions should be fetched on startup +#[test] +fn session_info_caching_doesnt_underflow() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = DISPUTE_WINDOW.get() + 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + test_state + }) + }); +} + +// Cached `SessionInfo` shouldn't be re-requested from the runtime +#[test] +fn session_info_is_requested_only_once() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session).await; + + // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session, + 3, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + // This leaf activation should fetch `SessionInfo` because the session is new + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session + 1, + 4, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(session_index, 2); + let _ = tx.send(Ok(Some(test_state.session_info()))); + } + ); + test_state + }) + }); +} + +// Big jump means the new session we see with a leaf update is at least a `DISPUTE_WINDOW` bigger than +// the already known one. In this case The whole `DISPUTE_WINDOW` should be fetched. +#[test] +fn session_info_big_jump_works() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session_on_startup = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await; + + // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session_on_startup, + 3, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() + 10; + // This leaf activation should cache all missing `SessionInfo`s + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session_after_jump, + 4, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + let first_expected_session = + session_after_jump.saturating_sub(DISPUTE_WINDOW.get() - 1); + for expected_idx in first_expected_session..=session_after_jump { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(session_index, expected_idx); + let _ = tx.send(Ok(Some(test_state.session_info()))); + } + ); + } + test_state + }) + }); +} + +// Small jump means the new session we see with a leaf update is at less than last known one + `DISPUTE_WINDOW`. In this +// case fetching should start from last known one + 1. +#[test] +fn session_info_small_jump_works() { + test_harness(|mut test_state, mut virtual_overseer| { + Box::pin(async move { + let session_on_startup = 1; + + test_state.handle_resume_sync(&mut virtual_overseer, session_on_startup).await; + + // This leaf activation shouldn't fetch `SessionInfo` because the session is already cached + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session_on_startup, + 3, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + let session_after_jump = session_on_startup + DISPUTE_WINDOW.get() - 1; + // This leaf activation should cache all missing `SessionInfo`s + test_state + .activate_leaf_at_session( + &mut virtual_overseer, + session_after_jump, + 4, + vec![make_candidate_included_event(make_valid_candidate_receipt())], + ) + .await; + + let first_expected_session = session_on_startup + 1; + for expected_idx in first_expected_session..=session_after_jump { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(session_index, tx), + )) => { + assert_eq!(session_index, expected_idx); + let _ = tx.send(Ok(Some(test_state.session_info()))); + } + ); + } + test_state + }) + }); +} diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index e55f2456160b..c29933732388 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -896,7 +896,6 @@ where let approval_voting_config = ApprovalVotingConfig { col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, - col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, slot_duration_millis: slot_duration.as_millis() as u64, }; @@ -920,7 +919,6 @@ where let dispute_coordinator_config = DisputeCoordinatorConfig { col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, - col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, }; let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { @@ -1512,7 +1510,6 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> { let config = approval_voting_subsystem::Config { col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, - col_session_data: parachains_db::REAL_COLUMNS.col_session_window_data, slot_duration_millis: Default::default(), }; diff --git a/node/service/src/parachains_db/mod.rs b/node/service/src/parachains_db/mod.rs index 918aecd25e76..519afbe0ccd1 100644 --- a/node/service/src/parachains_db/mod.rs +++ b/node/service/src/parachains_db/mod.rs @@ -36,12 +36,18 @@ pub(crate) mod columns { pub mod v2 { pub const NUM_COLUMNS: u32 = 6; + + #[cfg(test)] + pub const COL_SESSION_WINDOW_DATA: u32 = 5; + } + + pub mod v3 { + pub const NUM_COLUMNS: u32 = 5; pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_META: u32 = 1; pub const COL_APPROVAL_DATA: u32 = 2; pub const COL_CHAIN_SELECTION_DATA: u32 = 3; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; - pub const COL_SESSION_WINDOW_DATA: u32 = 5; pub const ORDERED_COL: &[u32] = &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; @@ -62,19 +68,16 @@ pub struct ColumnsConfig { pub col_chain_selection_data: u32, /// The column used by dispute coordinator for data. pub col_dispute_coordinator_data: u32, - /// The column used for session window data. - pub col_session_window_data: u32, } /// The real columns used by the parachains DB. #[cfg(any(test, feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { - col_availability_data: columns::v2::COL_AVAILABILITY_DATA, - col_availability_meta: columns::v2::COL_AVAILABILITY_META, - col_approval_data: columns::v2::COL_APPROVAL_DATA, - col_chain_selection_data: columns::v2::COL_CHAIN_SELECTION_DATA, - col_dispute_coordinator_data: columns::v2::COL_DISPUTE_COORDINATOR_DATA, - col_session_window_data: columns::v2::COL_SESSION_WINDOW_DATA, + col_availability_data: columns::v3::COL_AVAILABILITY_DATA, + col_availability_meta: columns::v3::COL_AVAILABILITY_META, + col_approval_data: columns::v3::COL_APPROVAL_DATA, + col_chain_selection_data: columns::v3::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::v3::COL_DISPUTE_COORDINATOR_DATA, }; #[derive(PartialEq)] @@ -122,20 +125,17 @@ pub fn open_creating_rocksdb( let path = root.join("parachains").join("db"); - let mut db_config = DatabaseConfig::with_columns(columns::v2::NUM_COLUMNS); + let mut db_config = DatabaseConfig::with_columns(columns::v3::NUM_COLUMNS); let _ = db_config .memory_budget - .insert(columns::v2::COL_AVAILABILITY_DATA, cache_sizes.availability_data); - let _ = db_config - .memory_budget - .insert(columns::v2::COL_AVAILABILITY_META, cache_sizes.availability_meta); + .insert(columns::v3::COL_AVAILABILITY_DATA, cache_sizes.availability_data); let _ = db_config .memory_budget - .insert(columns::v2::COL_APPROVAL_DATA, cache_sizes.approval_data); + .insert(columns::v3::COL_AVAILABILITY_META, cache_sizes.availability_meta); let _ = db_config .memory_budget - .insert(columns::v2::COL_SESSION_WINDOW_DATA, cache_sizes.session_data); + .insert(columns::v3::COL_APPROVAL_DATA, cache_sizes.approval_data); let path_str = path .to_str() @@ -146,7 +146,7 @@ pub fn open_creating_rocksdb( let db = Database::open(&db_config, &path_str)?; let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new( db, - columns::v2::ORDERED_COL, + columns::v3::ORDERED_COL, ); Ok(Arc::new(db)) @@ -166,12 +166,12 @@ pub fn open_creating_paritydb( std::fs::create_dir_all(&path_str)?; upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB)?; - let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_2_config(&path)) + let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_3_config(&path)) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( db, - columns::v2::ORDERED_COL, + columns::v3::ORDERED_COL, ); Ok(Arc::new(db)) } diff --git a/node/service/src/parachains_db/upgrade.rs b/node/service/src/parachains_db/upgrade.rs index c52bd21c0573..6041a093ef9b 100644 --- a/node/service/src/parachains_db/upgrade.rs +++ b/node/service/src/parachains_db/upgrade.rs @@ -28,7 +28,7 @@ type Version = u32; const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Current db version. -const CURRENT_VERSION: Version = 2; +const CURRENT_VERSION: Version = 3; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -58,6 +58,8 @@ pub(crate) fn try_upgrade_db(db_path: &Path, db_kind: DatabaseKind) -> Result<() Some(0) => migrate_from_version_0_to_1(db_path, db_kind)?, // 1 -> 2 migration Some(1) => migrate_from_version_1_to_2(db_path, db_kind)?, + // 2 -> 3 migration + Some(2) => migrate_from_version_2_to_3(db_path, db_kind)?, // Already at current version, do nothing. Some(CURRENT_VERSION) => (), // This is an arbitrary future version, we don't handle it. @@ -127,6 +129,18 @@ fn migrate_from_version_1_to_2(path: &Path, db_kind: DatabaseKind) -> Result<(), }) } +fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result<(), Error> { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 2 to version 3 ..."); + match db_kind { + DatabaseKind::ParityDB => paritydb_migrate_from_version_2_to_3(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_2_to_3(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { @@ -160,6 +174,20 @@ fn rocksdb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> { Ok(()) } +fn rocksdb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path + .to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS); + let mut db = Database::open(&db_cfg, db_path)?; + + db.remove_last_column()?; + + Ok(()) +} + // This currently clears columns which had their configs altered between versions. // The columns to be changed are constrained by the `allowed_columns` vector. fn paritydb_fix_columns( @@ -221,7 +249,7 @@ fn paritydb_fix_columns( pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - for i in columns::v2::ORDERED_COL { + for i in columns::v3::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -232,7 +260,18 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8); - for i in columns::v2::ORDERED_COL { + for i in columns::v3::ORDERED_COL { + options.columns[*i as usize].btree_index = true; + } + + options +} + +/// Database configuration for version 3. +pub(crate) fn paritydb_version_3_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v3::NUM_COLUMNS as u8); + for i in columns::v3::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -244,8 +283,8 @@ pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options { pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - options.columns[super::columns::v2::COL_AVAILABILITY_META as usize].btree_index = true; - options.columns[super::columns::v2::COL_CHAIN_SELECTION_DATA as usize].btree_index = true; + options.columns[super::columns::v3::COL_AVAILABILITY_META as usize].btree_index = true; + options.columns[super::columns::v3::COL_CHAIN_SELECTION_DATA as usize].btree_index = true; options } @@ -260,7 +299,7 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { paritydb_fix_columns( path, paritydb_version_1_config(path), - vec![super::columns::v2::COL_DISPUTE_COORDINATOR_DATA], + vec![super::columns::v3::COL_DISPUTE_COORDINATOR_DATA], )?; Ok(()) @@ -278,9 +317,20 @@ fn paritydb_migrate_from_version_1_to_2(path: &Path) -> Result<(), Error> { Ok(()) } +/// Migration from version 2 to version 3: +/// - drop the column used by `RollingSessionWindow` +fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result<(), Error> { + parity_db::Db::drop_last_column(&mut paritydb_version_2_config(path)) + .map_err(|e| other_io_error(format!("Error removing COL_SESSION_WINDOW_DATA {:?}", e)))?; + Ok(()) +} + #[cfg(test)] mod tests { - use super::{columns::v2::*, *}; + use super::{ + columns::{v2::COL_SESSION_WINDOW_DATA, v3::*}, + *, + }; #[test] fn test_paritydb_migrate_0_to_1() { @@ -375,7 +425,7 @@ mod tests { // We need to properly set db version for upgrade to work. fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version"); { - let db = DbAdapter::new(db, columns::v2::ORDERED_COL); + let db = DbAdapter::new(db, columns::v3::ORDERED_COL); db.write(DBTransaction { ops: vec![DBOp::Insert { col: COL_DISPUTE_COORDINATOR_DATA, @@ -393,7 +443,7 @@ mod tests { assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS); - let db = DbAdapter::new(db, columns::v2::ORDERED_COL); + let db = DbAdapter::new(db, columns::v3::ORDERED_COL); assert_eq!( db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(), @@ -416,4 +466,59 @@ mod tests { Some("0xdeadb00b".as_bytes().to_vec()) ); } + + #[test] + fn test_paritydb_migrate_2_to_3() { + use parity_db::Db; + + let db_dir = tempfile::tempdir().unwrap(); + let path = db_dir.path(); + let test_key = b"1337"; + + // We need to properly set db version for upgrade to work. + fs::write(version_file_path(path), "2").expect("Failed to write DB version"); + + { + let db = Db::open_or_create(&paritydb_version_2_config(&path)).unwrap(); + + // Write some dummy data + db.commit(vec![( + COL_SESSION_WINDOW_DATA as u8, + test_key.to_vec(), + Some(b"0xdeadb00b".to_vec()), + )]) + .unwrap(); + + assert_eq!(db.num_columns(), columns::v2::NUM_COLUMNS as u8); + } + + try_upgrade_db(&path, DatabaseKind::ParityDB).unwrap(); + + let db = Db::open(&paritydb_version_3_config(&path)).unwrap(); + + assert_eq!(db.num_columns(), columns::v3::NUM_COLUMNS as u8); + } + + #[test] + fn test_rocksdb_migrate_2_to_3() { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_dir = tempfile::tempdir().unwrap(); + let db_path = db_dir.path().to_str().unwrap(); + let db_cfg = DatabaseConfig::with_columns(super::columns::v2::NUM_COLUMNS); + { + let db = Database::open(&db_cfg, db_path).unwrap(); + assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS as u32); + } + + // We need to properly set db version for upgrade to work. + fs::write(version_file_path(db_dir.path()), "2").expect("Failed to write DB version"); + + try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB).unwrap(); + + let db_cfg = DatabaseConfig::with_columns(super::columns::v3::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path).unwrap(); + + assert_eq!(db.num_columns(), super::columns::v3::NUM_COLUMNS); + } } diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index 6c16cf396c40..1444bc0a2bf1 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -65,8 +65,6 @@ pub mod reexports { pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext}; } -/// A rolling session window cache. -pub mod rolling_session_window; /// Convenient and efficient runtime info access. pub mod runtime; diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs deleted file mode 100644 index 18364491849a..000000000000 --- a/node/subsystem-util/src/rolling_session_window.rs +++ /dev/null @@ -1,1532 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! A rolling window of sessions and cached session info, updated by the state of newly imported blocks. -//! -//! This is useful for consensus components which need to stay up-to-date about recent sessions but don't -//! care about the state of particular blocks. - -use super::database::{DBTransaction, Database}; -use kvdb::{DBKey, DBOp}; - -use parity_scale_codec::{Decode, Encode}; -pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize}; -use polkadot_primitives::{BlockNumber, Hash, SessionIndex, SessionInfo}; -use std::sync::Arc; - -use futures::channel::oneshot; -use polkadot_node_subsystem::{ - errors::{ChainApiError, RuntimeApiError}, - messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest}, - overseer, -}; - -// The window size is equal to the `approval-voting` and `dispute-coordinator` constants that -// have been obsoleted. -const SESSION_WINDOW_SIZE: SessionWindowSize = new_session_window_size!(6); -const LOG_TARGET: &str = "parachain::rolling-session-window"; -const STORED_ROLLING_SESSION_WINDOW: &[u8] = b"Rolling_session_window"; - -/// Sessions unavailable in state to cache. -#[derive(Debug, Clone, thiserror::Error)] -pub enum SessionsUnavailableReason { - /// Runtime API subsystem was unavailable. - #[error(transparent)] - RuntimeApiUnavailable(#[from] oneshot::Canceled), - /// The runtime API itself returned an error. - #[error(transparent)] - RuntimeApi(#[from] RuntimeApiError), - /// The chain API itself returned an error. - #[error(transparent)] - ChainApi(#[from] ChainApiError), - /// Missing session info from runtime API for given `SessionIndex`. - #[error("Missing session index {0:?}")] - Missing(SessionIndex), - /// Missing last finalized block number. - #[error("Missing last finalized block number")] - MissingLastFinalizedBlock, - /// Missing last finalized block hash. - #[error("Missing last finalized block hash")] - MissingLastFinalizedBlockHash(BlockNumber), -} - -/// Information about the sessions being fetched. -#[derive(Debug, Clone)] -pub struct SessionsUnavailableInfo { - /// The desired window start. - pub window_start: SessionIndex, - /// The desired window end. - pub window_end: SessionIndex, - /// The block hash whose state the sessions were meant to be drawn from. - pub block_hash: Hash, -} - -/// Sessions were unavailable to fetch from the state for some reason. -#[derive(Debug, thiserror::Error, Clone)] -#[error("Sessions unavailable: {kind:?}, info: {info:?}")] -pub struct SessionsUnavailable { - /// The error kind. - #[source] - kind: SessionsUnavailableReason, - /// The info about the session window, if any. - info: Option, -} - -/// An indicated update of the rolling session window. -#[derive(Debug, PartialEq, Clone)] -pub enum SessionWindowUpdate { - /// The session window was just advanced from one range to a new one. - Advanced { - /// The previous start of the window (inclusive). - prev_window_start: SessionIndex, - /// The previous end of the window (inclusive). - prev_window_end: SessionIndex, - /// The new start of the window (inclusive). - new_window_start: SessionIndex, - /// The new end of the window (inclusive). - new_window_end: SessionIndex, - }, - /// The session window was unchanged. - Unchanged, -} - -/// A structure to store rolling session database parameters. -#[derive(Clone)] -pub struct DatabaseParams { - /// Database reference. - pub db: Arc, - /// The column which stores the rolling session info. - pub db_column: u32, -} -/// A rolling window of sessions and cached session info. -pub struct RollingSessionWindow { - earliest_session: SessionIndex, - session_info: Vec, - window_size: SessionWindowSize, - // The option is just to enable some approval-voting tests to force feed sessions - // in the window without dealing with the DB. - db_params: Option, -} - -/// The rolling session data we persist in the database. -#[derive(Encode, Decode, Default)] -struct StoredWindow { - earliest_session: SessionIndex, - session_info: Vec, -} - -impl RollingSessionWindow { - /// Initialize a new session info cache with the given window size. - /// Invariant: The database always contains the earliest session. Then, - /// we can always extend the session info vector using chain state. - pub async fn new( - mut sender: Sender, - block_hash: Hash, - db_params: DatabaseParams, - ) -> Result - where - Sender: overseer::SubsystemSender - + overseer::SubsystemSender, - { - // At first, determine session window start using the chain state. - let session_index = get_session_index_for_child(&mut sender, block_hash).await?; - let earliest_non_finalized_block_session = - Self::earliest_non_finalized_block_session(&mut sender).await?; - - // This will increase the session window to cover the full unfinalized chain. - let on_chain_window_start = std::cmp::min( - session_index.saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - earliest_non_finalized_block_session, - ); - - // Fetch session information from DB. - let maybe_stored_window = Self::db_load(db_params.clone()); - - // Get the DB stored sessions and recompute window start based on DB data. - let (mut window_start, stored_sessions) = - if let Some(mut stored_window) = maybe_stored_window { - // Check if DB is ancient. - if earliest_non_finalized_block_session > - stored_window.earliest_session + stored_window.session_info.len() as u32 - { - // If ancient, we scrap it and fetch from chain state. - stored_window.session_info.clear(); - } - - // The session window might extend beyond the last finalized block, but that's fine as we'll prune it at - // next update. - let window_start = if stored_window.session_info.len() > 0 { - // If there is at least one entry in db, we always take the DB as source of truth. - stored_window.earliest_session - } else { - on_chain_window_start - }; - - (window_start, stored_window.session_info) - } else { - (on_chain_window_start, Vec::new()) - }; - - // Compute the amount of sessions missing from the window that will be fetched from chain state. - let sessions_missing_count = session_index - .saturating_sub(window_start) - .saturating_add(1) - .saturating_sub(stored_sessions.len() as u32); - - // Extend from chain state. - let sessions = if sessions_missing_count > 0 { - match extend_sessions_from_chain_state( - stored_sessions, - &mut sender, - block_hash, - &mut window_start, - session_index, - ) - .await - { - Err(kind) => Err(SessionsUnavailable { - kind, - info: Some(SessionsUnavailableInfo { - window_start, - window_end: session_index, - block_hash, - }), - }), - Ok(sessions) => Ok(sessions), - }? - } else { - // There are no new sessions to be fetched from chain state. - stored_sessions - }; - - Ok(Self { - earliest_session: window_start, - session_info: sessions, - window_size: SESSION_WINDOW_SIZE, - db_params: Some(db_params), - }) - } - - // Load session information from the parachains db. - fn db_load(db_params: DatabaseParams) -> Option { - match db_params.db.get(db_params.db_column, STORED_ROLLING_SESSION_WINDOW).ok()? { - None => None, - Some(raw) => { - let maybe_decoded = StoredWindow::decode(&mut &raw[..]).map(Some); - match maybe_decoded { - Ok(decoded) => decoded, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - ?err, - "Failed decoding db entry; will start with onchain session infos and self-heal DB entry on next update." - ); - None - }, - } - }, - } - } - - // Saves/Updates all sessions in the database. - // TODO: https://github.com/paritytech/polkadot/issues/6144 - fn db_save(&mut self, stored_window: StoredWindow) { - if let Some(db_params) = self.db_params.as_ref() { - match db_params.db.write(DBTransaction { - ops: vec![DBOp::Insert { - col: db_params.db_column, - key: DBKey::from_slice(STORED_ROLLING_SESSION_WINDOW), - value: stored_window.encode(), - }], - }) { - Ok(_) => {}, - Err(err) => { - gum::warn!(target: LOG_TARGET, ?err, "Failed writing db entry"); - }, - } - } - } - - /// Initialize a new session info cache with the given window size and - /// initial data. - /// This is only used in `approval voting` tests. - pub fn with_session_info( - earliest_session: SessionIndex, - session_info: Vec, - ) -> Self { - RollingSessionWindow { - earliest_session, - session_info, - window_size: SESSION_WINDOW_SIZE, - db_params: None, - } - } - - /// Access the session info for the given session index, if stored within the window. - pub fn session_info(&self, index: SessionIndex) -> Option<&SessionInfo> { - if index < self.earliest_session { - None - } else { - self.session_info.get((index - self.earliest_session) as usize) - } - } - - /// Access the index of the earliest session. - pub fn earliest_session(&self) -> SessionIndex { - self.earliest_session - } - - /// Access the index of the latest session. - pub fn latest_session(&self) -> SessionIndex { - self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1) - } - - /// Returns `true` if `session_index` is contained in the window. - pub fn contains(&self, session_index: SessionIndex) -> bool { - session_index >= self.earliest_session() && session_index <= self.latest_session() - } - - async fn earliest_non_finalized_block_session( - sender: &mut Sender, - ) -> Result - where - Sender: overseer::SubsystemSender - + overseer::SubsystemSender, - { - let last_finalized_height = { - let (tx, rx) = oneshot::channel(); - sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; - match rx.await { - Ok(Ok(number)) => number, - Ok(Err(e)) => - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::ChainApi(e), - info: None, - }), - Err(err) => { - gum::warn!( - target: LOG_TARGET, - ?err, - "Failed fetching last finalized block number" - ); - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::MissingLastFinalizedBlock, - info: None, - }) - }, - } - }; - - let (tx, rx) = oneshot::channel(); - // We want to get the session index for the child of the last finalized block. - sender - .send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx)) - .await; - let last_finalized_hash_parent = match rx.await { - Ok(Ok(maybe_hash)) => maybe_hash, - Ok(Err(e)) => - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::ChainApi(e), - info: None, - }), - Err(err) => { - gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash"); - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash( - last_finalized_height, - ), - info: None, - }) - }, - }; - - // Get the session in which the last finalized block was authored. - if let Some(last_finalized_hash_parent) = last_finalized_hash_parent { - let session = - match get_session_index_for_child(sender, last_finalized_hash_parent).await { - Ok(session_index) => session_index, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - ?err, - ?last_finalized_hash_parent, - "Failed fetching session index" - ); - return Err(err) - }, - }; - - Ok(session) - } else { - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash( - last_finalized_height, - ), - info: None, - }) - } - } - - /// When inspecting a new import notification, updates the session info cache to match - /// the session of the imported block's child. - /// - /// this only needs to be called on heads where we are directly notified about import, as sessions do - /// not change often and import notifications are expected to be typically increasing in session number. - /// - /// some backwards drift in session index is acceptable. - pub async fn cache_session_info_for_head( - &mut self, - sender: &mut Sender, - block_hash: Hash, - ) -> Result - where - Sender: overseer::SubsystemSender - + overseer::SubsystemSender, - { - let session_index = get_session_index_for_child(sender, block_hash).await?; - let latest = self.latest_session(); - - // Either cached or ancient. - if session_index <= latest { - return Ok(SessionWindowUpdate::Unchanged) - } - - let earliest_non_finalized_block_session = - Self::earliest_non_finalized_block_session(sender).await?; - - let old_window_start = self.earliest_session; - let old_window_end = latest; - - // Ensure we keep sessions up to last finalized block by adjusting the window start. - // This will increase the session window to cover the full unfinalized chain. - let window_start = std::cmp::min( - session_index.saturating_sub(self.window_size.get() - 1), - earliest_non_finalized_block_session, - ); - - // Never look back past earliest session, since if sessions beyond were not needed or available - // in the past remains valid for the future (window only advances forward). - let mut window_start = std::cmp::max(window_start, self.earliest_session); - - let mut sessions = self.session_info.clone(); - let sessions_out_of_window = window_start.saturating_sub(old_window_start) as usize; - - let sessions = if sessions_out_of_window < sessions.len() { - // Drop sessions based on how much the window advanced. - sessions.split_off((window_start as usize).saturating_sub(old_window_start as usize)) - } else { - // Window has jumped such that we need to fetch all sessions from on chain. - Vec::new() - }; - - match extend_sessions_from_chain_state( - sessions, - sender, - block_hash, - &mut window_start, - session_index, - ) - .await - { - Err(kind) => Err(SessionsUnavailable { - kind, - info: Some(SessionsUnavailableInfo { - window_start, - window_end: session_index, - block_hash, - }), - }), - Ok(s) => { - let update = SessionWindowUpdate::Advanced { - prev_window_start: old_window_start, - prev_window_end: old_window_end, - new_window_start: window_start, - new_window_end: session_index, - }; - - self.session_info = s; - - // we need to account for this case: - // window_start ................................... session_index - // old_window_start ........... latest - let new_earliest = std::cmp::max(window_start, old_window_start); - self.earliest_session = new_earliest; - - // Update current window in DB. - self.db_save(StoredWindow { - earliest_session: self.earliest_session, - session_info: self.session_info.clone(), - }); - Ok(update) - }, - } - } -} - -// Returns the session index expected at any child of the `parent` block. -// -// Note: We could use `RuntimeInfo::get_session_index_for_child` here but it's -// cleaner to just call the runtime API directly without needing to create an instance -// of `RuntimeInfo`. -async fn get_session_index_for_child( - sender: &mut impl overseer::SubsystemSender, - block_hash: Hash, -) -> Result { - let (s_tx, s_rx) = oneshot::channel(); - - // We're requesting session index of a child to populate the cache in advance. - sender - .send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) - .await; - - match s_rx.await { - Ok(Ok(s)) => Ok(s), - Ok(Err(e)) => - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::RuntimeApi(e), - info: None, - }), - Err(e) => - return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::RuntimeApiUnavailable(e), - info: None, - }), - } -} - -/// Attempts to extend db stored sessions with sessions missing between `start` and up to `end_inclusive`. -/// Runtime session info fetching errors are ignored if that doesn't create a gap in the window. -async fn extend_sessions_from_chain_state( - stored_sessions: Vec, - sender: &mut impl overseer::SubsystemSender, - block_hash: Hash, - window_start: &mut SessionIndex, - end_inclusive: SessionIndex, -) -> Result, SessionsUnavailableReason> { - // Start from the db sessions. - let mut sessions = stored_sessions; - // We allow session fetch failures only if we won't create a gap in the window by doing so. - // If `allow_failure` is set to true here, fetching errors are ignored until we get a first session. - let mut allow_failure = sessions.is_empty(); - - let start = *window_start + sessions.len() as u32; - - for i in start..=end_inclusive { - let (tx, rx) = oneshot::channel(); - sender - .send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::SessionInfo(i, tx), - )) - .await; - - match rx.await { - Ok(Ok(Some(session_info))) => { - // We do not allow failure anymore after having at least 1 session in window. - allow_failure = false; - sessions.push(session_info); - }, - Ok(Ok(None)) if !allow_failure => return Err(SessionsUnavailableReason::Missing(i)), - Ok(Ok(None)) => { - // Handle `allow_failure` true. - // If we didn't get the session, we advance window start. - *window_start += 1; - gum::debug!( - target: LOG_TARGET, - session = ?i, - "Session info missing from runtime." - ); - }, - Ok(Err(e)) if !allow_failure => return Err(SessionsUnavailableReason::RuntimeApi(e)), - Err(canceled) if !allow_failure => - return Err(SessionsUnavailableReason::RuntimeApiUnavailable(canceled)), - Ok(Err(err)) => { - // Handle `allow_failure` true. - // If we didn't get the session, we advance window start. - *window_start += 1; - gum::debug!( - target: LOG_TARGET, - session = ?i, - ?err, - "Error while fetching session information." - ); - }, - Err(err) => { - // Handle `allow_failure` true. - // If we didn't get the session, we advance window start. - *window_start += 1; - gum::debug!( - target: LOG_TARGET, - session = ?i, - ?err, - "Channel error while fetching session information." - ); - }, - }; - } - - Ok(sessions) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::database::kvdb_impl::DbAdapter; - use assert_matches::assert_matches; - use polkadot_node_subsystem::{ - messages::{AllMessages, AvailabilityRecoveryMessage}, - SubsystemContext, - }; - use polkadot_node_subsystem_test_helpers::make_subsystem_context; - use polkadot_primitives::Header; - use sp_core::testing::TaskExecutor; - - const SESSION_DATA_COL: u32 = 0; - - const NUM_COLUMNS: u32 = 1; - - fn dummy_db_params() -> DatabaseParams { - let db = kvdb_memorydb::create(NUM_COLUMNS); - let db = DbAdapter::new(db, &[]); - let db: Arc = Arc::new(db); - DatabaseParams { db, db_column: SESSION_DATA_COL } - } - - fn dummy_session_info(index: SessionIndex) -> SessionInfo { - SessionInfo { - validators: Default::default(), - discovery_keys: Vec::new(), - assignment_keys: Vec::new(), - validator_groups: Default::default(), - n_cores: index as _, - zeroth_delay_tranche_width: index as _, - relay_vrf_modulo_samples: index as _, - n_delay_tranches: index as _, - no_show_slots: index as _, - needed_approvals: index as _, - active_validator_indices: Vec::new(), - dispute_period: 6, - random_seed: [0u8; 32], - } - } - - fn cache_session_info_test( - expected_start_session: SessionIndex, - session: SessionIndex, - window: Option, - expect_requests_from: SessionIndex, - db_params: Option, - ) -> RollingSessionWindow { - let db_params = db_params.unwrap_or(dummy_db_params()); - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 5, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = - make_subsystem_context::(pool.clone()); - - let hash = header.hash(); - - let sender = ctx.sender(); - - let test_fut = { - Box::pin(async move { - let window = match window { - None => - RollingSessionWindow::new(sender.clone(), hash, db_params).await.unwrap(), - Some(mut window) => { - window.cache_session_info_for_head(sender, hash).await.unwrap(); - window - }, - }; - assert_eq!(window.earliest_session, expected_start_session); - assert_eq!( - window.session_info, - (expected_start_session..=session).map(dummy_session_info).collect::>(), - ); - - window - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header.number); - let _ = s_tx.send(Ok(Some(finalized_header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header.hash()); - let _ = s_tx.send(Ok(session)); - } - ); - - for i in expect_requests_from..=session { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - let _ = s_tx.send(Ok(Some(dummy_session_info(i)))); - } - ); - } - }); - - let (window, _) = futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - window - } - - #[test] - fn cache_session_info_start_empty_db() { - let db_params = dummy_db_params(); - - let window = cache_session_info_test( - (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 10, - None, - (10 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - Some(db_params.clone()), - ); - - let window = cache_session_info_test( - (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 11, - Some(window), - 11, - None, - ); - assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize); - - cache_session_info_test( - (11 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 12, - None, - 12, - Some(db_params), - ); - } - - #[test] - fn cache_session_info_first_early() { - cache_session_info_test(0, 1, None, 0, None); - } - - #[test] - fn cache_session_info_does_not_underflow() { - let window = RollingSessionWindow { - earliest_session: 1, - session_info: vec![dummy_session_info(1)], - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - cache_session_info_test(1, 2, Some(window), 2, None); - } - - #[test] - fn cache_session_window_contains() { - let window = RollingSessionWindow { - earliest_session: 10, - session_info: vec![dummy_session_info(1)], - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - assert!(!window.contains(0)); - assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get())); - assert!(!window.contains(11)); - assert!(!window.contains(10 + SESSION_WINDOW_SIZE.get() - 1)); - } - - #[test] - fn cache_session_info_first_late() { - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 100, - None, - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - None, - ); - } - - #[test] - fn cache_session_info_jump() { - let window = RollingSessionWindow { - earliest_session: 50, - session_info: vec![ - dummy_session_info(50), - dummy_session_info(51), - dummy_session_info(52), - ], - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 100, - Some(window), - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - None, - ); - } - - #[test] - fn cache_session_info_roll_full() { - let start = 99 - (SESSION_WINDOW_SIZE.get() - 1); - let window = RollingSessionWindow { - earliest_session: start, - session_info: (start..=99).map(dummy_session_info).collect(), - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 100, - Some(window), - 100, // should only make one request. - None, - ); - } - - #[test] - fn cache_session_info_roll_many_full_db() { - let db_params = dummy_db_params(); - let start = 97 - (SESSION_WINDOW_SIZE.get() - 1); - let window = RollingSessionWindow { - earliest_session: start, - session_info: (start..=97).map(dummy_session_info).collect(), - window_size: SESSION_WINDOW_SIZE, - db_params: Some(db_params.clone()), - }; - - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 100, - Some(window), - 98, - None, - ); - - // We expect the session to be populated from DB, and only fetch 101 from on chain. - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 101, - None, - 101, - Some(db_params.clone()), - ); - - // Session warps in the future. - let window = cache_session_info_test(195, 200, None, 195, Some(db_params)); - - assert_eq!(window.session_info.len(), SESSION_WINDOW_SIZE.get() as usize); - } - - #[test] - fn cache_session_info_roll_many_full() { - let start = 97 - (SESSION_WINDOW_SIZE.get() - 1); - let window = RollingSessionWindow { - earliest_session: start, - session_info: (start..=97).map(dummy_session_info).collect(), - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - cache_session_info_test( - (100 as SessionIndex).saturating_sub(SESSION_WINDOW_SIZE.get() - 1), - 100, - Some(window), - 98, - None, - ); - } - - #[test] - fn cache_session_info_roll_early() { - let start = 0; - let window = RollingSessionWindow { - earliest_session: start, - session_info: (0..=1).map(dummy_session_info).collect(), - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - cache_session_info_test( - 0, - 2, - Some(window), - 2, // should only make one request. - None, - ); - } - - #[test] - fn cache_session_info_roll_many_early() { - let start = 0; - let window = RollingSessionWindow { - earliest_session: start, - session_info: (0..=1).map(dummy_session_info).collect(), - window_size: SESSION_WINDOW_SIZE, - db_params: Some(dummy_db_params()), - }; - - let actual_window_size = window.session_info.len() as u32; - - cache_session_info_test(0, 3, Some(window), actual_window_size, None); - } - - #[test] - fn db_load_works() { - // Session index of the tip of our fake test chain. - let session: SessionIndex = 100; - let genesis_session: SessionIndex = 0; - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 5, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header_clone = finalized_header.clone(); - - let hash: sp_core::H256 = header.hash(); - let db_params = dummy_db_params(); - let db_params_clone = db_params.clone(); - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let test_fut = { - let sender = ctx.sender().clone(); - Box::pin(async move { - let mut rsw = - RollingSessionWindow::new(sender.clone(), hash, db_params_clone).await.unwrap(); - - let session_info = rsw.session_info.clone(); - let earliest_session = rsw.earliest_session(); - - assert_eq!(earliest_session, 0); - assert_eq!(session_info.len(), 101); - - rsw.db_save(StoredWindow { earliest_session, session_info }); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header.number); - let _ = s_tx.send(Ok(Some(finalized_header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header.hash()); - let _ = s_tx.send(Ok(0)); - } - ); - - // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. - for i in genesis_session..=session { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - let _ = s_tx.send(Ok(Some(dummy_session_info(i)))); - } - ); - } - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let test_fut = { - Box::pin(async move { - let sender = ctx.sender().clone(); - let res = RollingSessionWindow::new(sender, hash, db_params).await; - let rsw = res.unwrap(); - assert_eq!(rsw.earliest_session, 0); - assert_eq!(rsw.session_info.len(), 101); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header_clone.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header_clone.number); - let _ = s_tx.send(Ok(Some(finalized_header_clone.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header_clone.hash()); - let _ = s_tx.send(Ok(0)); - } - ); - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn cache_session_fails_for_gap_in_window() { - // Session index of the tip of our fake test chain. - let session: SessionIndex = 100; - let genesis_session: SessionIndex = 0; - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 5, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let hash = header.hash(); - - let test_fut = { - let sender = ctx.sender().clone(); - Box::pin(async move { - let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; - - assert!(res.is_err()); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header.number); - let _ = s_tx.send(Ok(Some(finalized_header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header.hash()); - let _ = s_tx.send(Ok(0)); - } - ); - - // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. - // First 50 sessions are missing. - for i in genesis_session..=50 { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - let _ = s_tx.send(Ok(None)); - } - ); - } - // next 10 sessions are present - for i in 51..=60 { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - let _ = s_tx.send(Ok(Some(dummy_session_info(i)))); - } - ); - } - // gap of 1 session - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(61, j); - let _ = s_tx.send(Ok(None)); - } - ); - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn any_session_stretch_with_failure_allowed_for_unfinalized_chain() { - // Session index of the tip of our fake test chain. - let session: SessionIndex = 100; - let genesis_session: SessionIndex = 0; - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 5, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let hash = header.hash(); - - let test_fut = { - let sender = ctx.sender().clone(); - Box::pin(async move { - let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; - assert!(res.is_ok()); - let rsw = res.unwrap(); - // Since first 50 sessions are missing the earliest should be 50. - assert_eq!(rsw.earliest_session, 50); - assert_eq!(rsw.session_info.len(), 51); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header.number); - let _ = s_tx.send(Ok(Some(finalized_header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header.hash()); - let _ = s_tx.send(Ok(0)); - } - ); - - // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. - // We also test if failure is allowed for 50 first missing sessions. - for i in genesis_session..=session { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - - let _ = s_tx.send(Ok(if i < 50 { - None - } else { - Some(dummy_session_info(i)) - })); - } - ); - } - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn any_session_unavailable_for_caching_means_no_change() { - let session: SessionIndex = 6; - let start_session = session.saturating_sub(SESSION_WINDOW_SIZE.get() - 1); - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 5, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let finalized_header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let hash = header.hash(); - - let test_fut = { - let sender = ctx.sender().clone(); - Box::pin(async move { - let res = RollingSessionWindow::new(sender, hash, dummy_db_params()).await; - assert!(res.is_err()); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(finalized_header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, finalized_header.number); - let _ = s_tx.send(Ok(Some(finalized_header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, finalized_header.hash()); - let _ = s_tx.send(Ok(session)); - } - ); - - for i in start_session..=session { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(j, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(i, j); - - let _ = s_tx.send(Ok(if i == session { - None - } else { - Some(dummy_session_info(i)) - })); - } - ); - } - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } - - #[test] - fn request_session_info_for_genesis() { - let session: SessionIndex = 0; - - let header = Header { - digest: Default::default(), - extrinsics_root: Default::default(), - number: 0, - state_root: Default::default(), - parent_hash: Default::default(), - }; - - let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); - - let hash = header.hash(); - - let test_fut = { - Box::pin(async move { - let sender = ctx.sender().clone(); - let window = - RollingSessionWindow::new(sender, hash, dummy_db_params()).await.unwrap(); - - assert_eq!(window.earliest_session, session); - assert_eq!(window.session_info, vec![dummy_session_info(session)]); - }) - }; - - let aux_fut = Box::pin(async move { - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, hash); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(header.number)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - block_number, - s_tx, - )) => { - assert_eq!(block_number, header.number); - let _ = s_tx.send(Ok(Some(header.hash()))); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, header.hash()); - let _ = s_tx.send(Ok(session)); - } - ); - - assert_matches!( - handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(s, s_tx), - )) => { - assert_eq!(h, hash); - assert_eq!(s, session); - - let _ = s_tx.send(Ok(Some(dummy_session_info(s)))); - } - ); - }); - - futures::executor::block_on(futures::future::join(test_fut, aux_fut)); - } -}