Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: handle ping messages in WebSocket listener #458

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 80 additions & 27 deletions rpc/src/client/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
// TODO(ismail): document fields or re-use the abci types
#![allow(missing_docs)]

use async_tungstenite::{tokio::connect_async, tokio::TokioAdapter, tungstenite::Message};
use async_tungstenite::{
tokio::connect_async, tokio::TokioAdapter, tungstenite::protocol::frame::coding::CloseCode,
tungstenite::protocol::CloseFrame, tungstenite::Error as tungsteniteError,
tungstenite::Message,
};
use futures::prelude::*;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::HashMap;
use std::error::Error as stdError;
use tokio::net::TcpStream;
Expand All @@ -25,12 +30,12 @@ use crate::{endpoint::subscribe, Error as RPCError};
pub enum EventSubscription {
/// Subscribe to all transactions
TransactionSubscription,
///Subscribe to all blocks
/// Subscribe to all blocks
BlockSubscription,
}

impl EventSubscription {
///Convert the query enum to a string
/// Convert the query enum to a string
pub fn as_str(&self) -> &str {
match self {
EventSubscription::TransactionSubscription => "tm.event='Tx'",
Expand Down Expand Up @@ -83,33 +88,81 @@ impl EventListener {
Ok(())
}

/// Get the next event from the websocket
pub async fn get_event(&mut self) -> Result<Option<ResultEvent>, RPCError> {
let msg = self
.socket
.next()
.await
.ok_or_else(|| RPCError::websocket_error("web socket closed"))??;
/// Get the next event from the websocket. Automatically handles websocket
/// protocol details, like responding to ping messages, so it can either
/// produce events or errors.
pub async fn get_event(&mut self) -> Result<ResultEvent, RPCError> {
Copy link
Member

@liamsi liamsi Jul 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be much nicer if the subscribe would actually return events (a stream of events) for that particular subscription. Then, the even-listener would also run a "infinite" loop where it would capture control messages but it would route only the relevant events to the particular subscriptions.
But that could probably be done in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds like a much more intuitive developer interface. I'll make a note of that to work on that in a future PR 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just came to me now: this sounds like what you've recommended on #313, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this design is, that it hinges on the caller to continuously drive the websocket event machinery. Likely expected to be run in a loop. Which can lead to subtle bugs, when we imagine the caller not calling get_event in time and the required Pong message to keep the connection alive is never fired. For a user to understand why the connection was closed will be hard.

IMHO we need to rework this so the event loop is running continuously and acts more like a stream. We can still build a synchronous API on top of it, if that is desired.

loop {
let msg = self
.socket
.next()
.await
.ok_or_else(|| RPCError::websocket_error("web socket closed"))??;

if let Ok(result_event) = serde_json::from_str::<WrappedResultEvent>(&msg.to_string()) {
// if we get an rpc error here, we will bubble it up:
return Ok(Some(result_event.into_result()?));
match msg {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This match block could benefit from the different arms be in their own methods. Not a huge thing tho.

Message::Text(msg_text) => {
match serde_json::from_str::<WrappedResultEvent>(msg_text.as_str()) {
// if we get an rpc error here, we will bubble it up:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't add much to understand the flow of the program.

Ok(result_event) => return Ok(result_event.into_result()?),
Err(e) => {
return Err(RPCError::new(
Code::Other(-1),
Some(format!(
"failed to decode incoming message as an event: {}",
e
)),
))
}
}
}
Message::Ping(_) => {
self.socket
.send(Message::Pong(Vec::new()))
Copy link
Member

@liamsi liamsi Jul 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try that against a running tendermint node? Not sure any of these will be fired in the integration test below 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did run it against a Tendermint node, and the pings did come through (I used dbg! statements to show how we send pong responses, but subsequently removed the dbg! statements to clean things up a little).

We definitely need more extensive testing here, but I'm still debating exactly how to go about doing that. Will definitely be looking into more extensive testing in a future PR.

.await
.map_err(|e| {
RPCError::websocket_error(format!(
"failed to send pong response: {}",
e
))
})?
}
Message::Pong(_) => (),
Message::Close(_) => return Err(RPCError::websocket_error("web socket closed")),
Message::Binary(_) => {
thanethomson marked this conversation as resolved.
Show resolved Hide resolved
return Err(RPCError::websocket_error(
"received unexpected binary websocket message",
))
}
}
}
dbg!("We did not receive a valid JSONRPC wrapped ResultEvent!");
if serde_json::from_str::<String>(&msg.to_string()).is_ok() {
// FIXME(ismail): Until this is a proper websocket client
// (or the endpoint moved to grpc in tendermint), we accept whatever was read here
// dbg! it out and return None below.
dbg!("Instead of JSONRPC wrapped ResultEvent, we got:");
dbg!(&msg.to_string());
return Ok(None);
}

/// Attempts to gracefully close the websocket connection and consumes the
/// listener.
pub async fn close(mut self) -> Result<(), RPCError> {
let _ = self
.socket
.close(Some(CloseFrame {
code: CloseCode::Normal,
reason: Cow::from("client closed connection"),
}))
.await
.map_err(|e| {
RPCError::websocket_error(format!("failed to close web socket connection: {}", e))
})?;
// try to gracefully close the connection
thanethomson marked this conversation as resolved.
Show resolved Hide resolved
match self.socket.next().await {
Some(r) => match r {
// we didn't get the connection closed message we wanted, so force connection closure
Ok(_) => Ok(()),
Err(e) => match e {
// this is what we want
thanethomson marked this conversation as resolved.
Show resolved Hide resolved
tungsteniteError::ConnectionClosed | tungsteniteError::AlreadyClosed => Ok(()),
_ => Err(RPCError::websocket_error(e.to_string())),
},
},
None => Ok(()),
}
dbg!("received neither event nor generic string message:");
dbg!(&msg.to_string());
Err(RPCError::new(
Code::Other(-1),
Some("received neither event nor generic string message".to_string()),
))
}
}

Expand Down
11 changes: 6 additions & 5 deletions tendermint/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,10 @@ mod rpc {
// client.subscribe("tm.event='NewBlock'".to_owned()).await.unwrap();

// Loop here is helpful when debugging parsing of JSON events
// loop{
let maybe_result_event = client.get_event().await.unwrap();
dbg!(&maybe_result_event);
// }
let result_event = maybe_result_event.expect("unexpected msg read");
//loop{
//for _ in 0..5 {
let result_event = client.get_event().await.unwrap();
dbg!(&result_event);
match result_event.data {
event_listener::TMEventData::EventDataNewBlock(nb) => {
dbg!("got EventDataNewBlock: {:?}", nb);
Expand All @@ -175,5 +174,7 @@ mod rpc {
panic!("got a GenericJSONEvent: {:?}", v);
}
}
//}
client.close().await.unwrap();
}
}