Skip to content

Commit

Permalink
Cache chain identifier and use ticks instead of sleep for watermark task
Browse files Browse the repository at this point in the history
  • Loading branch information
manolisliolios committed Aug 27, 2024
1 parent e1b9af4 commit 1dd99f9
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 14 deletions.
12 changes: 10 additions & 2 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::compatibility_check::check_all_tables;
use super::exchange_rates_task::TriggerExchangeRatesTask;
use super::system_package_task::SystemPackageTask;
use super::watermark_task::{Watermark, WatermarkLock, WatermarkTask};
use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask};
use crate::config::{
ConnectionConfig, ServiceConfig, Version, MAX_CONCURRENT_REQUESTS,
RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
Expand All @@ -14,6 +14,7 @@ use crate::data::{DataLoader, Db};
use crate::extensions::directive_checker::DirectiveChecker;
use crate::metrics::Metrics;
use crate::mutation::Mutation;
use crate::types::chain_identifier::ChainIdentifier;
use crate::types::datatype::IMoveDatatype;
use crate::types::move_object::IMoveObject;
use crate::types::object::IObject;
Expand Down Expand Up @@ -353,6 +354,7 @@ impl ServerBuilder {
))
.layer(axum::extract::Extension(schema))
.layer(axum::extract::Extension(watermark_task.lock()))
.layer(axum::extract::Extension(watermark_task.chain_id_lock()))
.layer(Self::cors()?);

Ok(Server {
Expand Down Expand Up @@ -519,6 +521,7 @@ async fn graphql_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
schema: Extension<SuiGraphQLSchema>,
Extension(watermark_lock): Extension<WatermarkLock>,
Extension(chain_identifier_lock): Extension<ChainIdentifierLock>,
headers: HeaderMap,
req: GraphQLRequest,
) -> (axum::http::Extensions, GraphQLResponse) {
Expand All @@ -532,6 +535,7 @@ async fn graphql_handler(
req.data.insert(addr);

req.data.insert(Watermark::new(watermark_lock).await);
req.data.insert(chain_identifier_lock.read().await);

let result = schema.execute(req).await;

Expand Down Expand Up @@ -678,6 +682,7 @@ pub mod tests {
use std::sync::Arc;
use std::time::Duration;
use sui_sdk::{wallet_context::WalletContext, SuiClient};
use sui_types::digests::get_mainnet_chain_identifier;
use sui_types::transaction::TransactionData;
use uuid::Uuid;

Expand All @@ -704,6 +709,7 @@ pub mod tests {
service_config.limits.clone(),
metrics.clone(),
);
let loader = DataLoader::new(db.clone());
let pg_conn_pool = PgManager::new(reader);
let cancellation_token = CancellationToken::new();
let watermark = Watermark {
Expand All @@ -720,11 +726,13 @@ pub mod tests {
);
ServerBuilder::new(state)
.context_data(db)
.context_data(loader)
.context_data(pg_conn_pool)
.context_data(service_config)
.context_data(query_id())
.context_data(ip_address())
.context_data(watermark)
.context_data(ChainIdentifier::from(get_mainnet_chain_identifier()))
.context_data(metrics)
}

Expand Down Expand Up @@ -790,7 +798,7 @@ pub mod tests {
schema.execute(query).await
}

let query = "{ chainIdentifier }";
let query = r#"{ checkpoint(id: {sequenceNumber: 0 }) { digest }}"#;
let timeout = Duration::from_millis(1000);
let delay = Duration::from_millis(100);
let sui_client = wallet.get_client().await.unwrap();
Expand Down
55 changes: 53 additions & 2 deletions crates/sui-graphql-rpc/src/server/watermark_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
use crate::data::{Db, DbConnection, QueryExecutor};
use crate::error::Error;
use crate::metrics::Metrics;
use crate::types::chain_identifier::ChainIdentifier;
use async_graphql::ServerError;
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use sui_indexer::schema::checkpoints;
use tokio::sync::{watch, RwLock};
use tokio::time::Interval;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

Expand All @@ -19,6 +21,7 @@ use tracing::{error, info};
pub(crate) struct WatermarkTask {
/// Thread-safe watermark that avoids writer starvation
watermark: WatermarkLock,
chain_identifier: ChainIdentifierLock,
db: Db,
metrics: Metrics,
sleep: Duration,
Expand All @@ -27,6 +30,9 @@ pub(crate) struct WatermarkTask {
receiver: watch::Receiver<u64>,
}

#[derive(Clone, Default)]
pub(crate) struct ChainIdentifierLock(pub(crate) Arc<RwLock<ChainIdentifier>>);

pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;

/// Watermark used by GraphQL queries to ensure cross-query consistency and flag epoch-boundary
Expand All @@ -53,6 +59,7 @@ impl WatermarkTask {

Self {
watermark: Default::default(),
chain_identifier: Default::default(),
db,
metrics,
sleep,
Expand All @@ -63,18 +70,23 @@ impl WatermarkTask {
}

pub(crate) async fn run(&self) {
let mut interval = tokio::time::interval(self.sleep);
// We start the task by first finding & setting the chain identifier
// so that it can be used in all requests.
self.get_and_cache_chain_identifier(&mut interval).await;

loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("Shutdown signal received, terminating watermark update task");
return;
},
_ = tokio::time::sleep(self.sleep) => {
_ = interval.tick() => {
let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
Ok(Some(watermark)) => watermark,
Ok(None) => continue,
Err(e) => {
error!("{}", e);
error!("Failed to fetch chain identifier: {}", e);
self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
continue;
}
Expand All @@ -100,10 +112,42 @@ impl WatermarkTask {
self.watermark.clone()
}

pub(crate) fn chain_id_lock(&self) -> ChainIdentifierLock {
self.chain_identifier.clone()
}

/// Receiver for subscribing to epoch changes.
pub(crate) fn epoch_receiver(&self) -> watch::Receiver<u64> {
self.receiver.clone()
}

// Fetch the chain identifier (once) from the database and cache it.
async fn get_and_cache_chain_identifier(&self, interval: &mut Interval) {
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("Shutdown signal received, terminating attempt to get chain identifier");
return;
},
_ = interval.tick() => {
// we only set the chain_identifier once.
let chain = match ChainIdentifier::query(&self.db).await {
Ok(Some(chain)) => chain,
Ok(None) => continue,
Err(e) => {
error!("{}", e);
self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]);
continue;
}
};

let mut chain_id_lock = self.chain_identifier.0.write().await;
*chain_id_lock = chain.into();
return;
}
}
}
}
}

impl Watermark {
Expand Down Expand Up @@ -139,3 +183,10 @@ impl Watermark {
}))
}
}

