diff --git a/docs/architecture/adr-008-event-subscription.md b/docs/architecture/adr-008-event-subscription.md new file mode 100644 index 000000000..40e391d3d --- /dev/null +++ b/docs/architecture/adr-008-event-subscription.md @@ -0,0 +1,410 @@ +# ADR 008: RPC Client Event Subscription Mechanism + +## Changelog + +* 2020-07-23: Initial draft + +## Context + +The [Tendermint Light Client](../../light-client/) is one of many important +applications that will make use of the [RPC client](../../rpc/) to query +Tendermint full nodes for information relating to the blockchain. + +Tendermint servers (e.g. full nodes) provide an [event +subscription][tm-event-subs] RPC endpoint to allow clients to receive +notifications of specific events as they happen (currently via a WebSockets +connection). We need to expose this subscription mechanism through the RPC client. + +In order to achieve this, we need: + +1. An ergonomic interface for the RPC client that allows developers to subscribe + to events produced by specific queries. Specifically, this interface must: + 1. Offer **subscription** functionality, where: + 1. A single **subscription** takes place to the **events** produced by a + **query** (the [PEG] for which, at the time of this writing, is located + [here][query-peg]). + 2. Callers must be able to create multiple distinct subscriptions. + 3. A subscription must offer an interface to allow for iteration over + incoming events only relevant to that subscription (i.e. it should + produce an **event stream**). + 4. It must be possible to, from outside of the RPC client, receive events + from multiple subscriptions' event streams concurrently without + interfering with/blocking each other. + 5. It must be possible to, from outside of the RPC client, handle + subscriptions' event streams without blocking other RPC operations. + 2. Offer the ability to **unsubscribe** from specific queries (i.e. to + terminate specific subscriptions). +2. An appropriate concurrency model for drivers of the transport layer that + allows the transport layer to operate independently of consumers of + subscriptions. This is so that consumers don't block transport layer + activities and vice-versa. + +## Decision + +### Assumptions + +* All blocking operations that deal with I/O must be `async`. +* We will not be ["de-asyncifying" the RPC][issue-318] and will rather, in a + future ADR, propose a synchronous architecture as well should we need one. + +### Proposed Entities and Relationships + +The entities in the diagram below are described in the following subsections. + +![](assets/rpc-client-erd.png) + +### `Event` + +In terms of the subscription interface, this is ultimately what the end user is +most interested in obtaining. The `Event` type's structure is dictated by the +Tendermint RPC: + +```rust +pub struct Event { + /// The query that produced the event. + pub query: String, + /// The data associated with the event (determines its `EventType`). + pub data: EventData, + /// Event type and attributes map. + pub events: Option>>, +} + +pub enum EventData { + NewBlock { + block: Option, + result_begin_block: Option, + result_end_block: Option, + }, + Tx { + tx_result: TxResult, + }, + // ... +} +``` + +### `Subscription` + +A `Subscription` here is envisaged as an entity that implements the +[Stream][futures-stream] trait, allowing its owner to asynchronously iterate +through all of the relevant incoming events. Use of such a subscription should +be as simple as: + +```rust +while let Some(result_event) = subscription.next().await { + match result_event { + Ok(event) => { /* handle event */ }, + Err(e) => { /* terminate subscription and report error */ }, + } +} + +// Terminate the subscription (i.e. unsubscribe and consume it). +// Since a `Subscription` could be moved to and consumed in any asynchronous +// context (and a distinct context to the original client entity that created +// it), it would make sense that **unsubscribing** should be accessible from +// that same context. +subscription.terminate().await.unwrap(); +``` + +Once [asynchronous destructors][async-drop] are available in Rust, the +`terminate` method should no longer be necessary. + +For efficient routing of events to `Subscription`s, each `Subscription` should +have some kind of unique identifier associated with it (a `SubscriptionId`). +Each `Subscription` relates only to a single [`Query`](#query). Therefore, its +publicly accessible fields may resemble the following: + +```rust +pub struct Subscription { + pub id: SubscriptionId, + pub query: Query, + // ... other fields to help facilitate inter-task comms ... +} +``` + +#### Buffering of Events + +Since the rate at which events could be produced by the remote RPC endpoint may +differ from the rate at which the client process them, we need to buffer +incoming events in a `Subscription`. + +Under the hood, a `Subscription` is envisaged to make use of some kind of +**unbounded channel** to buffer incoming `Event`s, such as that provided by +[Tokio's `mpsc`][tokio-mpsc]. We don't propose the use of bounded channels yet +since they complicate the concurrency model significantly: we would need to +cater for cases where we encounter full channels and provide for conventional or +application-specific ways of dealing with those full channels (e.g. back-off, or +back-pressure). + +#### Managing Multiple Simultaneous Subscriptions + +There may come instances where clients would want to initiate multiple +subscriptions to different event types and consume them all from the same +context. Since the `Subscription` struct implements the +[`Stream`][futures-stream] trait, all of the [stream-related +functionality][futures-stream-mod] should enhance the ergonomics of working with +`Subscription`s. + +For example, if you wanted to iterate through two subscriptions at the same +time, processing events in the order in which they are received by the client: + +```rust +use futures::stream::select_all; + +// `subs1` and `subs2` are `Subscription`s: +while let Some(res) = select_all(vec![subs1, subs2]).next().await { + match res { + Ok(event) => { /* handle event */ }, + Err(e) => { /* handle error */ }, + } +} +``` + +### Client Model + +Users of the Tendermint RPC library may or may not want access to subscription +functionality. Since such functionality comes with additional overhead in terms +of resource usage and asynchronous task management, it would be optimal to +provide two separate client traits: one that only implements non-subscription +functionality, and one that only implements subscription functionality (where +clients could either implement one or both traits). + +The interfaces of the two types of clients are envisaged as follows. + +#### `Client` + +This type of client would allow for interaction with all RPC endpoints except +those pertaining to subscription management. In our current implementation, this +client would only interact via the HTTP RPC endpoints (the `HttpClient` in the +entity diagram above). + +**Note**: All `async` traits are facilitated by the use of [async-trait]. + +```rust +pub type Result = std::result::Result; + +#[async_trait] +pub trait Client: ClosableClient { + /// `/abci_info`: get information about the ABCI application. + async fn abci_info(&self) -> Result; + + /// `/abci_query`: query the ABCI application + async fn abci_query( + &self, + path: Option, + data: V, + height: Option, + prove: bool, + ) -> Result + where + V: Into> + Send; + + /// ... + + /// Perform a general request against the RPC endpoint + async fn perform(&self, request: R) -> Result + where + R: Request; +} +``` + +#### `SubscriptionClient` + +A `SubscriptionClient` would be one that only provides access to subscription +functionality. In our current implementation, this client would interact with a +WebSocket connection to provide subscription functionality (the +`WebSocketSubscriptionClient` in the entity diagram above). + +```rust +#[async_trait] +pub trait SubscriptionClient: ClosableClient { + /// `/subscribe`: subscribe to receive events produced by the given query. + async fn subscribe(&mut self, query: String) -> Result; +} +``` + +#### `ClosableClient` + +This is not really a client in and of itself, but a trait that both the `Client` +and `SubscriptionClient` need to implement. The reason for this common trait is +that, depending on the transport layer, both types of clients may need a `close` +method to gracefully terminate the client. An example of this would be when we +implement a `WebSocketClient` that implements both the `Client` and +`SubscriptionClient` traits simultaneously. + +```rust +#[async_trait] +pub trait ClosableClient { + /// Attempt to gracefully terminate the client. + async fn close(self) -> Result<()>; +} +``` + +### Client Implementations + +We envisage 2 distinct client implementations at this point: + +* `HttpClient`, which only implements [`Client`](#client) (over HTTP). +* `WebSocketSubscriptionClient`, which only implements + [`SubscriptionClient`](#subscriptionclient) (over a WebSocket connection). + +#### Handle-Driver Concurrency Model + +Depending on the underlying transport, a client may need a **transport driver** +running in an asynchronous context. As in the example of a WebSocket connection, +the rate at which one interacts with the WebSocket connection may differ to the +rate at which one interacts with `Event`s being produced by a `Subscription`, so +they necessarily need to run concurrently in different contexts. + +Implementation of such a transport driver is transport-specific. Short-lived +request/response interactions (such as HTTP) would not require such a transport +driver, whereas a WebSocket connection would. + +In cases where a driver is necessary, the client implementation would have to +become a **handle** to the driver, facilitating communication with it across +asynchronous tasks. + +### `Query` + +It is proposed that, using a *builder pattern*, we implement a subscription +`Query` interface that implements the full [query PEG][query-peg] provided by +the Go implementation of the RPC client. This would allow for compile-time +validation of queries. + +The desired interface for constructing a query would look as follows: + +```rust +// tm.event='NewBlock' +let query = Query::from(EventType::NewBlock); + +// tm.event='Tx' AND tx.hash='XYZ' +let query = Query::from(EventType::Tx).and_eq("tx.hash", "XYZ"); + +// tm.event='Tx' AND tx.height=5 +let query = Query::from(EventType::Tx).and_eq("tx.height", 5); +``` + +This interface could be implemented along the following lines. + +```rust +// Query would only have constructors that either specified an event type +// (corresponding to a `tm.event='eventtype'` query) or a condition. There must +// be no constructor that allows for construction of an empty query. +pub struct Query { + // A query can only match zero or one type of event. + event_type: Option, + // A query can contain zero or more conditions. + conditions: Vec, +} + +impl From for Query { + fn from(event_type: EventType) -> Self { + Self { + event_type: Some(event_type), + conditions: Vec::new(), + } + } +} + +impl Query { + // An example of a constructor for `Operation::Eq`. + pub fn eq(key: &str, value: impl Into) -> Self { + Self { + event_type: None, + conditions: vec![Condition::new(key, Operation::Eq(value.into()))], + } + } + + // ... + + // Allows for a simple builder pattern. + pub fn and_eq(mut self, key: &str, value: impl Into) -> Self { + self.conditions.push(Condition::new(key, Operation::Eq(value.into()))); + self + } + + // ... +} + +// Derived from https://github.com/tendermint/tendermint/blob/master/types/events.go +pub enum EventType { + NewBlock, + NewBlockHeader, + NewEvidence, + Tx, + ValidatorSetUpdates, +} + +pub struct Condition { + key: String, + op: Operation, +} + +pub enum Operation { + Eq(Operand), + Lt(Operand), + Lte(Operand), + Gt(Operand), + Gte(Operand), + Contains(Operand), + Exists, +} + +// According to https://docs.tendermint.com/master/rpc/#/Websocket/subscribe, +// an operand can be a string, number, date or time. We differentiate here +// between integer and floating point numbers. +// +// It would be most useful to implement `From` traits for each of the different +// operand types to the `Operand` enum, as this would improve ergonomics. +pub enum Operand { + String(String), + Integer(i64), + Float(f64), + Date(chrono::Date), + DateTime(chrono::DateTime), +} +``` + +## Status + +Proposed + +## Consequences + +### Positive + +* Provides relatively intuitive developer ergonomics (`Subscription` iteration + to produce `Event`s). +* Mocking client functionality is relatively easy, allowing for a greater + variety of testing (including simulating transport-level failures). + +### Negative + +* Requires an additional concurrent, potentially long-running `async` task to be + concerned about (partially mitigated by the [handle-driver concurrency + model](#handle-driver-concurrency-model)). + +### Neutral + +None + +## References + +* [\#313](https://github.com/informalsystems/tendermint-rs/issues/313) +* [\#311](https://github.com/informalsystems/tendermint-rs/issues/311) +* [\#458][pr-458] +* [Tendermint RPC subscription endpoint][tm-event-subs] + +[tm-event-subs]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe +[client]: https://github.com/informalsystems/tendermint-rs/blob/06ed36eaf7a74c0357b86d1d7450a2fec52ed6a0/rpc/src/client.rs#L20 +[query-peg]: https://github.com/tendermint/tendermint/blob/98c595312af02037843b8fe74f0ee0625665448e/libs/pubsub/query/query.peg +[tm-go-query]: https://github.com/tendermint/tendermint/blob/98c595312af02037843b8fe74f0ee0625665448e/libs/pubsub/pubsub.go#L64 +[PEG]: https://en.wikipedia.org/wiki/Parsing_expression_grammar +[futures-stream]: https://docs.rs/futures/*/futures/stream/trait.Stream.html +[pr-458]: https://github.com/informalsystems/tendermint-rs/pull/458 +[issue-318]: https://github.com/informalsystems/tendermint-rs/issues/318 +[tokio-sync]: https://docs.rs/tokio/*/tokio/sync/index.html +[async-trait]: https://docs.rs/async-trait/*/async_trait/index.html +[async-drop]: https://internals.rust-lang.org/t/asynchronous-destructors/11127 +[tokio-mpsc]: https://docs.rs/tokio/*/tokio/sync/mpsc/index.html +[futures-stream-mod]: https://docs.rs/futures/*/futures/stream/index.html + diff --git a/docs/architecture/assets/rpc-client-erd.graphml b/docs/architecture/assets/rpc-client-erd.graphml new file mode 100644 index 000000000..b136cd350 --- /dev/null +++ b/docs/architecture/assets/rpc-client-erd.graphml @@ -0,0 +1,424 @@ + + + + + + + + + + + + + + + + + + + + + + + Client + + + + + + + + + + + SubscriptionClient + + + + + + + + + + + HttpClient + + + + + + + + + + + WebSocketSubscriptionClient + + + + + + + + + + + futures::stream::Stream + + + + + + + + + + + Subscription + + + + + + + + + + + Event + + + + + + + + + + + SubscriptionRouter + + + + + + + + + + + WebSocketSubscriptionDriver + + + + + + + + + + + + + + Legend + + + + + + + + + + Folder 1 + + + + + + + + + + + + + + + + Public Concrete +Type + + + + + + + + + + + Trait + + + + + + + + + + + Private Concrete +Type (Implementation +Detail) + + + + + + + + + + + + + Query + + + + + + + + + + + EventType + + + + + + + + + + + Condition + + + + + + + + + + + Operation + + + + + + + + + + + Operand + + + + + + + + + + + SubscriptionId + + + + + + + + + + + Implements + + + + + + + + + + + Implements + + + + + + + + + + + Implements + + + + + + + + + + + + + + + Produces + + + + + + + + + + + Produces + + + + + + + + + + + + + Has/ +Uses + + + + + + + + + + + Has/ +Uses + + + + + + + + + + + Uses + + + + + + + + + + + Has + + + + + + + + + + + + + + + Has + + + + + + + + + + + + + + Has + + + + + + + + + + + Has + + + + + + + + + + + Has + + + + + + + + + + + + + Has + + + + + + + + + diff --git a/docs/architecture/assets/rpc-client-erd.png b/docs/architecture/assets/rpc-client-erd.png new file mode 100644 index 000000000..3bfb5b8c5 Binary files /dev/null and b/docs/architecture/assets/rpc-client-erd.png differ