From c375c719d8828b63b2ed0b8f8d1f96db99410138 Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Mon, 7 Jun 2021 12:53:41 +0200 Subject: [PATCH] Refactor retry logic in relayer::channel::handshake (#990) * Refactor retry logic in relayer channel handshake * Fix formatting --- relayer/src/channel.rs | 255 ++++++++++++++++++++--------------- relayer/src/event/monitor.rs | 10 +- relayer/src/util/retry.rs | 13 +- 3 files changed, 164 insertions(+), 114 deletions(-) diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 481d8d9c8f..75bff12ad9 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -4,7 +4,7 @@ use prost_types::Any; use serde::Serialize; use std::time::Duration; use thiserror::Error; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use ibc::events::IbcEvent; use ibc::ics04_channel::channel::{ChannelEnd, Counterparty, IdentifiedChannelEnd, Order, State}; @@ -25,7 +25,22 @@ use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::object::Channel as WorkerChannelObject; use crate::supervisor::Error as WorkerChannelError; -const MAX_RETRIES: usize = 5; +use crate::util::retry::{retry_count, retry_with_index}; + +mod retry_strategy { + use crate::util::retry::clamp_total; + use retry::delay::Fibonacci; + use std::time::Duration; + + // Default parameters for the retrying mechanism + const MAX_DELAY: Duration = Duration::from_secs(60); // 1 minute + const MAX_TOTAL_DELAY: Duration = Duration::from_secs(10 * 60); // 10 minutes + const INITIAL_DELAY: Duration = Duration::from_secs(1); // 1 second + + pub fn default() -> impl Iterator { + clamp_total(Fibonacci::from(INITIAL_DELAY), MAX_DELAY, MAX_TOTAL_DELAY) + } +} use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination}; use crate::util::retry::RetryResult; @@ -328,125 +343,149 @@ impl Channel { } } - /// Executes the channel handshake protocol (ICS004) - fn handshake(&mut self) -> Result<(), ChannelError> { - let done = '🥳'; + fn do_chan_open_init_and_send(&mut self) -> Result<(), ChannelError> { + let event = self + .flipped() + .build_chan_open_init_and_send() + .map_err(|e| { + error!("Failed ChanInit {:?}: {:?}", self.a_side, e); + e + })?; - let a_chain = self.src_chain().clone(); - let b_chain = self.dst_chain().clone(); - - // Try chanOpenInit on a_chain - let mut counter = 0; - let mut init_success = false; - while counter < MAX_RETRIES { - counter += 1; - match self.flipped().build_chan_open_init_and_send() { - Err(e) => { - error!("Failed ChanInit {:?}: {:?}", self.a_side, e); - continue; - } - Ok(event) => { - self.a_side.channel_id = Some(extract_channel_id(&event)?.clone()); - println!("{} {} => {:#?}\n", done, a_chain.id(), event); - init_success = true; - break; - } - } - } + info!("done {} => {:#?}\n", self.src_chain().id(), event); - // Check that the channel was created on a_chain - if !init_success { - return Err(ChannelError::Failed(format!( + let channel_id = extract_channel_id(&event)?; + self.a_side.channel_id = Some(channel_id.clone()); + info!("successfully opened init channel"); + + Ok(()) + } + + // Check that the channel was created on a_chain + fn do_chan_open_init_and_send_with_retry(&mut self) -> Result<(), ChannelError> { + retry_with_index(retry_strategy::default(), |_| { + self.do_chan_open_init_and_send() + }) + .map_err(|err| { + error!("failed to open channel after {} retries", err); + ChannelError::Failed(format!( "Failed to finish channel open init in {} iterations for {:?}", - MAX_RETRIES, self - ))); - }; + retry_count(&err), + self + )) + })?; - // Try chanOpenTry on b_chain - counter = 0; - let mut try_success = false; - while counter < MAX_RETRIES { - counter += 1; - match self.build_chan_open_try_and_send() { - Err(e) => { - error!("Failed ChanTry {:?}: {:?}", self.b_side, e); - continue; - } - Ok(event) => { - self.b_side.channel_id = Some(extract_channel_id(&event)?.clone()); - println!("{} {} => {:#?}\n", done, b_chain.id(), event); - try_success = true; - break; - } - } - } + Ok(()) + } - if !try_success { - return Err(ChannelError::Failed(format!( + fn do_chan_open_try_and_send(&mut self) -> Result<(), ChannelError> { + let event = self.build_chan_open_try_and_send().map_err(|e| { + error!("Failed ChanTry {:?}: {:?}", self.b_side, e); + e + })?; + + let channel_id = extract_channel_id(&event)?; + self.b_side.channel_id = Some(channel_id.clone()); + + println!("done {} => {:#?}\n", self.dst_chain().id(), event); + Ok(()) + } + + fn do_chan_open_try_and_send_with_retry(&mut self) -> Result<(), ChannelError> { + retry_with_index(retry_strategy::default(), |_| { + self.do_chan_open_try_and_send() + }) + .map_err(|err| { + error!("failed to open channel after {} retries", err); + ChannelError::Failed(format!( "Failed to finish channel open try in {} iterations for {:?}", - MAX_RETRIES, self - ))); - }; + retry_count(&err), + self + )) + })?; + + Ok(()) + } + + fn do_channel_handshake(&mut self) -> Result<(), ChannelError> { + let src_channel_id = self + .src_channel_id() + .ok_or(ChannelError::MissingLocalChannelId)?; + + let dst_channel_id = self + .dst_channel_id() + .ok_or(ChannelError::MissingCounterpartyChannelId)?; - counter = 0; - while counter < MAX_RETRIES { - counter += 1; - - let src_channel_id = self - .src_channel_id() - .ok_or(ChannelError::MissingLocalChannelId)?; - let dst_channel_id = self - .dst_channel_id() - .ok_or(ChannelError::MissingCounterpartyChannelId)?; - // Continue loop if query error - let a_channel = - a_chain.query_channel(&self.src_port_id(), &src_channel_id, Height::zero()); - if a_channel.is_err() { - continue; + // Continue loop if query error + let a_channel = self + .src_chain() + .query_channel(&self.src_port_id(), src_channel_id, Height::zero()) + .map_err(|_| ChannelError::Failed("Failed to query source chain".into()))?; + + let b_channel = self + .dst_chain() + .query_channel(&self.dst_port_id(), dst_channel_id, Height::zero()) + .map_err(|_| ChannelError::Failed("Failed to query destination chain".into()))?; + + match (a_channel.state(), b_channel.state()) { + (State::Init, State::TryOpen) | (State::TryOpen, State::TryOpen) => { + let event = self.flipped().build_chan_open_ack_and_send().map_err(|e| { + error!("Failed ChanAck {:?}: {}", self.a_side, e); + e + })?; + + info!("done {} => {:#?}\n", self.src_chain().id(), event); } - let b_channel = - b_chain.query_channel(&self.dst_port_id(), &dst_channel_id, Height::zero()); - if b_channel.is_err() { - continue; + (State::Open, State::TryOpen) => { + let event = self.build_chan_open_confirm_and_send().map_err(|e| { + error!("Failed ChanConfirm {:?}: {}", self.b_side, e); + e + })?; + + info!("done {} => {:#?}\n", self.dst_chain().id(), event); } + (State::TryOpen, State::Open) => { + // Confirm to a_chain + let event = self + .flipped() + .build_chan_open_confirm_and_send() + .map_err(|e| { + error!("Failed ChanConfirm {:?}: {}", self.a_side, e); + e + })?; - match (a_channel.unwrap().state(), b_channel.unwrap().state()) { - (State::Init, State::TryOpen) | (State::TryOpen, State::TryOpen) => { - // Ack to a_chain - match self.flipped().build_chan_open_ack_and_send() { - Err(e) => error!("Failed ChanAck {:?}: {}", self.a_side, e), - Ok(event) => println!("{} {} => {:#?}\n", done, a_chain.id(), event), - } - } - (State::Open, State::TryOpen) => { - // Confirm to b_chain - match self.build_chan_open_confirm_and_send() { - Err(e) => error!("Failed ChanConfirm {:?}: {}", self.b_side, e), - Ok(event) => println!("{} {} => {:#?}\n", done, b_chain.id(), event), - } - } - (State::TryOpen, State::Open) => { - // Confirm to a_chain - match self.flipped().build_chan_open_confirm_and_send() { - Err(e) => error!("Failed ChanConfirm {:?}: {}", self.a_side, e), - Ok(event) => println!("{} {} => {:#?}\n", done, a_chain.id(), event), - } - } - (State::Open, State::Open) => { - println!( - "{} {} {} Channel handshake finished for {:#?}\n", - done, done, done, self - ); - return Ok(()); - } - _ => {} // TODO channel close + info!("done {} => {:#?}\n", self.src_chain().id(), event) + } + (State::Open, State::Open) => { + info!("Channel handshake finished for {:#?}\n", self); } + _ => { /* TODO channel close */ } } - Err(ChannelError::Failed(format!( - "Failed to finish channel handshake in {} iterations for {:?}", - MAX_RETRIES, self - ))) + info!("successfully opened channel"); + Ok(()) + } + + fn do_channel_handshake_with_retry(&mut self) -> Result<(), ChannelError> { + retry_with_index(retry_strategy::default(), |_| self.do_channel_handshake()).map_err( + |err| { + error!("failed to open channel after {} retries", err); + ChannelError::Failed(format!( + "Failed to finish channel handshake in {} iterations for {:?}", + retry_count(&err), + self + )) + }, + )?; + + Ok(()) + } + + /// Executes the channel handshake protocol (ICS004) + fn handshake(&mut self) -> Result<(), ChannelError> { + self.do_chan_open_init_and_send_with_retry()?; + self.do_chan_open_try_and_send_with_retry()?; + self.do_channel_handshake_with_retry() } pub fn counterparty_state(&self) -> Result { diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 77a5126ceb..b243341a3c 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -21,7 +21,7 @@ use tendermint_rpc::{ use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId}; use crate::util::{ - retry::{retry_with_index, RetryResult}, + retry::{retry_count, retry_with_index, RetryResult}, stream::group_while, }; @@ -259,17 +259,17 @@ impl EventMonitor { /// See the [`retry`](https://docs.rs/retry) crate and the /// [`crate::util::retry`] module for more information. fn restart(&mut self) { - let result = retry_with_index(retry_strategy::default(), |index| { + let result = retry_with_index(retry_strategy::default(), |_| { // Try to reconnect if let Err(e) = self.try_reconnect() { trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); - return RetryResult::Retry(index); + return RetryResult::Retry(()); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e); - return RetryResult::Retry(index); + return RetryResult::Retry(()); } RetryResult::Ok(()) @@ -284,7 +284,7 @@ impl EventMonitor { Err(retries) => error!( chain.id = %self.chain_id, "failed to reconnect to {} after {} retries", - self.node_addr, retries + self.node_addr, retry_count(&retries) ), } } diff --git a/relayer/src/util/retry.rs b/relayer/src/util/retry.rs index f63e09e8fd..77e52fb008 100644 --- a/relayer/src/util/retry.rs +++ b/relayer/src/util/retry.rs @@ -2,9 +2,20 @@ use std::time::Duration; pub use retry::{ delay::{Fibonacci, Fixed}, - retry_with_index, OperationResult as RetryResult, + retry_with_index, Error as RetryError, OperationResult as RetryResult, }; +pub fn retry_count(err: &RetryError) -> u64 { + match err { + RetryError::Operation { + tries, + total_delay: _, + error: _, + } => *tries, + _ => 0, + } +} + #[derive(Copy, Clone, Debug)] pub struct ConstantGrowth { delay: Duration,