Skip to content

Commit

Permalink
feat(exex): write ahead log
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 18, 2024
1 parent 0746479 commit 213bcea
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true
reth-exex-types.workspace = true
reth-fs-util.workspace = true
reth-metrics.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand All @@ -36,6 +37,7 @@ tokio-util.workspace = true
tokio.workspace = true

## misc
bincode = "1.3"
eyre.workspace = true
metrics.workspace = true

Expand Down
2 changes: 2 additions & 0 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub use event::*;
mod manager;
pub use manager::*;

mod wal;

// Re-export exex types
#[doc(inline)]
pub use reth_exex_types::*;
34 changes: 16 additions & 18 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::sync::{
mpsc::{self, error::SendError, Receiver, UnboundedReceiver, UnboundedSender},
watch,
};
use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
use tokio_util::sync::{PollSender, ReusableBoxFuture};

/// Metrics for an `ExEx`.
#[derive(Metrics)]
Expand Down Expand Up @@ -99,7 +99,7 @@ impl ExExHandle {
&mut self,
cx: &mut Context<'_>,
(notification_id, notification): &(usize, ExExNotification),
) -> Poll<Result<(), PollSendError<ExExNotification>>> {
) -> Poll<eyre::Result<()>> {
if let Some(finished_height) = self.finished_height {
match notification {
ExExNotification::ChainCommitted { new } => {
Expand Down Expand Up @@ -132,24 +132,25 @@ impl ExExHandle {
%notification_id,
"Reserving slot for notification"
);
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => (),
other => return other,
}
ready!(self.sender.poll_reserve(cx)?);

// debug!(
// exex_id = %self.id,
// %notification_id,
// "Appending notification to WAL"
// );
// self.wal.commit(notification)?;

debug!(
exex_id = %self.id,
%notification_id,
"Sending notification"
);
match self.sender.send_item(notification.clone()) {
Ok(()) => {
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);
Poll::Ready(Ok(()))
}
Err(err) => Poll::Ready(Err(err)),
}
self.sender.send_item(notification.clone())?;
self.next_notification_id = notification_id + 1;
self.metrics.notifications_sent_total.increment(1);

Poll::Ready(Ok(()))
}
}

Expand Down Expand Up @@ -647,10 +648,7 @@ impl Future for ExExManager {
.checked_sub(self.min_id)
.expect("exex expected notification ID outside the manager's range");
if let Some(notification) = self.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
// the channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
let _ = exex.send(cx, notification)?;
}
min_id = min_id.min(exex.next_notification_id);
self.exex_handles.push(exex);
Expand Down
255 changes: 255 additions & 0 deletions crates/exex/exex/src/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#![allow(dead_code)]

use std::{
collections::VecDeque,
fs::File,
io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
path::PathBuf,
};

use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;
use reth_provider::Chain;
use reth_tracing::tracing::debug;

/// The maximum number of blocks to cache in the WAL.
///
/// [`CachedBlock`] has a size of `u64 + u64 + B256` which is 384 bits. 384 bits * 1 million = 48
/// megabytes.
const MAX_CACHED_BLOCKS: usize = 1_000_000;

#[derive(Debug)]
struct CachedBlock {
/// The file offset where the WAL entry is written.
file_offset: u64,
/// The block number and hash of the block.
block: BlockNumHash,
}

#[derive(Debug)]
pub(crate) struct Wal {
path: PathBuf,
file: File,
block_cache: VecDeque<CachedBlock>,
}

