Skip to content

Commit

Permalink
Enable checkpoints/effects pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
andll committed Oct 3, 2024
1 parent 0bda70e commit 3a542fb
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 72 deletions.
138 changes: 66 additions & 72 deletions crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,8 @@ impl AuthorityStorePruner {
object_id, min_version, max_version
);
let start_range = ObjectKey(object_id, min_version);
let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
let delete: Vec<_> = perpetual_db.objects.range_iter(start_range..=end_range)
.map(|(k, _)|k)
.collect();
wb.delete_batch(&perpetual_db.objects, delete)?;
let end_range = ObjectKey(object_id, max_version);
wb.schedule_delete_range_inclusive(&perpetual_db.objects, &start_range, &end_range)?;
}

// When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
Expand Down Expand Up @@ -244,73 +241,70 @@ impl AuthorityStorePruner {
effects_to_prune: &Vec<TransactionEffects>,
metrics: Arc<AuthorityStorePruningMetrics>,
) -> anyhow::Result<()> {
return Ok(());
// let _scope = monitored_scope("EffectsLivePruner");
//
// let mut perpetual_batch = perpetual_db.objects.batch();
// let transactions: Vec<_> = checkpoint_content_to_prune
// .iter()
// .flat_map(|content| content.iter().map(|tx| tx.transaction))
// .collect();
//
// perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
// perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
// perpetual_batch.delete_batch(
// &perpetual_db.executed_transactions_to_checkpoint,
// transactions,
// )?;
//
// let mut effect_digests = vec![];
// for effects in effects_to_prune {
// let effects_digest = effects.digest();
// debug!("Pruning effects {:?}", effects_digest);
// effect_digests.push(effects_digest);
//
// if let Some(event_digest) = effects.events_digest() {
// if let Some(next_digest) = event_digest.next_lexicographical() {
// perpetual_batch.schedule_delete_range(
// &perpetual_db.events,
// &(*event_digest, 0),
// &(next_digest, 0),
// )?;
// }
// }
// }
// perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
//
// let mut checkpoints_batch = checkpoint_db.certified_checkpoints.batch();
//
// let checkpoint_content_digests =
// checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
// checkpoints_batch.delete_batch(
// &checkpoint_db.checkpoint_content,
// checkpoint_content_digests.clone(),
// )?;
// checkpoints_batch.delete_batch(
// &checkpoint_db.checkpoint_sequence_by_contents_digest,
// checkpoint_content_digests,
// )?;
//
// checkpoints_batch
// .delete_batch(&checkpoint_db.checkpoint_by_digest, checkpoints_to_prune)?;
//
// checkpoints_batch.insert_batch(
// &checkpoint_db.watermarks,
// [(
// &CheckpointWatermark::HighestPruned,
// &(checkpoint_number, CheckpointDigest::random()),
// )],
// )?;
//
// if let Some(rest_index) = rest_index {
// rest_index.prune(&checkpoint_content_to_prune)?;
// }
// perpetual_batch.write()?;
// checkpoints_batch.write()?;
// metrics
// .last_pruned_effects_checkpoint
// .set(checkpoint_number as i64);
// Ok(())
let _scope = monitored_scope("EffectsLivePruner");

let mut perpetual_batch = perpetual_db.objects.batch();
let transactions: Vec<_> = checkpoint_content_to_prune
.iter()
.flat_map(|content| content.iter().map(|tx| tx.transaction))
.collect();

perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
perpetual_batch.delete_batch(
&perpetual_db.executed_transactions_to_checkpoint,
transactions,
)?;

let mut effect_digests = vec![];
for effects in effects_to_prune {
let effects_digest = effects.digest();
debug!("Pruning effects {:?}", effects_digest);
effect_digests.push(effects_digest);

if let Some(event_digest) = effects.events_digest() {
perpetual_batch.schedule_delete_range_inclusive(
&perpetual_db.events,
&(*event_digest, 0),
&(*event_digest, usize::MAX),
)?;
}
}
perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;

let mut checkpoints_batch = checkpoint_db.certified_checkpoints.batch();

let checkpoint_content_digests =
checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
checkpoints_batch.delete_batch(
&checkpoint_db.checkpoint_content,
checkpoint_content_digests.clone(),
)?;
checkpoints_batch.delete_batch(
&checkpoint_db.checkpoint_sequence_by_contents_digest,
checkpoint_content_digests,
)?;

checkpoints_batch
.delete_batch(&checkpoint_db.checkpoint_by_digest, checkpoints_to_prune)?;

checkpoints_batch.insert_batch(
&checkpoint_db.watermarks,
[(
&CheckpointWatermark::HighestPruned,
&(checkpoint_number, CheckpointDigest::random()),
)],
)?;

if let Some(rest_index) = rest_index {
rest_index.prune(&checkpoint_content_to_prune)?;
}
perpetual_batch.write()?;
checkpoints_batch.write()?;
metrics
.last_pruned_effects_checkpoint
.set(checkpoint_number as i64);
Ok(())
}

/// Prunes old data based on effects from all checkpoints from epochs eligible for pruning
Expand Down
13 changes: 13 additions & 0 deletions crates/typed-store/src/tidehunter/th_db_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ impl ThDbBatch {
.write_batch(self.batch)
.map_err(typed_store_error_from_db_error)
}

pub fn schedule_delete_range_inclusive<K: Serialize + DeserializeOwned, V: Serialize + DeserializeOwned>(
&mut self,
db: &ThDbMap<K, V>,
from: &K,
to: &K,
) -> Result<(), TypedStoreError> {
let delete: Vec<_> = db.range_iter(from..=to)
.map(|(k, _)|k)
.collect();
self.delete_batch(&db, delete)?;
Ok(())
}
}

impl<'a, K, V> Map<'a, K, V> for ThDbMap<K, V>
Expand Down

0 comments on commit 3a542fb

Please sign in to comment.