From 0be6b47140ca4169b11b37d4d58b1ad7be1cb9e0 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 22 Aug 2023 16:44:40 +1000 Subject: [PATCH 01/11] Support per slot state diffs --- beacon_node/store/src/errors.rs | 2 +- beacon_node/store/src/hdiff.rs | 58 ++++++++++++---------- beacon_node/store/src/hot_cold_store.rs | 65 ++++++++++--------------- 3 files changed, 60 insertions(+), 65 deletions(-) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 9d7834e4fc2..2a170b6f0f2 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -41,7 +41,7 @@ pub enum Error { }, MissingStateRoot(Slot), MissingState(Hash256), - MissingSnapshot(Epoch), + MissingSnapshot(Slot), MissingDiff(Epoch), NoBaseStateFound(Hash256), BlockReplayError(BlockReplayError), diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index a1421ee3bb9..7d3ff471154 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; -use types::{BeaconState, ChainSpec, Epoch, EthSpec, VList}; +use types::{BeaconState, ChainSpec, EthSpec, Slot, VList}; use zstd::{Decoder, Encoder}; #[derive(Debug)] @@ -30,7 +30,7 @@ pub struct HierarchyModuli { #[derive(Debug, PartialEq, Eq)] pub enum StorageStrategy { Nothing, - DiffFrom(Epoch), + DiffFrom(Slot), Snapshot, } @@ -198,7 +198,7 @@ impl XorDiff { impl Default for HierarchyConfig { fn default() -> Self { HierarchyConfig { - exponents: vec![0, 4, 6, 8, 11, 13, 16], + exponents: vec![5, 9, 11, 13, 16, 18, 21], } } } @@ -226,30 +226,36 @@ impl HierarchyConfig { } impl HierarchyModuli { - pub fn storage_strategy(&self, epoch: Epoch) -> Result { + pub fn storage_strategy(&self, slot: Slot) -> Result { let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; - if epoch % last == 0 { + if slot % last == 0 { return Ok(StorageStrategy::Snapshot); } - let diff_from = self.moduli.iter().rev().find_map(|&n| { - (epoch % n == 0).then(|| { - // Diff from the previous state. - (epoch - 1) / n * n - }) - }); + let diff_from = self + .moduli + .iter() + .rev() + .tuple_windows() + .find_map(|(&n_big, &n_small)| { + (slot % n_small == 0).then(|| { + eprintln!("state at slot {slot} is divis by {n_small}"); + // Diff from the previous layer. + dbg!(slot / n_big * n_big) + }) + }); Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom)) } - /// Return the smallest epoch greater than or equal to `epoch` at which a full snapshot should + /// Return the smallest slot greater than or equal to `slot` at which a full snapshot should /// be stored. - pub fn next_snapshot_epoch(&self, epoch: Epoch) -> Result { + pub fn next_snapshot_slot(&self, slot: Slot) -> Result { let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; - if epoch % last == 0 { - Ok(epoch) + if slot % last == 0 { + Ok(slot) } else { - Ok((epoch / last + 1) * last) + Ok((slot / last + 1) * last) } } } @@ -266,9 +272,9 @@ mod tests { let moduli = config.to_moduli().unwrap(); // Full snapshots at multiples of 2^16. - let snapshot_freq = Epoch::new(1 << 16); + let snapshot_freq = Slot::new(1 << 16); assert_eq!( - moduli.storage_strategy(Epoch::new(0)).unwrap(), + moduli.storage_strategy(Slot::new(0)).unwrap(), StorageStrategy::Snapshot ); assert_eq!( @@ -281,7 +287,7 @@ mod tests { ); // For the first layer of diffs - let first_layer = Epoch::new(1 << 13); + let first_layer = Slot::new(1 << 13); assert_eq!( moduli.storage_strategy(first_layer * 2).unwrap(), StorageStrategy::DiffFrom(first_layer) @@ -289,31 +295,31 @@ mod tests { } #[test] - fn next_snapshot_epoch() { + fn next_snapshot_slot() { let config = HierarchyConfig::default(); config.validate().unwrap(); let moduli = config.to_moduli().unwrap(); - let snapshot_freq = Epoch::new(1 << 16); + let snapshot_freq = Slot::new(1 << 16); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq).unwrap(), + moduli.next_snapshot_slot(snapshot_freq).unwrap(), snapshot_freq ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq + 1).unwrap(), + moduli.next_snapshot_slot(snapshot_freq + 1).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 2 - 1).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 2 - 1).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 2).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 2).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 100).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 100).unwrap(), snapshot_freq * 100 ); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7321722a746..bc68cd2d8c2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -79,7 +79,7 @@ pub struct HotColdDB, Cold: ItemStore> { #[allow(dead_code)] historic_state_cache: Mutex>>, /// Cache of hierarchical diff buffers. - diff_buffer_cache: Mutex>, + diff_buffer_cache: Mutex>, // Cache of hierarchical diffs. // FIXME(sproul): see if this is necessary /// Chain spec. @@ -113,7 +113,7 @@ pub enum HotColdDBError { MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), MissingStateDiff(Hash256), - MissingHDiff(Epoch), + MissingHDiff(Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, @@ -1407,8 +1407,8 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } - let epoch = state.current_epoch(); - match self.hierarchy.storage_strategy(epoch)? { + let slot = state.slot(); + match self.hierarchy.storage_strategy(slot)? { StorageStrategy::Nothing => { debug!( self.log, @@ -1431,6 +1431,7 @@ impl, Cold: ItemStore> HotColdDB self.log, "Storing cold state"; "strategy" => "diff", + "from_slot" => from, "slot" => state.slot(), ); self.store_cold_state_as_diff(state, from, ops)?; @@ -1462,13 +1463,10 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - pub fn load_cold_state_bytes_as_snapshot( - &self, - epoch: Epoch, - ) -> Result>, Error> { + pub fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result>, Error> { match self.cold_db.get_bytes( DBColumn::BeaconStateSnapshot.into(), - &epoch.as_u64().to_be_bytes(), + &slot.as_u64().to_be_bytes(), )? { Some(bytes) => { let mut ssz_bytes = @@ -1483,12 +1481,9 @@ impl, Cold: ItemStore> HotColdDB } } - pub fn load_cold_state_as_snapshot( - &self, - epoch: Epoch, - ) -> Result>, Error> { + pub fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result>, Error> { Ok(self - .load_cold_state_bytes_as_snapshot(epoch)? + .load_cold_state_bytes_as_snapshot(slot)? .map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec)) .transpose()?) } @@ -1496,18 +1491,18 @@ impl, Cold: ItemStore> HotColdDB pub fn store_cold_state_as_diff( &self, state: &BeaconState, - from_epoch: Epoch, + from_slot: Slot, ops: &mut Vec, ) -> Result<(), Error> { // Load diff base state bytes. - let base_buffer = self.load_hdiff_buffer_for_epoch(from_epoch)?; + let base_buffer = self.load_hdiff_buffer_for_slot(from_slot)?; let target_buffer = HDiffBuffer::from_state(state.clone()); let diff = HDiff::compute(&base_buffer, &target_buffer)?; let diff_bytes = diff.as_ssz_bytes(); let key = get_key_for_col( DBColumn::BeaconStateDiff.into(), - &state.current_epoch().as_u64().to_be_bytes(), + &state.slot().as_u64().to_be_bytes(), ); ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); Ok(()) @@ -1527,9 +1522,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result>, Error> { - let epoch = slot.epoch(E::slots_per_epoch()); - - let hdiff_buffer = self.load_hdiff_buffer_for_epoch(epoch)?; + let hdiff_buffer = self.load_hdiff_buffer_for_slot(slot)?; let base_state = hdiff_buffer.into_state(&self.spec)?; if base_state.slot() == slot { @@ -1548,23 +1541,23 @@ impl, Cold: ItemStore> HotColdDB .map(Some) } - fn load_hdiff_for_epoch(&self, epoch: Epoch) -> Result { + fn load_hdiff_for_slot(&self, slot: Slot) -> Result { self.cold_db .get_bytes( DBColumn::BeaconStateDiff.into(), - &epoch.as_u64().to_be_bytes(), + &slot.as_u64().to_be_bytes(), )? .map(|bytes| HDiff::from_ssz_bytes(&bytes)) - .ok_or(HotColdDBError::MissingHDiff(epoch))? + .ok_or(HotColdDBError::MissingHDiff(slot))? .map_err(Into::into) } - fn load_hdiff_buffer_for_epoch(&self, epoch: Epoch) -> Result { - if let Some(buffer) = self.diff_buffer_cache.lock().get(&epoch) { + fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result { + if let Some(buffer) = self.diff_buffer_cache.lock().get(&slot) { debug!( self.log, "Hit diff buffer cache"; - "epoch" => epoch + "slot" => slot ); return Ok(buffer.clone()); } @@ -1572,29 +1565,29 @@ impl, Cold: ItemStore> HotColdDB // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - let mut buffer = match self.hierarchy.storage_strategy(epoch)? { + let mut buffer = match self.hierarchy.storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self - .load_cold_state_as_snapshot(epoch)? - .ok_or(Error::MissingSnapshot(epoch))?; + .load_cold_state_as_snapshot(slot)? + .ok_or(Error::MissingSnapshot(slot))?; return Ok(HDiffBuffer::from_state(state)); } // Recursive case. - StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_epoch(from)?, + StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, StorageStrategy::Nothing => unreachable!("FIXME(sproul)"), }; // Load diff and apply it to buffer. - let diff = self.load_hdiff_for_epoch(epoch)?; + let diff = self.load_hdiff_for_slot(slot)?; diff.apply(&mut buffer)?; - self.diff_buffer_cache.lock().put(epoch, buffer.clone()); + self.diff_buffer_cache.lock().put(slot, buffer.clone()); debug!( self.log, "Added diff buffer to cache"; "load_time_ms" => t.elapsed().as_millis(), - "epoch" => epoch + "slot" => slot ); Ok(buffer) @@ -1741,14 +1734,10 @@ impl, Cold: ItemStore> HotColdDB /// Initialise the anchor info for checkpoint sync starting from `block`. pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result { let anchor_slot = block.slot(); - let anchor_epoch = anchor_slot.epoch(E::slots_per_epoch()); // Set the `state_upper_limit` to the slot of the *next* checkpoint. // See `get_state_upper_limit` for rationale. - let next_snapshot_slot = self - .hierarchy - .next_snapshot_epoch(anchor_epoch)? - .start_slot(E::slots_per_epoch()); + let next_snapshot_slot = self.hierarchy.next_snapshot_slot(anchor_slot)?; let anchor_info = AnchorInfo { anchor_slot, oldest_block_slot: anchor_slot, From 251c9eda759025ea73951839494cc1db5330c55e Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 23 Aug 2023 16:56:43 +1000 Subject: [PATCH 02/11] Store HierarchyConfig on disk. Support storing hdiffs at per slot level. --- beacon_node/store/src/config.rs | 4 ++-- beacon_node/store/src/hdiff.rs | 32 ++++++++++++------------- beacon_node/store/src/hot_cold_store.rs | 4 ---- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index f701a03aae0..3afea5ef52c 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -48,7 +48,7 @@ pub struct StoreConfig { // FIXME(sproul): schema migration, add hdiff pub struct OnDiskStoreConfig { pub linear_blocks: bool, - pub linear_restore_points: bool, + pub hierarchy_config: HierarchyConfig, } #[derive(Debug, Clone)] @@ -80,7 +80,7 @@ impl StoreConfig { pub fn as_disk_config(&self) -> OnDiskStoreConfig { OnDiskStoreConfig { linear_blocks: self.linear_blocks, - linear_restore_points: self.linear_restore_points, + hierarchy_config: self.hierarchy_config.clone(), } } diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 7d3ff471154..7c321e8f22e 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -11,13 +11,13 @@ use zstd::{Decoder, Encoder}; #[derive(Debug)] pub enum Error { InvalidHierarchy, - XorDeletionsNotSupported, + U64DiffDeletionsNotSupported, UnableToComputeDiff, UnableToApplyDiff, Compression(std::io::Error), } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub struct HierarchyConfig { exponents: Vec, } @@ -45,7 +45,7 @@ pub struct HDiffBuffer { #[derive(Debug, Encode, Decode)] pub struct HDiff { state_diff: BytesDiff, - balances_diff: XorDiff, + balances_diff: CompressedU64Diff, } #[derive(Debug, Encode, Decode)] @@ -54,7 +54,7 @@ pub struct BytesDiff { } #[derive(Debug, Encode, Decode)] -pub struct XorDiff { +pub struct CompressedU64Diff { bytes: Vec, } @@ -78,7 +78,7 @@ impl HDiffBuffer { impl HDiff { pub fn compute(source: &HDiffBuffer, target: &HDiffBuffer) -> Result { let state_diff = BytesDiff::compute(&source.state, &target.state)?; - let balances_diff = XorDiff::compute(&source.balances, &target.balances)?; + let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances)?; Ok(Self { state_diff, @@ -138,10 +138,10 @@ impl BytesDiff { } } -impl XorDiff { +impl CompressedU64Diff { pub fn compute(xs: &[u64], ys: &[u64]) -> Result { if xs.len() > ys.len() { - return Err(Error::XorDeletionsNotSupported); + return Err(Error::U64DiffDeletionsNotSupported); } let uncompressed_bytes: Vec = ys @@ -164,7 +164,7 @@ impl XorDiff { .map_err(Error::Compression)?; encoder.finish().map_err(Error::Compression)?; - Ok(XorDiff { + Ok(CompressedU64Diff { bytes: compressed_bytes, }) } @@ -198,7 +198,7 @@ impl XorDiff { impl Default for HierarchyConfig { fn default() -> Self { HierarchyConfig { - exponents: vec![5, 9, 11, 13, 16, 18, 21], + exponents: vec![4, 9, 11, 13, 16, 18, 21], } } } @@ -325,7 +325,7 @@ mod tests { } #[test] - fn xor_vs_bytes_diff() { + fn compressed_u64_vs_bytes_diff() { let x_values = vec![99u64, 55, 123, 6834857, 0, 12]; let y_values = vec![98u64, 55, 312, 1, 1, 2, 4, 5]; @@ -335,12 +335,12 @@ mod tests { let x_bytes = to_bytes(&x_values); let y_bytes = to_bytes(&y_values); - let xor_diff = XorDiff::compute(&x_values, &y_values).unwrap(); + let u64_diff = CompressedU64Diff::compute(&x_values, &y_values).unwrap(); - let mut y_from_xor = x_values; - xor_diff.apply(&mut y_from_xor).unwrap(); + let mut y_from_u64_diff = x_values; + u64_diff.apply(&mut y_from_u64_diff).unwrap(); - assert_eq!(y_values, y_from_xor); + assert_eq!(y_values, y_from_u64_diff); let bytes_diff = BytesDiff::compute(&x_bytes, &y_bytes).unwrap(); @@ -349,7 +349,7 @@ mod tests { assert_eq!(y_bytes, y_from_bytes); - // XOR diff wins by more than a factor of 3 - assert!(xor_diff.bytes.len() < 3 * bytes_diff.bytes.len()); + // U64 diff wins by more than a factor of 3 + assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len()); } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index bc68cd2d8c2..6c675a91072 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1403,10 +1403,6 @@ impl, Cold: ItemStore> HotColdDB ) -> Result<(), Error> { self.store_cold_state_summary(state_root, state.slot(), ops)?; - if state.slot() % E::slots_per_epoch() != 0 { - return Ok(()); - } - let slot = state.slot(); match self.hierarchy.storage_strategy(slot)? { StorageStrategy::Nothing => { From ed7f6b0f5b6cdc2c0e25287b73ddfcd202d20c9c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 23 Aug 2023 17:01:04 +1000 Subject: [PATCH 03/11] Revert HierachyConfig change for testing. --- beacon_node/store/src/hdiff.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 7c321e8f22e..1ae70a3ddf8 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -198,7 +198,7 @@ impl CompressedU64Diff { impl Default for HierarchyConfig { fn default() -> Self { HierarchyConfig { - exponents: vec![4, 9, 11, 13, 16, 18, 21], + exponents: vec![5, 9, 11, 13, 16, 18, 21], } } } From 4dfc47afd76d33a689a5894200e02b5f4f4dbee6 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 28 Aug 2023 23:05:15 +1000 Subject: [PATCH 04/11] Add validity check for the hierarchy config when opening the DB. --- beacon_node/beacon_chain/tests/store_tests.rs | 46 ++++++++++-- beacon_node/store/src/config.rs | 71 +++++++++++++++++-- beacon_node/store/src/hdiff.rs | 2 +- 3 files changed, 109 insertions(+), 10 deletions(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b5e444e809d..b17b908292f 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -22,8 +22,9 @@ use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; use store::{ + config::StoreConfigError, iter::{BlockRootsIterator, StateRootsIterator}, - HotColdDB, LevelDB, StoreConfig, + Error as StoreError, HotColdDB, LevelDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use types::test_utils::{SeedableRng, XorShiftRng}; @@ -49,13 +50,19 @@ fn get_store_with_spec( db_path: &TempDir, spec: ChainSpec, ) -> Arc, LevelDB>> { + let config = StoreConfig::default(); + try_get_store_with_spec_and_config(db_path, spec, config).expect("disk store should initialize") +} + +fn try_get_store_with_spec_and_config( + db_path: &TempDir, + spec: ChainSpec, + config: StoreConfig, +) -> Result, LevelDB>>, StoreError> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); - let config = StoreConfig::default(); let log = test_logger(); - HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) - .expect("disk store should initialize") } fn get_harness( @@ -2481,6 +2488,37 @@ async fn revert_minority_fork_on_resume() { assert_eq!(heads.len(), 1); } +#[tokio::test] +async fn should_not_initialize_incompatible_store_config() { + let validator_count = 16; + let spec = MinimalEthSpec::default_spec(); + let db_path = tempdir().unwrap(); + let store_config = StoreConfig::default(); + let store = try_get_store_with_spec_and_config(&db_path, spec.clone(), store_config.clone()) + .expect("disk store should initialize"); + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec.clone()) + .deterministic_keypairs(validator_count) + .fresh_disk_store(store) + .build(); + + // Resume from disk with a different store config. + drop(harness); + let different_store_config = StoreConfig { + linear_blocks: !store_config.linear_blocks, + ..store_config + }; + let maybe_err = + try_get_store_with_spec_and_config(&db_path, spec.clone(), different_store_config).err(); + + assert!(matches!( + maybe_err, + Some(StoreError::ConfigError( + StoreConfigError::IncompatibleStoreConfig { .. } + )) + )); +} + // This test checks whether the schema downgrade from the latest version to some minimum supported // version is correct. This is the easiest schema test to write without historic versions of // Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 3afea5ef52c..18878e95e6c 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -45,7 +45,7 @@ pub struct StoreConfig { /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -// FIXME(sproul): schema migration, add hdiff +// FIXME(sproul): schema migration pub struct OnDiskStoreConfig { pub linear_blocks: bool, pub hierarchy_config: HierarchyConfig, @@ -53,8 +53,17 @@ pub struct OnDiskStoreConfig { #[derive(Debug, Clone)] pub enum StoreConfigError { - MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 }, - InvalidCompressionLevel { level: i32 }, + MismatchedSlotsPerRestorePoint { + config: u64, + on_disk: u64, + }, + InvalidCompressionLevel { + level: i32, + }, + IncompatibleStoreConfig { + config: OnDiskStoreConfig, + on_disk: OnDiskStoreConfig, + }, } impl Default for StoreConfig { @@ -86,9 +95,15 @@ impl StoreConfig { pub fn check_compatibility( &self, - _on_disk_config: &OnDiskStoreConfig, + on_disk_config: &OnDiskStoreConfig, ) -> Result<(), StoreConfigError> { - // FIXME(sproul): TODO + let db_config = self.as_disk_config(); + if db_config.ne(on_disk_config) { + return Err(StoreConfigError::IncompatibleStoreConfig { + config: db_config, + on_disk: on_disk_config.clone(), + }); + } Ok(()) } @@ -146,3 +161,49 @@ impl StoreItem for OnDiskStoreConfig { Ok(Self::from_ssz_bytes(bytes)?) } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn check_compatibility_ok() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: true, + hierarchy_config: store_config.hierarchy_config.clone(), + }; + assert!(store_config.check_compatibility(&on_disk_config).is_ok()); + } + + #[test] + fn check_compatibility_linear_blocks_mismatch() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: false, + hierarchy_config: store_config.hierarchy_config.clone(), + }; + assert!(store_config.check_compatibility(&on_disk_config).is_err()); + } + + #[test] + fn check_compatibility_hierarchy_config_incompatible() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: true, + hierarchy_config: HierarchyConfig { + exponents: vec![5, 8, 11, 13, 16, 18, 21], + }, + }; + assert!(store_config.check_compatibility(&on_disk_config).is_err()); + } +} diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 7c321e8f22e..ba3f02f52a3 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -19,7 +19,7 @@ pub enum Error { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub struct HierarchyConfig { - exponents: Vec, + pub exponents: Vec, } #[derive(Debug)] From b74fe77b0057a6991b66abf77903b817c4787fbe Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Aug 2023 00:18:03 +1000 Subject: [PATCH 05/11] Update HDiff tests. --- beacon_node/store/src/hdiff.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index c9e21c4ae02..ffe58ac5052 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -240,9 +240,8 @@ impl HierarchyModuli { .tuple_windows() .find_map(|(&n_big, &n_small)| { (slot % n_small == 0).then(|| { - eprintln!("state at slot {slot} is divis by {n_small}"); // Diff from the previous layer. - dbg!(slot / n_big * n_big) + slot / n_big * n_big }) }); Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom)) @@ -271,8 +270,8 @@ mod tests { let moduli = config.to_moduli().unwrap(); - // Full snapshots at multiples of 2^16. - let snapshot_freq = Slot::new(1 << 16); + // Full snapshots at multiples of 2^21. + let snapshot_freq = Slot::new(1 << 21); assert_eq!( moduli.storage_strategy(Slot::new(0)).unwrap(), StorageStrategy::Snapshot @@ -286,11 +285,11 @@ mod tests { StorageStrategy::Snapshot ); - // For the first layer of diffs - let first_layer = Slot::new(1 << 13); + // Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer. + let first_layer = Slot::new(1 << 18); assert_eq!( moduli.storage_strategy(first_layer * 2).unwrap(), - StorageStrategy::DiffFrom(first_layer) + StorageStrategy::DiffFrom(Slot::new(0)) ); } @@ -300,7 +299,7 @@ mod tests { config.validate().unwrap(); let moduli = config.to_moduli().unwrap(); - let snapshot_freq = Slot::new(1 << 16); + let snapshot_freq = Slot::new(1 << 21); assert_eq!( moduli.next_snapshot_slot(snapshot_freq).unwrap(), From 44c259de96b7179a572bd8b00be7187d1969ec6b Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Aug 2023 14:21:20 +1000 Subject: [PATCH 06/11] Fix `get_cold_state` panic when the diff for the slot isn't stored. --- beacon_node/store/src/hdiff.rs | 14 ++++++++++++-- beacon_node/store/src/hot_cold_store.rs | 22 +++++++++++++--------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index ffe58ac5052..bf6ce976ae1 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -29,7 +29,7 @@ pub struct HierarchyModuli { #[derive(Debug, PartialEq, Eq)] pub enum StorageStrategy { - Nothing, + ReplayFrom(Slot), DiffFrom(Slot), Snapshot, } @@ -228,6 +228,12 @@ impl HierarchyConfig { impl HierarchyModuli { pub fn storage_strategy(&self, slot: Slot) -> Result { let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; + let first = self + .moduli + .first() + .copied() + .ok_or(Error::InvalidHierarchy)?; + let replay_from = slot / first * first; if slot % last == 0 { return Ok(StorageStrategy::Snapshot); @@ -244,7 +250,11 @@ impl HierarchyModuli { slot / n_big * n_big }) }); - Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom)) + + Ok(diff_from.map_or( + StorageStrategy::ReplayFrom(replay_from), + StorageStrategy::DiffFrom, + )) } /// Return the smallest slot greater than or equal to `slot` at which a full snapshot should diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6c675a91072..30c104a3efb 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1405,11 +1405,12 @@ impl, Cold: ItemStore> HotColdDB let slot = state.slot(); match self.hierarchy.storage_strategy(slot)? { - StorageStrategy::Nothing => { + StorageStrategy::ReplayFrom(from) => { debug!( self.log, "Storing cold state"; "strategy" => "replay", + "from_slot" => from, "slot" => state.slot(), ); } @@ -1491,7 +1492,7 @@ impl, Cold: ItemStore> HotColdDB ops: &mut Vec, ) -> Result<(), Error> { // Load diff base state bytes. - let base_buffer = self.load_hdiff_buffer_for_slot(from_slot)?; + let (_, base_buffer) = self.load_hdiff_buffer_for_slot(from_slot)?; let target_buffer = HDiffBuffer::from_state(state.clone()); let diff = HDiff::compute(&base_buffer, &target_buffer)?; let diff_bytes = diff.as_ssz_bytes(); @@ -1518,8 +1519,9 @@ impl, Cold: ItemStore> HotColdDB /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result>, Error> { - let hdiff_buffer = self.load_hdiff_buffer_for_slot(slot)?; + let (base_slot, hdiff_buffer) = self.load_hdiff_buffer_for_slot(slot)?; let base_state = hdiff_buffer.into_state(&self.spec)?; + debug_assert_eq!(base_slot, base_state.slot()); if base_state.slot() == slot { return Ok(Some(base_state)); @@ -1548,30 +1550,32 @@ impl, Cold: ItemStore> HotColdDB .map_err(Into::into) } - fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result { + /// Returns `HDiffBuffer` for the specified slot, or `HDiffBuffer` for the `ReplayFrom` slot if + /// the diff for the specified slot is not stored. + fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, HDiffBuffer), Error> { if let Some(buffer) = self.diff_buffer_cache.lock().get(&slot) { debug!( self.log, "Hit diff buffer cache"; "slot" => slot ); - return Ok(buffer.clone()); + return Ok((slot, buffer.clone())); } // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - let mut buffer = match self.hierarchy.storage_strategy(slot)? { + let (slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self .load_cold_state_as_snapshot(slot)? .ok_or(Error::MissingSnapshot(slot))?; - return Ok(HDiffBuffer::from_state(state)); + return Ok((slot, HDiffBuffer::from_state(state))); } // Recursive case. StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, - StorageStrategy::Nothing => unreachable!("FIXME(sproul)"), + StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), }; // Load diff and apply it to buffer. @@ -1586,7 +1590,7 @@ impl, Cold: ItemStore> HotColdDB "slot" => slot ); - Ok(buffer) + Ok((slot, buffer)) } /// Load cold blocks between `start_slot` and `end_slot` inclusive. From 6b62570b43194429923ac1fbc9073d96e90d2534 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Aug 2023 15:26:40 +1000 Subject: [PATCH 07/11] Use slots instead of epochs for storing snapshots in freezer DB. --- beacon_node/store/src/hot_cold_store.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 30c104a3efb..12f666bb936 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1451,10 +1451,9 @@ impl, Cold: ItemStore> HotColdDB encoder.write_all(&bytes).map_err(Error::Compression)?; encoder.finish().map_err(Error::Compression)?; - let epoch = state.current_epoch(); let key = get_key_for_col( DBColumn::BeaconStateSnapshot.into(), - &epoch.as_u64().to_be_bytes(), + &state.slot().as_u64().to_be_bytes(), ); ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value)); Ok(()) @@ -1565,7 +1564,7 @@ impl, Cold: ItemStore> HotColdDB // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - let (slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { + let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self From 757580e5266aee24280720978906622da9cb1159 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Aug 2023 17:06:48 +1000 Subject: [PATCH 08/11] Add snapshot buffer to `diff_buffer_cache` instead of loading it from db every time. --- beacon_node/store/src/hot_cold_store.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 12f666bb936..eeaab378d42 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1570,7 +1570,17 @@ impl, Cold: ItemStore> HotColdDB let state = self .load_cold_state_as_snapshot(slot)? .ok_or(Error::MissingSnapshot(slot))?; - return Ok((slot, HDiffBuffer::from_state(state))); + let buffer = HDiffBuffer::from_state(state); + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); + + return Ok((slot, buffer)); } // Recursive case. StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, From 544d6129bfce5568b4b0714bda9ba7635b39f54f Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 29 Aug 2023 17:49:22 +1000 Subject: [PATCH 09/11] Add `hierarchy-exponents` cli flag to beacon node. --- beacon_node/beacon_chain/tests/store_tests.rs | 8 ++++- beacon_node/src/cli.rs | 15 ++++++++++ beacon_node/src/config.rs | 19 ++++++++++++ lighthouse/tests/beacon_node.rs | 30 +++++++++++++++++++ 4 files changed, 71 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b17b908292f..48174f813b5 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2489,6 +2489,12 @@ async fn revert_minority_fork_on_resume() { } #[tokio::test] +// #[ignore] +// FIXME(jimmy): Ignoring this now as the test is flaky :/ It intermittently fails with an IO error +// "..cold_db/LOCK file held by another process". +// There seems to be some race condition between dropping the lock file and and re-opening the db. +// There's a higher chance this test would fail when the entire test suite is run. Maybe it isn't +// fast enough at dropping the cold_db LOCK file before the test attempts to open it again. async fn should_not_initialize_incompatible_store_config() { let validator_count = 16; let spec = MinimalEthSpec::default_spec(); @@ -2509,7 +2515,7 @@ async fn should_not_initialize_incompatible_store_config() { ..store_config }; let maybe_err = - try_get_store_with_spec_and_config(&db_path, spec.clone(), different_store_config).err(); + try_get_store_with_spec_and_config(&db_path, spec, different_store_config).err(); assert!(matches!( maybe_err, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 738680286b4..5d0073f9684 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -533,6 +533,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { [default: 8192 (mainnet) or 64 (minimal)]") .takes_value(true) ) + .arg( + Arg::with_name("hierarchy-exponents") + .long("hierarchy-exponents") + .value_name("EXPONENTS") + .help("Specifies the frequency for storing full state snapshots and hierarchical \ + diffs in the freezer DB. Accepts a comma-separated list of ascending \ + exponents. Each exponent defines an interval for storing diffs to the layer \ + above. The last exponent defines the interval for full snapshots. \ + For example, a config of '4,8,12' would store a full snapshot every \ + 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second-level \ + diffs every 16 (2^4) slots. \ + Cannot be changed after initialization. \ + [default: 5,9,11,13,16,18,21]") + .takes_value(true) + ) .arg( Arg::with_name("epochs-per-migration") .long("epochs-per-migration") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 48914b16a72..ad7a7c1aaaf 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -23,6 +23,7 @@ use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; +use store::hdiff::HierarchyConfig; use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN}; /// Gets the fully-initialized global client. @@ -422,6 +423,24 @@ pub fn get_config( client_config.store.epochs_per_state_diff = epochs_per_state_diff; } + if let Some(hierarchy_exponents) = + clap_utils::parse_optional::(cli_args, "hierarchy-exponents")? + { + let exponents = hierarchy_exponents + .split(',') + .map(|s| { + s.parse() + .map_err(|e| format!("invalid hierarchy-exponents: {e:?}")) + }) + .collect::, _>>()?; + + if exponents.windows(2).any(|w| w[0] >= w[1]) { + return Err("hierarchy-exponents must be in ascending order".to_string()); + } + + client_config.store.hierarchy_config = HierarchyConfig { exponents }; + } + if let Some(epochs_per_migration) = clap_utils::parse_optional(cli_args, "epochs-per-migration")? { diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 6cba35b5edf..3f151142168 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -15,6 +15,7 @@ use std::process::Command; use std::str::FromStr; use std::string::ToString; use std::time::Duration; +use store::hdiff::HierarchyConfig; use tempfile::TempDir; use types::{ Address, Checkpoint, Epoch, ExecutionBlockHash, ForkName, Hash256, MainnetEthSpec, @@ -446,6 +447,35 @@ fn eth1_cache_follow_distance_manual() { assert_eq!(config.eth1.cache_follow_distance(), 128); }); } +#[test] +fn hierarchy_exponents_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.hierarchy_config, HierarchyConfig::default()); + }); +} +#[test] +fn hierarchy_exponents_valid() { + CommandLineTest::new() + .flag("hierarchy-exponents", Some("3,6,9,12")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.store.hierarchy_config, + HierarchyConfig { + exponents: vec![3, 6, 9, 12] + } + ); + }); +} +#[test] +#[should_panic] +fn hierarchy_exponents_invalid_order() { + CommandLineTest::new() + .flag("hierarchy-exponents", Some("7,6,9,12")) + .run_with_zero_port(); +} // Tests for Bellatrix flags. fn run_merge_execution_endpoints_flag_test(flag: &str) { From f40ef807dbd2e076297ce5f1dc20fda20d787830 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 30 Aug 2023 00:42:41 +1000 Subject: [PATCH 10/11] Add test for `StorageStrategy::ReplayFrom` and ignore a flaky test. --- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- beacon_node/store/src/hdiff.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 48174f813b5..bcc301d9c74 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2489,7 +2489,7 @@ async fn revert_minority_fork_on_resume() { } #[tokio::test] -// #[ignore] +#[ignore] // FIXME(jimmy): Ignoring this now as the test is flaky :/ It intermittently fails with an IO error // "..cold_db/LOCK file held by another process". // There seems to be some race condition between dropping the lock file and and re-opening the db. diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index bf6ce976ae1..cdb88d37680 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -301,6 +301,12 @@ mod tests { moduli.storage_strategy(first_layer * 2).unwrap(), StorageStrategy::DiffFrom(Slot::new(0)) ); + + let replay_strategy_slot = first_layer + 1; + assert_eq!( + moduli.storage_strategy(replay_strategy_slot).unwrap(), + StorageStrategy::ReplayFrom(first_layer) + ); } #[test] From 36a93e50770866ddc04ca8534a27c4e20953d5aa Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 30 Aug 2023 11:37:53 +1000 Subject: [PATCH 11/11] Drop hierarchy_config in tests for more frequent snapshot and fix an issue where hdiff wasn't stored unless it's a epoch boundary slot. --- beacon_node/beacon_chain/tests/store_tests.rs | 9 ++++++++- beacon_node/store/src/hot_cold_store.rs | 12 ++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index bcc301d9c74..30c8869585e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -21,6 +21,7 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; +use store::hdiff::HierarchyConfig; use store::{ config::StoreConfigError, iter::{BlockRootsIterator, StateRootsIterator}, @@ -50,7 +51,13 @@ fn get_store_with_spec( db_path: &TempDir, spec: ChainSpec, ) -> Arc, LevelDB>> { - let config = StoreConfig::default(); + let config = StoreConfig { + // More frequent snapshots and hdiffs in tests for testing + hierarchy_config: HierarchyConfig { + exponents: vec![1, 3, 5], + }, + ..Default::default() + }; try_get_store_with_spec_and_config(db_path, spec, config).expect("disk store should initialize") } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index eeaab378d42..68703a4d262 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2217,15 +2217,19 @@ pub fn migrate_database, Cold: ItemStore>( let mut cold_db_ops: Vec = Vec::new(); - if slot % E::slots_per_epoch() == 0 { + // Only store the cold state if it's on a diff boundary + if matches!( + store.hierarchy.storage_strategy(slot)?, + StorageStrategy::ReplayFrom(..) + ) { + // Store slot -> state_root and state_root -> slot mappings. + store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; + } else { let state: BeaconState = store .get_hot_state(&state_root)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; - } else { - // Store slot -> state_root and state_root -> slot mappings. - store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; } // There are data dependencies between calls to `store_cold_state()` that prevent us from