From 509d6acd2b545681214950075a9ec8f6ef3b9731 Mon Sep 17 00:00:00 2001 From: Tyera Date: Tue, 10 Oct 2023 10:40:36 -0600 Subject: [PATCH] Remove primary index from Blockstore special-column keys (#33419) * Add helper trait for column key deprecation * Add WriteBatch::delete_raw * Add ProtobufColumn::get_raw_protobuf_or_bincode * Add ColumnIndexDeprecation iterator methods * Impl ColumnIndexDeprecation for TransactionStatus (doesn't build) * Update TransactionStatus put * Update TransactionStatus purge_exact * Fix read_transaction_status * Fix get_transaction_status_with_counter * Fix test_all_empty_or_min (builds except tests) * Fix test_get_rooted_block * Fix test_persist_transaction_status * Fix test_get_transaction_status * Fix test_get_rooted_transaction * Fix test_get_complete_transaction * Fix test_lowest_cleanup_slot_and_special_cfs * Fix test_map_transactions_to_statuses * Fix test_transaction_status_protobuf_backward_compatability * Fix test_special_columns_empty * Delete test_transaction_status_index * Delete test_purge_transaction_status * Ignore some tests until both special columns are dealt with (all build) * Impl ColumnIndexDeprecation for AddressSignatures (doesn't build) * Add BlockstoreError variant * Update AddressSignatures put * Remove unneeded active_transaction_status_index column lock * Update AddressSignatures purge_exact * Fix find_address_signatures_for_slot methods * Fix get_block_signatures methods * Fix get_confirmed_signatures_for_address2 * Remove unused method * Fix test_all_empty_or_min moar (builds except tests) * Fix tests (all build) * Fix test_get_confirmed_signatures_for_address * Fix test_lowest_cleanup_slot_and_special_cfs moar * Unignore tests (builds except tests) * Fix test_purge_transaction_status_exact * Fix test_purge_front_of_ledger * Fix test_purge_special_columns_compaction_filter (all build) * Move some test-harness stuff around * Add test cases for purge_special_columns_with_old_data * Add test_read_transaction_status_with_old_data * Add test_get_transaction_status_with_old_data * Review comments * Move rev of block-signatures into helper * Improve deprecated_key impls * iter_filtered -> iter_current_index_filtered * Add comment to explain why use the smallest (index, Signature) to seed the iterator * Impl ColumnIndexDeprecation for TransactionMemos (doesn't build) * Update TransactionMemos put * Add LedgerColumn::get_raw * Fix read_transaction_memos * Add TransactionMemos to purge_special_columns_exact * Add TransactionMemos to compaction filter * Take find_address_signatures out of service * Remove faulty delete_new_column_key logic * Simplify comments --- ledger/src/blockstore.rs | 1009 +++++++++------------ ledger/src/blockstore/blockstore_purge.rs | 861 +++++------------- ledger/src/blockstore_db.rs | 348 ++++++- rpc/src/transaction_status_service.rs | 3 +- 4 files changed, 955 insertions(+), 1266 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c23dc240d79f7f..4ea608d3471c26 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -6,8 +6,8 @@ use { crate::{ ancestor_iterator::AncestorIterator, blockstore_db::{ - columns as cf, Column, Database, IteratorDirection, IteratorMode, LedgerColumn, Result, - WriteBatch, + columns as cf, Column, ColumnIndexDeprecation, Database, IteratorDirection, + IteratorMode, LedgerColumn, Result, WriteBatch, }, blockstore_meta::*, blockstore_options::{ @@ -74,7 +74,7 @@ use { rc::Rc, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, RwLock, RwLockWriteGuard, + Arc, Mutex, RwLock, }, }, tempfile::{Builder, TempDir}, @@ -2210,40 +2210,40 @@ impl Blockstore { } } - fn get_primary_index_to_write( - &self, - slot: Slot, - // take WriteGuard to require critical section semantics at call site - w_active_transaction_status_index: &RwLockWriteGuard, - ) -> Result { - let i = **w_active_transaction_status_index; - let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap(); - if slot > index_meta.max_slot { - assert!(!index_meta.frozen); - index_meta.max_slot = slot; - self.transaction_status_index_cf.put(i, &index_meta)?; - } - Ok(i) - } - - pub fn read_transaction_status( + fn read_deprecated_transaction_status( &self, index: (Signature, Slot), ) -> Result> { let (signature, slot) = index; let result = self .transaction_status_cf - .get_protobuf_or_bincode::((0, signature, slot))?; + .get_raw_protobuf_or_bincode::( + &cf::TransactionStatus::deprecated_key((0, signature, slot)), + )?; if result.is_none() { Ok(self .transaction_status_cf - .get_protobuf_or_bincode::((1, signature, slot))? + .get_raw_protobuf_or_bincode::( + &cf::TransactionStatus::deprecated_key((1, signature, slot)), + )? .and_then(|meta| meta.try_into().ok())) } else { Ok(result.and_then(|meta| meta.try_into().ok())) } } + pub fn read_transaction_status( + &self, + index: (Signature, Slot), + ) -> Result> { + let result = self.transaction_status_cf.get_protobuf(index)?; + if result.is_none() { + self.read_deprecated_transaction_status(index) + } else { + Ok(result.and_then(|meta| meta.try_into().ok())) + } + } + pub fn write_transaction_status( &self, slot: Slot, @@ -2251,37 +2251,49 @@ impl Blockstore { writable_keys: Vec<&Pubkey>, readonly_keys: Vec<&Pubkey>, status: TransactionStatusMeta, + transaction_index: usize, ) -> Result<()> { let status = status.into(); - // This write lock prevents interleaving issues with the transaction_status_index_cf by gating - // writes to that column - let w_active_transaction_status_index = - self.active_transaction_status_index.write().unwrap(); - let primary_index = - self.get_primary_index_to_write(slot, &w_active_transaction_status_index)?; + let transaction_index = u32::try_from(transaction_index) + .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; self.transaction_status_cf - .put_protobuf((primary_index, signature, slot), &status)?; + .put_protobuf((signature, slot), &status)?; for address in writable_keys { self.address_signatures_cf.put( - (primary_index, *address, slot, signature), + (*address, slot, transaction_index, signature), &AddressSignatureMeta { writeable: true }, )?; } for address in readonly_keys { self.address_signatures_cf.put( - (primary_index, *address, slot, signature), + (*address, slot, transaction_index, signature), &AddressSignatureMeta { writeable: false }, )?; } Ok(()) } - pub fn read_transaction_memos(&self, signature: Signature) -> Result> { - self.transaction_memos_cf.get(signature) + pub fn read_transaction_memos( + &self, + signature: Signature, + slot: Slot, + ) -> Result> { + let memos = self.transaction_memos_cf.get((signature, slot))?; + if memos.is_none() { + self.transaction_memos_cf + .get_raw(&cf::TransactionMemos::deprecated_key(signature)) + } else { + Ok(memos) + } } - pub fn write_transaction_memos(&self, signature: &Signature, memos: String) -> Result<()> { - self.transaction_memos_cf.put(*signature, &memos) + pub fn write_transaction_memos( + &self, + signature: &Signature, + slot: Slot, + memos: String, + ) -> Result<()> { + self.transaction_memos_cf.put((*signature, slot), &memos) } /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock @@ -2328,15 +2340,40 @@ impl Blockstore { let (lock, _) = self.ensure_lowest_cleanup_slot(); let first_available_block = self.get_first_available_block()?; + let iterator = + self.transaction_status_cf + .iter_current_index_filtered(IteratorMode::From( + (signature, first_available_block), + IteratorDirection::Forward, + ))?; + + for ((sig, slot), _data) in iterator { + counter += 1; + if sig != signature { + break; + } + if !self.is_root(slot) && !confirmed_unrooted_slots.contains(&slot) { + continue; + } + let status = self + .transaction_status_cf + .get_protobuf((signature, slot))? + .and_then(|status| status.try_into().ok()) + .map(|status| (slot, status)); + return Ok((status, counter)); + } + for transaction_status_cf_primary_index in 0..=1 { - let index_iterator = self.transaction_status_cf.iter(IteratorMode::From( - ( - transaction_status_cf_primary_index, - signature, - first_available_block, - ), - IteratorDirection::Forward, - ))?; + let index_iterator = + self.transaction_status_cf + .iter_deprecated_index_filtered(IteratorMode::From( + ( + transaction_status_cf_primary_index, + signature, + first_available_block, + ), + IteratorDirection::Forward, + ))?; for ((i, sig, slot), _data) in index_iterator { counter += 1; if i != transaction_status_cf_primary_index || sig != signature { @@ -2347,7 +2384,9 @@ impl Blockstore { } let status = self .transaction_status_cf - .get_protobuf_or_bincode::((i, sig, slot))? + .get_raw_protobuf_or_bincode::( + &cf::TransactionStatus::deprecated_key((i, signature, slot)), + )? .and_then(|status| status.try_into().ok()) .map(|status| (slot, status)); return Ok((status, counter)); @@ -2463,50 +2502,19 @@ impl Blockstore { .find(|transaction| transaction.signatures[0] == signature)) } - // Returns all rooted signatures for an address, ordered by slot that the transaction was - // processed in. Within each slot the transactions will be ordered by signature, and NOT by - // the order in which the transactions exist in the block - // - // DEPRECATED + // DEPRECATED and decommissioned + // This method always returns an empty Vec fn find_address_signatures( &self, - pubkey: Pubkey, - start_slot: Slot, - end_slot: Slot, + _pubkey: Pubkey, + _start_slot: Slot, + _end_slot: Slot, ) -> Result> { - let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot(); - let mut signatures: Vec<(Slot, Signature)> = vec![]; - if end_slot < lowest_available_slot { - return Ok(signatures); - } - for transaction_status_cf_primary_index in 0..=1 { - let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( - ( - transaction_status_cf_primary_index, - pubkey, - start_slot.max(lowest_available_slot), - Signature::default(), - ), - IteratorDirection::Forward, - ))?; - for ((i, address, slot, signature), _) in index_iterator { - if i != transaction_status_cf_primary_index || slot > end_slot || address != pubkey - { - break; - } - if self.is_root(slot) { - signatures.push((slot, signature)); - } - } - } - drop(lock); - signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); - Ok(signatures) + Ok(vec![]) } // Returns all signatures for an address in a particular slot, regardless of whether that slot - // has been rooted. The transactions will be ordered by signature, and NOT by the order in - // which the transactions exist in the block + // has been rooted. The transactions will be ordered by their occurrence in the block fn find_address_signatures_for_slot( &self, pubkey: Pubkey, @@ -2517,32 +2525,29 @@ impl Blockstore { if slot < lowest_available_slot { return Ok(signatures); } - for transaction_status_cf_primary_index in 0..=1 { - let index_iterator = self.address_signatures_cf.iter(IteratorMode::From( - ( - transaction_status_cf_primary_index, - pubkey, - slot, - Signature::default(), - ), - IteratorDirection::Forward, - ))?; - for ((i, address, transaction_slot, signature), _) in index_iterator { - if i != transaction_status_cf_primary_index - || transaction_slot > slot - || address != pubkey - { - break; - } - signatures.push((slot, signature)); + let index_iterator = + self.address_signatures_cf + .iter_current_index_filtered(IteratorMode::From( + ( + pubkey, + slot.max(lowest_available_slot), + 0, + Signature::default(), + ), + IteratorDirection::Forward, + ))?; + for ((address, transaction_slot, _transaction_index, signature), _) in index_iterator { + if transaction_slot > slot || address != pubkey { + break; } + signatures.push((slot, signature)); } drop(lock); - signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1))); Ok(signatures) } - // DEPRECATED + // DEPRECATED and decommissioned + // This method always returns an empty Vec pub fn get_confirmed_signatures_for_address( &self, pubkey: Pubkey, @@ -2557,7 +2562,7 @@ impl Blockstore { .map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect()) } - fn get_sorted_block_signatures(&self, slot: Slot) -> Result> { + fn get_block_signatures_rev(&self, slot: Slot) -> Result> { let block = self.get_complete_block(slot, false).map_err(|err| { BlockstoreError::Io(IoError::new( ErrorKind::Other, @@ -2565,10 +2570,10 @@ impl Blockstore { )) })?; - // Load all signatures for the block - let mut slot_signatures: Vec<_> = block + Ok(block .transactions .into_iter() + .rev() .filter_map(|transaction_with_meta| { transaction_with_meta .transaction @@ -2576,14 +2581,7 @@ impl Blockstore { .into_iter() .next() }) - .collect(); - - // Reverse sort signatures as a way to entire a stable ordering within a slot, as - // the AddressSignatures column is ordered by signatures within a slot, - // not by block ordering - slot_signatures.sort_unstable_by(|a, b| b.cmp(a)); - - Ok(slot_signatures) + .collect()) } pub fn get_confirmed_signatures_for_address2( @@ -2616,7 +2614,7 @@ impl Blockstore { match transaction_status { None => return Ok(SignatureInfosForAddress::default()), Some((slot, _)) => { - let mut slot_signatures = self.get_sorted_block_signatures(slot)?; + let mut slot_signatures = self.get_block_signatures_rev(slot)?; if let Some(pos) = slot_signatures.iter().position(|&x| x == before) { slot_signatures.truncate(pos + 1); } @@ -2643,7 +2641,7 @@ impl Blockstore { match transaction_status { None => (first_available_block, HashSet::new()), Some((slot, _)) => { - let mut slot_signatures = self.get_sorted_block_signatures(slot)?; + let mut slot_signatures = self.get_block_signatures_rev(slot)?; if let Some(pos) = slot_signatures.iter().position(|&x| x == until) { slot_signatures = slot_signatures.split_off(pos); } @@ -2673,65 +2671,26 @@ impl Blockstore { } get_initial_slot_timer.stop(); - // Check the active_transaction_status_index to see if it contains slot. If so, start with - // that index, as it will contain higher slots - let starting_primary_index = *self.active_transaction_status_index.read().unwrap(); - let next_primary_index = u64::from(starting_primary_index == 0); - let next_max_slot = self - .transaction_status_index_cf - .get(next_primary_index)? - .unwrap() - .max_slot; - - let mut starting_primary_index_iter_timer = Measure::start("starting_primary_index_iter"); - if slot > next_max_slot { - let mut starting_iterator = self.address_signatures_cf.iter(IteratorMode::From( - (starting_primary_index, address, slot, Signature::default()), - IteratorDirection::Reverse, - ))?; - - // Iterate through starting_iterator until limit is reached - while address_signatures.len() < limit { - if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() { - if slot == next_max_slot || slot < lowest_slot { - break; - } - if i == starting_primary_index && key_address == address { - if self.is_root(slot) || confirmed_unrooted_slots.contains(&slot) { - address_signatures.push((slot, signature)); - } - continue; - } - } - break; - } - - // Handle slots that cross primary indexes - if next_max_slot >= lowest_slot { - let mut signatures = - self.find_address_signatures_for_slot(address, next_max_slot)?; - signatures.reverse(); - address_signatures.append(&mut signatures); - } - } - starting_primary_index_iter_timer.stop(); - - // Iterate through next_iterator until limit is reached - let mut next_primary_index_iter_timer = Measure::start("next_primary_index_iter_timer"); - let mut next_iterator = self.address_signatures_cf.iter(IteratorMode::From( - (next_primary_index, address, slot, Signature::default()), - IteratorDirection::Reverse, - ))?; + let mut address_signatures_iter_timer = Measure::start("iter_timer"); + let mut iterator = + self.address_signatures_cf + .iter_current_index_filtered(IteratorMode::From( + // Ragardless of whether a `before` signature is provided, the latest relevant + // `slot` is queried directly with the `find_address_signatures_for_slot()` + // call above. Thus, this iterator starts at the lowest entry of `address, + // slot` and iterates backwards to continue reporting the next earliest + // signatures. + (address, slot, 0, Signature::default()), + IteratorDirection::Reverse, + ))?; + + // Iterate until limit is reached while address_signatures.len() < limit { - if let Some(((i, key_address, slot, signature), _)) = next_iterator.next() { - // Skip next_max_slot, which is already included - if slot == next_max_slot { - continue; - } + if let Some(((key_address, slot, _transaction_index, signature), _)) = iterator.next() { if slot < lowest_slot { break; } - if i == next_primary_index && key_address == address { + if key_address == address { if self.is_root(slot) || confirmed_unrooted_slots.contains(&slot) { address_signatures.push((slot, signature)); } @@ -2740,7 +2699,8 @@ impl Blockstore { } break; } - next_primary_index_iter_timer.stop(); + address_signatures_iter_timer.stop(); + let mut address_signatures: Vec<(Slot, Signature)> = address_signatures .into_iter() .filter(|(_, signature)| !until_excluded_signatures.contains(signature)) @@ -2754,7 +2714,7 @@ impl Blockstore { let transaction_status = self.get_transaction_status(signature, &confirmed_unrooted_slots)?; let err = transaction_status.and_then(|(_slot, status)| status.status.err()); - let memo = self.read_transaction_memos(signature)?; + let memo = self.read_transaction_memos(signature, slot)?; let block_time = self.get_block_time(slot)?; infos.push(ConfirmedTransactionStatusWithSignature { signature, @@ -2779,13 +2739,8 @@ impl Blockstore { i64 ), ( - "starting_primary_index_iter_us", - starting_primary_index_iter_timer.as_us() as i64, - i64 - ), - ( - "next_primary_index_iter_us", - next_primary_index_iter_timer.as_us() as i64, + "address_signatures_iter_us", + address_signatures_iter_timer.as_us() as i64, i64 ), ( @@ -4479,18 +4434,14 @@ pub fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) { .iter::(IteratorMode::Start) .unwrap() .next() - .map(|((primary_index, _, slot), _)| { - slot >= min_slot || (primary_index == 2 && slot == 0) - }) + .map(|((_, slot), _)| slot >= min_slot || slot == 0) .unwrap_or(true) & blockstore .db .iter::(IteratorMode::Start) .unwrap() .next() - .map(|((primary_index, _, slot, _), _)| { - slot >= min_slot || (primary_index == 2 && slot == 0) - }) + .map(|((_, slot, _, _), _)| slot >= min_slot || slot == 0) .unwrap_or(true) & blockstore .db @@ -7204,7 +7155,7 @@ pub mod tests { .into(); blockstore .transaction_status_cf - .put_protobuf((0, signature, slot), &status) + .put_protobuf((signature, slot), &status) .unwrap(); let status = TransactionStatusMeta { status: Ok(()), @@ -7223,7 +7174,7 @@ pub mod tests { .into(); blockstore .transaction_status_cf - .put_protobuf((0, signature, slot + 1), &status) + .put_protobuf((signature, slot + 1), &status) .unwrap(); let status = TransactionStatusMeta { status: Ok(()), @@ -7242,7 +7193,7 @@ pub mod tests { .into(); blockstore .transaction_status_cf - .put_protobuf((0, signature, slot + 2), &status) + .put_protobuf((signature, slot + 2), &status) .unwrap(); VersionedTransactionWithStatusMeta { transaction, @@ -7383,7 +7334,7 @@ pub mod tests { // result not found assert!(transaction_status_cf - .get_protobuf_or_bincode::((0, Signature::default(), 0)) + .get_protobuf((Signature::default(), 0)) .unwrap() .is_none()); @@ -7404,7 +7355,7 @@ pub mod tests { } .into(); assert!(transaction_status_cf - .put_protobuf((0, Signature::default(), 0), &status,) + .put_protobuf((Signature::default(), 0), &status) .is_ok()); // result found @@ -7422,7 +7373,7 @@ pub mod tests { return_data, compute_units_consumed, } = transaction_status_cf - .get_protobuf_or_bincode::((0, Signature::default(), 0)) + .get_protobuf((Signature::default(), 0)) .unwrap() .unwrap() .try_into() @@ -7457,7 +7408,7 @@ pub mod tests { } .into(); assert!(transaction_status_cf - .put_protobuf((0, Signature::from([2u8; 64]), 9), &status,) + .put_protobuf((Signature::from([2u8; 64]), 9), &status,) .is_ok()); // result found @@ -7475,11 +7426,7 @@ pub mod tests { return_data, compute_units_consumed, } = transaction_status_cf - .get_protobuf_or_bincode::(( - 0, - Signature::from([2u8; 64]), - 9, - )) + .get_protobuf((Signature::from([2u8; 64]), 9)) .unwrap() .unwrap() .try_into() @@ -7501,208 +7448,73 @@ pub mod tests { } #[test] - #[allow(clippy::cognitive_complexity)] - fn test_transaction_status_index() { + fn test_read_transaction_status_with_old_data() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let signature = Signature::from([1; 64]); - let transaction_status_index_cf = &blockstore.transaction_status_index_cf; - let slot0 = 10; - - // Primary index column is initialized on Blockstore::open - assert!(transaction_status_index_cf.get(0).unwrap().is_some()); - assert!(transaction_status_index_cf.get(1).unwrap().is_some()); + let index0_slot = 2; + blockstore + .write_deprecated_transaction_status( + 0, + index0_slot, + signature, + vec![&Pubkey::new_unique()], + vec![&Pubkey::new_unique()], + TransactionStatusMeta { + fee: index0_slot * 1_000, + ..TransactionStatusMeta::default() + }, + ) + .unwrap(); - for _ in 0..5 { - let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::()); - blockstore - .write_transaction_status( - slot0, - Signature::from(random_bytes), - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), - ) - .unwrap(); - } + let index1_slot = 1; + blockstore + .write_deprecated_transaction_status( + 1, + index1_slot, + signature, + vec![&Pubkey::new_unique()], + vec![&Pubkey::new_unique()], + TransactionStatusMeta { + fee: index1_slot * 1_000, + ..TransactionStatusMeta::default() + }, + ) + .unwrap(); - // New statuses bump index 0 max_slot - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: slot0, - frozen: false, - } - ); - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta::default() - ); + let slot = 3; + blockstore + .write_transaction_status( + slot, + signature, + vec![&Pubkey::new_unique()], + vec![&Pubkey::new_unique()], + TransactionStatusMeta { + fee: slot * 1_000, + ..TransactionStatusMeta::default() + }, + 0, + ) + .unwrap(); - let first_status_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() - .unwrap() - .0; - assert_eq!(first_status_entry.0, 0); - assert_eq!(first_status_entry.2, slot0); - let first_address_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() + let meta = blockstore + .read_transaction_status((signature, slot)) .unwrap() - .0; - assert_eq!(first_address_entry.0, 0); - assert_eq!(first_address_entry.2, slot0); - - blockstore.run_purge(0, 8, PurgeType::PrimaryIndex).unwrap(); - // First successful prune freezes index 0 - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: slot0, - frozen: true, - } - ); - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta::default() - ); - - let slot1 = 20; - for _ in 0..5 { - let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::()); - blockstore - .write_transaction_status( - slot1, - Signature::from(random_bytes), - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), - ) - .unwrap(); - } - - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: slot0, - frozen: true, - } - ); - // Index 0 is frozen, so new statuses bump index 1 max_slot - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: slot1, - frozen: false, - } - ); + .unwrap(); + assert_eq!(meta.fee, slot * 1000); - // Index 0 statuses and address records still exist - let first_status_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() - .unwrap() - .0; - assert_eq!(first_status_entry.0, 0); - assert_eq!(first_status_entry.2, 10); - let first_address_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() - .unwrap() - .0; - assert_eq!(first_address_entry.0, 0); - assert_eq!(first_address_entry.2, slot0); - // New statuses and address records are stored in index 1 - let index1_first_status_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(1), - IteratorDirection::Forward, - )) - .unwrap() - .next() - .unwrap() - .0; - assert_eq!(index1_first_status_entry.0, 1); - assert_eq!(index1_first_status_entry.2, slot1); - let index1_first_address_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(1), - IteratorDirection::Forward, - )) - .unwrap() - .next() + let meta = blockstore + .read_transaction_status((signature, index0_slot)) .unwrap() - .0; - assert_eq!(index1_first_address_entry.0, 1); - assert_eq!(index1_first_address_entry.2, slot1); - - blockstore - .run_purge(0, 18, PurgeType::PrimaryIndex) .unwrap(); - // Successful prune toggles TransactionStatusIndex - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: false, - } - ); - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: slot1, - frozen: true, - } - ); + assert_eq!(meta.fee, index0_slot * 1000); - // Index 0 has been pruned, so first status and address entries are now index 1 - let first_status_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() - .unwrap() - .0; - assert_eq!(first_status_entry.0, 1); - assert_eq!(first_status_entry.2, slot1); - let first_address_entry = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap() - .next() + let meta = blockstore + .read_transaction_status((signature, index1_slot)) .unwrap() - .0; - assert_eq!(first_address_entry.0, 1); - assert_eq!(first_address_entry.2, slot1); + .unwrap(); + assert_eq!(meta.fee, index1_slot * 1000); } #[test] @@ -7755,56 +7567,45 @@ pub mod tests { blockstore.set_roots([0, 2].iter()).unwrap(); - // Initialize index 0, including: - // signature2 in non-root and root, - // signature4 in non-root, + // Initialize statuses: + // signature2 in skipped slot and root, + // signature4 in skipped slot, // signature5 in skipped slot and non-root, // signature6 in skipped slot, + // signature5 extra entries transaction_status_cf - .put_protobuf((0, signature2, 1), &status) - .unwrap(); - - transaction_status_cf - .put_protobuf((0, signature2, 2), &status) - .unwrap(); - - transaction_status_cf - .put_protobuf((0, signature4, 1), &status) + .put_protobuf((signature2, 1), &status) .unwrap(); transaction_status_cf - .put_protobuf((0, signature5, 1), &status) + .put_protobuf((signature2, 2), &status) .unwrap(); transaction_status_cf - .put_protobuf((0, signature5, 3), &status) + .put_protobuf((signature4, 1), &status) .unwrap(); transaction_status_cf - .put_protobuf((0, signature6, 1), &status) + .put_protobuf((signature5, 1), &status) .unwrap(); - // Initialize index 1, including: - // signature4 in root, - // signature6 in non-root, - // signature5 extra entries transaction_status_cf - .put_protobuf((1, signature4, 2), &status) + .put_protobuf((signature5, 3), &status) .unwrap(); transaction_status_cf - .put_protobuf((1, signature5, 4), &status) + .put_protobuf((signature6, 1), &status) .unwrap(); transaction_status_cf - .put_protobuf((1, signature5, 5), &status) + .put_protobuf((signature5, 5), &status) .unwrap(); transaction_status_cf - .put_protobuf((1, signature6, 3), &status) + .put_protobuf((signature6, 3), &status) .unwrap(); - // Signature exists, root found in index 0 + // Signature exists, root found if let (Some((slot, _status)), counter) = blockstore .get_transaction_status_with_counter(signature2, &[].into()) .unwrap() @@ -7822,30 +7623,26 @@ pub mod tests { assert_eq!(counter, 2); } - // Signature exists, root found in index 1 - if let (Some((slot, _status)), counter) = blockstore + // Signature exists in skipped slot, no root found + let (status, counter) = blockstore .get_transaction_status_with_counter(signature4, &[].into()) - .unwrap() - { - assert_eq!(slot, 2); - assert_eq!(counter, 3); - } + .unwrap(); + assert_eq!(status, None); + assert_eq!(counter, 2); - // Signature exists, root found although not required, in index 1 - if let (Some((slot, _status)), counter) = blockstore + // Signature exists in skipped slot, no non-root found + let (status, counter) = blockstore .get_transaction_status_with_counter(signature4, &[3].into()) - .unwrap() - { - assert_eq!(slot, 2); - assert_eq!(counter, 3); - } + .unwrap(); + assert_eq!(status, None); + assert_eq!(counter, 2); // Signature exists, no root found let (status, counter) = blockstore .get_transaction_status_with_counter(signature5, &[].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 6); + assert_eq!(counter, 4); // Signature exists, root not required if let (Some((slot, _status)), counter) = blockstore @@ -7861,38 +7658,163 @@ pub mod tests { .get_transaction_status_with_counter(signature1, &[].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 2); + assert_eq!(counter, 1); let (status, counter) = blockstore .get_transaction_status_with_counter(signature1, &[3].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 2); + assert_eq!(counter, 1); // Signature does not exist, between existing entries let (status, counter) = blockstore .get_transaction_status_with_counter(signature3, &[].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 2); + assert_eq!(counter, 1); let (status, counter) = blockstore .get_transaction_status_with_counter(signature3, &[3].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 2); + assert_eq!(counter, 1); // Signature does not exist, larger than existing entries let (status, counter) = blockstore .get_transaction_status_with_counter(signature7, &[].into()) .unwrap(); assert_eq!(status, None); - assert_eq!(counter, 1); + assert_eq!(counter, 0); let (status, counter) = blockstore .get_transaction_status_with_counter(signature7, &[3].into()) .unwrap(); assert_eq!(status, None); + assert_eq!(counter, 0); + } + + #[test] + fn test_get_transaction_status_with_old_data() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + let transaction_status_cf = &blockstore.transaction_status_cf; + + let pre_balances_vec = vec![1, 2, 3]; + let post_balances_vec = vec![3, 2, 1]; + let status = TransactionStatusMeta { + status: solana_sdk::transaction::Result::<()>::Ok(()), + fee: 42u64, + pre_balances: pre_balances_vec, + post_balances: post_balances_vec, + inner_instructions: Some(vec![]), + log_messages: Some(vec![]), + pre_token_balances: Some(vec![]), + post_token_balances: Some(vec![]), + rewards: Some(vec![]), + loaded_addresses: LoadedAddresses::default(), + return_data: Some(TransactionReturnData::default()), + compute_units_consumed: Some(42u64), + } + .into(); + + let signature1 = Signature::from([1u8; 64]); + let signature2 = Signature::from([2u8; 64]); + let signature3 = Signature::from([3u8; 64]); + let signature4 = Signature::from([4u8; 64]); + let signature5 = Signature::from([5u8; 64]); + let signature6 = Signature::from([6u8; 64]); + + // Insert slots with fork + // 0 (root) + // / \ + // 1 | + // 2 (root) + // / | + // 3 | + // 4 (root) + // | + // 5 + let meta0 = SlotMeta::new(0, Some(0)); + blockstore.meta_cf.put(0, &meta0).unwrap(); + let meta1 = SlotMeta::new(1, Some(0)); + blockstore.meta_cf.put(1, &meta1).unwrap(); + let meta2 = SlotMeta::new(2, Some(0)); + blockstore.meta_cf.put(2, &meta2).unwrap(); + let meta3 = SlotMeta::new(3, Some(2)); + blockstore.meta_cf.put(3, &meta3).unwrap(); + let meta4 = SlotMeta::new(4, Some(2)); + blockstore.meta_cf.put(4, &meta4).unwrap(); + let meta5 = SlotMeta::new(5, Some(4)); + blockstore.meta_cf.put(5, &meta5).unwrap(); + + blockstore.set_roots([0, 2, 4].iter()).unwrap(); + + // Initialize statuses: + // signature1 in skipped slot and root (2), both index 1 + // signature2 in skipped slot and root (4), both index 0 + // signature3 in root + // signature4 in non-root, + // signature5 extra entries + transaction_status_cf + .put_deprecated_protobuf((1, signature1, 1), &status) + .unwrap(); + + transaction_status_cf + .put_deprecated_protobuf((1, signature1, 2), &status) + .unwrap(); + + transaction_status_cf + .put_deprecated_protobuf((0, signature2, 3), &status) + .unwrap(); + + transaction_status_cf + .put_deprecated_protobuf((0, signature2, 4), &status) + .unwrap(); + + transaction_status_cf + .put_protobuf((signature3, 4), &status) + .unwrap(); + + transaction_status_cf + .put_protobuf((signature4, 5), &status) + .unwrap(); + + transaction_status_cf + .put_protobuf((signature5, 5), &status) + .unwrap(); + + // Signature exists, root found in index 1 + if let (Some((slot, _status)), counter) = blockstore + .get_transaction_status_with_counter(signature1, &[].into()) + .unwrap() + { + assert_eq!(slot, 2); + assert_eq!(counter, 4); + } + + // Signature exists, root found in index 0 + if let (Some((slot, _status)), counter) = blockstore + .get_transaction_status_with_counter(signature2, &[].into()) + .unwrap() + { + assert_eq!(slot, 4); + assert_eq!(counter, 3); + } + + // Signature exists + if let (Some((slot, _status)), counter) = blockstore + .get_transaction_status_with_counter(signature3, &[].into()) + .unwrap() + { + assert_eq!(slot, 4); + assert_eq!(counter, 1); + } + + // Signature does not exist + let (status, counter) = blockstore + .get_transaction_status_with_counter(signature6, &[].into()) + .unwrap(); + assert_eq!(status, None); assert_eq!(counter, 1); } @@ -7940,11 +7862,11 @@ pub mod tests { let lowest_available_slot = lowest_cleanup_slot + 1; transaction_status_cf - .put_protobuf((0, signature1, lowest_cleanup_slot), &status) + .put_protobuf((signature1, lowest_cleanup_slot), &status) .unwrap(); transaction_status_cf - .put_protobuf((0, signature2, lowest_available_slot), &status) + .put_protobuf((signature2, lowest_available_slot), &status) .unwrap(); let address0 = solana_sdk::pubkey::new_rand(); @@ -7956,6 +7878,7 @@ pub mod tests { vec![&address0], vec![], TransactionStatusMeta::default(), + 0, ) .unwrap(); blockstore @@ -7965,6 +7888,7 @@ pub mod tests { vec![&address1], vec![], TransactionStatusMeta::default(), + 0, ) .unwrap(); @@ -7979,10 +7903,6 @@ pub mod tests { .find_address_signatures_for_slot(address0, lowest_cleanup_slot) .unwrap() .is_empty(), - blockstore - .find_address_signatures(address0, lowest_cleanup_slot, lowest_cleanup_slot) - .unwrap() - .is_empty(), ) }; @@ -7997,17 +7917,13 @@ pub mod tests { .find_address_signatures_for_slot(address1, lowest_available_slot) .unwrap() .is_empty(), - !blockstore - .find_address_signatures(address1, lowest_available_slot, lowest_available_slot) - .unwrap() - .is_empty(), ); - assert_eq!(are_existing_always, (true, true, true)); + assert_eq!(are_existing_always, (true, true)); }; let are_missing = check_for_missing(); // should never be missing before the conditional compaction & simulation... - assert_eq!(are_missing, (false, false, false)); + assert_eq!(are_missing, (false, false)); assert_existing_always(); if simulate_ledger_cleanup_service { @@ -8019,10 +7935,10 @@ pub mod tests { if simulate_ledger_cleanup_service { // ... when either simulation (or both) is effective, we should observe to be missing // consistently - assert_eq!(are_missing, (true, true, true)); + assert_eq!(are_missing, (true, true)); } else { // ... otherwise, we should observe to be existing... - assert_eq!(are_missing, (false, false, false)); + assert_eq!(are_missing, (false, false)); } assert_existing_always(); } @@ -8099,7 +8015,7 @@ pub mod tests { .into(); blockstore .transaction_status_cf - .put_protobuf((0, signature, slot), &status) + .put_protobuf((signature, slot), &status) .unwrap(); VersionedTransactionWithStatusMeta { transaction, @@ -8143,11 +8059,13 @@ pub mod tests { ); } - blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap(); + blockstore + .run_purge(0, slot, PurgeType::CompactionFilter) + .unwrap(); *blockstore.lowest_cleanup_slot.write().unwrap() = slot; for VersionedTransactionWithStatusMeta { transaction, .. } in expected_transactions { let signature = transaction.signatures[0]; - assert_eq!(blockstore.get_rooted_transaction(signature).unwrap(), None,); + assert_eq!(blockstore.get_rooted_transaction(signature).unwrap(), None); assert_eq!( blockstore .get_complete_transaction(signature, slot + 1) @@ -8219,7 +8137,7 @@ pub mod tests { .into(); blockstore .transaction_status_cf - .put_protobuf((0, signature, slot), &status) + .put_protobuf((signature, slot), &status) .unwrap(); VersionedTransactionWithStatusMeta { transaction, @@ -8256,7 +8174,9 @@ pub mod tests { assert_eq!(blockstore.get_rooted_transaction(signature).unwrap(), None); } - blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap(); + blockstore + .run_purge(0, slot, PurgeType::CompactionFilter) + .unwrap(); *blockstore.lowest_cleanup_slot.write().unwrap() = slot; for VersionedTransactionWithStatusMeta { transaction, .. } in expected_transactions { let signature = transaction.signatures[0]; @@ -8284,138 +8204,32 @@ pub mod tests { ); } - #[test] - fn test_get_confirmed_signatures_for_address() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - let address0 = solana_sdk::pubkey::new_rand(); - let address1 = solana_sdk::pubkey::new_rand(); - - let slot0 = 10; - for x in 1..5 { - let signature = Signature::from([x; 64]); - blockstore - .write_transaction_status( - slot0, - signature, - vec![&address0], - vec![&address1], - TransactionStatusMeta::default(), - ) - .unwrap(); - } - let slot1 = 20; - for x in 5..9 { - let signature = Signature::from([x; 64]); - blockstore - .write_transaction_status( - slot1, - signature, - vec![&address0], - vec![&address1], - TransactionStatusMeta::default(), - ) - .unwrap(); - } - blockstore.set_roots([slot0, slot1].iter()).unwrap(); - - let all0 = blockstore - .get_confirmed_signatures_for_address(address0, 0, 50) - .unwrap(); - assert_eq!(all0.len(), 8); - for x in 1..9 { - let expected_signature = Signature::from([x; 64]); - assert_eq!(all0[x as usize - 1], expected_signature); - } - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 20, 50) - .unwrap() - .len(), - 4 - ); - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 0, 10) - .unwrap() - .len(), - 4 - ); - assert!(blockstore - .get_confirmed_signatures_for_address(address0, 1, 5) - .unwrap() - .is_empty()); - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 1, 15) - .unwrap() - .len(), - 4 - ); - - let all1 = blockstore - .get_confirmed_signatures_for_address(address1, 0, 50) - .unwrap(); - assert_eq!(all1.len(), 8); - for x in 1..9 { - let expected_signature = Signature::from([x; 64]); - assert_eq!(all1[x as usize - 1], expected_signature); - } - - // Purge index 0 - blockstore - .run_purge(0, 10, PurgeType::PrimaryIndex) - .unwrap(); - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 0, 50) - .unwrap() - .len(), - 4 - ); - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 20, 50) - .unwrap() - .len(), - 4 - ); - assert!(blockstore - .get_confirmed_signatures_for_address(address0, 0, 10) - .unwrap() - .is_empty()); - assert!(blockstore - .get_confirmed_signatures_for_address(address0, 1, 5) - .unwrap() - .is_empty()); - assert_eq!( - blockstore - .get_confirmed_signatures_for_address(address0, 1, 25) - .unwrap() - .len(), - 4 - ); - - // Test sort, regardless of entry order or signature value - for slot in (21..25).rev() { - let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::()); - let signature = Signature::from(random_bytes); - blockstore - .write_transaction_status( - slot, - signature, - vec![&address0], - vec![&address1], - TransactionStatusMeta::default(), - ) - .unwrap(); - } - blockstore.set_roots([21, 22, 23, 24].iter()).unwrap(); - let mut past_slot = 0; - for (slot, _) in blockstore.find_address_signatures(address0, 1, 25).unwrap() { - assert!(slot >= past_slot); - past_slot = slot; + impl Blockstore { + pub(crate) fn write_deprecated_transaction_status( + &self, + primary_index: u64, + slot: Slot, + signature: Signature, + writable_keys: Vec<&Pubkey>, + readonly_keys: Vec<&Pubkey>, + status: TransactionStatusMeta, + ) -> Result<()> { + let status = status.into(); + self.transaction_status_cf + .put_deprecated_protobuf((primary_index, signature, slot), &status)?; + for address in writable_keys { + self.address_signatures_cf.put_deprecated( + (primary_index, *address, slot, signature), + &AddressSignatureMeta { writeable: true }, + )?; + } + for address in readonly_keys { + self.address_signatures_cf.put_deprecated( + (primary_index, *address, slot, signature), + &AddressSignatureMeta { writeable: false }, + )?; + } + Ok(()) } } @@ -8437,6 +8251,7 @@ pub mod tests { vec![&address0], vec![&address1], TransactionStatusMeta::default(), + x as usize, ) .unwrap(); } @@ -8450,6 +8265,7 @@ pub mod tests { vec![&address0], vec![&address1], TransactionStatusMeta::default(), + x as usize, ) .unwrap(); } @@ -8462,6 +8278,7 @@ pub mod tests { vec![&address0], vec![&address1], TransactionStatusMeta::default(), + x as usize, ) .unwrap(); } @@ -8475,6 +8292,7 @@ pub mod tests { vec![&address0], vec![&address1], TransactionStatusMeta::default(), + x as usize, ) .unwrap(); } @@ -8547,6 +8365,7 @@ pub mod tests { ); blockstore.insert_shreds(shreds, None, false).unwrap(); + let mut counter = 0; for entry in entries.into_iter() { for transaction in entry.transactions { assert_eq!(transaction.signatures.len(), 1); @@ -8557,8 +8376,10 @@ pub mod tests { transaction.message.static_account_keys().iter().collect(), vec![], TransactionStatusMeta::default(), + counter, ) .unwrap(); + counter += 1; } } } @@ -8572,6 +8393,7 @@ pub mod tests { entries_to_test_shreds(&entries, slot, 8, true, 0, /*merkle_variant:*/ true); blockstore.insert_shreds(shreds, None, false).unwrap(); + let mut counter = 0; for entry in entries.into_iter() { for transaction in entry.transactions { assert_eq!(transaction.signatures.len(), 1); @@ -8582,8 +8404,10 @@ pub mod tests { transaction.message.static_account_keys().iter().collect(), vec![], TransactionStatusMeta::default(), + counter, ) .unwrap(); + counter += 1; } } } @@ -8710,8 +8534,7 @@ pub mod tests { assert_eq!(results[2], all0[i + 2]); } - // Ensure that the signatures within a slot are reverse ordered by signature - // (current limitation of the .get_confirmed_signatures_for_address2()) + // Ensure that the signatures within a slot are reverse ordered by occurrence in block for i in (0..all1.len()).step_by(2) { let results = blockstore .get_confirmed_signatures_for_address2( @@ -8729,7 +8552,6 @@ pub mod tests { .infos; assert_eq!(results.len(), 2); assert_eq!(results[0].slot, results[1].slot); - assert!(results[0].signature >= results[1].signature); assert_eq!(results[0], all1[i]); assert_eq!(results[1], all1[i + 1]); } @@ -8885,8 +8707,7 @@ pub mod tests { assert_eq!(results[1], all0[i + 1]); } - // Ensure that the signatures within a slot are reverse ordered by signature - // (current limitation of the .get_confirmed_signatures_for_address2()) + // Ensure that the signatures within a slot are reverse ordered by occurrence in block for i in (0..all1.len()).step_by(2) { let results = blockstore .get_confirmed_signatures_for_address2( @@ -8904,7 +8725,6 @@ pub mod tests { .infos; assert_eq!(results.len(), 2); assert_eq!(results[0].slot, results[1].slot); - assert!(results[0].signature >= results[1].signature); assert_eq!(results[0], all1[i]); assert_eq!(results[1], all1[i + 1]); } @@ -8939,7 +8759,7 @@ pub mod tests { // Remove signature blockstore .address_signatures_cf - .delete((0, address0, 2, all0[0].signature)) + .delete((address0, 2, 0, all0[0].signature)) .unwrap(); let sig_infos = blockstore .get_confirmed_signatures_for_address2( @@ -9005,7 +8825,7 @@ pub mod tests { } .into(); transaction_status_cf - .put_protobuf((0, transaction.signatures[0], slot), &status) + .put_protobuf((transaction.signatures[0], slot), &status) .unwrap(); transactions.push(transaction.into()); } @@ -9698,6 +9518,10 @@ pub mod tests { } } + // This test is probably superfluous, since it is highly unlikely that bincode-format + // TransactionStatus entries exist in any current ledger. They certainly exist in historical + // ledger archives, but typically those require contemporaraneous software for other reasons. + // However, we are persisting the test since the apis still exist in `blockstore_db`. #[test] fn test_transaction_status_protobuf_backward_compatability() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -9755,13 +9579,13 @@ pub mod tests { let data = serialize(&deprecated_status).unwrap(); blockstore .transaction_status_cf - .put_bytes((0, Signature::default(), slot), &data) + .put_bytes((Signature::default(), slot), &data) .unwrap(); } for slot in 2..4 { blockstore .transaction_status_cf - .put_protobuf((0, Signature::default(), slot), &protobuf_status) + .put_protobuf((Signature::default(), slot), &protobuf_status) .unwrap(); } for slot in 0..4 { @@ -9769,7 +9593,6 @@ pub mod tests { blockstore .transaction_status_cf .get_protobuf_or_bincode::(( - 0, Signature::default(), slot )) diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index f7e8aab3db3ad7..5c1644aaa032a0 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -1,4 +1,7 @@ -use {super::*, solana_sdk::message::AccountKeys, std::time::Instant}; +use { + super::*, crate::blockstore_db::ColumnIndexDeprecation, solana_sdk::message::AccountKeys, + std::time::Instant, +}; #[derive(Default)] pub struct PurgeStats { @@ -391,18 +394,28 @@ impl Blockstore { for slot in from_slot..=to_slot { let primary_indexes = slot_indexes(slot); - if primary_indexes.is_empty() { - continue; - } let slot_entries = self.get_any_valid_slot_entries(slot, 0); let transactions = slot_entries .into_iter() .flat_map(|entry| entry.transactions); - for transaction in transactions { + for (i, transaction) in transactions.enumerate() { if let Some(&signature) = transaction.signatures.get(0) { + batch.delete::((signature, slot))?; + batch.delete::((signature, slot))?; + if !primary_indexes.is_empty() { + batch.delete_raw::( + &cf::TransactionMemos::deprecated_key(signature), + )?; + } for primary_index in &primary_indexes { - batch.delete::((*primary_index, signature, slot))?; + batch.delete_raw::( + &cf::TransactionStatus::deprecated_key(( + *primary_index, + signature, + slot, + )), + )?; } let meta = self.read_transaction_status((signature, slot))?; @@ -412,14 +425,24 @@ impl Blockstore { loaded_addresses.as_ref(), ); + let transaction_index = + u32::try_from(i).map_err(|_| BlockstoreError::TransactionIndexOverflow)?; for pubkey in account_keys.iter() { + batch.delete::(( + *pubkey, + slot, + transaction_index, + signature, + ))?; for primary_index in &primary_indexes { - batch.delete::(( - *primary_index, - *pubkey, - slot, - signature, - ))?; + batch.delete_raw::( + &cf::AddressSignatures::deprecated_key(( + *primary_index, + *pubkey, + slot, + signature, + )), + )?; } } } @@ -482,6 +505,7 @@ pub mod tests { message::Message, transaction::Transaction, }, + test_case::test_case, }; #[test] @@ -525,226 +549,39 @@ pub mod tests { vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], TransactionStatusMeta::default(), - ) - .unwrap(); - } - // Purge to freeze index 0 - blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); - - for x in max_slot..2 * max_slot { - let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::()); - blockstore - .write_transaction_status( - x, - Signature::from(random_bytes), - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), + 0, ) .unwrap(); } // Purging range outside of TransactionStatus max slots should not affect TransactionStatus data - blockstore.run_purge(20, 30, PurgeType::Exact).unwrap(); + blockstore.run_purge(10, 20, PurgeType::Exact).unwrap(); - let mut status_entry_iterator = blockstore + let status_entries: Vec<_> = blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); + .iter::(IteratorMode::Start) + .unwrap() + .collect(); + assert_eq!(status_entries.len(), 10); } - #[test] - #[allow(clippy::cognitive_complexity)] - fn test_purge_transaction_status() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - let transaction_status_index_cf = &blockstore.transaction_status_index_cf; - let slot = 10; - for _ in 0..5 { - let random_bytes: [u8; 64] = std::array::from_fn(|_| rand::random::()); - blockstore - .write_transaction_status( - slot, - Signature::from(random_bytes), - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), - ) - .unwrap(); - } - // Purge to freeze index 0 - blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - (0, Pubkey::default(), 0, Signature::default()), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - drop(status_entry_iterator); - drop(address_transactions_iterator); - - // Low purge should not affect state - blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - drop(status_entry_iterator); - drop(address_transactions_iterator); - - // Test boundary conditions: < slot should not purge statuses; <= slot should - blockstore.run_purge(0, 9, PurgeType::PrimaryIndex).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - drop(status_entry_iterator); - drop(address_transactions_iterator); - - blockstore - .run_purge(0, 10, PurgeType::PrimaryIndex) - .unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - assert!(status_entry_iterator.next().is_none()); - let mut address_transactions_iterator = blockstore + fn clear_and_repopulate_transaction_statuses_for_test(blockstore: &Blockstore, max_slot: u64) { + blockstore.run_purge(0, max_slot, PurgeType::Exact).unwrap(); + let mut iter = blockstore .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) + .iter::(IteratorMode::Start) .unwrap(); - assert!(address_transactions_iterator.next().is_none()); + assert_eq!(iter.next(), None); - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: false, - } - ); - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: true, - } - ); + populate_transaction_statuses_for_test(blockstore, 0, max_slot); } - fn clear_and_repopulate_transaction_statuses_for_test( + fn populate_transaction_statuses_for_test( blockstore: &Blockstore, - index0_max_slot: u64, - index1_max_slot: u64, + min_slot: u64, + max_slot: u64, ) { - assert!(index1_max_slot > index0_max_slot); - let mut write_batch = blockstore.db.batch().unwrap(); - blockstore - .run_purge(0, index1_max_slot, PurgeType::PrimaryIndex) - .unwrap(); - blockstore - .db - .delete_range_cf::(&mut write_batch, 0, 2) - .unwrap(); - blockstore - .db - .delete_range_cf::(&mut write_batch, 0, 2) - .unwrap(); - blockstore.db.write(write_batch).unwrap(); - blockstore.initialize_transaction_status_index().unwrap(); - *blockstore.active_transaction_status_index.write().unwrap() = 0; - - for x in 0..index0_max_slot { + for x in min_slot..=max_slot { let entries = make_slot_entries_with_transactions(1); let shreds = entries_to_test_shreds( &entries, @@ -770,67 +607,19 @@ pub mod tests { vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], TransactionStatusMeta::default(), + 0, ) .unwrap(); } + } - // Add slot that crosses primary indexes - let entries = make_slot_entries_with_transactions(2); - let shreds = entries_to_test_shreds( - &entries, - index0_max_slot, // slot - index0_max_slot.saturating_sub(1), // parent_slot - true, // is_full_slot - 0, // version - true, // merkle_variant - ); - blockstore.insert_shreds(shreds, None, false).unwrap(); - let signatures = entries - .iter() - .filter(|entry| !entry.is_tick()) - .cloned() - .flat_map(|entry| entry.transactions) - .map(|transaction| transaction.signatures[0]) - .collect::>(); - let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); - blockstore - .write_transaction_status( - index0_max_slot, - signatures[0], - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), - ) - .unwrap(); - - // Freeze index 0 - let mut write_batch = blockstore.db.batch().unwrap(); - let mut w_active_transaction_status_index = - blockstore.active_transaction_status_index.write().unwrap(); - blockstore - .toggle_transaction_status_index( - &mut write_batch, - &mut w_active_transaction_status_index, - index0_max_slot + 1, - ) - .unwrap(); - drop(w_active_transaction_status_index); - blockstore.db.write(write_batch).unwrap(); - - let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); - blockstore - .write_transaction_status( - index0_max_slot, - signatures[1], - vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], - vec![&Pubkey::try_from(&random_bytes[32..]).unwrap()], - TransactionStatusMeta::default(), - ) - .unwrap(); - - // Note: index0_max_slot exists in both indexes - - for x in index0_max_slot + 1..index1_max_slot + 1 { + fn populate_deprecated_transaction_statuses_for_test( + blockstore: &Blockstore, + primary_index: u64, + min_slot: u64, + max_slot: u64, + ) { + for x in min_slot..=max_slot { let entries = make_slot_entries_with_transactions(1); let shreds = entries_to_test_shreds( &entries, @@ -841,7 +630,7 @@ pub mod tests { true, // merkle_variant ); blockstore.insert_shreds(shreds, None, false).unwrap(); - let signature: Signature = entries + let signature = entries .iter() .filter(|entry| !entry.is_tick()) .cloned() @@ -850,7 +639,8 @@ pub mod tests { .collect::>()[0]; let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); blockstore - .write_transaction_status( + .write_deprecated_transaction_status( + primary_index, x, signature, vec![&Pubkey::try_from(&random_bytes[..32]).unwrap()], @@ -859,28 +649,6 @@ pub mod tests { ) .unwrap(); } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index0_max_slot, - frozen: true, - } - ); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); } #[test] @@ -914,6 +682,7 @@ pub mod tests { transaction.message.static_account_keys().iter().collect(), vec![], TransactionStatusMeta::default(), + 0, ) .unwrap(); } @@ -937,359 +706,216 @@ pub mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let index0_max_slot = 9; - let index1_max_slot = 19; + let max_slot = 9; // Test purge outside bounds - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore.run_purge(20, 22, PurgeType::Exact).unwrap(); + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + blockstore.run_purge(10, 12, PurgeType::Exact).unwrap(); let mut status_entry_iterator = blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) + .iter::(IteratorMode::Start) .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index0_max_slot, - frozen: true, - } - ); - for _ in 0..index0_max_slot + 1 { + for _ in 0..max_slot + 1 { let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); - for _ in index0_max_slot + 1..index1_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 1); + assert!(entry.1 <= max_slot || entry.1 > 0); } + assert_eq!(status_entry_iterator.next(), None); drop(status_entry_iterator); - // Test purge inside index 0 - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); + // Test purge inside written range + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); blockstore.run_purge(2, 4, PurgeType::Exact).unwrap(); let mut status_entry_iterator = blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) + .iter::(IteratorMode::Start) .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index0_max_slot, - frozen: true, - } - ); for _ in 0..7 { // 7 entries remaining let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert!(entry.2 < 2 || entry.2 > 4); - } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); - for _ in index0_max_slot + 1..index1_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 1); + assert!(entry.1 < 2 || entry.1 > 4); } + assert_eq!(status_entry_iterator.next(), None); drop(status_entry_iterator); - // Test purge inside index 0 at upper boundary - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); + // Purge up to but not including max_slot + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); blockstore - .run_purge(7, index0_max_slot, PurgeType::Exact) + .run_purge(0, max_slot - 1, PurgeType::Exact) .unwrap(); let mut status_entry_iterator = blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) + .iter::(IteratorMode::Start) .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 6, - frozen: true, - } - ); - for _ in 0..7 { - // 7 entries remaining - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert!(entry.2 < 7); - } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); - for _ in index0_max_slot + 1..index1_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 1); - } + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.1, 9); + assert_eq!(status_entry_iterator.next(), None); drop(status_entry_iterator); - // Test purge inside index 1 at lower boundary - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore.run_purge(10, 12, PurgeType::Exact).unwrap(); + // Test purge all + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); + blockstore.run_purge(0, 22, PurgeType::Exact).unwrap(); let mut status_entry_iterator = blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) + .iter::(IteratorMode::Start) .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index0_max_slot, - frozen: true, - } - ); - for _ in 0..index0_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); - for _ in 13..index1_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 1); - assert!(entry.2 == index0_max_slot || entry.2 > 12); - } - drop(status_entry_iterator); + assert_eq!(status_entry_iterator.next(), None); + } - // Test purge across index boundaries - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore.run_purge(7, 12, PurgeType::Exact).unwrap(); + fn get_index_bounds(blockstore: &Blockstore) -> (Box<[u8]>, Box<[u8]>) { + let first_index = { + let mut status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + status_entry_iterator.next().unwrap().unwrap().0 + }; + let last_index = { + let mut status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::End); + status_entry_iterator.next().unwrap().unwrap().0 + }; + (first_index, last_index) + } - let mut status_entry_iterator = blockstore + fn purge_exact(blockstore: &Blockstore, oldest_slot: Slot) { + blockstore + .run_purge(0, oldest_slot - 1, PurgeType::Exact) + .unwrap(); + } + + fn purge_compaction_filter(blockstore: &Blockstore, oldest_slot: Slot) { + let (first_index, last_index) = get_index_bounds(blockstore); + blockstore.db.set_oldest_slot(oldest_slot); + blockstore .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) + .compact_range_cf::(&first_index, &last_index); + } + + #[test_case(purge_exact; "exact")] + #[test_case(purge_compaction_filter; "compaction_filter")] + fn test_purge_special_columns_with_old_data(purge: impl Fn(&Blockstore, Slot)) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + populate_deprecated_transaction_statuses_for_test(&blockstore, 0, 0, 4); + populate_deprecated_transaction_statuses_for_test(&blockstore, 1, 5, 9); + populate_transaction_statuses_for_test(&blockstore, 10, 14); + + let mut index0 = blockstore + .transaction_status_index_cf + .get(0) + .unwrap() .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 6, - frozen: true, - } - ); - for _ in 0..7 { - // 7 entries remaining - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert!(entry.2 < 7); - } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index1_max_slot, - frozen: false, - } - ); - for _ in 13..index1_max_slot + 1 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 1); - assert!(entry.2 > 12); + index0.frozen = true; + index0.max_slot = 4; + blockstore + .transaction_status_index_cf + .put(0, &index0) + .unwrap(); + let mut index1 = blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(); + index1.frozen = false; + index1.max_slot = 9; + blockstore + .transaction_status_index_cf + .put(1, &index1) + .unwrap(); + + let statuses: Vec<_> = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start) + .collect(); + assert_eq!(statuses.len(), 15); + + // Delete some of primary-index 0 + let oldest_slot = 3; + purge(&blockstore, oldest_slot); + let status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for entry in status_entry_iterator { + let (key, _value) = entry.unwrap(); + let (_signature, slot) = ::index(&key); + assert!(slot >= oldest_slot); + count += 1; } - drop(status_entry_iterator); + assert_eq!(count, 12); - // Test purge include complete index 1 - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore.run_purge(7, 22, PurgeType::Exact).unwrap(); + // Delete the rest of primary-index 0 + let oldest_slot = 5; + purge(&blockstore, oldest_slot); + let status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for entry in status_entry_iterator { + let (key, _value) = entry.unwrap(); + let (_signature, slot) = ::index(&key); + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 10); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 6, - frozen: true, - } - ); - for _ in 0..7 { - // 7 entries remaining - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert!(entry.2 < 7); + // Delete some of primary-index 1 + let oldest_slot = 8; + purge(&blockstore, oldest_slot); + let status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for entry in status_entry_iterator { + let (key, _value) = entry.unwrap(); + let (_signature, slot) = ::index(&key); + assert!(slot >= oldest_slot); + count += 1; } - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 6, - frozen: false, - } - ); - drop(status_entry_iterator); + assert_eq!(count, 7); - // Purge up to but not including index0_max_slot - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore - .run_purge(0, index0_max_slot - 1, PurgeType::Exact) - .unwrap(); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: index0_max_slot, - frozen: true, - } - ); + // Delete the rest of primary-index 1 + let oldest_slot = 10; + purge(&blockstore, oldest_slot); + let status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for entry in status_entry_iterator { + let (key, _value) = entry.unwrap(); + let (_signature, slot) = ::index(&key); + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 5); - // Test purge all - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); - blockstore.run_purge(0, 22, PurgeType::Exact).unwrap(); + // Delete some of new-style entries + let oldest_slot = 13; + purge(&blockstore, oldest_slot); + let status_entry_iterator = blockstore + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); + let mut count = 0; + for entry in status_entry_iterator { + let (key, _value) = entry.unwrap(); + let (_signature, slot) = ::index(&key); + assert!(slot >= oldest_slot); + count += 1; + } + assert_eq!(count, 2); + // Delete the rest of the new-style entries + let oldest_slot = 20; + purge(&blockstore, oldest_slot); let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); + .transaction_status_cf + .iterator_cf_raw_key(IteratorMode::Start); assert!(status_entry_iterator.next().is_none()); - - assert_eq!( - blockstore - .transaction_status_index_cf - .get(0) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: true, - } - ); - assert_eq!( - blockstore - .transaction_status_index_cf - .get(1) - .unwrap() - .unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: false, - } - ); } #[test] @@ -1326,16 +952,9 @@ pub mod tests { fn test_purge_special_columns_compaction_filter() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - let index0_max_slot = 9; - let index1_max_slot = 19; - // includes slot 0, and slot 9 has 2 transactions - let num_total_transactions = index1_max_slot + 2; - - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); + let max_slot = 19; + + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); let first_index = { let mut status_entry_iterator = blockstore .db @@ -1363,17 +982,13 @@ pub mod tests { .iter::(IteratorMode::Start) .unwrap(); let mut count = 0; - for ((_primary_index, _signature, slot), _value) in status_entry_iterator { + for ((_signature, slot), _value) in status_entry_iterator { assert!(slot >= oldest_slot); count += 1; } - assert_eq!(count, num_total_transactions - oldest_slot); + assert_eq!(count, max_slot - (oldest_slot - 1)); - clear_and_repopulate_transaction_statuses_for_test( - &blockstore, - index0_max_slot, - index1_max_slot, - ); + clear_and_repopulate_transaction_statuses_for_test(&blockstore, max_slot); let first_index = { let mut status_entry_iterator = blockstore .db @@ -1401,10 +1016,10 @@ pub mod tests { .iter::(IteratorMode::Start) .unwrap(); let mut count = 0; - for ((_primary_index, _signature, slot), _value) in status_entry_iterator { + for ((_signature, slot), _value) in status_entry_iterator { assert!(slot >= oldest_slot); count += 1; } - assert_eq!(count, num_total_transactions - oldest_slot - 1); // Extra transaction in slot 9 + assert_eq!(count, max_slot - (oldest_slot - 1)); } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 3fd33fa12acea3..f9c87ce397d434 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -146,6 +146,8 @@ pub enum BlockstoreError { UnsupportedTransactionVersion, #[error("missing transaction metadata")] MissingTransactionMetadata, + #[error("transaction-index overflow")] + TransactionIndexOverflow, } pub type Result = std::result::Result; @@ -270,14 +272,14 @@ pub mod columns { #[derive(Debug)] /// The transaction status column /// - /// * index type: `(u64, `[`Signature`]`, `[`Slot`])` + /// * index type: `(`[`Signature`]`, `[`Slot`])` /// * value type: [`generated::TransactionStatusMeta`] pub struct TransactionStatus; #[derive(Debug)] /// The address signatures column /// - /// * index type: `(u64, `[`Pubkey`]`, `[`Slot`]`, `[`Signature`]`)` + /// * index type: `(`[`Pubkey`]`, `[`Slot`]`, u32, `[`Signature`]`)` /// * value type: [`blockstore_meta::AddressSignatureMeta`] pub struct AddressSignatures; @@ -614,6 +616,23 @@ impl Rocks { self.db.iterator_cf(cf, iterator_mode) } + fn iterator_cf_raw_key( + &self, + cf: &ColumnFamily, + iterator_mode: IteratorMode>, + ) -> DBIterator { + let start_key; + let iterator_mode = match iterator_mode { + IteratorMode::From(start_from, direction) => { + start_key = start_from; + RocksIteratorMode::From(&start_key, direction) + } + IteratorMode::Start => RocksIteratorMode::Start, + IteratorMode::End => RocksIteratorMode::End, + }; + self.db.iterator_cf(cf, iterator_mode) + } + fn raw_iterator_cf(&self, cf: &ColumnFamily) -> DBRawIterator { self.db.raw_iterator_cf(cf) } @@ -677,6 +696,9 @@ pub trait Column { fn key(index: Self::Index) -> Vec; fn index(key: &[u8]) -> Self::Index; + // This trait method is primarily used by `Database::delete_range_cf()`, and is therefore only + // relevant for columns keyed by Slot: ie. SlotColumns and columns that feature a Slot as the + // first item in the key. fn as_index(slot: Slot) -> Self::Index; fn slot(index: Self::Index) -> Slot; } @@ -739,34 +761,58 @@ impl Column for T { } } +pub enum IndexError { + UnpackError, +} + +/// Helper trait to transition primary indexes out from the columns that are using them. +pub trait ColumnIndexDeprecation: Column { + const DEPRECATED_INDEX_LEN: usize; + const CURRENT_INDEX_LEN: usize; + type DeprecatedIndex; + + fn deprecated_key(index: Self::DeprecatedIndex) -> Vec; + fn try_deprecated_index(key: &[u8]) -> std::result::Result; + + fn try_current_index(key: &[u8]) -> std::result::Result; + fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index; + + fn index(key: &[u8]) -> Self::Index { + if let Ok(index) = Self::try_current_index(key) { + index + } else if let Ok(index) = Self::try_deprecated_index(key) { + Self::convert_index(index) + } else { + // Way back in the day, we broke the TransactionStatus column key. This fallback + // preserves the existing logic for ancient keys, but realistically should never be + // executed. + Self::as_index(0) + } + } +} + impl Column for columns::TransactionStatus { - type Index = (u64, Signature, Slot); + type Index = (Signature, Slot); - fn key((index, signature, slot): (u64, Signature, Slot)) -> Vec { - let mut key = vec![0; 8 + 64 + 8]; // size_of u64 + size_of Signature + size_of Slot - BigEndian::write_u64(&mut key[0..8], index); - key[8..72].copy_from_slice(&signature.as_ref()[0..64]); - BigEndian::write_u64(&mut key[72..80], slot); + fn key((signature, slot): Self::Index) -> Vec { + let mut key = vec![0; Self::CURRENT_INDEX_LEN]; + key[0..64].copy_from_slice(&signature.as_ref()[0..64]); + BigEndian::write_u64(&mut key[64..72], slot); key } - fn index(key: &[u8]) -> (u64, Signature, Slot) { - if key.len() != 80 { - Self::as_index(0) - } else { - let index = BigEndian::read_u64(&key[0..8]); - let signature = Signature::try_from(&key[8..72]).unwrap(); - let slot = BigEndian::read_u64(&key[72..80]); - (index, signature, slot) - } + fn index(key: &[u8]) -> (Signature, Slot) { + ::index(key) } fn slot(index: Self::Index) -> Slot { - index.2 + index.1 } - fn as_index(index: u64) -> Self::Index { - (index, Signature::default(), 0) + // The TransactionStatus column is not keyed by slot so this method is meaningless + // See Column::as_index() declaration for more details + fn as_index(_index: u64) -> Self::Index { + (Signature::default(), 0) } } impl ColumnName for columns::TransactionStatus { @@ -776,63 +822,171 @@ impl ProtobufColumn for columns::TransactionStatus { type Type = generated::TransactionStatusMeta; } -impl Column for columns::AddressSignatures { - type Index = (u64, Pubkey, Slot, Signature); +impl ColumnIndexDeprecation for columns::TransactionStatus { + const DEPRECATED_INDEX_LEN: usize = 80; + const CURRENT_INDEX_LEN: usize = 72; + type DeprecatedIndex = (u64, Signature, Slot); - fn key((index, pubkey, slot, signature): (u64, Pubkey, Slot, Signature)) -> Vec { - let mut key = vec![0; 8 + 32 + 8 + 64]; // size_of u64 + size_of Pubkey + size_of Slot + size_of Signature + fn deprecated_key((index, signature, slot): Self::DeprecatedIndex) -> Vec { + let mut key = vec![0; Self::DEPRECATED_INDEX_LEN]; BigEndian::write_u64(&mut key[0..8], index); - key[8..40].copy_from_slice(&pubkey.as_ref()[0..32]); - BigEndian::write_u64(&mut key[40..48], slot); - key[48..112].copy_from_slice(&signature.as_ref()[0..64]); + key[8..72].copy_from_slice(&signature.as_ref()[0..64]); + BigEndian::write_u64(&mut key[72..80], slot); key } - fn index(key: &[u8]) -> (u64, Pubkey, Slot, Signature) { - let index = BigEndian::read_u64(&key[0..8]); - let pubkey = Pubkey::try_from(&key[8..40]).unwrap(); - let slot = BigEndian::read_u64(&key[40..48]); - let signature = Signature::try_from(&key[48..112]).unwrap(); - (index, pubkey, slot, signature) + fn try_deprecated_index(key: &[u8]) -> std::result::Result { + if key.len() != Self::DEPRECATED_INDEX_LEN { + return Err(IndexError::UnpackError); + } + let primary_index = BigEndian::read_u64(&key[0..8]); + let signature = Signature::try_from(&key[8..72]).unwrap(); + let slot = BigEndian::read_u64(&key[72..80]); + Ok((primary_index, signature, slot)) + } + + fn try_current_index(key: &[u8]) -> std::result::Result { + if key.len() != Self::CURRENT_INDEX_LEN { + return Err(IndexError::UnpackError); + } + let signature = Signature::try_from(&key[0..64]).unwrap(); + let slot = BigEndian::read_u64(&key[64..72]); + Ok((signature, slot)) + } + + fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index { + let (_primary_index, signature, slot) = deprecated_index; + (signature, slot) + } +} + +impl Column for columns::AddressSignatures { + type Index = (Pubkey, Slot, u32, Signature); + + fn key((pubkey, slot, transaction_index, signature): Self::Index) -> Vec { + let mut key = vec![0; Self::CURRENT_INDEX_LEN]; + key[0..32].copy_from_slice(&pubkey.as_ref()[0..32]); + BigEndian::write_u64(&mut key[32..40], slot); + BigEndian::write_u32(&mut key[40..44], transaction_index); + key[44..108].copy_from_slice(&signature.as_ref()[0..64]); + key + } + + fn index(key: &[u8]) -> Self::Index { + ::index(key) } fn slot(index: Self::Index) -> Slot { - index.2 + index.1 } - fn as_index(index: u64) -> Self::Index { - (index, Pubkey::default(), 0, Signature::default()) + // The AddressSignatures column is not keyed by slot so this method is meaningless + // See Column::as_index() declaration for more details + fn as_index(_index: u64) -> Self::Index { + (Pubkey::default(), 0, 0, Signature::default()) } } impl ColumnName for columns::AddressSignatures { const NAME: &'static str = ADDRESS_SIGNATURES_CF; } +impl ColumnIndexDeprecation for columns::AddressSignatures { + const DEPRECATED_INDEX_LEN: usize = 112; + const CURRENT_INDEX_LEN: usize = 108; + type DeprecatedIndex = (u64, Pubkey, Slot, Signature); + + fn deprecated_key((primary_index, pubkey, slot, signature): Self::DeprecatedIndex) -> Vec { + let mut key = vec![0; Self::DEPRECATED_INDEX_LEN]; + BigEndian::write_u64(&mut key[0..8], primary_index); + key[8..40].clone_from_slice(&pubkey.as_ref()[0..32]); + BigEndian::write_u64(&mut key[40..48], slot); + key[48..112].clone_from_slice(&signature.as_ref()[0..64]); + key + } + + fn try_deprecated_index(key: &[u8]) -> std::result::Result { + if key.len() != Self::DEPRECATED_INDEX_LEN { + return Err(IndexError::UnpackError); + } + let primary_index = BigEndian::read_u64(&key[0..8]); + let pubkey = Pubkey::try_from(&key[8..40]).unwrap(); + let slot = BigEndian::read_u64(&key[40..48]); + let signature = Signature::try_from(&key[48..112]).unwrap(); + Ok((primary_index, pubkey, slot, signature)) + } + + fn try_current_index(key: &[u8]) -> std::result::Result { + if key.len() != Self::CURRENT_INDEX_LEN { + return Err(IndexError::UnpackError); + } + let pubkey = Pubkey::try_from(&key[0..32]).unwrap(); + let slot = BigEndian::read_u64(&key[32..40]); + let transaction_index = BigEndian::read_u32(&key[40..44]); + let signature = Signature::try_from(&key[44..108]).unwrap(); + Ok((pubkey, slot, transaction_index, signature)) + } + + fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index { + let (_primary_index, pubkey, slot, signature) = deprecated_index; + (pubkey, slot, 0, signature) + } +} + impl Column for columns::TransactionMemos { - type Index = Signature; + type Index = (Signature, Slot); - fn key(signature: Signature) -> Vec { - let mut key = vec![0; 64]; // size_of Signature + fn key((signature, slot): Self::Index) -> Vec { + let mut key = vec![0; Self::CURRENT_INDEX_LEN]; key[0..64].copy_from_slice(&signature.as_ref()[0..64]); + BigEndian::write_u64(&mut key[64..72], slot); key } - fn index(key: &[u8]) -> Signature { - Signature::try_from(&key[..64]).unwrap() + fn index(key: &[u8]) -> Self::Index { + ::index(key) } - fn slot(_index: Self::Index) -> Slot { - unimplemented!() + fn slot(index: Self::Index) -> Slot { + index.1 } - fn as_index(_index: u64) -> Self::Index { - Signature::default() + fn as_index(index: u64) -> Self::Index { + (Signature::default(), index) } } impl ColumnName for columns::TransactionMemos { const NAME: &'static str = TRANSACTION_MEMOS_CF; } +impl ColumnIndexDeprecation for columns::TransactionMemos { + const DEPRECATED_INDEX_LEN: usize = 64; + const CURRENT_INDEX_LEN: usize = 72; + type DeprecatedIndex = Signature; + + fn deprecated_key(signature: Self::DeprecatedIndex) -> Vec { + let mut key = vec![0; Self::DEPRECATED_INDEX_LEN]; + key[0..64].copy_from_slice(&signature.as_ref()[0..64]); + key + } + + fn try_deprecated_index(key: &[u8]) -> std::result::Result { + Signature::try_from(&key[..64]).map_err(|_| IndexError::UnpackError) + } + + fn try_current_index(key: &[u8]) -> std::result::Result { + if key.len() != Self::CURRENT_INDEX_LEN { + return Err(IndexError::UnpackError); + } + let signature = Signature::try_from(&key[0..64]).unwrap(); + let slot = BigEndian::read_u64(&key[64..72]); + Ok((signature, slot)) + } + + fn convert_index(deprecated_index: Self::DeprecatedIndex) -> Self::Index { + (deprecated_index, 0) + } +} + impl Column for columns::TransactionStatusIndex { type Index = u64; @@ -1456,12 +1610,16 @@ where } pub fn get(&self, key: C::Index) -> Result> { + self.get_raw(&C::key(key)) + } + + pub fn get_raw(&self, key: &[u8]) -> Result> { let mut result = Ok(None); let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.read_perf_status, ); - if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), &C::key(key))? { + if let Some(pinnable_slice) = self.backend.get_pinned_cf(self.handle(), key)? { let value = deserialize(pinnable_slice.as_ref())?; result = Ok(Some(value)) } @@ -1507,12 +1665,19 @@ where pub fn get_protobuf_or_bincode>( &self, key: C::Index, + ) -> Result> { + self.get_raw_protobuf_or_bincode::(&C::key(key)) + } + + pub(crate) fn get_raw_protobuf_or_bincode>( + &self, + key: &[u8], ) -> Result> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.read_perf_status, ); - let result = self.backend.get_pinned_cf(self.handle(), &C::key(key)); + let result = self.backend.get_pinned_cf(self.handle(), key); if let Some(op_start_instant) = is_perf_enabled { report_rocksdb_read_perf( C::NAME, @@ -1577,6 +1742,45 @@ where } } +impl LedgerColumn +where + C: ColumnIndexDeprecation + ColumnName, +{ + pub(crate) fn iter_current_index_filtered( + &self, + iterator_mode: IteratorMode, + ) -> Result)> + '_> { + let cf = self.handle(); + let iter = self.backend.iterator_cf::(cf, iterator_mode); + Ok(iter.filter_map(|pair| { + let (key, value) = pair.unwrap(); + C::try_current_index(&key).ok().map(|index| (index, value)) + })) + } + + pub(crate) fn iter_deprecated_index_filtered( + &self, + iterator_mode: IteratorMode, + ) -> Result)> + '_> { + let cf = self.handle(); + let iterator_mode_raw_key = match iterator_mode { + IteratorMode::Start => IteratorMode::Start, + IteratorMode::End => IteratorMode::End, + IteratorMode::From(start_from, direction) => { + let raw_key = C::deprecated_key(start_from); + IteratorMode::From(raw_key, direction) + } + }; + let iter = self.backend.iterator_cf_raw_key(cf, iterator_mode_raw_key); + Ok(iter.filter_map(|pair| { + let (key, value) = pair.unwrap(); + C::try_deprecated_index(&key) + .ok() + .map(|index| (index, value)) + })) + } +} + impl<'a> WriteBatch<'a> { pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { self.write_batch @@ -1585,7 +1789,11 @@ impl<'a> WriteBatch<'a> { } pub fn delete(&mut self, key: C::Index) -> Result<()> { - self.write_batch.delete_cf(self.get_cf::(), C::key(key)); + self.delete_raw::(&C::key(key)) + } + + pub(crate) fn delete_raw(&mut self, key: &[u8]) -> Result<()> { + self.write_batch.delete_cf(self.get_cf::(), key); Ok(()) } @@ -1882,7 +2090,9 @@ fn should_enable_cf_compaction(cf_name: &str) -> bool { // completed on a given range or file. matches!( cf_name, - columns::TransactionStatus::NAME | columns::AddressSignatures::NAME + columns::TransactionStatus::NAME + | columns::TransactionMemos::NAME + | columns::AddressSignatures::NAME ) } @@ -1976,4 +2186,44 @@ pub mod tests { }); assert!(!should_enable_cf_compaction("something else")); } + + impl LedgerColumn + where + C: ColumnIndexDeprecation + ProtobufColumn + ColumnName, + { + pub fn put_deprecated_protobuf( + &self, + key: C::DeprecatedIndex, + value: &C::Type, + ) -> Result<()> { + let mut buf = Vec::with_capacity(value.encoded_len()); + value.encode(&mut buf)?; + self.backend + .put_cf(self.handle(), &C::deprecated_key(key), &buf) + } + } + + impl LedgerColumn + where + C: ColumnIndexDeprecation + TypedColumn + ColumnName, + { + pub fn put_deprecated(&self, key: C::DeprecatedIndex, value: &C::Type) -> Result<()> { + let serialized_value = serialize(value)?; + self.backend + .put_cf(self.handle(), &C::deprecated_key(key), &serialized_value) + } + } + + impl LedgerColumn + where + C: ColumnIndexDeprecation + ColumnName, + { + pub(crate) fn iterator_cf_raw_key( + &self, + iterator_mode: IteratorMode>, + ) -> DBIterator { + let cf = self.handle(); + self.backend.iterator_cf_raw_key(cf, iterator_mode) + } + } } diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index c4c619a1a32568..ccc4364891dd75 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -188,7 +188,7 @@ impl TransactionStatusService { if enable_rpc_transaction_history { if let Some(memos) = extract_and_fmt_memos(transaction.message()) { blockstore - .write_transaction_memos(transaction.signature(), memos) + .write_transaction_memos(transaction.signature(), slot, memos) .expect("Expect database write to succeed: TransactionMemos"); } @@ -199,6 +199,7 @@ impl TransactionStatusService { tx_account_locks.writable, tx_account_locks.readonly, transaction_status_meta, + transaction_index, ) .expect("Expect database write to succeed: TransactionStatus"); }