Skip to content

Commit

Permalink
return blocks on revert
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 20, 2024
1 parent 1b79e58 commit 108981f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 24 deletions.
2 changes: 1 addition & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
msrv = "1.81"
too-large-for-stack = 128
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]
doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "WAL", "MessagePack"]
8 changes: 8 additions & 0 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ use std::collections::VecDeque;
use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;

/// The block cache of the WAL. Acts as a FIFO queue with a specified maximum size.
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file offset as the notification in the file. I.e. for each
/// notification, there may be multiple blocks in the cache.
///
/// This cache is needed to avoid walking the file every time we want to find a notification
/// corresponding to a block.
#[derive(Debug)]
pub(super) struct BlockCache {
deque: VecDeque<CachedBlock>,
Expand Down
54 changes: 34 additions & 20 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ use storage::Storage;
/// = 48 megabytes.
const MAX_CACHED_BLOCKS: usize = 1_000_000;

/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx.
///
/// WAL is backed by a binary file represented by [`Storage`] and a block cache represented by
/// [`BlockCache`].
///
/// The expected mode of operation is as follows:
/// 1. On every new canonical chain notification, call [`Wal::commit`].
/// 2. When ExEx is on a wrong fork, rollback the WAL using [`Wal::rollback`]. The caller is
/// expected to create reverts from the removed notifications and backfill the blocks between the
/// returned block and the given rollback block. After that, commit new notifications as usual
/// with [`Wal::commit`].
/// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the
/// WAL.
#[derive(Debug)]
pub(crate) struct Wal {
/// The underlying WAL storage backed by a file.
storage: Storage,
/// The block cache of the WAL. Acts as a FIFO queue with a maximum size of
/// [`MAX_CACHED_BLOCKS`].
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file offset as the notification in the [`Storage`]. I.e. for each
/// notification, there may be multiple blocks in the cache.
///
/// This cache is needed only for convenience, so we can avoid walking the [`Storage`] every
/// time we want to find a notification corresponding to a block.
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: BlockCache,
}

Expand Down Expand Up @@ -121,13 +126,13 @@ impl Wal {
///
/// # Returns
///
/// The block number and hash of the lowest removed block. The caller is expected to backfill
/// the blocks between the returned block and the given `to_block`, if there's any.
/// 1. The block number and hash of the lowest removed block.
/// 2. The notifications that were removed.
#[instrument(target = "exex::wal", skip(self))]
pub(crate) fn rollback(
&mut self,
to_block: BlockNumHash,
) -> eyre::Result<Option<BlockNumHash>> {
) -> eyre::Result<Option<(BlockNumHash, Vec<ExExNotification>)>> {
let mut truncate_to = None;
let mut lowest_removed_block = None;
loop {
Expand Down Expand Up @@ -173,15 +178,17 @@ impl Wal {
truncate_to = Some(block.file_offset);
}

if let Some(truncate_to) = truncate_to {
self.storage.truncate_to_offset(truncate_to)?;
let result = if let Some(truncate_to) = truncate_to {
let removed_notifications = self.storage.truncate_to_offset(truncate_to)?;
debug!(?truncate_to, "Truncated the storage");
Some((lowest_removed_block.expect("qed"), removed_notifications))
} else {
debug!("No blocks were truncated. Block cache was filled.");
}
None
};
self.fill_block_cache(u64::MAX)?;

Ok(lowest_removed_block)
Ok(result)
}

/// Finalizes the WAL to the given block, inclusive.
Expand Down Expand Up @@ -388,12 +395,19 @@ mod tests {

// Now, rollback to block 1 and verify that both the block cache and the storage are
// empty. We expect the rollback to delete the first notification (commit block 0, 1),
// because we can't delete blocks partly from the notification. Additionally, check that
// the block that the rolled back to is the block with number 0.
let rolled_back_to = wal.rollback((blocks[1].number, blocks[1].hash()).into())?;
// because we can't delete blocks partly from the notification, and also the second
// notification (revert block 1). Additionally, check that the block that the rolled
// back to is the block with number 0.
let rollback_result = wal.rollback((blocks[1].number, blocks[1].hash()).into())?;
assert_eq!(wal.block_cache.iter().copied().collect::<Vec<_>>(), vec![]);
assert_eq!(wal.storage.bytes_len()?, 0);
assert_eq!(rolled_back_to, Some((blocks[0].number, blocks[0].hash()).into()));
assert_eq!(
rollback_result,
Some((
(blocks[0].number, blocks[0].hash()).into(),
vec![committed_notification_1.clone(), reverted_notification.clone()]
))
);

// Commit notifications 1 and 2 again
wal.commit(&committed_notification_1)?;
Expand Down
38 changes: 35 additions & 3 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ use std::{

use reth_exex_types::ExExNotification;

/// The underlying WAL storage backed by a file.
///
/// Each notification is written without any delimiters and structured as follows:
/// ```text
/// +--------------------------+----------------------------------+
/// | little endian u32 length | MessagePack-encoded notification |
/// +--------------------------+----------------------------------+
/// ```
/// The length is the length of the MessagePack-encoded notification in bytes.
#[derive(Debug)]
pub(super) struct Storage {
/// The path to the WAL file.
Expand Down Expand Up @@ -73,19 +82,41 @@ impl Storage {
}

/// Truncates the underlying file to the given byte offset (exclusive).
pub(super) fn truncate_to_offset(&self, to_bytes_len: u64) -> eyre::Result<()> {
///
/// # Returns
///
/// Notifications that were removed.
pub(super) fn truncate_to_offset(
&mut self,
to_bytes_len: u64,
) -> eyre::Result<Vec<ExExNotification>> {
let mut removed_notifications = Vec::new();
self.for_each_notification_from_offset(to_bytes_len, |_, notification| {
removed_notifications.push(notification);
Ok(ControlFlow::Continue(()))
})?;

self.file.set_len(to_bytes_len)?;
Ok(())

Ok(removed_notifications)
}

/// Iterates over the notifications in the underlying file, decoding them and calling the
/// provided closure with the length of the notification in bytes and the notification itself.
/// Stops when the closure returns [`ControlFlow::Break`], or the end of the file is reached.
pub(super) fn for_each_notification(
&mut self,
f: impl FnMut(usize, ExExNotification) -> eyre::Result<ControlFlow<()>>,
) -> eyre::Result<()> {
self.for_each_notification_from_offset(0, f)
}

fn for_each_notification_from_offset(
&mut self,
file_offset: u64,
mut f: impl FnMut(usize, ExExNotification) -> eyre::Result<ControlFlow<()>>,
) -> eyre::Result<()> {
self.file.seek(SeekFrom::Start(0))?;
self.file.seek(SeekFrom::Start(file_offset))?;

let mut reader = BufReader::new(&self.file);
loop {
Expand Down Expand Up @@ -115,6 +146,7 @@ impl Storage {

fn write_notification(w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> {
let data = rmp_serde::encode::to_vec(notification)?;
// Write the length of the notification as a u32 in little endian
w.write_all(&(data.len() as u32).to_le_bytes())?;
w.write_all(&data)?;
w.flush()?;
Expand Down

0 comments on commit 108981f

Please sign in to comment.