Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix null public pois #4768

Merged
merged 3 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,5 +606,15 @@ pub trait StatusStore: Send + Sync + 'static {
&self,
subgraph_id: &DeploymentHash,
block_number: BlockNumber,
fetch_block_ptr: &dyn BlockPtrForNumber,
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
}

#[async_trait]
pub trait BlockPtrForNumber: Send + Sync {
async fn block_ptr_for_number(
&self,
network: String,
number: BlockNumber,
) -> Result<Option<BlockPtr>, Error>;
}
204 changes: 117 additions & 87 deletions server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use graph::data::query::Trace;
use web3::types::Address;

use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap};
use graph::components::store::{BlockStore, EntityType, Store};
use graph::components::store::{BlockPtrForNumber, BlockStore, EntityType, Store};
use graph::components::versions::VERSIONS;
use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap};
use graph::data::subgraph::status;
Expand All @@ -15,6 +15,9 @@ use graph_graphql::prelude::{a, ExecutionContext, Resolver};

use crate::auth::PoiProtection;

/// Timeout for calls to fetch the block from JSON-RPC or Firehose.
const BLOCK_HASH_FROM_NUMBER_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Clone, Debug)]
struct PublicProofOfIndexingRequest {
pub deployment: DeploymentHash,
Expand Down Expand Up @@ -212,62 +215,10 @@ impl<S: Store> IndexNodeResolver<S> {
.get_required::<BlockNumber>("blockNumber")
.expect("Valid blockNumber required");

macro_rules! try_resolve_for_chain {
( $typ:path ) => {
let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok();

if let Some(blockchain) = blockchain {
debug!(
self.logger,
"Fetching block hash from number";
"network" => &network,
"block_number" => block_number,
);

let block_ptr_res = blockchain
.block_pointer_from_number(&self.logger, block_number)
.await;

if let Err(e) = block_ptr_res {
warn!(
self.logger,
"Failed to fetch block hash from number";
"network" => &network,
"chain" => <$typ as Blockchain>::KIND.to_string(),
"block_number" => block_number,
"error" => e.to_string(),
);
return Ok(r::Value::Null);
}

let block_ptr = block_ptr_res.unwrap();
return Ok(r::Value::String(block_ptr.hash_hex()));
}
};
match self.block_ptr_for_number(network, block_number).await? {
Some(block_ptr) => Ok(r::Value::String(block_ptr.hash_hex())),
None => Ok(r::Value::Null),
}

// Ugly, but we can't get back an object trait from the `BlockchainMap`,
// so this seems like the next best thing.
try_resolve_for_chain!(graph_chain_ethereum::Chain);
try_resolve_for_chain!(graph_chain_arweave::Chain);
try_resolve_for_chain!(graph_chain_cosmos::Chain);
try_resolve_for_chain!(graph_chain_near::Chain);

// If you're adding support for a new chain and this `match` clause just
// gave you a compiler error, then this message is for you! You need to
// add a new `try_resolve!` macro invocation above for your new chain
// type.
match BlockchainKind::Ethereum {
// Note: we don't actually care about substreams here.
BlockchainKind::Substreams
| BlockchainKind::Arweave
| BlockchainKind::Ethereum
| BlockchainKind::Cosmos
| BlockchainKind::Near => (),
}

// The given network does not exist.
Ok(r::Value::Null)
}

async fn resolve_cached_ethereum_calls(
Expand Down Expand Up @@ -405,7 +356,7 @@ impl<S: Store> IndexNodeResolver<S> {
Ok(poi)
}

fn resolve_public_proofs_of_indexing(
async fn resolve_public_proofs_of_indexing(
&self,
field: &a::Field,
) -> Result<r::Value, QueryExecutionError> {
Expand All @@ -420,41 +371,41 @@ impl<S: Store> IndexNodeResolver<S> {
return Err(QueryExecutionError::TooExpensive);
}

Ok(r::Value::List(
requests
.into_iter()
.map(|request| {
match futures::executor::block_on(
self.store.get_public_proof_of_indexing(
&request.deployment,
request.block_number,
),
) {
Ok(Some(poi)) => (Some(poi), request),
Ok(None) => (None, request),
Err(e) => {
error!(
self.logger,
"Failed to query public proof of indexing";
"subgraph" => &request.deployment,
"block" => format!("{}", request.block_number),
"error" => format!("{:?}", e)
);
(None, request)
}
}
})
.map(|(poi_result, request)| PublicProofOfIndexingResult {
let mut public_poi_results = vec![];
for request in requests {
let (poi_result, request) = match self
.store
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
.await
{
Ok(Some(poi)) => (Some(poi), request),
Ok(None) => (None, request),
Err(e) => {
error!(
self.logger,
"Failed to query public proof of indexing";
"subgraph" => &request.deployment,
"block" => format!("{}", request.block_number),
"error" => format!("{:?}", e)
);
(None, request)
}
};

public_poi_results.push(
PublicProofOfIndexingResult {
deployment: request.deployment,
block: match poi_result {
Some((ref block, _)) => block.clone(),
None => PartialBlockPtr::from(request.block_number),
},
proof_of_indexing: poi_result.map(|(_, poi)| poi),
})
.map(IntoValue::into_value)
.collect(),
))
}
.into_value(),
)
}

Ok(r::Value::List(public_poi_results))
}

fn resolve_indexing_status_for_version(
Expand Down Expand Up @@ -517,6 +468,85 @@ impl<S: Store> IndexNodeResolver<S> {
.collect(),
))
}

async fn block_ptr_for_number(
&self,
network: String,
block_number: BlockNumber,
) -> Result<Option<BlockPtr>, QueryExecutionError> {
macro_rules! try_resolve_for_chain {
( $typ:path ) => {
let blockchain = self.blockchain_map.get::<$typ>(network.to_string()).ok();

if let Some(blockchain) = blockchain {
debug!(
self.logger,
"Fetching block hash from number";
"network" => &network,
"block_number" => block_number,
);

let block_ptr_res = tokio::time::timeout(BLOCK_HASH_FROM_NUMBER_TIMEOUT, blockchain
.block_pointer_from_number(&self.logger, block_number)
.map_err(Error::from))
.await
.map_err(Error::from)
.and_then(|x| x);

if let Err(e) = block_ptr_res {
warn!(
self.logger,
"Failed to fetch block hash from number";
"network" => &network,
"chain" => <$typ as Blockchain>::KIND.to_string(),
"block_number" => block_number,
"error" => e.to_string(),
);
return Ok(None);
}

let block_ptr = block_ptr_res.unwrap();
return Ok(Some(block_ptr));
}
};
}

// Ugly, but we can't get back an object trait from the `BlockchainMap`,
// so this seems like the next best thing.
try_resolve_for_chain!(graph_chain_ethereum::Chain);
try_resolve_for_chain!(graph_chain_arweave::Chain);
try_resolve_for_chain!(graph_chain_cosmos::Chain);
try_resolve_for_chain!(graph_chain_near::Chain);

// If you're adding support for a new chain and this `match` clause just
// gave you a compiler error, then this message is for you! You need to
// add a new `try_resolve!` macro invocation above for your new chain
// type.
match BlockchainKind::Ethereum {
// Note: we don't actually care about substreams here.
BlockchainKind::Substreams
| BlockchainKind::Arweave
| BlockchainKind::Ethereum
| BlockchainKind::Cosmos
| BlockchainKind::Near => (),
}

// The given network does not exist.
Ok(None)
}
}

#[async_trait]
impl<S: Store> BlockPtrForNumber for IndexNodeResolver<S> {
async fn block_ptr_for_number(
&self,
network: String,
block_number: BlockNumber,
) -> Result<Option<BlockPtr>, Error> {
self.block_ptr_for_number(network, block_number)
.map_err(Error::from)
.await
}
}

fn entity_changes_to_graphql(entity_changes: Vec<EntityOperation>) -> r::Value {
Expand Down Expand Up @@ -643,7 +673,7 @@ impl<S: Store> Resolver for IndexNodeResolver<S> {

// The top-level `publicProofsOfIndexing` field
(None, "PublicProofOfIndexingResult", "publicProofsOfIndexing") => {
self.resolve_public_proofs_of_indexing(field)
self.resolve_public_proofs_of_indexing(field).await
}

// Resolve fields of `Object` values (e.g. the `chains` field of `ChainIndexingStatus`)
Expand Down
11 changes: 9 additions & 2 deletions store/postgres/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use graph::{
components::{
server::index_node::VersionInfo,
store::{
BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore, Store as StoreTrait,
BlockPtrForNumber, BlockStore as BlockStoreTrait, QueryStoreManager, StatusStore,
Store as StoreTrait,
},
},
constraint_violation,
Expand Down Expand Up @@ -155,9 +156,15 @@ impl StatusStore for Store {
&self,
subgraph_id: &DeploymentHash,
block_number: BlockNumber,
fetch_block_ptr: &dyn BlockPtrForNumber,
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
self.subgraph_store
.get_public_proof_of_indexing(subgraph_id, block_number, self.block_store().clone())
.get_public_proof_of_indexing(
subgraph_id,
block_number,
self.block_store().clone(),
fetch_block_ptr,
)
.await
}

Expand Down
44 changes: 27 additions & 17 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use graph::{
components::{
server::index_node::VersionInfo,
store::{
self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, PruneReporter,
PruneRequest, SubgraphFork,
self, BlockPtrForNumber, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait,
PruneReporter, PruneRequest, SubgraphFork,
},
},
constraint_violation,
Expand Down Expand Up @@ -243,9 +243,10 @@ impl SubgraphStore {
id: &DeploymentHash,
block_number: BlockNumber,
block_store: Arc<impl BlockStore>,
fetch_block_ptr: &dyn BlockPtrForNumber,
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
self.inner
.get_public_proof_of_indexing(id, block_number, block_store)
.get_public_proof_of_indexing(id, block_number, block_store, fetch_block_ptr)
.await
}

Expand Down Expand Up @@ -990,24 +991,33 @@ impl SubgraphStoreInner {
id: &DeploymentHash,
block_number: BlockNumber,
block_store: Arc<impl BlockStore>,
fetch_block_ptr: &dyn BlockPtrForNumber,
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
let (store, site) = self.store(id)?;

let chain_store = match block_store.chain_store(&site.network) {
Some(chain_store) => chain_store,
None => return Ok(None),
let block_hash = {
let chain_store = match block_store.chain_store(&site.network) {
Some(chain_store) => chain_store,
None => return Ok(None),
};
let mut hashes = chain_store.block_hashes_by_block_number(block_number)?;

// If we have multiple versions of this block using any of them could introduce
// non-determinism because we don't know which one is the right one
if hashes.len() == 1 {
hashes.pop().unwrap()
} else {
match fetch_block_ptr
.block_ptr_for_number(site.network.clone(), block_number)
.await
.ok()
.flatten()
{
None => return Ok(None),
Some(block_ptr) => block_ptr.hash,
}
}
};
let mut hashes = chain_store.block_hashes_by_block_number(block_number)?;

// If we don't have this block or we have multiple versions of this block
// and using any of them could introduce non-deterministic because we don't
// know which one is the right one -> return no block hash
if hashes.is_empty() || hashes.len() > 1 {
return Ok(None);
}

// This `unwrap` is safe to do now
let block_hash = hashes.pop().unwrap();

let block_for_poi_query = BlockPtr::new(block_hash.clone(), block_number);
let indexer = Some(Address::zero());
Expand Down
Loading