Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): only delete static file if last_block is on a previous static file #11029

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
10 changes: 8 additions & 2 deletions crates/static-file/types/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ impl StaticFileSegment {
pub const fn is_receipts(&self) -> bool {
matches!(self, Self::Receipts)
}

/// Returns `true` if the segment is `StaticFileSegment::Receipts` or
/// `StaticFileSegment::Transactions`.
pub const fn is_tx_based(&self) -> bool {
matches!(self, Self::Receipts | Self::Transactions)
}
}

/// A segment header that contains information common to all segments. Used for storage.
Expand Down Expand Up @@ -239,7 +245,7 @@ impl SegmentHeader {
match self.segment {
StaticFileSegment::Headers => {
if let Some(range) = &mut self.block_range {
if num > range.end {
if num > range.end - range.start {
self.block_range = None;
} else {
range.end = range.end.saturating_sub(num);
Expand All @@ -248,7 +254,7 @@ impl SegmentHeader {
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
if let Some(range) = &mut self.tx_range {
if num > range.end {
if num > range.end - range.start {
self.tx_range = None;
} else {
range.end = range.end.saturating_sub(num);
Expand Down
25 changes: 16 additions & 9 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,16 @@ impl StaticFileProvider {
})
.or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
}
} else if tx_index.get(&segment).map(|index| index.len()) == Some(1) {
// Only happens if we unwind all the txs/receipts from the first static file.
// Should only happen in test scenarios.
if jar.user_header().expected_block_start() == 0 &&
matches!(
segment,
StaticFileSegment::Receipts | StaticFileSegment::Transactions
)
{
} else if segment.is_tx_based() {
// The unwinded file has no more transactions/receipts. However, the highest
// block is within this files' block range. We only retain
// entries with block ranges before the current one.
tx_index.entry(segment).and_modify(|index| {
index.retain(|_, block_range| block_range.start() < fixed_range.start());
});

// If the index is empty, just remove it.
if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
tx_index.remove(&segment);
}
}
Expand Down Expand Up @@ -1141,6 +1142,12 @@ impl StaticFileProvider {
pub fn path(&self) -> &Path {
&self.path
}

#[cfg(any(test, feature = "test-utils"))]
/// Returns `static_files` transaction index
pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
&self.static_files_tx_index
}
}

/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
Expand Down
107 changes: 88 additions & 19 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ mod tests {
use reth_db_api::transaction::DbTxMut;
use reth_primitives::{
static_file::{find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE},
BlockHash, Header, Receipt, TransactionSignedNoHash,
BlockHash, Header, Receipt, TransactionSignedNoHash, TxNumber,
};
use reth_storage_api::{ReceiptProvider, TransactionsProvider};
use reth_testing_utils::generators::{self, random_header_range};
use std::{fmt::Debug, fs, ops::Range, path::Path};

Expand Down Expand Up @@ -204,6 +205,14 @@ mod tests {
"block mismatch",
)?;

if let Some(id) = expected_tip {
assert_eyre(
sf_rw.header_by_number(id)?.map(|h| h.number),
expected_tip,
"header mismatch",
)?;
}

// Validate the number of files remaining in the directory
assert_eyre(
fs::read_dir(static_dir)?.count(),
Expand Down Expand Up @@ -304,17 +313,22 @@ mod tests {
mut tx_count: u64,
next_tx_num: &mut u64,
) {
let mut receipt = Receipt::default();
let mut tx = TransactionSignedNoHash::default();

for block in block_range.clone() {
writer.increment_block(block).unwrap();

// Append transaction/receipt if there's still a transaction count to append
if tx_count > 0 {
if segment.is_receipts() {
writer.append_receipt(*next_tx_num, &Receipt::default()).unwrap();
// Used as ID for validation
receipt.cumulative_gas_used = *next_tx_num;
writer.append_receipt(*next_tx_num, &receipt).unwrap();
} else {
writer
.append_transaction(*next_tx_num, &TransactionSignedNoHash::default())
.unwrap();
// Used as ID for validation
tx.transaction.set_nonce(*next_tx_num);
writer.append_transaction(*next_tx_num, &tx).unwrap();
}
*next_tx_num += 1;
tx_count -= 1;
Expand Down Expand Up @@ -376,25 +390,36 @@ mod tests {
expected_tx_range.as_ref()
);
});

// Ensure transaction index
let tx_index = sf_rw.tx_index().read();
let expected_tx_index =
vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
assert_eq!(
tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
(!expected_tx_index.is_empty()).then_some(expected_tx_index),
"tx index mismatch",
);
}

#[test]
#[ignore]
fn test_tx_based_truncation() {
let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
let blocks_per_file = 10; // Number of blocks per file
let files_per_range = 3; // Number of files per range (data/conf/offset files)
let file_set_count = 3; // Number of sets of files to create
let initial_file_count = files_per_range * file_set_count + 1; // Includes lockfile

#[allow(clippy::too_many_arguments)]
fn prune_and_validate(
sf_rw: &StaticFileProvider,
static_dir: impl AsRef<Path>,
segment: StaticFileSegment,
prune_count: u64,
last_block: u64,
expected_tx_tip: u64,
expected_tx_tip: Option<u64>,
expected_file_count: i32,
expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
) -> eyre::Result<()> {
let mut writer = sf_rw.latest_writer(segment)?;

Expand All @@ -412,18 +437,41 @@ mod tests {
Some(last_block),
"block mismatch",
)?;
assert_eyre(
sf_rw.get_highest_static_file_tx(segment),
Some(expected_tx_tip),
"tx mismatch",
)?;
assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;

// Verify that transactions and receipts are returned correctly. Uses
// cumulative_gas_used & nonce as ids.
if let Some(id) = expected_tx_tip {
if segment.is_receipts() {
assert_eyre(
expected_tx_tip,
sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
"tx mismatch",
)?;
} else {
assert_eyre(
expected_tx_tip,
sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
"tx mismatch",
)?;
}
}

// Ensure the file count has reduced as expected
assert_eyre(
fs::read_dir(static_dir)?.count(),
expected_file_count as usize,
"file count mismatch",
)?;

// Ensure that the inner tx index (max_tx -> block range) is as expected
let tx_index = sf_rw.tx_index().read();
assert_eyre(
tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
(!expected_tx_index.is_empty()).then_some(expected_tx_index),
"tx index mismatch",
)?;

Ok(())
}

Expand All @@ -442,26 +490,46 @@ mod tests {
let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();

// Test cases
// [prune_count, last_block, expected_tx_tip, expected_file_count)
// [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
let test_cases = vec![
// Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
// It ensures that the file is not deleted even though there are no rows, since the
// `last_block` which is passed to the prune method is the first
// block of the range.
(1, blocks_per_file * 2, highest_tx - 1, initial_file_count),
(
1,
blocks_per_file * 2,
Some(highest_tx - 1),
initial_file_count,
vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
),
// Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
// to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
// are deleted.
(0, blocks_per_file - 1, highest_tx - 1, files_per_range + 1), // includes lockfile
(
0,
blocks_per_file - 1,
Some(highest_tx - 1),
files_per_range + 1, // includes lockfile
vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
),
// Case 2: Prune most txs up to block 1.
(7, 1, 1, files_per_range + 1),
(
highest_tx - 1,
1,
Some(0),
files_per_range + 1,
vec![(0, SegmentRangeInclusive::new(0, 1))],
),
// Case 3: Prune remaining tx and ensure that file is not deleted.
(1, 0, 0, files_per_range + 1),
(1, 0, None, files_per_range + 1, vec![]),
];

// Loop through test cases
for (case, (prune_count, last_block, expected_tx_tip, expected_file_count)) in
test_cases.into_iter().enumerate()
for (
case,
(prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
) in test_cases.into_iter().enumerate()
{
prune_and_validate(
&sf_rw,
Expand All @@ -471,6 +539,7 @@ mod tests {
last_block,
expected_tx_tip,
expected_file_count,
expected_tx_index,
)
.map_err(|err| eyre::eyre!("Test case {case}: {err}"))
.unwrap();
Expand Down
12 changes: 10 additions & 2 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,9 @@ impl StaticFileProviderRW {
/// Commits to the configuration file at the end.
fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
let mut remaining_rows = num_rows;
let segment = self.writer.user_header().segment();
while remaining_rows > 0 {
let len = match self.writer.user_header().segment() {
let len = match segment {
StaticFileSegment::Headers => {
self.writer.user_header().block_len().unwrap_or_default()
}
Expand All @@ -396,7 +397,14 @@ impl StaticFileProviderRW {
// delete the whole file and go to the next static file
let block_start = self.writer.user_header().expected_block_start();

if block_start != 0 {
// We only delete the file if it's NOT the first static file AND:
// * it's a Header segment OR
// * it's a tx-based segment AND `last_block` is lower than the first block of this
// file's block range. Otherwise, having no rows simply means that this block
// range has no transactions, but the file should remain.
if block_start != 0 &&
(segment.is_headers() || last_block.is_some_and(|b| b < block_start))
{
self.delete_current_and_open_previous()?;
} else {
// Update `SegmentHeader`
Expand Down
Loading