Skip to content

Commit

Permalink
PruningOptions -> RetentionPolicies
Browse files Browse the repository at this point in the history
make life easier, requires epochs_to_keep with optional overrides. otherwise, no retention policy at all

strumming the pruning config. pruner.rs file will define prunable tables. the toml file when parsed into Rust config will warn if it tries to name any variants not supported by indexer code. additionally, there's also a check to make sure that prunable tables actually exist in the db
  • Loading branch information
wlmyng committed Oct 4, 2024
1 parent 2e97e71 commit 1bbe93b
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 62 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::RetentionPolicies;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_policies: Option<RetentionPolicies>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -184,7 +184,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
retention_policies,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
178 changes: 174 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use strum::IntoEnumIterator;
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +214,93 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub pruning_config_path: Option<PathBuf>,
}

/// Represents the default retention policy and overrides for prunable tables. When `finalize` is
/// called, the `policies` field is updated with the default retention policy for all tables that do
/// not have an override specified. Instantiated only if `PruningOptions` is provided on indexer
/// start.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicies {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// In Rust, a mapping of all `PrunableTable` variants to their respective retention policies.
/// From a TOML file, a user only needs to explicitly list tables that should override the
/// default retention.
#[serde(default, rename = "overrides")]
pub policies: HashMap<PrunableTable, u64>,
}

impl PruningOptions {
/// Load default retention policy and overrides from file.
pub fn load_from_file(&self) -> Option<RetentionPolicies> {
let Some(config_path) = self.pruning_config_path.as_ref() else {
return None;
};

let contents = std::fs::read_to_string(&config_path)
.expect("Failed to read default retention policy and overrides from file");
let retention_with_overrides = toml::de::from_str::<RetentionPolicies>(&contents)
.expect("Failed to parse into RetentionPolicies struct");

let default_retention = retention_with_overrides.epochs_to_keep;

assert!(
default_retention > 0,
"Default retention must be greater than 0"
);
assert!(
retention_with_overrides
.policies
.values()
.all(|&policy| policy > 0),
"All retention overrides must be greater than 0"
);

Some(retention_with_overrides)
}
}

impl RetentionPolicies {
/// Create a new `RetentionPolicies` with the specified default retention and overrides. Call
/// `finalize()` on the instance to update the `policies` field with the default retention
/// policy for all tables that do not have an override specified.
pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
Self {
epochs_to_keep,
policies: overrides,
}
}

/// Create a new `RetentionPolicies` with only the default retention specified and the default
/// override for `objects_history`. Call `finalize()` on the instance to update the `policies`
/// field with the default retention policy for all tables that do not have an override
/// specified.
pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self::new(epochs_to_keep, overrides)
}

/// Updates the `policies` field with the default retention policy for all tables that do not
/// have an override specified.
pub fn finalize(mut self) -> Self {
for table in PrunableTable::iter() {
self.policies.entry(table).or_insert(self.epochs_to_keep);
}
self
}

pub fn get(&self, table: &PrunableTable) -> Option<u64> {
self.policies.get(table).copied()
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -290,7 +381,9 @@ impl Default for RestoreConfig {
#[cfg(test)]
mod test {
use super::*;
use std::io::Write;
use tap::Pipe;
use tempfile::NamedTempFile;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
Expand Down Expand Up @@ -354,4 +447,81 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn test_valid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;

let opts = toml::de::from_str::<RetentionPolicies>(toml_str).unwrap();
assert_eq!(opts.epochs_to_keep, 5);
assert_eq!(opts.policies.get(&PrunableTable::ObjectsHistory), Some(&10));
assert_eq!(opts.policies.get(&PrunableTable::Transactions), Some(&20));
assert_eq!(opts.policies.len(), 2);
}

#[test]
fn test_pruning_options_from_file() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let mut retention_policies = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_policies.epochs_to_keep, 5);
assert_eq!(
retention_policies.get(&PrunableTable::ObjectsHistory),
Some(10)
);
assert_eq!(
retention_policies.get(&PrunableTable::Transactions),
Some(20)
);
assert_eq!(retention_policies.policies.len(), 2);

retention_policies = retention_policies.finalize();

assert!(
retention_policies.policies.len() > 2,
"Expected more than 2 policies, but got {}",
retention_policies.policies.len()
);
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionPolicies>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
54 changes: 53 additions & 1 deletion crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@

use crate::database::Connection;
use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::prelude::QueryableByName;
use diesel::table;
use diesel::QueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashSet};
use std::time::Duration;
use strum::IntoEnumIterator;
use tracing::info;

table! {
Expand Down Expand Up @@ -134,6 +137,55 @@ async fn check_db_migration_consistency_impl(
)))
}

/// Check that prunable tables exist in the database.
pub async fn check_prunable_tables_valid(conn: &mut Connection<'_>) -> Result<(), IndexerError> {
info!("Starting compatibility check");

use diesel_async::RunQueryDsl;

let select_parent_tables = r#"
SELECT c.relname AS table_name
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid
WHERE c.relkind IN ('r', 'p') -- 'r' for regular tables, 'p' for partitioned tables
AND n.nspname = 'public'
AND (
pt.partrelid IS NOT NULL -- This is a partitioned (parent) table
OR NOT EXISTS ( -- This is not a partition (child table)
SELECT 1
FROM pg_inherits i
WHERE i.inhrelid = c.oid
)
);
"#;

#[derive(QueryableByName)]
struct TableName {
#[diesel(sql_type = diesel::sql_types::Text)]
table_name: String,
}

let result: Vec<TableName> = diesel::sql_query(select_parent_tables)
.load(conn)
.await
.map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?;

let parent_tables_from_db: HashSet<_> = result.into_iter().map(|t| t.table_name).collect();

for key in PrunableTable::iter() {
if !parent_tables_from_db.contains(key.as_ref()) {
return Err(IndexerError::GenericError(format!(
"Invalid retention policy override provided for table {}: does not exist in the database",
key
)));
}
}

info!("Compatibility check passed");
Ok(())
}

pub use setup_postgres::{reset_database, run_migrations};

pub mod setup_postgres {
Expand Down
Loading

0 comments on commit 1bbe93b

Please sign in to comment.