diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index e637ee82413..ad1e25bf63a 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -606,5 +606,15 @@ pub trait StatusStore: Send + Sync + 'static { &self, subgraph_id: &DeploymentHash, block_number: BlockNumber, + fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError>; } + +#[async_trait] +pub trait BlockPtrForNumber: Send + Sync { + async fn block_ptr_for_number( + &self, + network: String, + number: BlockNumber, + ) -> Result, Error>; +} diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 70ba4fe2d17..5aae51fc762 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -5,7 +5,7 @@ use graph::data::query::Trace; use web3::types::Address; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; -use graph::components::store::{BlockStore, EntityType, Store}; +use graph::components::store::{BlockPtrForNumber, BlockStore, EntityType, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; use graph::data::subgraph::status; @@ -15,6 +15,9 @@ use graph_graphql::prelude::{a, ExecutionContext, Resolver}; use crate::auth::PoiProtection; +/// Timeout for calls to fetch the block from JSON-RPC or Firehose. +const BLOCK_HASH_FROM_NUMBER_TIMEOUT: Duration = Duration::from_secs(10); + #[derive(Clone, Debug)] struct PublicProofOfIndexingRequest { pub deployment: DeploymentHash, @@ -212,62 +215,10 @@ impl IndexNodeResolver { .get_required::("blockNumber") .expect("Valid blockNumber required"); - macro_rules! try_resolve_for_chain { - ( $typ:path ) => { - let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok(); - - if let Some(blockchain) = blockchain { - debug!( - self.logger, - "Fetching block hash from number"; - "network" => &network, - "block_number" => block_number, - ); - - let block_ptr_res = blockchain - .block_pointer_from_number(&self.logger, block_number) - .await; - - if let Err(e) = block_ptr_res { - warn!( - self.logger, - "Failed to fetch block hash from number"; - "network" => &network, - "chain" => <$typ as Blockchain>::KIND.to_string(), - "block_number" => block_number, - "error" => e.to_string(), - ); - return Ok(r::Value::Null); - } - - let block_ptr = block_ptr_res.unwrap(); - return Ok(r::Value::String(block_ptr.hash_hex())); - } - }; + match self.block_ptr_for_number(network, block_number).await? { + Some(block_ptr) => Ok(r::Value::String(block_ptr.hash_hex())), + None => Ok(r::Value::Null), } - - // Ugly, but we can't get back an object trait from the `BlockchainMap`, - // so this seems like the next best thing. - try_resolve_for_chain!(graph_chain_ethereum::Chain); - try_resolve_for_chain!(graph_chain_arweave::Chain); - try_resolve_for_chain!(graph_chain_cosmos::Chain); - try_resolve_for_chain!(graph_chain_near::Chain); - - // If you're adding support for a new chain and this `match` clause just - // gave you a compiler error, then this message is for you! You need to - // add a new `try_resolve!` macro invocation above for your new chain - // type. - match BlockchainKind::Ethereum { - // Note: we don't actually care about substreams here. - BlockchainKind::Substreams - | BlockchainKind::Arweave - | BlockchainKind::Ethereum - | BlockchainKind::Cosmos - | BlockchainKind::Near => (), - } - - // The given network does not exist. - Ok(r::Value::Null) } async fn resolve_cached_ethereum_calls( @@ -405,7 +356,7 @@ impl IndexNodeResolver { Ok(poi) } - fn resolve_public_proofs_of_indexing( + async fn resolve_public_proofs_of_indexing( &self, field: &a::Field, ) -> Result { @@ -420,41 +371,41 @@ impl IndexNodeResolver { return Err(QueryExecutionError::TooExpensive); } - Ok(r::Value::List( - requests - .into_iter() - .map(|request| { - match futures::executor::block_on( - self.store.get_public_proof_of_indexing( - &request.deployment, - request.block_number, - ), - ) { - Ok(Some(poi)) => (Some(poi), request), - Ok(None) => (None, request), - Err(e) => { - error!( - self.logger, - "Failed to query public proof of indexing"; - "subgraph" => &request.deployment, - "block" => format!("{}", request.block_number), - "error" => format!("{:?}", e) - ); - (None, request) - } - } - }) - .map(|(poi_result, request)| PublicProofOfIndexingResult { + let mut public_poi_results = vec![]; + for request in requests { + let (poi_result, request) = match self + .store + .get_public_proof_of_indexing(&request.deployment, request.block_number, self) + .await + { + Ok(Some(poi)) => (Some(poi), request), + Ok(None) => (None, request), + Err(e) => { + error!( + self.logger, + "Failed to query public proof of indexing"; + "subgraph" => &request.deployment, + "block" => format!("{}", request.block_number), + "error" => format!("{:?}", e) + ); + (None, request) + } + }; + + public_poi_results.push( + PublicProofOfIndexingResult { deployment: request.deployment, block: match poi_result { Some((ref block, _)) => block.clone(), None => PartialBlockPtr::from(request.block_number), }, proof_of_indexing: poi_result.map(|(_, poi)| poi), - }) - .map(IntoValue::into_value) - .collect(), - )) + } + .into_value(), + ) + } + + Ok(r::Value::List(public_poi_results)) } fn resolve_indexing_status_for_version( @@ -517,6 +468,85 @@ impl IndexNodeResolver { .collect(), )) } + + async fn block_ptr_for_number( + &self, + network: String, + block_number: BlockNumber, + ) -> Result, QueryExecutionError> { + macro_rules! try_resolve_for_chain { + ( $typ:path ) => { + let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok(); + + if let Some(blockchain) = blockchain { + debug!( + self.logger, + "Fetching block hash from number"; + "network" => &network, + "block_number" => block_number, + ); + + let block_ptr_res = tokio::time::timeout(BLOCK_HASH_FROM_NUMBER_TIMEOUT, blockchain + .block_pointer_from_number(&self.logger, block_number) + .map_err(Error::from)) + .await + .map_err(Error::from) + .and_then(|x| x); + + if let Err(e) = block_ptr_res { + warn!( + self.logger, + "Failed to fetch block hash from number"; + "network" => &network, + "chain" => <$typ as Blockchain>::KIND.to_string(), + "block_number" => block_number, + "error" => e.to_string(), + ); + return Ok(None); + } + + let block_ptr = block_ptr_res.unwrap(); + return Ok(Some(block_ptr)); + } + }; + } + + // Ugly, but we can't get back an object trait from the `BlockchainMap`, + // so this seems like the next best thing. + try_resolve_for_chain!(graph_chain_ethereum::Chain); + try_resolve_for_chain!(graph_chain_arweave::Chain); + try_resolve_for_chain!(graph_chain_cosmos::Chain); + try_resolve_for_chain!(graph_chain_near::Chain); + + // If you're adding support for a new chain and this `match` clause just + // gave you a compiler error, then this message is for you! You need to + // add a new `try_resolve!` macro invocation above for your new chain + // type. + match BlockchainKind::Ethereum { + // Note: we don't actually care about substreams here. + BlockchainKind::Substreams + | BlockchainKind::Arweave + | BlockchainKind::Ethereum + | BlockchainKind::Cosmos + | BlockchainKind::Near => (), + } + + // The given network does not exist. + Ok(None) + } +} + +#[async_trait] +impl BlockPtrForNumber for IndexNodeResolver { + async fn block_ptr_for_number( + &self, + network: String, + block_number: BlockNumber, + ) -> Result, Error> { + self.block_ptr_for_number(network, block_number) + .map_err(Error::from) + .await + } } fn entity_changes_to_graphql(entity_changes: Vec) -> r::Value { @@ -643,7 +673,7 @@ impl Resolver for IndexNodeResolver { // The top-level `publicProofsOfIndexing` field (None, "PublicProofOfIndexingResult", "publicProofsOfIndexing") => { - self.resolve_public_proofs_of_indexing(field) + self.resolve_public_proofs_of_indexing(field).await } // Resolve fields of `Object` values (e.g. the `chains` field of `ChainIndexingStatus`) diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 07fea71829d..b9baf4a3941 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -5,7 +5,8 @@ use graph::{ components::{ server::index_node::VersionInfo, store::{ - BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore, Store as StoreTrait, + BlockPtrForNumber, BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore, + Store as StoreTrait, }, }, constraint_violation, @@ -155,9 +156,15 @@ impl StatusStore for Store { &self, subgraph_id: &DeploymentHash, block_number: BlockNumber, + fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError> { self.subgraph_store - .get_public_proof_of_indexing(subgraph_id, block_number, self.block_store().clone()) + .get_public_proof_of_indexing( + subgraph_id, + block_number, + self.block_store().clone(), + fetch_block_ptr, + ) .await } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 433a8616e79..cd98606db2c 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -16,8 +16,8 @@ use graph::{ components::{ server::index_node::VersionInfo, store::{ - self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, PruneReporter, - PruneRequest, SubgraphFork, + self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, + PruneReporter, PruneRequest, SubgraphFork, }, }, constraint_violation, @@ -243,9 +243,10 @@ impl SubgraphStore { id: &DeploymentHash, block_number: BlockNumber, block_store: Arc, + fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError> { self.inner - .get_public_proof_of_indexing(id, block_number, block_store) + .get_public_proof_of_indexing(id, block_number, block_store, fetch_block_ptr) .await } @@ -990,24 +991,33 @@ impl SubgraphStoreInner { id: &DeploymentHash, block_number: BlockNumber, block_store: Arc, + fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError> { let (store, site) = self.store(id)?; - let chain_store = match block_store.chain_store(&site.network) { - Some(chain_store) => chain_store, - None => return Ok(None), + let block_hash = { + let chain_store = match block_store.chain_store(&site.network) { + Some(chain_store) => chain_store, + None => return Ok(None), + }; + let mut hashes = chain_store.block_hashes_by_block_number(block_number)?; + + // If we have multiple versions of this block using any of them could introduce + // non-determinism because we don't know which one is the right one + if hashes.len() == 1 { + hashes.pop().unwrap() + } else { + match fetch_block_ptr + .block_ptr_for_number(site.network.clone(), block_number) + .await + .ok() + .flatten() + { + None => return Ok(None), + Some(block_ptr) => block_ptr.hash, + } + } }; - let mut hashes = chain_store.block_hashes_by_block_number(block_number)?; - - // If we don't have this block or we have multiple versions of this block - // and using any of them could introduce non-deterministic because we don't - // know which one is the right one -> return no block hash - if hashes.is_empty() || hashes.len() > 1 { - return Ok(None); - } - - // This `unwrap` is safe to do now - let block_hash = hashes.pop().unwrap(); let block_for_poi_query = BlockPtr::new(block_hash.clone(), block_number); let indexer = Some(Address::zero());