Skip to content

Commit

Permalink
Refactor retry logic in relayer::channel::handshake (#990)
Browse files Browse the repository at this point in the history
* Refactor retry logic in relayer channel handshake

* Fix formatting
  • Loading branch information
soareschen authored Jun 7, 2021
1 parent dc6e840 commit c375c71
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 114 deletions.
255 changes: 147 additions & 108 deletions relayer/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use prost_types::Any;
use serde::Serialize;
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error};
use tracing::{debug, error, info};

use ibc::events::IbcEvent;
use ibc::ics04_channel::channel::{ChannelEnd, Counterparty, IdentifiedChannelEnd, Order, State};
Expand All @@ -25,7 +25,22 @@ use crate::foreign_client::{ForeignClient, ForeignClientError};
use crate::object::Channel as WorkerChannelObject;
use crate::supervisor::Error as WorkerChannelError;

const MAX_RETRIES: usize = 5;
use crate::util::retry::{retry_count, retry_with_index};

mod retry_strategy {
use crate::util::retry::clamp_total;
use retry::delay::Fibonacci;
use std::time::Duration;

// Default parameters for the retrying mechanism
const MAX_DELAY: Duration = Duration::from_secs(60); // 1 minute
const MAX_TOTAL_DELAY: Duration = Duration::from_secs(10 * 60); // 10 minutes
const INITIAL_DELAY: Duration = Duration::from_secs(1); // 1 second

pub fn default() -> impl Iterator<Item = Duration> {
clamp_total(Fibonacci::from(INITIAL_DELAY), MAX_DELAY, MAX_TOTAL_DELAY)
}
}

use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination};
use crate::util::retry::RetryResult;
Expand Down Expand Up @@ -328,125 +343,149 @@ impl Channel {
}
}

