diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index b827c78948a..65019d0724b 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -35,7 +35,9 @@ use crate::{ codec, data_source::{DataSource, UnresolvedDataSource}, }; -use graph::blockchain::block_stream::{BlockStream, BlockStreamMapper, FirehoseCursor}; +use graph::blockchain::block_stream::{ + BlockStream, BlockStreamError, BlockStreamMapper, FirehoseCursor, +}; pub struct Chain { logger_factory: LoggerFactory, @@ -259,10 +261,17 @@ pub struct FirehoseMapper { #[async_trait] impl BlockStreamMapper for FirehoseMapper { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error> { + fn decode_block( + &self, + output: Option<&[u8]>, + ) -> Result, BlockStreamError> { let block = match output { Some(block) => codec::Block::decode(block)?, - None => anyhow::bail!("Arweave mapper is expected to always have a block"), + None => { + return Err(anyhow::anyhow!( + "Arweave mapper is expected to always have a block" + ))? + } }; Ok(Some(block)) @@ -272,10 +281,11 @@ impl BlockStreamMapper for FirehoseMapper { &self, logger: &Logger, block: codec::Block, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { self.adapter .triggers_in_block(logger, block, self.filter.as_ref()) .await + .map_err(BlockStreamError::from) } async fn handle_substreams_block( &self, @@ -283,7 +293,7 @@ impl BlockStreamMapper for FirehoseMapper { _clock: Clock, _cursor: FirehoseCursor, _block: Vec, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { unimplemented!() } } @@ -319,12 +329,17 @@ impl FirehoseMapperTrait for FirehoseMapper { // Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe // define a slimmed down stuct that would decode only a few fields and ignore all the rest. // unwrap: Input cannot be None so output will be error or block. - let block = self.decode_block(Some(&any_block.value.as_ref()))?.unwrap(); + let block = self + .decode_block(Some(&any_block.value.as_ref())) + .map_err(Error::from)? + .unwrap(); use ForkStep::*; match step { StepNew => Ok(BlockStreamEvent::ProcessBlock( - self.block_with_triggers(&logger, block).await?, + self.block_with_triggers(&logger, block) + .await + .map_err(Error::from)?, FirehoseCursor::from(response.cursor.clone()), )), diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 5cc1a6f9b1b..6a493a144d4 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -5,7 +5,7 @@ use graph::prelude::MetricsRegistry; use graph::substreams::Clock; use std::sync::Arc; -use graph::blockchain::block_stream::{BlockStreamMapper, FirehoseCursor}; +use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, FirehoseCursor}; use graph::blockchain::client::ChainClient; use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter}; use graph::cheap_clone::CheapClone; @@ -332,10 +332,15 @@ pub struct FirehoseMapper { #[async_trait] impl BlockStreamMapper for FirehoseMapper { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error> { + fn decode_block( + &self, + output: Option<&[u8]>, + ) -> Result, BlockStreamError> { let block = match output { Some(block) => crate::Block::decode(block)?, - None => anyhow::bail!("cosmos mapper is expected to always have a block"), + None => Err(anyhow::anyhow!( + "cosmos mapper is expected to always have a block" + ))?, }; Ok(Some(block)) @@ -345,10 +350,11 @@ impl BlockStreamMapper for FirehoseMapper { &self, logger: &Logger, block: crate::Block, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { self.adapter .triggers_in_block(logger, block, self.filter.as_ref()) .await + .map_err(BlockStreamError::from) } async fn handle_substreams_block( @@ -357,7 +363,7 @@ impl BlockStreamMapper for FirehoseMapper { _clock: Clock, _cursor: FirehoseCursor, _block: Vec, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { unimplemented!() } } @@ -393,11 +399,16 @@ impl FirehoseMapperTrait for FirehoseMapper { // Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe // define a slimmed down struct that would decode only a few fields and ignore all the rest. // unwrap: Input cannot be None so output will be error or block. - let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap(); + let block = self + .decode_block(Some(any_block.value.as_ref())) + .map_err(Error::from)? + .unwrap(); match step { ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock( - self.block_with_triggers(logger, block).await?, + self.block_with_triggers(logger, block) + .await + .map_err(Error::from)?, FirehoseCursor::from(response.cursor.clone()), )), diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 32697c15dd2..0754e7e2146 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -55,7 +55,7 @@ use crate::{ SubgraphEthRpcMetrics, TriggerFilter, ENV_VARS, }; use graph::blockchain::block_stream::{ - BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor, + BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, }; /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 @@ -745,10 +745,15 @@ pub struct FirehoseMapper { #[async_trait] impl BlockStreamMapper for FirehoseMapper { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error> { + fn decode_block( + &self, + output: Option<&[u8]>, + ) -> Result, BlockStreamError> { let block = match output { Some(block) => codec::Block::decode(block)?, - None => anyhow::bail!("ethereum mapper is expected to always have a block"), + None => Err(anyhow::anyhow!( + "ethereum mapper is expected to always have a block" + ))?, }; // See comment(437a9f17-67cc-478f-80a3-804fe554b227) ethereum_block.calls is always Some even if calls @@ -762,10 +767,11 @@ impl BlockStreamMapper for FirehoseMapper { &self, logger: &Logger, block: BlockFinality, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { self.adapter .triggers_in_block(logger, block, &self.filter) .await + .map_err(BlockStreamError::from) } async fn handle_substreams_block( @@ -774,7 +780,7 @@ impl BlockStreamMapper for FirehoseMapper { _clock: Clock, _cursor: FirehoseCursor, _block: Vec, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { unimplemented!() } } diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 88fe69fcf6e..135350a1deb 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -40,7 +40,7 @@ use crate::{ data_source::{DataSource, UnresolvedDataSource}, }; use graph::blockchain::block_stream::{ - BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor, + BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, }; const NEAR_FILTER_MODULE_NAME: &str = "near_filter"; @@ -408,10 +408,18 @@ pub struct FirehoseMapper { #[async_trait] impl BlockStreamMapper for FirehoseMapper { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error> { + fn decode_block( + &self, + output: Option<&[u8]>, + ) -> Result, BlockStreamError> { let block = match output { Some(block) => codec::Block::decode(block)?, - None => anyhow::bail!("near mapper is expected to always have a block"), + None => { + return Err(anyhow::anyhow!( + "near mapper is expected to always have a block" + )) + .map_err(BlockStreamError::from) + } }; Ok(Some(block)) @@ -421,10 +429,11 @@ impl BlockStreamMapper for FirehoseMapper { &self, logger: &Logger, block: codec::Block, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { self.adapter .triggers_in_block(logger, block, self.filter.as_ref()) .await + .map_err(BlockStreamError::from) } async fn handle_substreams_block( @@ -433,7 +442,7 @@ impl BlockStreamMapper for FirehoseMapper { _clock: Clock, cursor: FirehoseCursor, message: Vec, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { let BlockAndReceipts { block, outcome, diff --git a/chain/substreams/src/mapper.rs b/chain/substreams/src/mapper.rs index f08fedb462e..6e28ff2c5b3 100644 --- a/chain/substreams/src/mapper.rs +++ b/chain/substreams/src/mapper.rs @@ -4,7 +4,8 @@ use std::str::FromStr; use crate::codec::{entity_change, EntityChanges}; use anyhow::{anyhow, Error}; use graph::blockchain::block_stream::{ - BlockStreamEvent, BlockStreamMapper, BlockWithTriggers, FirehoseCursor, SubstreamsError, + BlockStreamError, BlockStreamEvent, BlockStreamMapper, BlockWithTriggers, FirehoseCursor, + SubstreamsError, }; use graph::blockchain::BlockTime; use graph::data::store::scalar::Bytes; @@ -29,7 +30,10 @@ pub struct WasmBlockMapper { #[async_trait] impl BlockStreamMapper for WasmBlockMapper { - fn decode_block(&self, _output: Option<&[u8]>) -> Result, Error> { + fn decode_block( + &self, + _output: Option<&[u8]>, + ) -> Result, BlockStreamError> { unreachable!("WasmBlockMapper does not do block decoding") } @@ -37,7 +41,7 @@ impl BlockStreamMapper for WasmBlockMapper { &self, _logger: &Logger, _block: crate::Block, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { unreachable!("WasmBlockMapper does not do trigger decoding") } @@ -47,7 +51,7 @@ impl BlockStreamMapper for WasmBlockMapper { clock: Clock, cursor: FirehoseCursor, block: Vec, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { let Clock { id, number, @@ -56,7 +60,7 @@ impl BlockStreamMapper for WasmBlockMapper { let block_ptr = BlockPtr { hash: BlockHash::from(id.into_bytes()), - number: BlockNumber::from(TryInto::::try_into(number)?), + number: BlockNumber::from(TryInto::::try_into(number).map_err(Error::from)?), }; let block_data = block.into_boxed_slice(); @@ -71,7 +75,7 @@ impl BlockStreamMapper for WasmBlockMapper { ); return Err(anyhow!( "Substream block is missing a timestamp at cursor {cursor}, block number {number}" - )); + )).map_err(BlockStreamError::from); } Some(ts) => BlockTime::since_epoch(ts.seconds, ts.nanos as u32), }; @@ -100,7 +104,7 @@ pub struct Mapper { #[async_trait] impl BlockStreamMapper for Mapper { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error> { + fn decode_block(&self, output: Option<&[u8]>) -> Result, BlockStreamError> { let changes: EntityChanges = match output { Some(msg) => Message::decode(msg).map_err(SubstreamsError::DecodingError)?, None => EntityChanges { @@ -130,7 +134,7 @@ impl BlockStreamMapper for Mapper { &self, logger: &Logger, block: Block, - ) -> Result, Error> { + ) -> Result, BlockStreamError> { let mut triggers = vec![]; if block.changes.entity_changes.len() >= 1 { triggers.push(TriggerData {}); @@ -145,9 +149,9 @@ impl BlockStreamMapper for Mapper { clock: Clock, cursor: FirehoseCursor, block: Vec, - ) -> Result, Error> { - let block_number: BlockNumber = clock.number.try_into()?; - let block_hash = clock.id.as_bytes().to_vec().try_into()?; + ) -> Result, BlockStreamError> { + let block_number: BlockNumber = clock.number.try_into().map_err(Error::from)?; + let block_hash = clock.id.as_bytes().to_vec().into(); let block = self .decode_block(Some(&block))? @@ -168,7 +172,7 @@ impl BlockStreamMapper for Mapper { fn parse_changes( changes: &EntityChanges, schema: &InputSchema, -) -> anyhow::Result> { +) -> Result, SubstreamsError> { let mut parsed_changes = vec![]; for entity_change in changes.entity_changes.iter() { let mut parsed_data: HashMap = HashMap::default(); @@ -215,9 +219,7 @@ fn parse_changes( .entry(Word::from(field.name.as_str())) .or_insert(Value::Null) = value; } - let entity = schema - .make_entity(parsed_data) - .map_err(anyhow::Error::from)?; + let entity = schema.make_entity(parsed_data)?; ParsedChanges::Upsert { key, entity } } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index bd6da6fc6c0..d774469bcf3 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -341,13 +341,13 @@ pub trait FirehoseMapper: Send + Sync { #[async_trait] pub trait BlockStreamMapper: Send + Sync { - fn decode_block(&self, output: Option<&[u8]>) -> Result, Error>; + fn decode_block(&self, output: Option<&[u8]>) -> Result, BlockStreamError>; async fn block_with_triggers( &self, logger: &Logger, block: C::Block, - ) -> Result, Error>; + ) -> Result, BlockStreamError>; async fn handle_substreams_block( &self, @@ -355,14 +355,14 @@ pub trait BlockStreamMapper: Send + Sync { clock: Clock, cursor: FirehoseCursor, block: Vec, - ) -> Result, Error>; + ) -> Result, BlockStreamError>; async fn to_block_stream_event( &self, logger: &mut Logger, message: Option, log_data: &mut SubstreamsLogData, - ) -> Result>, SubstreamsError> { + ) -> Result>, BlockStreamError> { match message { Some(SubstreamsMessage::Session(session_init)) => { info!( @@ -376,7 +376,7 @@ pub trait BlockStreamMapper: Send + Sync { Some(SubstreamsMessage::BlockUndoSignal(undo)) => { let valid_block = match undo.last_valid_block { Some(clock) => clock, - None => return Err(SubstreamsError::InvalidUndoError), + None => return Err(BlockStreamError::from(SubstreamsError::InvalidUndoError)), }; let valid_ptr = BlockPtr { hash: valid_block.id.trim_start_matches("0x").try_into()?, @@ -406,7 +406,7 @@ pub trait BlockStreamMapper: Send + Sync { let clock = match clock { Some(clock) => clock, - None => return Err(SubstreamsError::MissingClockError), + None => return Err(BlockStreamError::from(SubstreamsError::MissingClockError)), }; let value = match module_output.map_output { @@ -457,6 +457,15 @@ pub enum FirehoseError { UnknownError(#[from] anyhow::Error), } +impl From for FirehoseError { + fn from(value: BlockStreamError) -> Self { + match value { + BlockStreamError::ProtobufDecodingError(e) => FirehoseError::DecodingError(e), + e => FirehoseError::UnknownError(anyhow!(e.to_string())), + } + } +} + #[derive(Error, Debug)] pub enum SubstreamsError { #[error("response is missing the clock information")] @@ -465,12 +474,15 @@ pub enum SubstreamsError { #[error("invalid undo message")] InvalidUndoError, + #[error("entity validation failed {0}")] + EntityValidationError(#[from] crate::data::store::EntityValidationError), + /// We were unable to decode the received block payload into the chain specific Block struct (e.g. chain_ethereum::pb::Block) #[error("received gRPC block payload cannot be decoded: {0}")] DecodingError(#[from] prost::DecodeError), /// Some unknown error occurred - #[error("unknown error")] + #[error("unknown error {0}")] UnknownError(#[from] anyhow::Error), #[error("multiple module output error")] @@ -483,11 +495,32 @@ pub enum SubstreamsError { UnexpectedStoreDeltaOutput, } +impl SubstreamsError { + pub fn is_deterministic(&self) -> bool { + use SubstreamsError::*; + + match self { + EntityValidationError(_) => true, + MissingClockError + | InvalidUndoError + | DecodingError(_) + | UnknownError(_) + | MultipleModuleOutputError + | ModuleOutputNotPresentOrUnexpected + | UnexpectedStoreDeltaOutput => false, + } + } +} + #[derive(Debug, Error)] pub enum BlockStreamError { - #[error("block stream error")] + #[error("Failed to decode protobuf {0}")] + ProtobufDecodingError(#[from] prost::DecodeError), + #[error("substreams error: {0}")] + SubstreamsError(#[from] SubstreamsError), + #[error("block stream error {0}")] Unknown(#[from] anyhow::Error), - #[error("block stream fatal error")] + #[error("block stream fatal error {0}")] Fatal(String), } diff --git a/graph/src/blockchain/substreams_block_stream.rs b/graph/src/blockchain/substreams_block_stream.rs index 91451e27eaa..33d9dca1ed3 100644 --- a/graph/src/blockchain/substreams_block_stream.rs +++ b/graph/src/blockchain/substreams_block_stream.rs @@ -237,9 +237,12 @@ fn stream_blocks>( } } }, - Err(BlockStreamError::Fatal(msg)) => { - Err(BlockStreamError::Fatal(msg))? - } + Err(BlockStreamError::SubstreamsError(e)) if e.is_deterministic() => + Err(BlockStreamError::Fatal(e.to_string()))?, + + Err(BlockStreamError::Fatal(msg)) => + Err(BlockStreamError::Fatal(msg))?, + Err(err) => { info!(&logger, "received err"); @@ -308,7 +311,7 @@ async fn process_substreams_response>( match mapper .to_block_stream_event(logger, response.message, log_data) .await - .context("Mapping message to BlockStreamEvent failed")? + .map_err(BlockStreamError::from)? { Some(event) => { let cursor = match &event {