Skip to content

Commit

Permalink
more observables
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Oct 3, 2024
1 parent 4364288 commit 3c88e86
Show file tree
Hide file tree
Showing 12 changed files with 385 additions and 6 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ members = [
"crates/sui-bridge",
"crates/sui-bridge-cli",
"crates/sui-bridge-indexer",
"crates/sui-bridge-watchdog",
"crates/sui-cluster-test",
"crates/sui-config",
"crates/sui-core",
Expand Down Expand Up @@ -617,6 +618,7 @@ sui-archival = { path = "crates/sui-archival" }
sui-authority-aggregation = { path = "crates/sui-authority-aggregation" }
sui-benchmark = { path = "crates/sui-benchmark" }
sui-bridge = { path = "crates/sui-bridge" }
sui-bridge-watchdog = { path = "crates/sui-bridge-watchdog" }
sui-cluster-test = { path = "crates/sui-cluster-test" }
sui-config = { path = "crates/sui-config" }
sui-core = { path = "crates/sui-core" }
Expand Down
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ backoff.workspace = true
sui-config.workspace = true
tempfile.workspace = true
sui-indexer-builder.workspace = true
sui-bridge-watchdog.workspace = true

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
Expand Down
65 changes: 61 additions & 4 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::Result;
use clap::*;
use ethers::providers::{Http, Provider};
use ethers::types::Address as EthAddress;
use prometheus::Registry;
use std::collections::HashSet;
use std::env;
use std::net::IpAddr;
Expand All @@ -13,10 +14,12 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use sui_bridge::metered_eth_provider::{new_metered_eth_provider, MeteredEthHttpProvier};
use sui_bridge::sui_client::SuiBridgeClient;
use sui_bridge::utils::get_eth_contract_addresses;
use sui_bridge_indexer::eth_bridge_indexer::EthFinalizedSyncDatasource;
use sui_bridge_indexer::eth_bridge_indexer::EthSubscriptionDatasource;
use sui_config::Config;
use tokio::task::JoinHandle;
use tracing::info;

Expand All @@ -34,7 +37,10 @@ use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_config::Config;
use sui_bridge_watchdog::{
eth_bridge_status::EthBridgeStatus, eth_vault_balance::EthVaultBalance,
metrics::WatchdogMetrics, sui_bridge_status::SuiBridgeStatus, BridgeWatchDog,
};
use sui_data_ingestion_core::DataIngestionMetrics;
use sui_indexer_builder::indexer_builder::{BackfillStrategy, IndexerBuilder};
use sui_indexer_builder::progress::{
Expand Down Expand Up @@ -101,7 +107,7 @@ async fn main() -> Result<()> {
)
.await?,
);

let eth_bridge_proxy_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let mut tasks = vec![];
if Some(true) == config.disable_eth {
info!("Eth indexer is disabled");
Expand Down Expand Up @@ -174,11 +180,12 @@ async fn main() -> Result<()> {
.await?,
);
let sui_checkpoint_datasource = SuiCheckpointDatasource::new(
config.remote_store_url,
config.remote_store_url.clone(),
sui_client,
config.concurrency as usize,
config
.checkpoints_path
.clone()
.map(|p| p.into())
.unwrap_or(tempfile::tempdir()?.into_path()),
config.sui_bridge_genesis_checkpoint,
Expand All @@ -196,11 +203,61 @@ async fn main() -> Result<()> {
.build();
tasks.push(spawn_logged_monitored_task!(indexer.start()));

let sui_bridge_client =
Arc::new(SuiBridgeClient::new(&config.sui_rpc_url, bridge_metrics.clone()).await?);
start_watchdog(
config,
eth_bridge_proxy_address,
sui_bridge_client,
&registry,
bridge_metrics.clone(),
)
.await?;

// Wait for tasks in `tasks` to finish. Return when anyone of them returns an error.
futures::future::try_join_all(tasks).await?;
unreachable!("Indexer tasks finished unexpectedly");
}

async fn start_watchdog(
config: IndexerConfig,
eth_bridge_proxy_address: EthAddress,
sui_client: Arc<SuiBridgeClient>,
registry: &Registry,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<()> {
let watchdog_metrics = WatchdogMetrics::new(registry);
let eth_provider =
Arc::new(new_metered_eth_provider(&config.eth_rpc_url, bridge_metrics.clone()).unwrap());
let (_committee_address, _limiter_address, vault_address, _config_address, weth_address) =
get_eth_contract_addresses(eth_bridge_proxy_address, &eth_provider).await?;

let eth_vault_balance = EthVaultBalance::new(
eth_provider.clone(),
vault_address,
weth_address,
watchdog_metrics.eth_vault_balance.clone(),
);

let eth_bridge_status = EthBridgeStatus::new(
eth_provider,
eth_bridge_proxy_address,
watchdog_metrics.eth_bridge_paused.clone(),
);

let sui_bridge_status =
SuiBridgeStatus::new(sui_client, watchdog_metrics.sui_bridge_paused.clone());

BridgeWatchDog::new(vec![
Arc::new(eth_vault_balance),
Arc::new(eth_bridge_status),
Arc::new(sui_bridge_status),
])
.run()
.await;
Ok(())
}

#[allow(unused)]
async fn start_processing_sui_checkpoints_by_querying_txns(
sui_rpc_url: String,
Expand Down
18 changes: 18 additions & 0 deletions crates/sui-bridge-watchdog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "sui-bridge-watchdog"
version = "0.1.0"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
sui-bridge.workspace = true
mysten-metrics.workspace = true
prometheus.workspace = true
anyhow.workspace = true
futures.workspace = true
async-trait.workspace = true
ethers = { version = "2.0" }
tracing.workspace = true
tokio = { workspace = true, features = ["full"] }
58 changes: 58 additions & 0 deletions crates/sui-bridge-watchdog/src/eth_bridge_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! The EthBridgeStatus observable monitors whether the Eth Bridge is paused.

use crate::Observable;
use async_trait::async_trait;
use ethers::providers::Provider;
use ethers::types::Address as EthAddress;
use prometheus::IntGauge;
use std::sync::Arc;
use sui_bridge::abi::EthSuiBridge;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use tokio::time::Duration;
use tracing::{error, info};

pub struct EthBridgeStatus {
bridge_contract: EthSuiBridge<Provider<MeteredEthHttpProvier>>,
metric: IntGauge,
}

impl EthBridgeStatus {
pub fn new(
provider: Arc<Provider<MeteredEthHttpProvier>>,
bridge_address: EthAddress,
metric: IntGauge,
) -> Self {
let bridge_contract = EthSuiBridge::new(bridge_address, provider.clone());
Self {
bridge_contract,
metric,
}
}
}

#[async_trait]
impl Observable for EthBridgeStatus {
fn name(&self) -> &str {
"EthBridgeStatus"
}

async fn observe_and_report(&self) {
let status = self.bridge_contract.paused().call().await;
match status {
Ok(status) => {
self.metric.set(status as i64);
info!("Eth Bridge Status: {:?}", status);
}
Err(e) => {
error!("Error getting eth bridge status: {:?}", e);
}
}
}

fn interval(&self) -> Duration {
Duration::from_secs(10)
}
}
73 changes: 73 additions & 0 deletions crates/sui-bridge-watchdog/src/eth_vault_balance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::Observable;
use async_trait::async_trait;
use ethers::providers::Provider;
use ethers::types::{Address as EthAddress, U256};
use prometheus::IntGauge;
use std::sync::Arc;
use sui_bridge::abi::EthERC20;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use tokio::time::Duration;
use tracing::{error, info};

pub struct EthVaultBalance {
coin_contract: EthERC20<Provider<MeteredEthHttpProvier>>,
vault_address: EthAddress,
ten_zeros: U256,
metric: IntGauge,
}

impl EthVaultBalance {
pub fn new(
provider: Arc<Provider<MeteredEthHttpProvier>>,
vault_address: EthAddress,
coin_address: EthAddress, // for now this only support one coin which is WETH
metric: IntGauge,
) -> Self {
let ten_zeros = U256::from(10_u64.pow(10));
let coin_contract = EthERC20::new(coin_address, provider);
Self {
coin_contract,
vault_address,
ten_zeros,
metric,
}
}
}

#[async_trait]
impl Observable for EthVaultBalance {
fn name(&self) -> &str {
"EthVaultBalance"
}

async fn observe_and_report(&self) {
match self
.coin_contract
.balance_of(self.vault_address)
.call()
.await
{
Ok(balance) => {
// Why downcasting is safe:
// 1. On Ethereum we only take the first 8 decimals into account,
// meaning the trailing 10 digits can be ignored
// 2. i64::MAX is 9_223_372_036_854_775_807, with 8 decimal places is
// 92_233_720_368. We likely won't see any balance higher than this
// in the next 12 months.
let balance = (balance / self.ten_zeros).as_u64() as i64;
self.metric.set(balance);
info!("Eth Vault Balance: {:?}", balance);
}
Err(e) => {
error!("Error getting balance from vault: {:?}", e);
}
}
}

fn interval(&self) -> Duration {
Duration::from_secs(10)
}
}
62 changes: 62 additions & 0 deletions crates/sui-bridge-watchdog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! The BridgeWatchDog module is responsible for monitoring the health
//! of the bridge by periodically running various observables and
//! reporting the results.

use anyhow::Result;
use async_trait::async_trait;
use mysten_metrics::spawn_logged_monitored_task;
use std::sync::Arc;
use tokio::time::Duration;
use tokio::time::MissedTickBehavior;
use tracing::{error_span, info, Instrument};

pub mod eth_bridge_status;
pub mod eth_vault_balance;
pub mod metrics;
pub mod sui_bridge_status;

pub struct BridgeWatchDog {
observables: Vec<Arc<dyn Observable + Send + Sync>>,
}

impl BridgeWatchDog {
pub fn new(observables: Vec<Arc<dyn Observable + Send + Sync>>) -> Self {
Self { observables }
}

pub async fn run(self) {
let mut handles = vec![];
for observable in self.observables.into_iter() {
let handle = spawn_logged_monitored_task!(Self::run_observable(observable));
handles.push(handle);
}
// Return when any task returns an error or all tasks exit.
futures::future::try_join_all(handles).await.unwrap();
unreachable!("watch dog tasks should not exit");
}

async fn run_observable(observable: Arc<dyn Observable + Send + Sync>) -> Result<()> {
let mut interval = tokio::time::interval(observable.interval());
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
let name = observable.name();
let span = error_span!("observable", name);
loop {
info!("Running observable {}", name);
observable
.observe_and_report()
.instrument(span.clone())
.await;
interval.tick().await;
}
}
}

#[async_trait]
pub trait Observable {
fn name(&self) -> &str;
async fn observe_and_report(&self);
fn interval(&self) -> Duration;
}
Loading

0 comments on commit 3c88e86

Please sign in to comment.