Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listen to chain events and update twin on requests #194

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ url = "2.3.1"
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
futures-util = "0.3.25"
jwt = "0.16"
subxt = "0.28.0"
subxt = { version = "0.28.0", features = ["substrate-compat"]}
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
itertools = "0.11"

# for static build
Expand Down
3 changes: 3 additions & 0 deletions _tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl TwinDB for InMemoryDB {
) -> anyhow::Result<Option<u32>> {
unimplemented!()
}
async fn set_twin(&self, twin: Twin) -> anyhow::Result<()> {
unimplemented!()
}
}

fn new_message(
Expand Down
Binary file added artifacts/network.scale
Binary file not shown.
2 changes: 2 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ message Envelope {
bytes plain = 13;
bytes cipher = 14;
}

repeated string relays = 17;
}
11 changes: 4 additions & 7 deletions src/bins/rmb-peer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

use anyhow::{Context, Result};
use clap::{builder::ArgAction, Args, Parser};
Expand Down Expand Up @@ -151,12 +150,10 @@ async fn app(args: Params) -> Result<()> {

// cache is a little bit tricky because while it improves performance it
// makes changes to twin data takes at least 5 min before they are detected
let db = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(60)),
)
.await
.context("cannot create substrate twin db object")?;
let db =
SubstrateTwinDB::<RedisCache>::new(args.substrate, RedisCache::new(pool.clone(), "twin"))
.await
.context("cannot create substrate twin db object")?;

let id = db
.get_twin_with_account(signer.account())
Expand Down
23 changes: 15 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use clap::{builder::ArgAction, Parser};
use rmb::cache::RedisCache;
use rmb::events;
use rmb::redis;
use rmb::relay::{
self,
Expand Down Expand Up @@ -142,14 +143,11 @@ async fn app(args: Args) -> Result<()> {
.await
.context("failed to initialize redis pool")?;

// we use 6 hours cache for twin information because twin id will not change anyway
// and we only need twin public key for validation only.
let twins = SubstrateTwinDB::<RedisCache>::new(
args.substrate,
RedisCache::new(pool.clone(), "twin", Duration::from_secs(args.cache * 60)),
)
.await
.context("cannot create substrate twin db object")?;
let redis_cache = RedisCache::new(pool.clone(), "twin");

let twins = SubstrateTwinDB::<RedisCache>::new(args.substrate.clone(), redis_cache.clone())
.await
.context("cannot create substrate twin db object")?;

let max_users = args.workers as usize * args.user_per_worker as usize;
let opt = relay::SwitchOptions::new(pool.clone())
Expand All @@ -175,6 +173,15 @@ async fn app(args: Args) -> Result<()> {
let r = relay::Relay::new(&args.domain, twins, opt, federation, limiter, ranker)
.await
.unwrap();

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
l.listen()
.await
.context("failed to listen to chain events")
.unwrap();
});

r.start(&args.listen).await.unwrap();
Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/cache/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ where
Some(v) => Ok(Some(v.clone())),
}
}
async fn flush(&self) -> Result<()> {
let mut mem = self.mem.write().await;
mem.clear();

Ok(())
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::marker::{Send, Sync};
pub trait Cache<T>: Send + Sync + 'static {
async fn set<S: ToString + Send + Sync>(&self, id: S, obj: T) -> Result<()>;
async fn get<S: ToString + Send + Sync>(&self, id: S) -> Result<Option<T>>;
async fn flush(&self) -> Result<()>;
}

#[async_trait]
Expand All @@ -31,6 +32,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
match self {
Some(cache) => cache.flush().await,
None => Ok(()),
}
}
}

#[derive(Clone, Copy)]
Expand All @@ -47,4 +54,7 @@ where
async fn get<S: ToString + Send + Sync>(&self, _id: S) -> Result<Option<T>> {
Ok(None)
}
async fn flush(&self) -> Result<()> {
Ok(())
}
}
34 changes: 16 additions & 18 deletions src/cache/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::time::Duration;

use super::Cache;

use anyhow::{Context, Result};
Expand All @@ -22,19 +20,13 @@ use serde::{de::DeserializeOwned, Serialize};
pub struct RedisCache {
pool: Pool<RedisConnectionManager>,
prefix: String,
ttl: Duration,
}

