Skip to content

Commit

Permalink
some cleanups. remove pruner_lo from watermarks table since it isnt b…
Browse files Browse the repository at this point in the history
…eing written to right now
  • Loading branch information
wlmyng committed Oct 16, 2024
1 parent 856a957 commit d353774
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,5 @@ CREATE TABLE watermarks
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Updated and used by the pruner. Data up to and excluding this watermark can be immediately
-- dropped. Data between this and `reader_lo` can be pruned after a delay.
pruner_lo BIGINT,
PRIMARY KEY (entity)
);
30 changes: 10 additions & 20 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tracing::{error, info};

use crate::config::RetentionConfig;
use crate::errors::IndexerError;
use crate::models::watermarks::PrunableWatermark;
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
Expand Down Expand Up @@ -104,15 +103,6 @@ impl PrunableTable {
PrunableTable::PrunerCpWatermark => cp,
}
}

pub fn select_pruner_lo(&self, epoch_lo: u64, reader_lo: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory => epoch_lo,
PrunableTable::Transactions => epoch_lo,
PrunableTable::Events => epoch_lo,
_ => reader_lo,
}
}
}

impl Pruner {
Expand Down Expand Up @@ -261,7 +251,7 @@ async fn update_watermarks_lower_bounds(
retention_policies: &HashMap<PrunableTable, u64>,
cancel: &CancellationToken,
) -> IndexerResult<()> {
let (watermarks, latest_db_timestamp) = store.get_watermarks().await?;
let (watermarks, _) = store.get_watermarks().await?;
let mut lower_bound_updates = vec![];

for watermark in watermarks.iter() {
Expand All @@ -270,21 +260,21 @@ async fn update_watermarks_lower_bounds(
return Ok(());
}

let Some(watermark) = PrunableWatermark::new(watermark.clone(), latest_db_timestamp) else {
let Some(prunable_table) = watermark.entity() else {
continue;
};

let Some(epochs_to_keep) = retention_policies.get(&watermark.entity) else {
let Some(epochs_to_keep) = retention_policies.get(&prunable_table) else {
error!(
"No retention policy found for prunable table {}",
prunable_table
);
continue;
};

if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive {
let new_epoch_lo = watermark
.epoch_hi_inclusive
.saturating_sub(epochs_to_keep - 1);

lower_bound_updates.push((watermark, new_epoch_lo));
}
if let Some(new_epoch_lo) = watermark.new_epoch_lo(*epochs_to_keep) {
lower_bound_updates.push((prunable_table, new_epoch_lo));
};
}

if !lower_bound_updates.is_empty() {
Expand Down
85 changes: 14 additions & 71 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,70 +36,6 @@ pub struct StoredWatermark {
/// be dropped. The pruner uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Updated and used by the pruner. Data up to and excluding this watermark can be immediately
/// dropped. Data between this and `reader_lo` can be pruned after a delay.
pub pruner_lo: Option<i64>,
}

#[derive(Debug)]
pub struct PrunableWatermark {
pub entity: PrunableTable,
pub epoch_hi_inclusive: u64,
pub epoch_lo: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi_inclusive: u64,
pub reader_lo: u64,
/// Timestamp when the watermark's lower bound was last updated.
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
pub pruner_lo: Option<u64>,
}

impl PrunableWatermark {
pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Option<Self> {
let entity = PrunableTable::from_str(&stored.entity).ok()?;

Some(PrunableWatermark {
entity,
epoch_hi_inclusive: stored.epoch_hi_inclusive as u64,
epoch_lo: stored.epoch_lo as u64,
checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64,
tx_hi_inclusive: stored.tx_hi_inclusive as u64,
reader_lo: stored.reader_lo as u64,
timestamp_ms: stored.timestamp_ms,
current_timestamp_ms: latest_db_timestamp,
pruner_lo: stored.pruner_lo.map(|lo| lo as u64),
})
}

pub fn update(&mut self, new_epoch_lo: u64, new_reader_lo: u64) {
self.pruner_lo = Some(match self.entity {
PrunableTable::ObjectsHistory => self.epoch_lo,
PrunableTable::Transactions => self.epoch_lo,
PrunableTable::Events => self.epoch_lo,
_ => self.reader_lo,
});

self.epoch_lo = new_epoch_lo;
self.reader_lo = new_reader_lo;
}

/// Represents the exclusive upper bound of data that can be pruned immediately.
pub fn immediately_prunable_hi(&self) -> Option<u64> {
self.pruner_lo
}

/// Represents the lower bound of data that can be pruned after a delay.
pub fn delayed_prunable_lo(&self) -> Option<u64> {
self.pruner_lo
}

/// The new `pruner_lo` is the current reader_lo, or epoch_lo for epoch-partitioned tables.
pub fn new_pruner_lo(&self) -> u64 {
self.entity.select_pruner_lo(self.epoch_lo, self.reader_lo)
}
}

impl StoredWatermark {
Expand All @@ -113,18 +49,25 @@ impl StoredWatermark {
}
}

pub fn from_lower_bound_update(
entity: &str,
epoch_lo: u64,
reader_lo: u64,
pruner_lo: u64,
) -> Self {
pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_lo: epoch_lo as i64,
reader_lo: reader_lo as i64,
pruner_lo: Some(pruner_lo as i64),
..StoredWatermark::default()
}
}

pub fn entity(&self) -> Option<PrunableTable> {
PrunableTable::from_str(&self.entity).ok()
}

/// Determine whether to set a new epoch lower bound based on the retention policy.
pub fn new_epoch_lo(&self, retention: u64) -> Option<u64> {
if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 {
Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1))
} else {
None
}
}
}
1 change: 0 additions & 1 deletion crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ diesel::table! {
tx_hi_inclusive -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruner_lo -> Nullable<Int8>,
}
}

