From be001ebddcc064e1473ffe758a9718ee43b83151 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Wed, 2 Dec 2020 22:30:09 +0100 Subject: [PATCH] Changes for the tendermint dep rebase (#437) * Changes for the tendermint dep rebase * error format * Changes for integration with tm pr#706 * remove event_monitor.rs file --- Cargo.toml | 3 +- modules/src/ics07_tendermint/header.rs | 2 +- relayer/src/chain/cosmos.rs | 30 +---- relayer/src/event/monitor.rs | 8 +- relayer/src/event_monitor.rs | 156 ------------------------- 5 files changed, 10 insertions(+), 189 deletions(-) delete mode 100644 relayer/src/event_monitor.rs diff --git a/Cargo.toml b/Cargo.toml index 39e6021cfb..549ac13f08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,5 @@ exclude = [ tendermint = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } tendermint-rpc = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } tendermint-proto = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } -tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } \ No newline at end of file +tendermint-light-client = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } +tendermint-testgen = { git = "https://github.com/informalsystems/tendermint-rs", branch = "romac/skip-verif" } \ No newline at end of file diff --git a/modules/src/ics07_tendermint/header.rs b/modules/src/ics07_tendermint/header.rs index 1014fe4498..9be97b1f9a 100644 --- a/modules/src/ics07_tendermint/header.rs +++ b/modules/src/ics07_tendermint/header.rs @@ -132,7 +132,7 @@ pub mod test_util { 281_815_u64.try_into().unwrap(), ); - let vs = ValidatorSet::new(vec![v1], Some(v1), 281_815_u64.try_into().unwrap()); + let vs = ValidatorSet::new(vec![v1], Some(v1), None); Header { signed_header: shdr, diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index de5f09d5c7..d80dafef52 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -1,4 +1,4 @@ -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::future::Future; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -19,7 +19,7 @@ use tendermint::account::Id as AccountId; use tendermint::block::Height; use tendermint::consensus::Params; -use tendermint_light_client::types::{LightBlock as TMLightBlock, ValidatorSet}; +use tendermint_light_client::types::LightBlock as TMLightBlock; use tendermint_rpc::Client; use tendermint_rpc::HttpClient; @@ -376,8 +376,8 @@ impl Chain for CosmosSDKChain { Ok(TMHeader { trusted_height, signed_header: target_light_block.signed_header.clone(), - validator_set: fix_validator_set(&target_light_block)?, - trusted_validator_set: fix_validator_set(&trusted_light_block)?, + validator_set: target_light_block.validators, + trusted_validator_set: trusted_light_block.validators, }) } @@ -411,28 +411,6 @@ impl Chain for CosmosSDKChain { } } -fn fix_validator_set(light_block: &TMLightBlock) -> Result { - let validators = light_block.validators.validators(); - // Get the proposer. - let proposer = validators - .iter() - .find(|v| v.address == light_block.signed_header.header.proposer_address) - .ok_or(Kind::EmptyResponseValue)?; - - let voting_power: u64 = validators.iter().map(|v| v.voting_power.value()).sum(); - - // Create the validator set with the proposer from the header. - // This is required by IBC on-chain validation. - let validator_set = ValidatorSet::new( - validators.clone(), - Some(*proposer), - voting_power - .try_into() - .map_err(|e| Kind::EmptyResponseValue.context(e))?, - ); - Ok(validator_set) -} - /// Perform a generic `abci_query`, and return the corresponding deserialized response data. async fn abci_query( chain: &CosmosSDKChain, diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 673d4eeab4..561b9d9f4e 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -134,11 +134,9 @@ impl EventMonitor { // Shut down previous client debug!("Gracefully shutting down previous client"); - self.rt - .lock() - .map_err(|_| Kind::PoisonedMutex)? - .block_on(websocket_client.close()) - .map_err(|e| format!("Failed to close previous WebSocket client: {}", e))?; + if let Err(e) = websocket_client.close() { + error!("Previous websocket client closing failure {}", e); + } self.rt .lock() diff --git a/relayer/src/event_monitor.rs b/relayer/src/event_monitor.rs deleted file mode 100644 index a2a7e0c62a..0000000000 --- a/relayer/src/event_monitor.rs +++ /dev/null @@ -1,156 +0,0 @@ -use ibc::events::IBCEvent; -use tendermint::{chain, net, Error as TMError}; -use tendermint_rpc::{ - query::EventType, query::Query, Subscription, SubscriptionClient, WebSocketClient, -}; -use tokio::stream::StreamExt; -use tokio::sync::mpsc::Sender; - -use futures::{stream::select_all, Stream}; -use tokio::task::JoinHandle; -use tracing::{debug, error, info}; - -type SubscriptionResult = Result; -type SubscriptionStream = dyn Stream + Send + Sync + Unpin; - -/// Connect to a TM node, receive push events over a websocket and filter them for the -/// event handler. -pub struct EventMonitor { - chain_id: chain::Id, - /// WebSocket to collect events from - websocket_client: WebSocketClient, - /// Async task handle for the WebSocket client's driver - websocket_driver_handle: JoinHandle>, - /// Channel to handler where the monitor for this chain sends the events - channel_to_handler: Sender<(chain::Id, Vec)>, - /// Node Address - node_addr: net::Address, - /// Queries - event_queries: Vec, - /// All subscriptions combined in a single stream - subscriptions: Box, -} - -impl EventMonitor { - /// Create an event monitor, connect to a node and subscribe to queries. - pub async fn create( - chain_id: chain::Id, - rpc_addr: net::Address, - channel_to_handler: Sender<(chain::Id, Vec)>, - ) -> Result> { - let (websocket_client, websocket_driver) = WebSocketClient::new(rpc_addr.clone()).await?; - let websocket_driver_handle = tokio::spawn(async move { websocket_driver.run().await }); - - // TODO: move them to config file(?) - let event_queries = vec![Query::from(EventType::Tx), Query::from(EventType::NewBlock)]; - - Ok(EventMonitor { - chain_id, - websocket_client, - websocket_driver_handle, - channel_to_handler, - node_addr: rpc_addr.clone(), - event_queries, - subscriptions: Box::new(futures::stream::empty()), - }) - } - - /// Clear the current subscriptions, and subscribe again to all queries. - pub async fn subscribe(&mut self) -> Result<(), Box> { - let mut subscriptions = vec![]; - - for query in &self.event_queries { - let subscription = self.websocket_client.subscribe(query.clone()).await?; - subscriptions.push(subscription); - } - - self.subscriptions = Box::new(select_all(subscriptions)); - - Ok(()) - } - - /// Event monitor loop - pub async fn run(&mut self) { - info!(chain.id = % self.chain_id, "running listener for"); - - loop { - match self.collect_events().await { - Ok(..) => continue, - Err(err) => { - debug!("Web socket error: {}", err); - - // Try to reconnect - let (mut websocket_client, websocket_driver) = - WebSocketClient::new(self.node_addr.clone()) - .await - .unwrap_or_else(|e| { - debug!("Error on reconnection: {}", e); - panic!("Abort on failed reconnection"); - }); - let mut websocket_driver_handle = - tokio::spawn(async move { websocket_driver.run().await }); - - // Swap the new client with the previous one which failed, - // so that we can shut the latter down gracefully. - std::mem::swap(&mut self.websocket_client, &mut websocket_client); - std::mem::swap( - &mut self.websocket_driver_handle, - &mut websocket_driver_handle, - ); - - debug!("Reconnected"); - - // Shut down previous client - debug!("Gracefully shutting down previous client"); - websocket_client.close().await.unwrap_or_else(|e| { - error!("Failed to close previous WebSocket client: {}", e); - }); - websocket_driver_handle - .await - .unwrap_or_else(|e| { - Err(tendermint_rpc::Error::client_internal_error(format!( - "failed to terminate previous WebSocket client driver: {}", - e - ))) - }) - .unwrap_or_else(|e| { - error!("Previous WebSocket client driver failed with error: {}", e); - }); - - // Try to resubscribe - if let Err(err) = self.subscribe().await { - debug!("Error on recreating subscriptions: {}", err); - panic!("Abort during reconnection"); - }; - } - } - } - } - - /// Collect the IBC events from the subscriptions - pub async fn collect_events(&mut self) -> Result<(), TMError> { - tokio::select! { - Some(event) = self.subscriptions.next() => { - match event { - Ok(event) => { - match ibc::events::get_all_events(event.clone()) { - Ok(ibc_events) => { - self.channel_to_handler - .send((self.chain_id.clone(), ibc_events)) - .await? - }, - Err(err) => { - error!("Error {} when extracting IBC events from {:?}: ", err, event); - } - } - } - Err(err) => { - error!("Error on collecting events from subscriptions: {}", err); - } - } - }, - } - - Ok(()) - } -} \ No newline at end of file