diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index f36d75982484d..3ed0348db120f 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -117,8 +117,14 @@ impl Datasource for EthSubscriptionDatasource { } } _ = interval.tick() => { - let latest_block = eth_ws_client.get_block_number().await?.as_u64(); - progress_metric.set(latest_block as i64); + let Ok(Ok(block_num)) = retry_with_max_elapsed_time!( + eth_ws_client.get_block_number(), + Duration::from_secs(30000) + ) else { + tracing::error!("Failed to get block number"); + continue; + }; + progress_metric.set(block_num.as_u64() as i64); } } } diff --git a/crates/sui-bridge-watchdog/src/eth_vault_balance.rs b/crates/sui-bridge-watchdog/src/eth_vault_balance.rs index 81bc0f4c44b9d..dfc359e0cb393 100644 --- a/crates/sui-bridge-watchdog/src/eth_vault_balance.rs +++ b/crates/sui-bridge-watchdog/src/eth_vault_balance.rs @@ -12,6 +12,8 @@ use sui_bridge::metered_eth_provider::MeteredEthHttpProvier; use tokio::time::Duration; use tracing::{error, info}; +const TEN_ZEROS: u64 = 10_u64.pow(10); + pub struct EthVaultBalance { coin_contract: EthERC20>, vault_address: EthAddress, @@ -26,7 +28,7 @@ impl EthVaultBalance { coin_address: EthAddress, // for now this only support one coin which is WETH metric: IntGauge, ) -> Self { - let ten_zeros = U256::from(10_u64.pow(10)); + let ten_zeros = U256::from(TEN_ZEROS); let coin_contract = EthERC20::new(coin_address, provider); Self { coin_contract, diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index 3f612c0187d1a..b89ff975ae562 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -422,7 +422,13 @@ pub trait Datasource: Sync + Send { let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx) .ready_chunks(ingestion_batch_size); let mut last_saved_checkpoint = None; - while let Some(batch) = stream.next().await { + loop { + let batch_option = stream.next().await; + if batch_option.is_none() { + tracing::error!(task_name, "Data stream ended unexpectedly"); + break; + } + let batch = batch_option.unwrap(); let mut max_height = 0; let mut heights = vec![]; let mut data = vec![]; @@ -521,7 +527,9 @@ pub trait Datasource: Sync + Send { if let Some(m) = &remaining_checkpoints_metric { m.set(0) } - join_handle.await? + join_handle.await?.tap_err(|err| { + tracing::error!(task_name, "Data retrieval task failed: {:?}", err); + }) } async fn start_data_retrieval(