Expand Down
5 changes: 3 additions & 2 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use async_trait::async_trait;
use strum::IntoEnumIterator;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{CommitterWatermark, EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{StoredDeletedObject, StoredObject};
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::models::watermarks::{PrunableWatermark, StoredWatermark};
use crate::models::watermarks::StoredWatermark;
use crate::types::{
EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex,
};
Expand Down Expand Up @@ -131,7 +132,7 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
/// bounds.
async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<(PrunableWatermark, u64)>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError>;

/// Load all watermark entries from the store, and the latest timestamp from the db.
Expand Down
15 changes: 7 additions & 8 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use sui_storage::object_store::util::put;
use crate::config::UploadOptions;
use crate::database::ConnectionPool;
use crate::errors::{Context, IndexerError};
use crate::handlers::pruner::PrunableTable;
use crate::handlers::TransactionObjectChangesToCommit;
use crate::handlers::{CommitterWatermark, EpochToCommit};
use crate::metrics::IndexerMetrics;
Expand All @@ -45,7 +46,7 @@ use crate::models::objects::{
};
use crate::models::packages::StoredPackage;
use crate::models::transactions::StoredTransaction;
use crate::models::watermarks::{PrunableWatermark, StoredWatermark};
use crate::models::watermarks::StoredWatermark;
use crate::schema::{
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
Expand Down Expand Up @@ -1584,15 +1585,15 @@ impl PgIndexerStore {

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<(PrunableWatermark, u64)>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?;
let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
.into_iter()
.map(|(watermark, epoch)| {
.map(|(table, epoch)| {
let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"Epoch {} not found in epoch mapping",
Expand All @@ -1601,10 +1602,9 @@ impl PgIndexerStore {
})?;

Ok(StoredWatermark::from_lower_bound_update(
watermark.entity.as_ref(),
table.as_ref(),
epoch,
watermark.entity.select_reader_lo(*checkpoint, *tx),
watermark.new_pruner_lo(),
table.select_reader_lo(*checkpoint, *tx),
))
})
.collect();
Expand All @@ -1627,7 +1627,6 @@ impl PgIndexerStore {
.set((
watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
watermarks::pruner_lo.eq(excluded(watermarks::pruner_lo)),
watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
"(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
)),
Expand Down Expand Up @@ -2387,7 +2386,7 @@ impl IndexerStore for PgIndexerStore {

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<(PrunableWatermark, u64)>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError> {
self.update_watermarks_lower_bound(watermarks).await
}
Expand Down

0 comments on commit d353774

Please sign in to comment.