Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): write ahead log #10995

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
213bcea
feat(exex): write ahead log
shekhirin Sep 18, 2024
2381ea8
more comments, minor improvements
shekhirin Sep 18, 2024
aaa11a4
do not destroy old file on finalization until the very end
shekhirin Sep 19, 2024
d0a666d
more comments
shekhirin Sep 19, 2024
d537c4d
deserialize into notification
shekhirin Sep 19, 2024
b0965a0
include serde feature
shekhirin Sep 19, 2024
c809d89
spell
shekhirin Sep 19, 2024
d45da30
clarify why cache
shekhirin Sep 19, 2024
9d4a450
logggggggs
shekhirin Sep 19, 2024
75f2572
more comments (i need to start writing tests)
shekhirin Sep 19, 2024
117afef
oof a lot of changes but i fixed the format and added tests
shekhirin Sep 19, 2024
b7c71d9
add dedicated functions for read/write
shekhirin Sep 19, 2024
992d5ce
reorganize tests
shekhirin Sep 19, 2024
a7f3173
finalization test
shekhirin Sep 19, 2024
7a1603c
dedup file creation
shekhirin Sep 19, 2024
857ccc1
fill the block cache on rollback, also test rollback
shekhirin Sep 19, 2024
8272b85
remove unused deps
shekhirin Sep 19, 2024
ce63845
add comments to tests
shekhirin Sep 19, 2024
bee7beb
adjust the default tx count for random block
shekhirin Sep 19, 2024
b05bb87
separate wal module
shekhirin Sep 20, 2024
68ce3b4
revert two unrelated changes
shekhirin Sep 20, 2024
d80c196
add a test for eviction
shekhirin Sep 20, 2024
fce46d1
fix read/write in finalize
shekhirin Sep 20, 2024
2f390f0
move storage to a separate module
shekhirin Sep 20, 2024
fa17c65
comments
shekhirin Sep 20, 2024
23ca02c
WAL file -> storage
shekhirin Sep 20, 2024
1b79e58
oops forgot to add storage.rs
shekhirin Sep 20, 2024
108981f
return blocks on revert
shekhirin Sep 20, 2024
10c8a11
wip rework
shekhirin Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
msrv = "1.81"
too-large-for-stack = 128
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "WAL", "MessagePack"]
6 changes: 5 additions & 1 deletion crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ workspace = true
reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true
reth-exex-types.workspace = true
reth-exex-types = { workspace = true, features = ["serde"] }
reth-fs-util.workspace = true
reth-metrics.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand All @@ -38,6 +39,8 @@ tokio.workspace = true
## misc
eyre.workspace = true
metrics.workspace = true
rmp-serde = "1.3"
tracing.workspace = true

[dev-dependencies]
reth-blockchain-tree.workspace = true
Expand All @@ -50,6 +53,7 @@ reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true

secp256k1.workspace = true
tempfile.workspace = true

[features]
default = []
Expand Down
2 changes: 2 additions & 0 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub use event::*;
mod manager;
pub use manager::*;

mod wal;

// Re-export exex types
#[doc(inline)]
pub use reth_exex_types::*;
153 changes: 153 additions & 0 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::collections::VecDeque;

use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;

/// The block cache of the WAL. Acts as a FIFO queue with a specified maximum size.
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file offset as the notification in the file. I.e. for each
/// notification, there may be multiple blocks in the cache.
///
/// This cache is needed to avoid walking the file every time we want to find a notification
/// corresponding to a block.
#[derive(Debug)]
pub(super) struct BlockCache {
deque: VecDeque<CachedBlock>,
max_capacity: usize,
}

impl BlockCache {
/// Creates a new instance of [`BlockCache`] with the given maximum capacity.
pub(super) fn new(max_capacity: usize) -> Self {
Self { deque: VecDeque::with_capacity(max_capacity), max_capacity }
}

/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.deque.is_empty()
}

/// Returns a front-to-back iterator.
pub(super) fn iter(&self) -> std::collections::vec_deque::Iter<'_, CachedBlock> {
self.deque.iter()
}

/// Provides a reference to the first block from the cache, or `None` if the cache is
/// empty.
pub(super) fn front(&self) -> Option<&CachedBlock> {
self.deque.front()
}

/// Provides a reference to the last block from the cache, or `None` if the cache is
/// empty.
pub(super) fn back(&self) -> Option<&CachedBlock> {
self.deque.back()
}

/// Removes the first block from the cache and returns it, or `None` if
/// the cache is empty.
pub(super) fn pop_front(&mut self) -> Option<CachedBlock> {
self.deque.pop_front()
}

/// Removes the last block from the cache and returns it, or `None` if
/// the cache is empty.
pub(super) fn pop_back(&mut self) -> Option<CachedBlock> {
self.deque.pop_back()
}

/// Appends a block to the back of the cache.
///
/// If the cache is full, the oldest block is removed from the front.
pub(super) fn push_back(&mut self, block: CachedBlock) {
self.deque.push_back(block);
if self.deque.len() > self.max_capacity {
self.deque.pop_front();
}
}

/// Clears the cache, removing all blocks
pub(super) fn clear(&mut self) {
self.deque.clear();
}

/// Inserts the blocks from the notification into the cache with the given file offset.
///
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
pub(super) fn insert_notification_blocks_with_offset(
&mut self,
notification: &ExExNotification,
file_offset: u64,
) {
let reverted_chain = notification.reverted_chain();
let committed_chain = notification.committed_chain();

if let Some(reverted_chain) = reverted_chain {
for block in reverted_chain.blocks().values() {
self.push_back(CachedBlock {
file_offset,
action: CachedBlockAction::Revert,
block: (block.number, block.hash()).into(),
});
}
}

if let Some(committed_chain) = committed_chain {
for block in committed_chain.blocks().values() {
self.push_back(CachedBlock {
file_offset,
action: CachedBlockAction::Commit,
block: (block.number, block.hash()).into(),
});
}
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct CachedBlock {
/// The file offset where the WAL entry is written.
pub(super) file_offset: u64,
pub(super) action: CachedBlockAction,
/// The block number and hash of the block.
pub(super) block: BlockNumHash,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CachedBlockAction {
Commit,
Revert,
}

impl CachedBlockAction {
pub(super) const fn is_commit(&self) -> bool {
matches!(self, Self::Commit)
}
}

#[cfg(test)]
mod tests {
use crate::wal::cache::{BlockCache, CachedBlock, CachedBlockAction};

#[test]
fn test_eviction() {
let mut cache = BlockCache::new(1);

let cached_block_1 = CachedBlock {
file_offset: 0,
action: CachedBlockAction::Commit,
block: Default::default(),
};
cache.push_back(cached_block_1);

let cached_block_2 = CachedBlock {
file_offset: 1,
action: CachedBlockAction::Revert,
block: Default::default(),
};
cache.push_back(cached_block_2);

assert_eq!(cache.iter().collect::<Vec<_>>(), vec![&cached_block_2]);
}
}
Loading
Loading