-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc: handle ping messages in WebSocket listener #458
Conversation
Closes #311. Adds responding to WebSockets ping messages during the attempt to fetch the next event from the event stream. I debated whether or not to wrap the fetching of the next message in a finite or an infinite loop, but wrapping it in a finite loop would just push that same responsibility further up the stack. Using this approach allows us to do away with the optionality of a `ResultEvent` and simplifies the method signature. Also, I added in a `close` method for the listener to allow one to gracefully close the connection (avoids those ugly `websocket: close 1006 (abnormal closure): unexpected EOF` messages in the Tendermint logs). Signed-off-by: Thane Thomson <[email protected]>
Signed-off-by: Thane Thomson <[email protected]>
} | ||
Message::Ping(_) => { | ||
self.socket | ||
.send(Message::Pong(Vec::new())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you try that against a running tendermint node? Not sure any of these will be fired in the integration test below 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did run it against a Tendermint node, and the pings did come through (I used dbg!
statements to show how we send pong responses, but subsequently removed the dbg!
statements to clean things up a little).
We definitely need more extensive testing here, but I'm still debating exactly how to go about doing that. Will definitely be looking into more extensive testing in a future PR.
/// Get the next event from the websocket. Automatically handles websocket | ||
/// protocol details, like responding to ping messages, so it can either | ||
/// produce events or errors. | ||
pub async fn get_event(&mut self) -> Result<ResultEvent, RPCError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be much nicer if the subscribe
would actually return events (a stream of events) for that particular subscription. Then, the even-listener would also run a "infinite" loop where it would capture control messages but it would route only the relevant events to the particular subscriptions.
But that could probably be done in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds like a much more intuitive developer interface. I'll make a note of that to work on that in a future PR 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just came to me now: this sounds like what you've recommended on #313, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with this design is, that it hinges on the caller to continuously drive the websocket event machinery. Likely expected to be run in a loop. Which can lead to subtle bugs, when we imagine the caller not calling get_event
in time and the required Pong
message to keep the connection alive is never fired. For a user to understand why the connection was closed will be hard.
IMHO we need to rework this so the event loop is running continuously and acts more like a stream. We can still build a synchronous API on top of it, if that is desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking care of this @thanethomson 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dope! The structural improvements to the protocol handling are really good.
Left a larger design concern inline. And also think that we should not go forward without at least added functionality to be tested, i.e. close
method. Testing can't be a concern deferred to a follow-up which sits on the backlog. A simple first setup could be to start a client against an event listener and test that ping as well as close work as expected.
match msg { | ||
Message::Text(msg_text) => { | ||
match serde_json::from_str::<WrappedResultEvent>(msg_text.as_str()) { | ||
// if we get an rpc error here, we will bubble it up: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't add much to understand the flow of the program.
/// Get the next event from the websocket. Automatically handles websocket | ||
/// protocol details, like responding to ping messages, so it can either | ||
/// produce events or errors. | ||
pub async fn get_event(&mut self) -> Result<ResultEvent, RPCError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with this design is, that it hinges on the caller to continuously drive the websocket event machinery. Likely expected to be run in a loop. Which can lead to subtle bugs, when we imagine the caller not calling get_event
in time and the required Pong
message to keep the connection alive is never fired. For a user to understand why the connection was closed will be hard.
IMHO we need to rework this so the event loop is running continuously and acts more like a stream. We can still build a synchronous API on top of it, if that is desired.
if let Ok(result_event) = serde_json::from_str::<WrappedResultEvent>(&msg.to_string()) { | ||
// if we get an rpc error here, we will bubble it up: | ||
return Ok(Some(result_event.into_result()?)); | ||
match msg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This match block could benefit from the different arms be in their own methods. Not a huge thing tho.
Co-authored-by: Alexander Simmerl <[email protected]>
Co-authored-by: Alexander Simmerl <[email protected]>
Totally agree on the need to restructure the design here 👍 Since nothing else in this codebase is using the I'll start working on an ADR today! |
Closes #311.
Adds responding to WebSockets ping messages during the attempt to fetch the next event from the event stream.
I debated whether or not to wrap the fetching of the next message in a finite or an infinite loop, but wrapping it in a finite loop would just push that same responsibility further up the stack. Using this approach allows us to do away with the optionality of a
ResultEvent
and simplifies the method signature.Also, I added in a
close
method for the listener to allow one to gracefully close the connection (avoids those uglywebsocket: close 1006 (abnormal closure): unexpected EOF
messages in the Tendermint logs).