/// Executes the channel handshake protocol (ICS004)
fn handshake(&mut self) -> Result<(), ChannelError> {
let done = '🥳';
fn do_chan_open_init_and_send(&mut self) -> Result<(), ChannelError> {
let event = self
.flipped()
.build_chan_open_init_and_send()
.map_err(|e| {
error!("Failed ChanInit {:?}: {:?}", self.a_side, e);
e
})?;

let a_chain = self.src_chain().clone();
let b_chain = self.dst_chain().clone();

// Try chanOpenInit on a_chain
let mut counter = 0;
let mut init_success = false;
while counter < MAX_RETRIES {
counter += 1;
match self.flipped().build_chan_open_init_and_send() {
Err(e) => {
error!("Failed ChanInit {:?}: {:?}", self.a_side, e);
continue;
}
Ok(event) => {
self.a_side.channel_id = Some(extract_channel_id(&event)?.clone());
println!("{} {} => {:#?}\n", done, a_chain.id(), event);
init_success = true;
break;
}
}
}
info!("done {} => {:#?}\n", self.src_chain().id(), event);

// Check that the channel was created on a_chain
if !init_success {
return Err(ChannelError::Failed(format!(
let channel_id = extract_channel_id(&event)?;
self.a_side.channel_id = Some(channel_id.clone());
info!("successfully opened init channel");

Ok(())
}

// Check that the channel was created on a_chain
fn do_chan_open_init_and_send_with_retry(&mut self) -> Result<(), ChannelError> {
retry_with_index(retry_strategy::default(), |_| {
self.do_chan_open_init_and_send()
})
.map_err(|err| {
error!("failed to open channel after {} retries", err);
ChannelError::Failed(format!(
"Failed to finish channel open init in {} iterations for {:?}",
MAX_RETRIES, self
)));
};
retry_count(&err),
self
))
})?;

// Try chanOpenTry on b_chain
counter = 0;
let mut try_success = false;
while counter < MAX_RETRIES {
counter += 1;
match self.build_chan_open_try_and_send() {
Err(e) => {
error!("Failed ChanTry {:?}: {:?}", self.b_side, e);
continue;
}
Ok(event) => {
self.b_side.channel_id = Some(extract_channel_id(&event)?.clone());
println!("{} {} => {:#?}\n", done, b_chain.id(), event);
try_success = true;
break;
}
}
}
Ok(())
}

if !try_success {
return Err(ChannelError::Failed(format!(
fn do_chan_open_try_and_send(&mut self) -> Result<(), ChannelError> {
let event = self.build_chan_open_try_and_send().map_err(|e| {
error!("Failed ChanTry {:?}: {:?}", self.b_side, e);
e
})?;

let channel_id = extract_channel_id(&event)?;
self.b_side.channel_id = Some(channel_id.clone());

println!("done {} => {:#?}\n", self.dst_chain().id(), event);
Ok(())
}

fn do_chan_open_try_and_send_with_retry(&mut self) -> Result<(), ChannelError> {
retry_with_index(retry_strategy::default(), |_| {
self.do_chan_open_try_and_send()
})
.map_err(|err| {
error!("failed to open channel after {} retries", err);
ChannelError::Failed(format!(
"Failed to finish channel open try in {} iterations for {:?}",
MAX_RETRIES, self
)));
};
retry_count(&err),
self
))
})?;

Ok(())
}

fn do_channel_handshake(&mut self) -> Result<(), ChannelError> {
let src_channel_id = self
.src_channel_id()
.ok_or(ChannelError::MissingLocalChannelId)?;

let dst_channel_id = self
.dst_channel_id()
.ok_or(ChannelError::MissingCounterpartyChannelId)?;

counter = 0;
while counter < MAX_RETRIES {
counter += 1;

let src_channel_id = self
.src_channel_id()
.ok_or(ChannelError::MissingLocalChannelId)?;
let dst_channel_id = self
.dst_channel_id()
.ok_or(ChannelError::MissingCounterpartyChannelId)?;
// Continue loop if query error
let a_channel =
a_chain.query_channel(&self.src_port_id(), &src_channel_id, Height::zero());
if a_channel.is_err() {
continue;
// Continue loop if query error
let a_channel = self
.src_chain()
.query_channel(&self.src_port_id(), src_channel_id, Height::zero())
.map_err(|_| ChannelError::Failed("Failed to query source chain".into()))?;

let b_channel = self
.dst_chain()
.query_channel(&self.dst_port_id(), dst_channel_id, Height::zero())
.map_err(|_| ChannelError::Failed("Failed to query destination chain".into()))?;

match (a_channel.state(), b_channel.state()) {
(State::Init, State::TryOpen) | (State::TryOpen, State::TryOpen) => {
let event = self.flipped().build_chan_open_ack_and_send().map_err(|e| {
error!("Failed ChanAck {:?}: {}", self.a_side, e);
e
})?;

info!("done {} => {:#?}\n", self.src_chain().id(), event);
}
let b_channel =
b_chain.query_channel(&self.dst_port_id(), &dst_channel_id, Height::zero());
if b_channel.is_err() {
continue;
(State::Open, State::TryOpen) => {
let event = self.build_chan_open_confirm_and_send().map_err(|e| {
error!("Failed ChanConfirm {:?}: {}", self.b_side, e);
e
})?;

info!("done {} => {:#?}\n", self.dst_chain().id(), event);
}
(State::TryOpen, State::Open) => {
// Confirm to a_chain
let event = self
.flipped()
.build_chan_open_confirm_and_send()
.map_err(|e| {
error!("Failed ChanConfirm {:?}: {}", self.a_side, e);
e
})?;

match (a_channel.unwrap().state(), b_channel.unwrap().state()) {
(State::Init, State::TryOpen) | (State::TryOpen, State::TryOpen) => {
// Ack to a_chain
match self.flipped().build_chan_open_ack_and_send() {
Err(e) => error!("Failed ChanAck {:?}: {}", self.a_side, e),
Ok(event) => println!("{} {} => {:#?}\n", done, a_chain.id(), event),
}
}
(State::Open, State::TryOpen) => {
// Confirm to b_chain
match self.build_chan_open_confirm_and_send() {
Err(e) => error!("Failed ChanConfirm {:?}: {}", self.b_side, e),
Ok(event) => println!("{} {} => {:#?}\n", done, b_chain.id(), event),
}
}
(State::TryOpen, State::Open) => {
// Confirm to a_chain
match self.flipped().build_chan_open_confirm_and_send() {
Err(e) => error!("Failed ChanConfirm {:?}: {}", self.a_side, e),
Ok(event) => println!("{} {} => {:#?}\n", done, a_chain.id(), event),
}
}
(State::Open, State::Open) => {
println!(
"{} {} {} Channel handshake finished for {:#?}\n",
done, done, done, self
);
return Ok(());
}
_ => {} // TODO channel close
info!("done {} => {:#?}\n", self.src_chain().id(), event)
}
(State::Open, State::Open) => {
info!("Channel handshake finished for {:#?}\n", self);
}
_ => { /* TODO channel close */ }
}

Err(ChannelError::Failed(format!(
"Failed to finish channel handshake in {} iterations for {:?}",
MAX_RETRIES, self
)))
info!("successfully opened channel");
Ok(())
}

fn do_channel_handshake_with_retry(&mut self) -> Result<(), ChannelError> {
retry_with_index(retry_strategy::default(), |_| self.do_channel_handshake()).map_err(
|err| {
error!("failed to open channel after {} retries", err);
ChannelError::Failed(format!(
"Failed to finish channel handshake in {} iterations for {:?}",
retry_count(&err),
self
))
},
)?;

Ok(())
}

/// Executes the channel handshake protocol (ICS004)
fn handshake(&mut self) -> Result<(), ChannelError> {
self.do_chan_open_init_and_send_with_retry()?;
self.do_chan_open_try_and_send_with_retry()?;
self.do_channel_handshake_with_retry()
}

pub fn counterparty_state(&self) -> Result<State, ChannelError> {
Expand Down
10 changes: 5 additions & 5 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tendermint_rpc::{
use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId};

use crate::util::{
retry::{retry_with_index, RetryResult},
retry::{retry_count, retry_with_index, RetryResult},
stream::group_while,
};

Expand Down Expand Up @@ -259,17 +259,17 @@ impl EventMonitor {
/// See the [`retry`](https://docs.rs/retry) crate and the
/// [`crate::util::retry`] module for more information.
fn restart(&mut self) {
let result = retry_with_index(retry_strategy::default(), |index| {
let result = retry_with_index(retry_strategy::default(), |_| {
// Try to reconnect
if let Err(e) = self.try_reconnect() {
trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e);
return RetryResult::Retry(index);
return RetryResult::Retry(());
}

// Try to resubscribe
if let Err(e) = self.try_resubscribe() {
trace!(chain.id = %self.chain_id, "error when reconnecting: {}", e);
return RetryResult::Retry(index);
return RetryResult::Retry(());
}

RetryResult::Ok(())
Expand All @@ -284,7 +284,7 @@ impl EventMonitor {
Err(retries) => error!(
chain.id = %self.chain_id,
"failed to reconnect to {} after {} retries",
self.node_addr, retries
self.node_addr, retry_count(&retries)
),
}
}
Expand Down
13 changes: 12 additions & 1 deletion relayer/src/util/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@ use std::time::Duration;

pub use retry::{
delay::{Fibonacci, Fixed},
retry_with_index, OperationResult as RetryResult,
retry_with_index, Error as RetryError, OperationResult as RetryResult,
};

pub fn retry_count<E>(err: &RetryError<E>) -> u64 {
match err {
RetryError::Operation {
tries,
total_delay: _,
error: _,
} => *tries,
_ => 0,
}
}

#[derive(Copy, Clone, Debug)]
pub struct ConstantGrowth {
delay: Duration,
Expand Down

0 comments on commit c375c71

Please sign in to comment.