Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: revamp trie updates #9239

Merged
merged 8 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
use reth_tasks::TaskExecutor;
use reth_trie::{updates::TrieKey, StateRoot};
use reth_trie::StateRoot;
use std::{path::PathBuf, sync::Arc};
use tracing::*;

Expand Down Expand Up @@ -188,15 +188,16 @@ impl Command {
// Compare updates
let mut in_mem_mismatched = Vec::new();
let mut incremental_mismatched = Vec::new();
let mut in_mem_updates_iter = in_memory_updates.into_iter().peekable();
let mut incremental_updates_iter = incremental_trie_updates.into_iter().peekable();
let mut in_mem_updates_iter = in_memory_updates.account_nodes_ref().iter().peekable();
let mut incremental_updates_iter =
incremental_trie_updates.account_nodes_ref().iter().peekable();

while in_mem_updates_iter.peek().is_some() || incremental_updates_iter.peek().is_some() {
match (in_mem_updates_iter.next(), incremental_updates_iter.next()) {
(Some(in_mem), Some(incr)) => {
similar_asserts::assert_eq!(in_mem.0, incr.0, "Nibbles don't match");
if in_mem.1 != incr.1 &&
matches!(in_mem.0, TrieKey::AccountNode(ref nibbles) if nibbles.len() > self.skip_node_depth.unwrap_or_default())
in_mem.0.len() > self.skip_node_depth.unwrap_or_default()
{
in_mem_mismatched.push(in_mem);
incremental_mismatched.push(incr);
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<DB: Database> Persistence<DB> {
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
HashedStateChanges(hashed_state.clone()).write_to_db(provider_rw.tx_ref())?;
trie_updates.flush(provider_rw.tx_ref())?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
}

// update history indices
Expand Down
6 changes: 5 additions & 1 deletion crates/stages/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
let offset = transitions.len() as u64;

db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| Ok(updates.flush(tx)?)).unwrap();
db.commit(|tx| {
updates.write_to_database(tx)?;
Ok(())
})
.unwrap();

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand Down
8 changes: 4 additions & 4 deletions crates/stages/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})?;
match progress {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
updates.flush(tx)?;
updates.write_to_database(tx)?;

let checkpoint = MerkleCheckpoint::new(
to_block,
Expand All @@ -237,7 +237,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
updates.flush(tx)?;
updates.write_to_database(tx)?;

entities_checkpoint.processed += hashed_entries_walked as u64;

Expand All @@ -252,7 +252,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
StageError::Fatal(Box::new(e))
})?;
updates.flush(provider.tx_ref())?;
updates.write_to_database(provider.tx_ref())?;

let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
provider.count_entries::<tables::HashedStorages>()?)
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;

// Validation passed, apply unwind changes to the database.
updates.flush(provider.tx_ref())?;
updates.write_to_database(provider.tx_ref())?;

// TODO(alexey): update entities checkpoint
} else {
Expand Down
17 changes: 6 additions & 11 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,19 +464,17 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
.root_with_progress()?
{
StateRootProgress::Progress(state, _, updates) => {
let updates_len = updates.len();
let updated_len = updates.write_to_database(tx)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
last_account_key = %state.last_account_key,
updates_len,
updated_len,
total_flushed_updates,
"Flushing trie updates"
);

intermediate_state = Some(*state);
updates.flush(tx)?;

total_flushed_updates += updates_len;

if total_flushed_updates % SOFT_LIMIT_COUNT_FLUSHED_UPDATES == 0 {
info!(target: "reth::cli",
Expand All @@ -486,15 +484,12 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
}
}
StateRootProgress::Complete(root, _, updates) => {
let updates_len = updates.len();

updates.flush(tx)?;

total_flushed_updates += updates_len;
let updated_len = updates.write_to_database(tx)?;
total_flushed_updates += updated_len;

trace!(target: "reth::cli",
%root,
updates_len = updates_len,
updated_len,
total_flushed_updates,
"State root has been computed"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ mod tests {
}

let (_, updates) = StateRoot::from_tx(tx).root_with_updates().unwrap();
updates.flush(tx).unwrap();
updates.write_to_database(tx).unwrap();
})
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2396,7 +2396,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
block_hash: end_block_hash,
})))
}
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);

Expand Down Expand Up @@ -2592,7 +2592,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
block_hash: parent_hash,
})))
}
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}

// get blocks
Expand Down Expand Up @@ -2793,7 +2793,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
// insert hashes and intermediate merkle nodes
{
HashedStateChanges(hashed_state).write_to_db(&self.tx)?;
trie_updates.flush(&self.tx)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);

Expand Down
2 changes: 1 addition & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
HashedStateChanges(db_state).write_to_db(provider_rw.tx_ref()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.flush(provider_rw.tx_ref()).unwrap();
updates.write_to_database(provider_rw.tx_ref()).unwrap();
provider_rw.commit().unwrap();
}

Expand Down
4 changes: 2 additions & 2 deletions crates/trie/parallel/src/async_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ where
};

if retain_updates {
trie_updates.extend(updates.into_iter());
trie_updates.insert_storage_updates(hashed_address, updates);
}

account_rlp.clear();
Expand All @@ -179,7 +179,7 @@ where

let root = hash_builder.root();

trie_updates.finalize_state_updates(
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
Expand Down
4 changes: 2 additions & 2 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
};

if retain_updates {
trie_updates.extend(updates.into_iter());
trie_updates.insert_storage_updates(hashed_address, updates);
}

account_rlp.clear();
Expand All @@ -161,7 +161,7 @@ where

let root = hash_builder.root();

trie_updates.finalize_state_updates(
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
Expand Down
3 changes: 2 additions & 1 deletion crates/trie/trie/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use reth_db_api::transaction::DbTx;
use reth_execution_errors::{StateRootError, StorageRootError};
use reth_primitives::{constants::EMPTY_ROOT_HASH, keccak256, Address, B256};
use reth_trie_common::{proof::ProofRetainer, AccountProof, StorageProof, TrieAccount};

/// A struct for generating merkle proofs.
///
/// Proof generator adds the target address and slots to the prefix set, enables the proof retainer
Expand Down Expand Up @@ -226,7 +227,7 @@ mod tests {
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
updates.flush(provider.tx_mut())?;
updates.write_to_database(provider.tx_mut())?;

provider.commit()?;

Expand Down
Loading
Loading