diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 054072ffefae53..b1a9d5931e6cda 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -4,7 +4,7 @@ use super::compatibility_check::check_all_tables; use super::exchange_rates_task::TriggerExchangeRatesTask; use super::system_package_task::SystemPackageTask; -use super::watermark_task::{Watermark, WatermarkLock, WatermarkTask}; +use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask}; use crate::config::{ ConnectionConfig, ServiceConfig, Version, MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD, @@ -14,6 +14,7 @@ use crate::data::{DataLoader, Db}; use crate::extensions::directive_checker::DirectiveChecker; use crate::metrics::Metrics; use crate::mutation::Mutation; +use crate::types::chain_identifier::ChainIdentifier; use crate::types::datatype::IMoveDatatype; use crate::types::move_object::IMoveObject; use crate::types::object::IObject; @@ -353,6 +354,7 @@ impl ServerBuilder { )) .layer(axum::extract::Extension(schema)) .layer(axum::extract::Extension(watermark_task.lock())) + .layer(axum::extract::Extension(watermark_task.chain_id_lock())) .layer(Self::cors()?); Ok(Server { @@ -519,6 +521,7 @@ async fn graphql_handler( ConnectInfo(addr): ConnectInfo, schema: Extension, Extension(watermark_lock): Extension, + Extension(chain_identifier_lock): Extension, headers: HeaderMap, req: GraphQLRequest, ) -> (axum::http::Extensions, GraphQLResponse) { @@ -532,6 +535,7 @@ async fn graphql_handler( req.data.insert(addr); req.data.insert(Watermark::new(watermark_lock).await); + req.data.insert(chain_identifier_lock.read().await); let result = schema.execute(req).await; @@ -678,6 +682,7 @@ pub mod tests { use std::sync::Arc; use std::time::Duration; use sui_sdk::{wallet_context::WalletContext, SuiClient}; + use sui_types::digests::get_mainnet_chain_identifier; use sui_types::transaction::TransactionData; use uuid::Uuid; @@ -704,6 +709,7 @@ pub mod tests { service_config.limits.clone(), metrics.clone(), ); + let loader = DataLoader::new(db.clone()); let pg_conn_pool = PgManager::new(reader); let cancellation_token = CancellationToken::new(); let watermark = Watermark { @@ -720,11 +726,13 @@ pub mod tests { ); ServerBuilder::new(state) .context_data(db) + .context_data(loader) .context_data(pg_conn_pool) .context_data(service_config) .context_data(query_id()) .context_data(ip_address()) .context_data(watermark) + .context_data(ChainIdentifier::from(get_mainnet_chain_identifier())) .context_data(metrics) } @@ -790,7 +798,7 @@ pub mod tests { schema.execute(query).await } - let query = "{ chainIdentifier }"; + let query = r#"{ checkpoint(id: {sequenceNumber: 0 }) { digest }}"#; let timeout = Duration::from_millis(1000); let delay = Duration::from_millis(100); let sui_client = wallet.get_client().await.unwrap(); diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index eae0827fd4006b..0f4d94532476a6 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -4,6 +4,7 @@ use crate::data::{Db, DbConnection, QueryExecutor}; use crate::error::Error; use crate::metrics::Metrics; +use crate::types::chain_identifier::ChainIdentifier; use async_graphql::ServerError; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; use std::mem; @@ -11,6 +12,7 @@ use std::sync::Arc; use std::time::Duration; use sui_indexer::schema::checkpoints; use tokio::sync::{watch, RwLock}; +use tokio::time::Interval; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -19,6 +21,7 @@ use tracing::{error, info}; pub(crate) struct WatermarkTask { /// Thread-safe watermark that avoids writer starvation watermark: WatermarkLock, + chain_identifier: ChainIdentifierLock, db: Db, metrics: Metrics, sleep: Duration, @@ -27,6 +30,9 @@ pub(crate) struct WatermarkTask { receiver: watch::Receiver, } +#[derive(Clone, Default)] +pub(crate) struct ChainIdentifierLock(pub(crate) Arc>); + pub(crate) type WatermarkLock = Arc>; /// Watermark used by GraphQL queries to ensure cross-query consistency and flag epoch-boundary @@ -53,6 +59,7 @@ impl WatermarkTask { Self { watermark: Default::default(), + chain_identifier: Default::default(), db, metrics, sleep, @@ -63,18 +70,23 @@ impl WatermarkTask { } pub(crate) async fn run(&self) { + let mut interval = tokio::time::interval(self.sleep); + // We start the task by first finding & setting the chain identifier + // so that it can be used in all requests. + self.get_and_cache_chain_identifier(&mut interval).await; + loop { tokio::select! { _ = self.cancel.cancelled() => { info!("Shutdown signal received, terminating watermark update task"); return; }, - _ = tokio::time::sleep(self.sleep) => { + _ = interval.tick() => { let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { - error!("{}", e); + error!("Failed to fetch chain identifier: {}", e); self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]); continue; } @@ -100,10 +112,42 @@ impl WatermarkTask { self.watermark.clone() } + pub(crate) fn chain_id_lock(&self) -> ChainIdentifierLock { + self.chain_identifier.clone() + } + /// Receiver for subscribing to epoch changes. pub(crate) fn epoch_receiver(&self) -> watch::Receiver { self.receiver.clone() } + + // Fetch the chain identifier (once) from the database and cache it. + async fn get_and_cache_chain_identifier(&self, interval: &mut Interval) { + loop { + tokio::select! { + _ = self.cancel.cancelled() => { + info!("Shutdown signal received, terminating attempt to get chain identifier"); + return; + }, + _ = interval.tick() => { + // we only set the chain_identifier once. + let chain = match ChainIdentifier::query(&self.db).await { + Ok(Some(chain)) => chain, + Ok(None) => continue, + Err(e) => { + error!("{}", e); + self.metrics.inc_errors(&[ServerError::new(e.to_string(), None)]); + continue; + } + }; + + let mut chain_id_lock = self.chain_identifier.0.write().await; + *chain_id_lock = chain.into(); + return; + } + } + } + } } impl Watermark { @@ -139,3 +183,10 @@ impl Watermark { })) } } + +impl ChainIdentifierLock { + pub(crate) async fn read(&self) -> ChainIdentifier { + let w = self.0.read().await; + w.0.into() + } +} diff --git a/crates/sui-graphql-rpc/src/types/chain_identifier.rs b/crates/sui-graphql-rpc/src/types/chain_identifier.rs index 50fb3d8adaf81a..0565845b45d23c 100644 --- a/crates/sui-graphql-rpc/src/types/chain_identifier.rs +++ b/crates/sui-graphql-rpc/src/types/chain_identifier.rs @@ -6,27 +6,34 @@ use crate::{ error::Error, }; use async_graphql::*; -use diesel::QueryDsl; +use diesel::{OptionalExtension, QueryDsl}; use sui_indexer::schema::chain_identifier; use sui_types::{ digests::ChainIdentifier as NativeChainIdentifier, messages_checkpoint::CheckpointDigest, }; -pub(crate) struct ChainIdentifier; +#[derive(Clone, Copy, Debug, Default)] +pub(crate) struct ChainIdentifier(pub(crate) Option); impl ChainIdentifier { /// Query the Chain Identifier from the DB. - pub(crate) async fn query(db: &Db) -> Result { + pub(crate) async fn query(db: &Db) -> Result, Error> { use chain_identifier::dsl; - let digest_bytes = db + let Some(digest_bytes) = db .execute(move |conn| { conn.first(move || dsl::chain_identifier.select(dsl::checkpoint_digest)) + .optional() }) .await - .map_err(|e| Error::Internal(format!("Failed to fetch genesis digest: {e}")))?; + .map_err(|e| Error::Internal(format!("Failed to fetch genesis digest: {e}")))? + else { + return Ok(None); + }; - Self::from_bytes(digest_bytes) + let native_identifier = Self::from_bytes(digest_bytes)?; + + Ok(Some(native_identifier)) } /// Treat `bytes` as a checkpoint digest and extract a chain identifier from it. @@ -36,3 +43,15 @@ impl ChainIdentifier { Ok(NativeChainIdentifier::from(genesis_digest)) } } + +impl From> for ChainIdentifier { + fn from(chain_identifier: Option) -> Self { + Self(chain_identifier) + } +} + +impl From for ChainIdentifier { + fn from(chain_identifier: NativeChainIdentifier) -> Self { + Self(Some(chain_identifier)) + } +} diff --git a/crates/sui-graphql-rpc/src/types/query.rs b/crates/sui-graphql-rpc/src/types/query.rs index f403fbf8657b5e..5b65c696c83f5e 100644 --- a/crates/sui-graphql-rpc/src/types/query.rs +++ b/crates/sui-graphql-rpc/src/types/query.rs @@ -55,10 +55,18 @@ impl Query { /// First four bytes of the network's genesis checkpoint digest (uniquely identifies the /// network). async fn chain_identifier(&self, ctx: &Context<'_>) -> Result { - Ok(ChainIdentifier::query(ctx.data_unchecked()) - .await - .extend()? - .to_string()) + // we want to panic if the chain identifier is missing, as there's something wrong with + // the service. + let chain_id: ChainIdentifier = *ctx.data_unchecked(); + + if let Some(id) = chain_id.0 { + Ok(id.to_string()) + } else { + Err(Error::Internal( + "Chain identifier not initialized.".to_string(), + )) + .extend() + } } /// Range of checkpoints that the RPC has data available for (for data