impl ChainIdentifierLock {
pub(crate) async fn read(&self) -> ChainIdentifier {
let w = self.0.read().await;
w.0.into()
}
}
31 changes: 25 additions & 6 deletions crates/sui-graphql-rpc/src/types/chain_identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,34 @@ use crate::{
error::Error,
};
use async_graphql::*;
use diesel::QueryDsl;
use diesel::{OptionalExtension, QueryDsl};
use sui_indexer::schema::chain_identifier;
use sui_types::{
digests::ChainIdentifier as NativeChainIdentifier, messages_checkpoint::CheckpointDigest,
};

pub(crate) struct ChainIdentifier;
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct ChainIdentifier(pub(crate) Option<NativeChainIdentifier>);

impl ChainIdentifier {
/// Query the Chain Identifier from the DB.
pub(crate) async fn query(db: &Db) -> Result<NativeChainIdentifier, Error> {
pub(crate) async fn query(db: &Db) -> Result<Option<NativeChainIdentifier>, Error> {
use chain_identifier::dsl;

let digest_bytes = db
let Some(digest_bytes) = db
.execute(move |conn| {
conn.first(move || dsl::chain_identifier.select(dsl::checkpoint_digest))
.optional()
})
.await
.map_err(|e| Error::Internal(format!("Failed to fetch genesis digest: {e}")))?;
.map_err(|e| Error::Internal(format!("Failed to fetch genesis digest: {e}")))?
else {
return Ok(None);
};

Self::from_bytes(digest_bytes)
let native_identifier = Self::from_bytes(digest_bytes)?;

Ok(Some(native_identifier))
}

/// Treat `bytes` as a checkpoint digest and extract a chain identifier from it.
Expand All @@ -36,3 +43,15 @@ impl ChainIdentifier {
Ok(NativeChainIdentifier::from(genesis_digest))
}
}

impl From<Option<NativeChainIdentifier>> for ChainIdentifier {
fn from(chain_identifier: Option<NativeChainIdentifier>) -> Self {
Self(chain_identifier)
}
}

impl From<NativeChainIdentifier> for ChainIdentifier {
fn from(chain_identifier: NativeChainIdentifier) -> Self {
Self(Some(chain_identifier))
}
}
16 changes: 12 additions & 4 deletions crates/sui-graphql-rpc/src/types/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,18 @@ impl Query {
/// First four bytes of the network's genesis checkpoint digest (uniquely identifies the
/// network).
async fn chain_identifier(&self, ctx: &Context<'_>) -> Result<String> {
Ok(ChainIdentifier::query(ctx.data_unchecked())
.await
.extend()?
.to_string())
// we want to panic if the chain identifier is missing, as there's something wrong with
// the service.
let chain_id: ChainIdentifier = *ctx.data_unchecked();

if let Some(id) = chain_id.0 {
Ok(id.to_string())
} else {
Err(Error::Internal(
"Chain identifier not initialized.".to_string(),
))
.extend()
}
}

/// Range of checkpoints that the RPC has data available for (for data
Expand Down

0 comments on commit 1dd99f9

Please sign in to comment.