Skip to content

Commit

Permalink
improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 20, 2024
1 parent b9a219e commit 614892d
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 46 deletions.
29 changes: 22 additions & 7 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use crate::{
codec,
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{BlockStream, BlockStreamMapper, FirehoseCursor};
use graph::blockchain::block_stream::{
BlockStream, BlockStreamError, BlockStreamMapper, FirehoseCursor,
};

pub struct Chain {
logger_factory: LoggerFactory,
Expand Down Expand Up @@ -259,10 +261,17 @@ pub struct FirehoseMapper {

#[async_trait]
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<codec::Block>, Error> {
fn decode_block(
&self,
output: Option<&[u8]>,
) -> Result<Option<codec::Block>, BlockStreamError> {
let block = match output {
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("Arweave mapper is expected to always have a block"),
None => {
return Err(anyhow::anyhow!(
"Arweave mapper is expected to always have a block"
))?
}
};

Ok(Some(block))
Expand All @@ -272,18 +281,19 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
&self,
logger: &Logger,
block: codec::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
self.adapter
.triggers_in_block(logger, block, self.filter.as_ref())
.await
.map_err(BlockStreamError::from)
}
async fn handle_substreams_block(
&self,
_logger: &Logger,
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
unimplemented!()
}
}
Expand Down Expand Up @@ -319,12 +329,17 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(&any_block.value.as_ref()))?.unwrap();
let block = self
.decode_block(Some(&any_block.value.as_ref()))
.map_err(Error::from)?
.unwrap();

use ForkStep::*;
match step {
StepNew => Ok(BlockStreamEvent::ProcessBlock(
self.block_with_triggers(&logger, block).await?,
self.block_with_triggers(&logger, block)
.await
.map_err(Error::from)?,
FirehoseCursor::from(response.cursor.clone()),
)),

Expand Down
30 changes: 22 additions & 8 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use std::sync::Arc;

use graph::blockchain::block_stream::{BlockStreamMapper, FirehoseCursor};
use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, FirehoseCursor};
use graph::blockchain::client::ChainClient;
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
use graph::cheap_clone::CheapClone;
Expand Down Expand Up @@ -332,10 +332,18 @@ pub struct FirehoseMapper {

#[async_trait]
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<crate::Block>, Error> {
fn decode_block(
&self,
output: Option<&[u8]>,
) -> Result<Option<crate::Block>, BlockStreamError> {
let block = match output {
Some(block) => crate::Block::decode(block)?,
None => anyhow::bail!("cosmos mapper is expected to always have a block"),
Some(block) => crate::Block::decode(block)
.map_err(|err| BlockStreamError::from(Error::from(err)))?,
None => {
return Err(BlockStreamError::from(anyhow::anyhow!(
"cosmos mapper is expected to always have a block"
)))
}
};

Ok(Some(block))
Expand All @@ -345,10 +353,11 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
&self,
logger: &Logger,
block: crate::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
self.adapter
.triggers_in_block(logger, block, self.filter.as_ref())
.await
.map_err(BlockStreamError::from)
}

async fn handle_substreams_block(
Expand All @@ -357,7 +366,7 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
unimplemented!()
}
}
Expand Down Expand Up @@ -393,11 +402,16 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
// define a slimmed down struct that would decode only a few fields and ignore all the rest.
// unwrap: Input cannot be None so output will be error or block.
let block = self.decode_block(Some(any_block.value.as_ref()))?.unwrap();
let block = self
.decode_block(Some(any_block.value.as_ref()))
.map_err(Error::from)?
.unwrap();

match step {
ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
self.block_with_triggers(logger, block).await?,
self.block_with_triggers(logger, block)
.await
.map_err(Error::from)?,
FirehoseCursor::from(response.cursor.clone()),
)),

Expand Down
18 changes: 13 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::{
SubgraphEthRpcMetrics, TriggerFilter, ENV_VARS,
};
use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor,
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
};

/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
Expand Down Expand Up @@ -745,10 +745,17 @@ pub struct FirehoseMapper {

#[async_trait]
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<BlockFinality>, Error> {
fn decode_block(
&self,
output: Option<&[u8]>,
) -> Result<Option<BlockFinality>, BlockStreamError> {
let block = match output {
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("ethereum mapper is expected to always have a block"),
None => {
return Err(BlockStreamError::from(anyhow::anyhow!(
"ethereum mapper is expected to always have a block"
)))
}
};

// See comment(437a9f17-67cc-478f-80a3-804fe554b227) ethereum_block.calls is always Some even if calls
Expand All @@ -762,10 +769,11 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
&self,
logger: &Logger,
block: BlockFinality,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
self.adapter
.triggers_in_block(logger, block, &self.filter)
.await
.map_err(BlockStreamError::from)
}

async fn handle_substreams_block(
Expand All @@ -774,7 +782,7 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
_clock: Clock,
_cursor: FirehoseCursor,
_block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
unimplemented!()
}
}
Expand Down
19 changes: 14 additions & 5 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{
data_source::{DataSource, UnresolvedDataSource},
};
use graph::blockchain::block_stream::{
BlockStream, BlockStreamBuilder, BlockStreamMapper, FirehoseCursor,
BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor,
};

const NEAR_FILTER_MODULE_NAME: &str = "near_filter";
Expand Down Expand Up @@ -408,10 +408,18 @@ pub struct FirehoseMapper {

#[async_trait]
impl BlockStreamMapper<Chain> for FirehoseMapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<codec::Block>, Error> {
fn decode_block(
&self,
output: Option<&[u8]>,
) -> Result<Option<codec::Block>, BlockStreamError> {
let block = match output {
Some(block) => codec::Block::decode(block)?,
None => anyhow::bail!("near mapper is expected to always have a block"),
None => {
return Err(anyhow::anyhow!(
"near mapper is expected to always have a block"
))
.map_err(BlockStreamError::from)
}
};

Ok(Some(block))
Expand All @@ -421,10 +429,11 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
&self,
logger: &Logger,
block: codec::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
self.adapter
.triggers_in_block(logger, block, self.filter.as_ref())
.await
.map_err(BlockStreamError::from)
}

async fn handle_substreams_block(
Expand All @@ -433,7 +442,7 @@ impl BlockStreamMapper<Chain> for FirehoseMapper {
_clock: Clock,
cursor: FirehoseCursor,
message: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
let BlockAndReceipts {
block,
outcome,
Expand Down
30 changes: 16 additions & 14 deletions chain/substreams/src/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::str::FromStr;
use crate::codec::{entity_change, EntityChanges};
use anyhow::{anyhow, Error};
use graph::blockchain::block_stream::{
BlockStreamEvent, BlockStreamMapper, BlockWithTriggers, FirehoseCursor, SubstreamsError,
BlockStreamError, BlockStreamEvent, BlockStreamMapper, BlockWithTriggers, FirehoseCursor,
SubstreamsError,
};
use graph::blockchain::BlockTime;
use graph::data::store::scalar::Bytes;
Expand All @@ -29,15 +30,18 @@ pub struct WasmBlockMapper {

#[async_trait]
impl BlockStreamMapper<Chain> for WasmBlockMapper {
fn decode_block(&self, _output: Option<&[u8]>) -> Result<Option<crate::Block>, Error> {
fn decode_block(
&self,
_output: Option<&[u8]>,
) -> Result<Option<crate::Block>, BlockStreamError> {
unreachable!("WasmBlockMapper does not do block decoding")
}

async fn block_with_triggers(
&self,
_logger: &Logger,
_block: crate::Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
unreachable!("WasmBlockMapper does not do trigger decoding")
}

Expand All @@ -47,7 +51,7 @@ impl BlockStreamMapper<Chain> for WasmBlockMapper {
clock: Clock,
cursor: FirehoseCursor,
block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
let Clock {
id,
number,
Expand All @@ -56,7 +60,7 @@ impl BlockStreamMapper<Chain> for WasmBlockMapper {

let block_ptr = BlockPtr {
hash: BlockHash::from(id.into_bytes()),
number: BlockNumber::from(TryInto::<i32>::try_into(number)?),
number: BlockNumber::from(TryInto::<i32>::try_into(number).map_err(Error::from)?),
};

let block_data = block.into_boxed_slice();
Expand All @@ -71,7 +75,7 @@ impl BlockStreamMapper<Chain> for WasmBlockMapper {
);
return Err(anyhow!(
"Substream block is missing a timestamp at cursor {cursor}, block number {number}"
));
)).map_err(BlockStreamError::from);
}
Some(ts) => BlockTime::since_epoch(ts.seconds, ts.nanos as u32),
};
Expand Down Expand Up @@ -100,7 +104,7 @@ pub struct Mapper {

#[async_trait]
impl BlockStreamMapper<Chain> for Mapper {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<Block>, Error> {
fn decode_block(&self, output: Option<&[u8]>) -> Result<Option<Block>, BlockStreamError> {
let changes: EntityChanges = match output {
Some(msg) => Message::decode(msg).map_err(SubstreamsError::DecodingError)?,
None => EntityChanges {
Expand Down Expand Up @@ -130,7 +134,7 @@ impl BlockStreamMapper<Chain> for Mapper {
&self,
logger: &Logger,
block: Block,
) -> Result<BlockWithTriggers<Chain>, Error> {
) -> Result<BlockWithTriggers<Chain>, BlockStreamError> {
let mut triggers = vec![];
if block.changes.entity_changes.len() >= 1 {
triggers.push(TriggerData {});
Expand All @@ -145,9 +149,9 @@ impl BlockStreamMapper<Chain> for Mapper {
clock: Clock,
cursor: FirehoseCursor,
block: Vec<u8>,
) -> Result<BlockStreamEvent<Chain>, Error> {
let block_number: BlockNumber = clock.number.try_into()?;
let block_hash = clock.id.as_bytes().to_vec().try_into()?;
) -> Result<BlockStreamEvent<Chain>, BlockStreamError> {
let block_number: BlockNumber = clock.number.try_into().map_err(Error::from)?;
let block_hash = clock.id.as_bytes().to_vec().into();

let block = self
.decode_block(Some(&block))?
Expand Down Expand Up @@ -215,9 +219,7 @@ fn parse_changes(
.entry(Word::from(field.name.as_str()))
.or_insert(Value::Null) = value;
}
let entity = schema
.make_entity(parsed_data)
.map_err(anyhow::Error::from)?;
let entity = schema.make_entity(parsed_data)?;

ParsedChanges::Upsert { key, entity }
}
Expand Down
Loading

0 comments on commit 614892d

Please sign in to comment.