impl Wal {
/// Creates a new instance of [`Wal`].
pub(crate) fn new(directory: PathBuf) -> Self {
let path = directory.join("latest.wal");
let file = File::create(&path).unwrap();

let mut wal = Self { path, file, block_cache: VecDeque::new() };
wal.fill_block_cache(u64::MAX).unwrap();

wal
}

/// Fills the block cache with the notifications from the WAL file, up to the given offset in
/// bytes, not inclusive.
fn fill_block_cache(&mut self, to_offset: u64) -> eyre::Result<()> {
let reader = BufReader::new(&self.file);
let mut file_offset = 0;
self.block_cache = VecDeque::new();
for line in reader.split(b'\n') {
let line = line.unwrap();
let chain: Chain = bincode::deserialize(&line).unwrap();
for block in chain.blocks().values() {
self.block_cache.push_back(CachedBlock {
file_offset,
block: (block.number, block.hash()).into(),
});
if self.block_cache.len() > MAX_CACHED_BLOCKS {
self.block_cache.pop_front();
}
}

file_offset += line.len() as u64 + 1;
if file_offset >= to_offset {
break
}
}

Ok(())
}

/// Commits the notification to WAL. If the notification contains a
/// reverted chain, the WAL is truncated.
pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> {
if let Some(reverted_chain) = notification.reverted_chain() {
let mut truncate_to = None;
let mut reverted_blocks = reverted_chain.blocks().values().rev();
loop {
let Some(block) = self.block_cache.pop_back() else {
self.fill_block_cache(truncate_to.unwrap_or(u64::MAX))?;
if self.block_cache.is_empty() {
break
}
continue
};

let Some(reverted_block) = reverted_blocks.next() else { break };

if reverted_block.number != block.block.number ||
reverted_block.hash() != block.block.hash
{
return Err(eyre::eyre!("inconsistent WAL block cache entry"))
}

truncate_to = Some(block.file_offset);
}

if let Some(truncate_to) = truncate_to {
self.file.set_len(truncate_to)?;
}
}

if let Some(committed_chain) = notification.committed_chain() {
let data = bincode::serialize(&notification)?;

let file_offset = self.file.metadata()?.len();
self.file.write_all(&data)?;
self.file.write_all(b"\n")?;
self.file.flush()?;

for block in committed_chain.blocks().values() {
self.block_cache.push_back(CachedBlock {
file_offset,
block: (block.number, block.hash()).into(),
});
if self.block_cache.len() > MAX_CACHED_BLOCKS {
self.block_cache.pop_front();
}
}
}

Ok(())
}

/// Rollbacks the WAL to the given block, inclusive. Returns the block number and hash of the
/// lowest removed block.
pub(crate) fn rollback(
&mut self,
to_block: BlockNumHash,
) -> eyre::Result<Option<BlockNumHash>> {
let mut truncate_to = None;
let mut lowest_removed_block = None;
loop {
let Some(block) = self.block_cache.pop_back() else {
self.fill_block_cache(truncate_to.unwrap_or(u64::MAX))?;
if self.block_cache.is_empty() {
break
}
continue
};

if block.block.number == to_block.number {
if block.block.hash != to_block.hash {
return Err(eyre::eyre!("block hash mismatch in WAL"))
}

truncate_to = Some(block.file_offset);

self.file.seek(SeekFrom::Start(block.file_offset))?;
let notification: ExExNotification = bincode::deserialize_from(&self.file)?;
lowest_removed_block = notification
.committed_chain()
.as_ref()
.map(|chain| chain.first())
.map(|block| (block.number, block.hash()).into());

break
}

truncate_to = Some(block.file_offset);
}

if let Some(truncate_to) = truncate_to {
self.file.set_len(truncate_to)?;
}

Ok(lowest_removed_block)
}

/// Finalizes the WAL to the given block, inclusive.
pub(crate) fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> {
// Find an offset where the unfinalized blocks start. If the notificatin includes both
// finalized and non-finalized blocks, it will not be truncated and the offset will
// include it.

// First, walk cache to find the offset of the notification with the finalized block.
let mut unfinalized_from_offset = None;
while let Some(cached_block) = self.block_cache.pop_front() {
if cached_block.block.number == to_block.number {
if cached_block.block.hash != to_block.hash {
return Err(eyre::eyre!("block hash mismatch in WAL"))
}

unfinalized_from_offset = Some(self.block_cache.front().map_or_else(
|| std::io::Result::Ok(self.file.metadata()?.len()),
|block| std::io::Result::Ok(block.file_offset),
)?);
break
}
}

// If the finalized block is not found in cache, we need to walk the whole file.
if unfinalized_from_offset.is_none() {
let mut offset = 0;
for data in BufReader::new(&self.file).split(b'\n') {
let data = data?;
let notification: ExExNotification = bincode::deserialize(&data)?;
if let Some(committed_chain) = notification.committed_chain() {
let finalized_block = committed_chain.blocks().get(&to_block.number);

if let Some(finalized_block) = finalized_block {
if finalized_block.hash() != to_block.hash {
return Err(eyre::eyre!("block hash mismatch in WAL"))
}

if committed_chain.blocks().len() == 1 {
unfinalized_from_offset = Some(offset + data.len() as u64 + 1);
} else {
unfinalized_from_offset = Some(offset);
}

break
}
}

offset += data.len() as u64 + 1;
}
}

// If the finalized block is still not found, we can't do anything and just return.
let Some(unfinalized_from_offset) = unfinalized_from_offset else {
debug!(target: "exex::wal", ?to_block, "Could not find the finalized block in WAL");
return Ok(())
};

let old_file_path = self.path.with_extension("tmp");
reth_fs_util::rename(&self.path, &old_file_path)?;

let mut old_file = File::open(&old_file_path)?;
old_file.seek(SeekFrom::Start(unfinalized_from_offset))?;
let mut new_file = File::create(&self.path)?;

loop {
let mut buffer = [0; 4096];
let read = old_file.read(&mut buffer)?;
new_file.write_all(&buffer[..read])?;

if read < 1024 {
break
}
}

reth_fs_util::remove_file(old_file_path)?;

self.file = new_file;

self.fill_block_cache(u64::MAX)?;

Ok(())
}
}
7 changes: 7 additions & 0 deletions crates/node/core/src/dirs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ impl<D> ChainPath<D> {
pub fn invalid_block_hooks(&self) -> PathBuf {
self.data_dir().join("invalid_block_hooks")
}

/// Returns the path to the ExEx write ahead logs directory for this chain.
///
/// `<DIR>/<CHAIN_ID>/exex-wals`
pub fn exex_wals(&self) -> PathBuf {
self.data_dir().join("exex-wals")
}
}

impl<D> AsRef<Path> for ChainPath<D> {
Expand Down

0 comments on commit 213bcea

Please sign in to comment.