impl RedisCache {
pub fn new<P: Into<String>>(
pool: Pool<RedisConnectionManager>,
prefix: P,
ttl: Duration,
) -> Self {
pub fn new<P: Into<String>>(pool: Pool<RedisConnectionManager>, prefix: P) -> Self {
Self {
pool,
prefix: prefix.into(),
ttl,
}
}

Expand All @@ -57,22 +49,23 @@ where
async fn set<S: ToString + Send + Sync>(&self, key: S, obj: T) -> Result<()> {
let mut conn = self.get_connection().await?;
let obj = serde_json::to_vec(&obj).context("unable to serialize twin object for redis")?;
let key = format!("{}.{}", self.prefix, key.to_string());
cmd("SET")
.arg(key)
cmd("HSET")
.arg(&self.prefix)
.arg(key.to_string())
.arg(obj)
.arg("EX")
.arg(self.ttl.as_secs())
.query_async(&mut *conn)
.await?;

Ok(())
}
async fn get<S: ToString + Send + Sync>(&self, key: S) -> Result<Option<T>> {
let mut conn = self.get_connection().await?;
let key = format!("{}.{}", self.prefix, key.to_string());

let ret: Option<Vec<u8>> = cmd("GET").arg(key).query_async(&mut *conn).await?;
let ret: Option<Vec<u8>> = cmd("HGET")
.arg(&self.prefix)
.arg(key.to_string())
.query_async(&mut *conn)
.await?;

match ret {
Some(val) => {
Expand All @@ -84,6 +77,12 @@ where
None => Ok(None),
}
}
async fn flush(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
cmd("DEL").arg(&self.prefix).query_async(&mut *conn).await?;

Ok(())
}
}

#[cfg(test)]
Expand All @@ -93,7 +92,6 @@ mod tests {
use super::*;

const PREFIX: &str = "twin";
const TTL: u64 = 20;

async fn create_redis_cache() -> RedisCache {
let manager = RedisConnectionManager::new("redis://127.0.0.1/")
Expand All @@ -105,7 +103,7 @@ mod tests {
.context("unable to build pool or redis connection manager")
.unwrap();

RedisCache::new(pool, PREFIX, Duration::from_secs(TTL))
RedisCache::new(pool, PREFIX)
}

#[tokio::test]
Expand Down
105 changes: 105 additions & 0 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{collections::LinkedList, time::Duration};

use crate::{cache::Cache, tfchain::tfchain, twin::Twin};
use anyhow::Result;
use futures::StreamExt;
use log;
use subxt::{OnlineClient, PolkadotConfig};

#[derive(Clone)]
pub struct Listener<C>
where
C: Cache<Twin>,
{
cache: C,
api: OnlineClient<PolkadotConfig>,
substrate_urls: LinkedList<String>,
}

impl<C> Listener<C>
where
C: Cache<Twin> + Clone,
{
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> {
let mut urls = LinkedList::from_iter(substrate_urls);

let api = Self::connect(&mut urls).await?;

cache.flush().await?;
Ok(Listener {
api,
cache,
substrate_urls: urls,
})
}

async fn connect(urls: &mut LinkedList<String>) -> Result<OnlineClient<PolkadotConfig>> {
let trials = urls.len() * 2;
for _ in 0..trials {
let url = match urls.front() {
Some(url) => url,
None => anyhow::bail!("substrate urls list is empty"),
};

match OnlineClient::<PolkadotConfig>::from_url(url).await {
Ok(client) => return Ok(client),
Err(err) => {
log::error!(
"failed to create substrate client with url \"{}\": {}",
url,
err
);
}
}

if let Some(front) = urls.pop_front() {
urls.push_back(front);
}
}

anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
log::error!("failed to flush redis cache {}", err);
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
if let Err(err) = self.handle_events().await {
log::error!("error listening to events {}", err);
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
}
}
}

async fn handle_events(&self) -> Result<()> {
log::info!("started chain events listener");
let mut blocks_sub = self.api.blocks().subscribe_finalized().await?;
while let Some(block) = blocks_sub.next().await {
let events = block?.events().await?;
for evt in events.iter() {
let evt = match evt {
Err(err) => {
log::error!("failed to decode event {}", err);
continue;
}
Ok(e) => e,
};
if let Ok(Some(twin)) = evt.as_event::<tfchain::tfgrid_module::events::TwinStored>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
} else if let Ok(Some(twin)) =
evt.as_event::<tfchain::tfgrid_module::events::TwinUpdated>()
{
self.cache.set(twin.0.id, twin.0.into()).await?;
}
}
}
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ extern crate anyhow;
extern crate mime;

pub mod cache;
pub mod events;
pub mod identity;
pub mod peer;
pub mod redis;
pub mod relay;
pub mod tfchain;
pub mod token;
pub mod twin;
pub mod types;
Loading
Loading