Skip to content

Commit

Permalink
Make listener not fail on decode errors and db errors
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelrahmanElawady committed Mar 25, 2024
1 parent 602adca commit d164d7a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 153 deletions.
2 changes: 1 addition & 1 deletion proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ message Envelope {
bytes cipher = 14;
}

optional string relays = 17;
repeated string relays = 17;
}
7 changes: 4 additions & 3 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@ async fn app(args: Args) -> Result<()> {

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
if let Err(e) = l.listen().await {
log::error!("failed to listen to events: {:#}", e);
}
l.listen()
.await
.context("failed to listen to chain events")
.unwrap();
});

r.start(&args.listen).await.unwrap();
Expand Down
105 changes: 0 additions & 105 deletions src/events/events.rs

This file was deleted.

110 changes: 108 additions & 2 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,109 @@
mod events;
use std::{collections::LinkedList, time::Duration};

pub use events::Listener;
use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};

#[derive(Clone)]
pub struct Listener<C>
where
C: Cache<Twin>,
{
cache: C,
api: OnlineClient<PolkadotConfig>,
substrate_urls: LinkedList<String>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> {
let mut urls = LinkedList::from_iter(substrate_urls);

let api = Self::connect(&mut urls).await?;

cache.flush().await?;
Ok(Listener {
api,
cache,
substrate_urls: urls,
})
}

async fn connect(urls: &mut LinkedList<String>) -> Result<OnlineClient<PolkadotConfig>> {
let trials = urls.len() * 2;
for _ in 0..trials {
let url = match urls.front() {
Some(url) => url,
None => anyhow::bail!("substrate urls list is empty"),
};

match OnlineClient::<PolkadotConfig>::from_url(url).await {
Ok(client) => return Ok(client),
Err(err) => {
log::error!(
"failed to create substrate client with url \"{}\": {}",
url,
err
);
}
}

if let Some(front) = urls.pop_front() {
urls.push_back(front);
}
}

anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
match self.handle_events().await {

Check warning on line 71 in src/events/mod.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

you seem to be trying to use `match` for destructuring a single pattern. Consider using `if let`
Err(err) => {
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
} else {
log::error!("error listening to events {}", err);
}
}
Ok(_) => {}
}
}
}

async fn handle_events(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = match evt {
Err(err) => {
log::error!("failed to decode event {}", err);
continue;
}
Ok(e) => e,
};
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}
}
Ok(())
}
}
61 changes: 21 additions & 40 deletions src/relay/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use prometheus::TextEncoder;
use protobuf::Message as ProtoMessage;
use std::fmt::Display;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -204,27 +203,9 @@ async fn federation<D: TwinDB, R: RateLimiter>(
let envelope =
Envelope::parse_from_bytes(&body).map_err(|err| HttpError::BadRequest(err.to_string()))?;

if let Some(relays) = &envelope.relays {
let mut twin = data
.twins
.get_twin(envelope.source.twin)
.await
.map_err(|err| HttpError::FailedToGetTwin(err.to_string()))?
.ok_or_else(|| HttpError::TwinNotFound(envelope.source.twin))?;
let envelope_relays = match RelayDomains::from_str(relays) {
Ok(r) => r,
Err(_) => return Err(HttpError::BadRequest("invalid relays".to_string())),
};
if let Some(twin_relays) = twin.relay {
if twin_relays != envelope_relays {
twin.relay = Some(envelope_relays);
data.twins
.set_twin(twin)
.await
.map_err(|err| HttpError::FailedToSetTwin(err.to_string()))?;
}
}
}
update_cache_relays(&envelope, &data.twins)
.await
.map_err(|err| HttpError::FailedToSetTwin(err.to_string()))?;
let dst: StreamID = (&envelope.destination).into();
data.switch.send(&dst, &body).await?;

Expand All @@ -234,6 +215,23 @@ async fn federation<D: TwinDB, R: RateLimiter>(
.map_err(HttpError::Http)
}

async fn update_cache_relays(envelope: &Envelope, twin_db: &impl TwinDB) -> Result<()> {
if envelope.relays.len() == 0 {

Check warning on line 219 in src/relay/api.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

length comparison to zero
return Ok(());
}
let twin = twin_db
.get_twin(envelope.source.twin)
.await?
.ok_or_else(|| anyhow::Error::msg("unknown twin source"))?;
let envelope_relays = RelayDomains::new(&envelope.relays);
if let Some(twin_relays) = twin.relay.clone() {
if twin_relays != envelope_relays {
twin_db.set_twin(twin).await?;
}
}
Ok(())
}

type Writer = SplitSink<WebSocketStream<Upgraded>, Message>;

pub(crate) struct RelayHook {
Expand Down Expand Up @@ -331,24 +329,7 @@ impl<M: Metrics, D: TwinDB> Stream<M, D> {
.await?
.ok_or_else(|| anyhow::Error::msg("unknown twin destination"))?;

if let Some(relays) = &envelope.relays {
let mut twin = self
.twins
.get_twin(envelope.source.twin)
.await?
.ok_or_else(|| anyhow::Error::msg("unknown twin source"))?;

let envelope_relays = match RelayDomains::from_str(relays) {
Ok(r) => r,
Err(_) => anyhow::bail!("invalid relays"),
};
if let Some(twin_relays) = twin.relay {
if twin_relays != envelope_relays {
twin.relay = Some(envelope_relays);
self.twins.set_twin(twin.clone()).await?;
}
}
}
update_cache_relays(&envelope, &self.twins).await?;

Check warning on line 332 in src/relay/api.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

this expression creates a reference which is immediately dereferenced by the compiler

if !twin
.relay
Expand Down
4 changes: 2 additions & 2 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ impl Challengeable for Envelope {
hash.write_all(data)?;
}
}
if let Some(ref relays) = self.relays {
write!(hash, "{}", relays)?;
for relay in &self.relays {
write!(hash, "{}", relay)?;
}

Ok(())
Expand Down

0 comments on commit d164d7a

Please sign in to comment.