Skip to content

Commit

Permalink
use LayeredMap + incremental hash in place of smt+jmt
Browse files Browse the repository at this point in the history
temp
  • Loading branch information
msmouse committed Jun 28, 2024
1 parent 3d4d746 commit e22f336
Show file tree
Hide file tree
Showing 19 changed files with 630 additions and 315 deletions.
308 changes: 304 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/aptos-crypto/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ impl HashValue {
hash[Self::LENGTH - bytes.len()..].copy_from_slice(&bytes[..]);
Self::new(hash)
}

/// hack
pub fn into_inner(self) -> [u8; Self::LENGTH] {
self.hash
}
}

impl ser::Serialize for HashValue {
Expand Down
10 changes: 5 additions & 5 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use aptos_types::{
epoch_state::EpochState,
jwks::OBSERVED_JWK_UPDATED_MOVE_TYPE_TAG,
ledger_info::LedgerInfoWithSignatures,
proof::{AccumulatorExtensionProof, SparseMerkleProofExt},
proof::AccumulatorExtensionProof,
state_store::{state_key::StateKey, state_value::StateValue},
transaction::{
block_epilogue::{BlockEndInfo, BlockEpiloguePayload},
Expand Down Expand Up @@ -526,11 +526,11 @@ impl StateComputeResult {
}

pub struct ProofReader {
proofs: HashMap<HashValue, SparseMerkleProofExt>,
proofs: HashMap<HashValue, Option<HashValue>>,
}

impl ProofReader {
pub fn new(proofs: HashMap<HashValue, SparseMerkleProofExt>) -> Self {
pub fn new(proofs: HashMap<HashValue, Option<HashValue>>) -> Self {
ProofReader { proofs }
}

Expand All @@ -540,8 +540,8 @@ impl ProofReader {
}

impl ProofRead for ProofReader {
fn get_proof(&self, key: HashValue) -> Option<&SparseMerkleProofExt> {
self.proofs.get(&key)
fn get_proof(&self, key: HashValue) -> Option<HashValue> {
*self.proofs.get(&key).unwrap()
}
}

Expand Down
6 changes: 5 additions & 1 deletion experimental/storage/layered-map/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,13 @@ impl<K: ArcAsyncDrop, V: ArcAsyncDrop> MapLayer<K, V> {
self.inner.log_layer(name)
}

fn is_family(&self, other: &Self) -> bool {
pub fn is_family(&self, other: &Self) -> bool {
self.inner.family == other.inner.family
}

pub fn layer(&self) -> u64 {
self.inner.layer
}
}

#[derive(Clone, Debug)]
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ aptos-types = { workspace = true }
arc-swap = { workspace = true }
arr_macro = { workspace = true }
bcs = { workspace = true }
bitvec = { workspace = true }
byteorder = { workspace = true }
claims = { workspace = true }
clap = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl DbWriter for AptosDB {
&transaction_infos,
&events,
wsets,
Option::Some((
Some((
&mut ledger_db_batch,
&mut sharded_kv_batch,
&state_kv_metadata_batch,
Expand Down
2 changes: 2 additions & 0 deletions storage/aptosdb/src/state_merkle_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#![allow(dead_code)]

use crate::{
common::NUM_STATE_SHARDS,
db_options::{gen_state_merkle_cfds, state_merkle_db_column_families},
Expand Down
3 changes: 1 addition & 2 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use aptos_logger::info;
use aptos_schemadb::SchemaBatch;
use aptos_scratchpad::{SmtAncestors, SparseMerkleTree};
use aptos_storage_interface::{
async_proof_fetcher::AsyncProofFetcher,
cached_state_view::{CachedStateView, ShardedStateCache},
db_ensure as ensure,
state_delta::StateDelta,
Expand Down Expand Up @@ -503,7 +502,7 @@ impl StateStore {
StateViewId::Miscellaneous,
snapshot,
speculative_state,
Arc::new(AsyncProofFetcher::new(state_db.clone())),
state_db.clone(),
);
let write_sets = state_db
.ledger_db
Expand Down
35 changes: 27 additions & 8 deletions storage/aptosdb/src/state_store/state_merkle_batch_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@

use crate::{
metrics::{LATEST_SNAPSHOT_VERSION, OTHER_TIMERS_SECONDS},
pruner::PrunerManager,
schema::jellyfish_merkle_node::JellyfishMerkleNodeSchema,
state_merkle_db::Node,
state_store::{buffered_state::CommitMessage, StateDb},
};
use anyhow::{anyhow, ensure, Result};
use aptos_crypto::HashValue;
use aptos_jellyfish_merkle::node_type::NodeKey;
use aptos_logger::{info, trace};
use aptos_metrics_core::TimerHelper;
use aptos_schemadb::SchemaBatch;
use aptos_scratchpad::SmtAncestors;
use aptos_storage_interface::state_delta::StateDelta;
use aptos_types::state_store::state_value::StateValue;
use aptos_types::state_store::{state_key::StateKey, state_value::StateValue};
use std::sync::{mpsc::Receiver, Arc};

pub struct StateMerkleBatch {
pub top_levels_batch: SchemaBatch,
pub batches_for_shards: Vec<SchemaBatch>,
// pub top_levels_batch: SchemaBatch,
// pub batches_for_shards: Vec<SchemaBatch>,
pub root_hash: HashValue,
pub state_delta: Arc<StateDelta>,
}
Expand Down Expand Up @@ -52,8 +51,8 @@ impl StateMerkleBatchCommitter {
match msg {
CommitMessage::Data(state_merkle_batch) => {
let StateMerkleBatch {
top_levels_batch,
batches_for_shards,
// top_levels_batch,
// batches_for_shards,
root_hash,
state_delta,
} = state_merkle_batch;
Expand All @@ -62,10 +61,25 @@ impl StateMerkleBatchCommitter {
.current_version
.expect("Current version should not be None");

// commit jellyfish merkle nodes
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["commit_jellyfish_merkle_nodes"])
.start_timer();

self.state_db
.state_merkle_db
.metadata_db()
.put::<JellyfishMerkleNodeSchema>(
&NodeKey::new_empty_path(current_version),
&Node::new_leaf(
root_hash,
root_hash,
(StateKey::raw(b"root"), current_version),
),
)
.unwrap();

/*
// commit jellyfish merkle nodes
self.state_db
.state_merkle_db
.commit(current_version, top_levels_batch, batches_for_shards)
Expand All @@ -79,19 +93,22 @@ impl StateMerkleBatchCommitter {
cache.maybe_evict_version(self.state_db.state_merkle_db.lru_cache())
});
}
*/
info!(
version = current_version,
base_version = state_delta.base_version,
root_hash = root_hash,
"State snapshot committed."
);
LATEST_SNAPSHOT_VERSION.set(current_version as i64);
/*
self.state_db
.state_merkle_pruner
.maybe_set_pruner_target_db_version(current_version);
self.state_db
.epoch_snapshot_pruner
.maybe_set_pruner_target_db_version(current_version);
*/

self.check_usage_consistency(&state_delta).unwrap();

Expand All @@ -117,6 +134,7 @@ impl StateMerkleBatchCommitter {
.ok_or_else(|| anyhow!("Committing without version."))?;

let usage_from_ledger_db = self.state_db.ledger_db.metadata_db().get_usage(version)?;
/*
let leaf_count_from_jmt = self
.state_db
.state_merkle_db
Expand All @@ -131,6 +149,7 @@ impl StateMerkleBatchCommitter {
usage_from_ledger_db.items(),
leaf_count_from_jmt,
);
*/

let usage_from_smt = state_delta.current.usage();
if !usage_from_smt.is_untracked() {
Expand Down
14 changes: 7 additions & 7 deletions storage/aptosdb/src/state_store/state_snapshot_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@
//! This file defines the state snapshot committer running in background thread within StateStore.

use crate::{
metrics::OTHER_TIMERS_SECONDS,
state_store::{
buffered_state::CommitMessage,
state_merkle_batch_committer::{StateMerkleBatch, StateMerkleBatchCommitter},
StateDb,
},
versioned_node_cache::VersionedNodeCache,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_logger::trace;
use aptos_scratchpad::SmtAncestors;
use aptos_storage_interface::{jmt_update_refs, jmt_updates, state_delta::StateDelta, Result};
use aptos_storage_interface::state_delta::StateDelta;
use aptos_types::state_store::state_value::StateValue;
use rayon::prelude::*;
use static_assertions::const_assert;
use std::{
sync::{
Expand All @@ -29,6 +26,7 @@ use std::{
};

pub(crate) struct StateSnapshotCommitter {
#[allow(dead_code)]
state_db: Arc<StateDb>,
state_snapshot_commit_receiver: Receiver<CommitMessage<Arc<StateDelta>>>,
state_merkle_batch_commit_sender: SyncSender<CommitMessage<StateMerkleBatch>>,
Expand Down Expand Up @@ -74,6 +72,7 @@ impl StateSnapshotCommitter {
while let Ok(msg) = self.state_snapshot_commit_receiver.recv() {
match msg {
CommitMessage::Data(delta_to_commit) => {
/*
let version = delta_to_commit.current_version.expect("Cannot be empty");
let base_version = delta_to_commit.base_version;
let previous_epoch_ending_version = self
Expand Down Expand Up @@ -138,12 +137,13 @@ impl StateSnapshotCommitter {
)
.expect("Error calculating StateMerkleBatch for top levels.")
};
*/

self.state_merkle_batch_commit_sender
.send(CommitMessage::Data(StateMerkleBatch {
top_levels_batch,
batches_for_shards,
root_hash,
// top_levels_batch,
// batches_for_shards,
root_hash: delta_to_commit.current.root_hash(),
state_delta: delta_to_commit,
}))
.unwrap();
Expand Down
54 changes: 7 additions & 47 deletions storage/aptosdb/src/versioned_node_cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{lru_node_cache::LruNodeCache, metrics::OTHER_TIMERS_SECONDS, state_merkle_db::Node};
use crate::{lru_node_cache::LruNodeCache, state_merkle_db::Node};
use aptos_infallible::RwLock;
use aptos_jellyfish_merkle::node_type::NodeKey;
use aptos_types::transaction::Version;
use rayon::prelude::*;
use std::{
collections::{HashMap, VecDeque},
fmt,
Expand All @@ -30,6 +29,7 @@ impl fmt::Debug for VersionedNodeCache {
}

impl VersionedNodeCache {
#[allow(dead_code)]
pub(crate) const NUM_VERSIONS_TO_CACHE: usize = 2;

pub fn new() -> Self {
Expand All @@ -38,53 +38,13 @@ impl VersionedNodeCache {
}
}

pub fn add_version(&self, version: Version, nodes: NodeCache) {
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["version_cache_add"])
.start_timer();

let mut locked = self.inner.write();
if !locked.is_empty() {
let (last_version, _) = locked.back().unwrap();
assert!(
*last_version < version,
"Updating older version. {} vs latest:{} ",
version,
*last_version,
);
}
locked.push_back((version, Arc::new(nodes)));
pub fn add_version(&self, _version: Version, _nodes: NodeCache) {
unimplemented!();
}

pub fn maybe_evict_version(&self, lru_cache: &LruNodeCache) {
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["version_cache_evict"])
.start_timer();

let to_evict = {
let locked = self.inner.read();
if locked.len() > Self::NUM_VERSIONS_TO_CACHE {
locked
.front()
.map(|(version, cache)| (*version, cache.clone()))
} else {
None
}
};

if let Some((version, cache)) = to_evict {
cache
.iter()
.collect::<Vec<_>>()
.into_par_iter()
.with_min_len(100)
.for_each(|(node_key, node)| {
lru_cache.put(node_key.clone(), node.clone());
});

let evicted = self.inner.write().pop_front();
assert_eq!(evicted, Some((version, cache)));
}
#[allow(dead_code)]
pub fn maybe_evict_version(&self, _lru_cache: &LruNodeCache) {
unimplemented!();
}

pub fn get_version(&self, version: Version) -> Option<Arc<NodeCache>> {
Expand Down
7 changes: 7 additions & 0 deletions storage/scratchpad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ rust-version = { workspace = true }

[dependencies]
aptos-crypto = { workspace = true }
aptos-crypto-derive = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-experimental-layered-map = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-infallible = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-types = { workspace = true }
bitvec = { workspace = true }
bcs = { workspace = true }
criterion = { workspace = true, optional = true }
itertools = { workspace = true }
once_cell = { workspace = true }
proptest = { workspace = true, optional = true }
rayon = { workspace = true }
thiserror = { workspace = true }

fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto.git" }
bincode = "1.3.3"
serde = { version = "1.0.197", features = ["derive"] }

[dev-dependencies]
aptos-types = { workspace = true, features = ["fuzzing"] }
bitvec = { workspace = true }
Expand Down
Loading

0 comments on commit e22f336

Please sign in to comment.