Skip to content

Commit

Permalink
Add retry connections to chain
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrahmanElawady committed Mar 24, 2024
1 parent fae3a21 commit 602adca
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 13 deletions.
4 changes: 1 addition & 3 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ async fn app(args: Args) -> Result<()> {

let redis_cache = RedisCache::new(pool.clone(), "twin");

redis_cache.flush().await?;

let twins = SubstrateTwinDB::<RedisCache>::new(args.substrate.clone(), redis_cache.clone())
.await
.context("cannot create substrate twin db object")?;
Expand Down Expand Up @@ -176,7 +174,7 @@ async fn app(args: Args) -> Result<()> {
.await
.unwrap();

let l = events::Listener::new(args.substrate[0].as_str(), redis_cache).await?;
let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
if let Err(e) = l.listen().await {
log::error!("failed to listen to events: {:#}", e);
Expand Down
6 changes: 6 additions & 0 deletions src/cache/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ where
Some(v) => Ok(Some(v.clone())),
}
}
async fn flush(&self) -> Result<()> {
let mut mem = self.mem.write().await;
mem.clear();

Ok(())
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::marker::{Send, Sync};
pub trait Cache<T>: Send + Sync + 'static {
async fn set<S: ToString + Send + Sync>(&self, id: S, obj: T) -> Result<()>;
async fn get<S: ToString + Send + Sync>(&self, id: S) -> Result<Option<T>>;
async fn flush(&self) -> Result<()>;
}

#[async_trait]
Expand All @@ -31,6 +32,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
match self {
Some(cache) => cache.flush().await,
None => Ok(()),
}
}
}

#[derive(Clone, Copy)]
Expand All @@ -47,4 +54,7 @@ where
async fn get<S: ToString + Send + Sync>(&self, _id: S) -> Result<Option<T>> {
Ok(None)
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
12 changes: 6 additions & 6 deletions src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ impl RedisCache {

Ok(conn)
}
pub async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -83,6 +77,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[cfg(test)]
Expand Down
70 changes: 66 additions & 4 deletions src/events/events.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::LinkedList;

use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
Expand All @@ -11,17 +13,77 @@ where
{
cache: C,
api: OnlineClient<PolkadotConfig>,
substrate_urls: LinkedList<String>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(url: &str, cache: C) -> Result<Self> {
let api = OnlineClient::<PolkadotConfig>::from_url(url).await?;
Ok(Listener { api, cache })
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> {
let mut urls = LinkedList::new();
for url in substrate_urls {
urls.push_back(url);
}

let api = Self::connect(&mut urls).await?;

cache.flush().await?;
Ok(Listener {
api,
cache,
substrate_urls: urls,
})
}

async fn connect(urls: &mut LinkedList<String>) -> Result<OnlineClient<PolkadotConfig>> {
let trials = urls.len() * 2;
for _ in 0..trials {
let url = match urls.front() {
Some(url) => url,
None => anyhow::bail!("substrate urls list is empty"),
};

match OnlineClient::<PolkadotConfig>::from_url(url).await {
Ok(client) => return Ok(client),
Err(err) => {
log::error!(
"failed to create substrate client with url \"{}\": {}",
url,
err
);
}
}

if let Some(front) = urls.pop_front() {
urls.push_back(front);
}
}

anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
self.cache.flush().await?;
match self.handle_events().await {
Err(err) => {
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
} else {
return Err(err);
}
}
Ok(_) => {
// reconnect here too?
self.api = Self::connect(&mut self.substrate_urls).await?;
}
}
}
}
pub async fn listen(&self) -> Result<()> {

async fn handle_events(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
Expand Down

0 comments on commit 602adca

Please sign in to comment.