Skip to content

Commit

Permalink
PruningOptions -> RetentionPolicies
Browse files Browse the repository at this point in the history
make life easier, requires epochs_to_keep with optional overrides. otherwise, no retention policy at all

strumming the pruning config. pruner.rs file will define prunable tables. the toml file when parsed into Rust config will warn if it tries to name any variants not supported by indexer code. additionally, there's also a check to make sure that prunable tables actually exist in the db
  • Loading branch information
wlmyng committed Oct 2, 2024
1 parent b22cb24 commit 2482cda
Show file tree
Hide file tree
Showing 21 changed files with 671 additions and 67 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
pub use sui_indexer::config::RetentionPolicies;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::indexer_store::IndexerStore;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_policies: Option<RetentionPolicies>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -181,7 +182,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle) = start_test_indexer_impl(
db_url,
format!("http://{}", executor_server_url),
ReaderWriterConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
ReaderWriterConfig::writer_mode(snapshot_config.clone(), retention_policies),
Some(data_ingestion_path),
cancellation_token.clone(),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
8 changes: 7 additions & 1 deletion crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ

Start a local network using the `sui` binary:
```sh
cargo run --bin sui -- start --with-faucet --force-regenesis
cargo run --bin sui -- start --with-faucet --force-regenesis
```

If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB:
Expand Down Expand Up @@ -124,3 +124,9 @@ Note that you need an existing database for this to work. Using the DATABASE_URL
# Change the RPC_CLIENT_URL to http://0.0.0.0:9000 to run indexer against local validator & fullnode
cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --fullnode-sync-worker --reset-db
```

### Extending the indexer

To add a new table, run `diesel migration generate your_table_name`, and modify the newly created `up.sql` and `down.sql` files.

You would apply the migration with `diesel migration run`, and run the script in `./scripts/generate_indexer_schema.sh` to update the `schema.rs` file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS watermarks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE TABLE watermarks
(
-- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
entity TEXT NOT NULL,
-- Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner
-- uses this field for per-entity epoch-level retention, and is mostly useful for pruning
-- unpartitioned tables.
epoch_hi BIGINT NOT NULL,
-- Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses
-- this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly
-- useful for pruning unpartitioned tables.
epoch_lo BIGINT NOT NULL,
-- Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All
-- data of this entity in the checkpoint must be persisted before advancing this watermark. The
-- committer or ingestion task refers to this on disaster recovery.
checkpoint_hi BIGINT NOT NULL,
-- Inclusive high watermark that the committer advances. For `checkpoints`, this represents the
-- checkpoint sequence number, for `transactions`, the transaction sequence number, etc.
reader_hi BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Data before this watermark is considered
-- pruned by a reader. The underlying data may still exist in the db instance.
reader_lo BIGINT NOT NULL,
-- Updated using the database's current timestamp when the pruner sees that some data needs to
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark has
-- been truly pruned from the db, and should no longer exist. When recovering from a crash, the
-- pruner will consult this column to determine where to continue.
pruned_lo BIGINT,
PRIMARY KEY (entity)
);
146 changes: 142 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
const DEFAULT_EPOCHS_TO_KEEP: u64 = 30;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +215,97 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub config: Option<PathBuf>,
}

/// Default retention policies and overrides for the pruner. Instantiated only if `PruningOptions`
/// is provided on indexer start. Any tables not named in the file will inherit the default
/// retention policy `epochs_to_keep`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicies {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// Mapping of retention policies for specific tables which will override the default policy
/// specified by `epochs_to_keep`.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
}

impl RetentionPolicies {
pub fn new(epochs_to_keep: u64, overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();

Self {
epochs_to_keep,
overrides,
}
}

/// Create a new `RetentionPolicies` with only the default retention specified and the default
/// override for `objects_history`.
pub fn new_with_default_retention_only(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self {
epochs_to_keep,
overrides,
}
}

/// Create a new `RetentionPolicies` with only the overrides specified and a default retention
/// of 30 days.
pub fn new_with_overrides_only(overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();
Self {
overrides,
epochs_to_keep: DEFAULT_EPOCHS_TO_KEEP,
}
}
}

impl PruningOptions {
/// If a path to the retention policies config has been provided, attempt to parse and return
/// it. Otherwise, pruning is not enabled for the indexer, and return None.
pub fn retention_policies(self) -> Option<RetentionPolicies> {
let Some(path) = self.config else {
return None;
};

let contents =
std::fs::read_to_string(&path).expect("Failed to read retention policies file");
let policies = toml::de::from_str::<RetentionPolicies>(&contents)
.expect("Failed to parse into RetentionPolicies struct");

assert!(
policies.epochs_to_keep > 0,
"Default retention must be greater than 0"
);
assert!(
policies.overrides.values().all(|&retention| retention > 0),
"All retention overrides must be greater than 0"
);
Some(policies)
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -354,4 +450,46 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn test_valid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;

let opts = toml::de::from_str::<RetentionPolicies>(toml_str).unwrap();
assert_eq!(opts.epochs_to_keep, 5);
assert_eq!(
opts.overrides.get(&PrunableTable::ObjectsHistory),
Some(&10)
);
assert_eq!(opts.overrides.get(&PrunableTable::Transactions), Some(&20));
assert_eq!(opts.overrides.len(), 2);
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionPolicies>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
57 changes: 56 additions & 1 deletion crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::errors::IndexerError;
use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::prelude::QueryableByName;
use diesel::table;
use diesel::QueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;
use tracing::info;

Expand Down Expand Up @@ -134,6 +135,60 @@ async fn check_db_migration_consistency_impl(
)))
}

/// Checks that the tables named in `RetentionPolicies.overrides`, if any, actually exist in the db.
pub async fn check_retention_policy_overrides_valid(
conn: &mut Connection<'_>,
overrides: &HashMap<String, u64>,
) -> Result<(), IndexerError> {
info!("Starting compatibility check");

use diesel_async::RunQueryDsl;

let select_parent_tables = r#"
SELECT c.relname AS table_name
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid
WHERE c.relkind IN ('r', 'p') -- 'r' for regular tables, 'p' for partitioned tables
AND n.nspname = 'public'
AND (
pt.partrelid IS NOT NULL -- This is a partitioned (parent) table
OR NOT EXISTS ( -- This is not a partition (child table)
SELECT 1
FROM pg_inherits i
WHERE i.inhrelid = c.oid
)
);
"#;

#[derive(QueryableByName)]
struct TableName {
#[diesel(sql_type = diesel::sql_types::Text)]
table_name: String,
}

let result: Vec<TableName> = diesel::sql_query(select_parent_tables)
.load(conn)
.await
.map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?;

// Check that each key in overrides exists in result
let map_keys: HashSet<&String> = overrides.keys().collect();
let existing_tables: HashSet<_> = result.into_iter().map(|t| t.table_name).collect();

for key in map_keys {
if !existing_tables.contains(key) {
return Err(IndexerError::GenericError(format!(
"Invalid retention policy override provided for table {}: does not exist in the database",
key
)));
}
}

info!("Compatibility check passed");
Ok(())
}

pub use setup_postgres::{reset_database, run_migrations};

pub mod setup_postgres {
Expand Down
Loading

0 comments on commit 2482cda

Please sign in to comment.