-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Listen to chain events and update twin on requests #194
Conversation
still missing reconnecting to chain urls for rpc errors in a round robin way |
Connection error handling was added and tested with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general this looks good but i have really few comments below.
proto/types.proto
Outdated
@@ -71,4 +71,6 @@ message Envelope { | |||
bytes plain = 13; | |||
bytes cipher = 14; | |||
} | |||
|
|||
optional string relays = 17; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this is just a single string, not a list of strings? it's better to send it as a []string imho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we gonna re-introduced this again why not reuse the old federation
field?
is that to keep compatibility with old peers that may interrupt it differently?
Also it is expected to have some seconds delay after switching the relay before it can receive messages.
it won't be bad as DNS propagation time which can take anywhere from a few hours up to 48 hours to propagate a new domain name for a website worldwide :D
so even if we didn't go with this path (adding the relays filed) we still just fine, how often node would switch its relays given that we are storing on-chain multiple relays per twin? is that even supported in zos nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is currently used to set the relay of twin destination in rmb-sdk-go
here. so it would break compatibility with older versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also for the delay part, for clients that are using this new feature there should be no delay as cache will be updated with the requests made. for clients that are not using this field and changing relays there should be a delay up to 6 seconds until new blocks are produced.
This is already an improvement to the currently used relay warmer that adds a delay up to 10 minutes and previously before warmer it could take up to 60 seconds.
src/bins/rmb-relay.rs
Outdated
|
||
let mut l = events::Listener::new(args.substrate, redis_cache).await?; | ||
tokio::spawn(async move { | ||
if let Err(e) = l.listen().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see a problem here. If the listener failed for any reason (and returned an error here) It's not gonna be good for the system that the relay keeps running otherwise it will rely on out-dated date forever.
So what we need to do is:
- Events listener failure is fatal and the entire system should then exit with an error
- Event system never fails, it need to retry forever
- If failure is persisted for sometime relay then should return errors to sender that they can't grantee deliver to destinations
I checked the listener code and it seems there are many points where the listener can return an error. for example cache.flush() but what if redis was temporary unavailable or restarting for some reason. This will cause the system to continue working but with a cache that will never be updated.
IMHO the best approach is to make the listener infailable. This can be accomplished but basically never give up on errors
src/events/events.rs
Outdated
C: Cache<Twin> + Clone, | ||
{ | ||
pub async fn new(substrate_urls: Vec<String>, cache: C) -> Result<Self> { | ||
let mut urls = LinkedList::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sure you can create a linked list directly from a Vec
will probably be something like
LinkedList::from_iter(substrate_urls)
just saying
src/events/events.rs
Outdated
while let Some(block) = blocks_sub.next().await { | ||
let events = block?.events().await?; | ||
for evt in events.iter() { | ||
let evt = evt?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do u assume this is a connection error here. Although it can be encoding error (most probably) this mean that you are flushing the entire db, and reconnecting if you receive an bad event type
I am wondering if we better log this error and continue instead ? what do you think
src/events/mod.rs
Outdated
@@ -0,0 +1,3 @@ | |||
mod events; | |||
|
|||
pub use events::Listener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the warning from clippy
i think since you don't need to use submodule instead just move the Listener to mod.rs
directly
src/relay/api.rs
Outdated
.await | ||
.map_err(|err| HttpError::FailedToGetTwin(err.to_string()))? | ||
.ok_or_else(|| HttpError::TwinNotFound(envelope.source.twin))?; | ||
let envelope_relays = match RelayDomains::from_str(relays) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check comments ont he proto buf schema
src/relay/api.rs
Outdated
@@ -309,6 +331,25 @@ impl<M: Metrics, D: TwinDB> Stream<M, D> { | |||
.await? | |||
.ok_or_else(|| anyhow::Error::msg("unknown twin destination"))?; | |||
|
|||
if let Some(relays) = &envelope.relays { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the same code is repeating here and in federation as well. I mean this code is in the same file so I am sure you can create a function that u can reuse
441e730
to
319af11
Compare
Description
This PR utilize listening to chain events to update relay cache with the latest twin information. For
TwinStored
andTwinUpdated
events, the event listener get twin information from the event and update local cache. For twins that are neither update nor created after starting events listener they will be fetched from the chain for the first time a request is made but will remain in cache forever until aTwinUpdated
is emitted then its data will be updated too.The time difference between clients calling
update_twin
and the blocks finalizing led to response failures for up to 6 seconds. So, to mitigate this delay a new optionalrelays
field was introduced to Envelope type. If set it will be considered the source of truth for which relays the twin is using and relays will update local cache based on that (even for federating messages).This removes all delay for twin updates (if using new field in envelope) and the need for cache warmer.
Issues