diff --git a/Cargo.lock b/Cargo.lock index 5172c3d02861d..4a6a3a4ff780c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8775,10 +8775,19 @@ dependencies = [ name = "sc-rpc-spec-v2" version = "0.10.0-dev" dependencies = [ + "futures", "hex", "jsonrpsee", + "parity-scale-codec", "sc-chain-spec", + "sc-transaction-pool-api", + "serde", "serde_json", + "sp-api", + "sp-blockchain", + "sp-core", + "sp-runtime", + "thiserror", "tokio", ] @@ -8830,6 +8839,7 @@ dependencies = [ "sc-offchain", "sc-rpc", "sc-rpc-server", + "sc-rpc-spec-v2", "sc-sysinfo", "sc-telemetry", "sc-tracing", @@ -9050,6 +9060,7 @@ dependencies = [ "futures", "log", "serde", + "serde_json", "sp-blockchain", "sp-runtime", "thiserror", diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 12dec7464e6d0..885d415eb50d2 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -16,7 +16,17 @@ targets = ["x86_64-unknown-linux-gnu"] jsonrpsee = { version = "0.15.1", features = ["server", "macros"] } # Internal chain structures for "chain_spec". sc-chain-spec = { version = "4.0.0-dev", path = "../chain-spec" } +# Pool for submitting extrinsics required by "transaction" +sc-transaction-pool-api = { version = "4.0.0-dev", path = "../transaction-pool/api" } +sp-core = { version = "6.0.0", path = "../../primitives/core" } +sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } +sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } +sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } +codec = { package = "parity-scale-codec", version = "3.0.0" } +thiserror = "1.0" +serde = "1.0" hex = "0.4" +futures = "0.3.21" [dev-dependencies] serde_json = "1.0" diff --git a/client/rpc-spec-v2/src/lib.rs b/client/rpc-spec-v2/src/lib.rs index 297fda13172d6..f4b9d2f95bf97 100644 --- a/client/rpc-spec-v2/src/lib.rs +++ b/client/rpc-spec-v2/src/lib.rs @@ -24,3 +24,7 @@ #![deny(unused_crate_dependencies)] pub mod chain_spec; +pub mod transaction; + +/// Task executor that is being used by RPC subscriptions. +pub type SubscriptionTaskExecutor = std::sync::Arc; diff --git a/client/rpc-spec-v2/src/transaction/api.rs b/client/rpc-spec-v2/src/transaction/api.rs new file mode 100644 index 0000000000000..2f0c799f1cc19 --- /dev/null +++ b/client/rpc-spec-v2/src/transaction/api.rs @@ -0,0 +1,37 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API trait for transactions. + +use crate::transaction::event::TransactionEvent; +use jsonrpsee::proc_macros::rpc; +use sp_core::Bytes; + +#[rpc(client, server)] +pub trait TransactionApi { + /// Submit an extrinsic to watch. + /// + /// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on + /// transaction life cycle. + #[subscription( + name = "transaction_unstable_submitAndWatch" => "transaction_unstable_submitExtrinsic", + unsubscribe = "transaction_unstable_unwatch", + item = TransactionEvent, + )] + fn submit_and_watch(&self, bytes: Bytes); +} diff --git a/client/rpc-spec-v2/src/transaction/error.rs b/client/rpc-spec-v2/src/transaction/error.rs new file mode 100644 index 0000000000000..72a5959992f9e --- /dev/null +++ b/client/rpc-spec-v2/src/transaction/error.rs @@ -0,0 +1,100 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transaction RPC errors. +//! +//! Errors are interpreted as transaction events for subscriptions. + +use crate::transaction::event::{TransactionError, TransactionEvent}; +use sc_transaction_pool_api::error::Error as PoolError; +use sp_runtime::transaction_validity::InvalidTransaction; + +/// Transaction RPC errors. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Transaction pool error. + #[error("Transaction pool error: {}", .0)] + Pool(#[from] PoolError), + /// Verification error. + #[error("Extrinsic verification error: {}", .0)] + Verification(Box), +} + +impl From for TransactionEvent { + fn from(e: Error) -> Self { + match e { + Error::Verification(e) => TransactionEvent::Invalid(TransactionError { + error: format!("Verification error: {}", e), + }), + Error::Pool(PoolError::InvalidTransaction(InvalidTransaction::Custom(e))) => + TransactionEvent::Invalid(TransactionError { + error: format!("Invalid transaction with custom error: {}", e), + }), + Error::Pool(PoolError::InvalidTransaction(e)) => { + let msg: &str = e.into(); + TransactionEvent::Invalid(TransactionError { + error: format!("Invalid transaction: {}", msg), + }) + }, + Error::Pool(PoolError::UnknownTransaction(e)) => { + let msg: &str = e.into(); + TransactionEvent::Invalid(TransactionError { + error: format!("Unknown transaction validity: {}", msg), + }) + }, + Error::Pool(PoolError::TemporarilyBanned) => + TransactionEvent::Invalid(TransactionError { + error: "Transaction is temporarily banned".into(), + }), + Error::Pool(PoolError::AlreadyImported(_)) => + TransactionEvent::Invalid(TransactionError { + error: "Transaction is already imported".into(), + }), + Error::Pool(PoolError::TooLowPriority { old, new }) => + TransactionEvent::Invalid(TransactionError { + error: format!( + "The priority of the transactin is too low (pool {} > current {})", + old, new + ), + }), + Error::Pool(PoolError::CycleDetected) => TransactionEvent::Invalid(TransactionError { + error: "The transaction contains a cyclic dependency".into(), + }), + Error::Pool(PoolError::ImmediatelyDropped) => + TransactionEvent::Invalid(TransactionError { + error: "The transaction could not enter the pool because of the limit".into(), + }), + Error::Pool(PoolError::Unactionable) => TransactionEvent::Invalid(TransactionError { + error: "Transaction cannot be propagated and the local node does not author blocks" + .into(), + }), + Error::Pool(PoolError::NoTagsProvided) => TransactionEvent::Invalid(TransactionError { + error: "Transaction does not provide any tags, so the pool cannot identify it" + .into(), + }), + Error::Pool(PoolError::InvalidBlockId(_)) => + TransactionEvent::Invalid(TransactionError { + error: "The provided block ID is not valid".into(), + }), + Error::Pool(PoolError::RejectedFutureTransaction) => + TransactionEvent::Invalid(TransactionError { + error: "The pool is not accepting future transactions".into(), + }), + } + } +} diff --git a/client/rpc-spec-v2/src/transaction/event.rs b/client/rpc-spec-v2/src/transaction/event.rs new file mode 100644 index 0000000000000..3c75eaff10fd4 --- /dev/null +++ b/client/rpc-spec-v2/src/transaction/event.rs @@ -0,0 +1,353 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! The transaction's event returned as json compatible object. + +use serde::{Deserialize, Serialize}; + +/// The transaction was broadcasted to a number of peers. +/// +/// # Note +/// +/// The RPC does not guarantee that the peers have received the +/// transaction. +/// +/// When the number of peers is zero, the event guarantees that +/// shutting down the local node will lead to the transaction +/// not being included in the chain. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionBroadcasted { + /// The number of peers the transaction was broadcasted to. + #[serde(with = "as_string")] + pub num_peers: usize, +} + +/// The transaction was included in a block of the chain. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionBlock { + /// The hash of the block the transaction was included into. + pub hash: Hash, + /// The index (zero-based) of the transaction within the body of the block. + #[serde(with = "as_string")] + pub index: usize, +} + +/// The transaction could not be processed due to an error. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionError { + /// Reason of the error. + pub error: String, +} + +/// The transaction was dropped because of exceeding limits. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransactionDropped { + /// True if the transaction was broadcasted to other peers and + /// may still be included in the block. + pub broadcasted: bool, + /// Reason of the event. + pub error: String, +} + +/// Possible transaction status events. +/// +/// The status events can be grouped based on their kinds as: +/// +/// 1. Runtime validated the transaction: +/// - `Validated` +/// +/// 2. Inside the `Ready` queue: +/// - `Broadcast` +/// +/// 3. Leaving the pool: +/// - `BestChainBlockIncluded` +/// - `Invalid` +/// +/// 4. Block finalized: +/// - `Finalized` +/// +/// 5. At any time: +/// - `Dropped` +/// - `Error` +/// +/// The subscription's stream is considered finished whenever the following events are +/// received: `Finalized`, `Error`, `Invalid` or `Dropped`. However, the user is allowed +/// to unsubscribe at any moment. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +// We need to manually specify the trait bounds for the `Hash` trait to ensure `into` and +// `from` still work. +#[serde(bound( + serialize = "Hash: Serialize + Clone", + deserialize = "Hash: Deserialize<'de> + Clone" +))] +#[serde(into = "TransactionEventIR", from = "TransactionEventIR")] +pub enum TransactionEvent { + /// The transaction was validated by the runtime. + Validated, + /// The transaction was broadcasted to a number of peers. + Broadcasted(TransactionBroadcasted), + /// The transaction was included in a best block of the chain. + /// + /// # Note + /// + /// This may contain `None` if the block is no longer a best + /// block of the chain. + BestChainBlockIncluded(Option>), + /// The transaction was included in a finalized block. + Finalized(TransactionBlock), + /// The transaction could not be processed due to an error. + Error(TransactionError), + /// The transaction is marked as invalid. + Invalid(TransactionError), + /// The client was not capable of keeping track of this transaction. + Dropped(TransactionDropped), +} + +/// Intermediate representation (IR) for the transaction events +/// that handles block events only. +/// +/// The block events require a JSON compatible interpretation similar to: +/// +/// ```json +/// { event: "EVENT", block: { hash: "0xFF", index: 0 } } +/// ``` +/// +/// This IR is introduced to circumvent that the block events need to +/// be serialized/deserialized with "tag" and "content", while other +/// events only require "tag". +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event", content = "block")] +enum TransactionEventBlockIR { + /// The transaction was included in the best block of the chain. + BestChainBlockIncluded(Option>), + /// The transaction was included in a finalized block of the chain. + Finalized(TransactionBlock), +} + +/// Intermediate representation (IR) for the transaction events +/// that handles non-block events only. +/// +/// The non-block events require a JSON compatible interpretation similar to: +/// +/// ```json +/// { event: "EVENT", num_peers: 0 } +/// ``` +/// +/// This IR is introduced to circumvent that the block events need to +/// be serialized/deserialized with "tag" and "content", while other +/// events only require "tag". +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "event")] +enum TransactionEventNonBlockIR { + Validated, + Broadcasted(TransactionBroadcasted), + Error(TransactionError), + Invalid(TransactionError), + Dropped(TransactionDropped), +} + +/// Intermediate representation (IR) used for serialization/deserialization of the +/// [`TransactionEvent`] in a JSON compatible format. +/// +/// Serde cannot mix `#[serde(tag = "event")]` with `#[serde(tag = "event", content = "block")]` +/// for specific enum variants. Therefore, this IR is introduced to circumvent this +/// restriction, while exposing a simplified [`TransactionEvent`] for users of the +/// rust ecosystem. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(bound(serialize = "Hash: Serialize", deserialize = "Hash: Deserialize<'de>"))] +#[serde(rename_all = "camelCase")] +#[serde(untagged)] +enum TransactionEventIR { + Block(TransactionEventBlockIR), + NonBlock(TransactionEventNonBlockIR), +} + +impl From> for TransactionEventIR { + fn from(value: TransactionEvent) -> Self { + match value { + TransactionEvent::Validated => + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated), + TransactionEvent::Broadcasted(event) => + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted(event)), + TransactionEvent::BestChainBlockIncluded(event) => + TransactionEventIR::Block(TransactionEventBlockIR::BestChainBlockIncluded(event)), + TransactionEvent::Finalized(event) => + TransactionEventIR::Block(TransactionEventBlockIR::Finalized(event)), + TransactionEvent::Error(event) => + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Error(event)), + TransactionEvent::Invalid(event) => + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Invalid(event)), + TransactionEvent::Dropped(event) => + TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Dropped(event)), + } + } +} + +impl From> for TransactionEvent { + fn from(value: TransactionEventIR) -> Self { + match value { + TransactionEventIR::NonBlock(status) => match status { + TransactionEventNonBlockIR::Validated => TransactionEvent::Validated, + TransactionEventNonBlockIR::Broadcasted(event) => + TransactionEvent::Broadcasted(event), + TransactionEventNonBlockIR::Error(event) => TransactionEvent::Error(event), + TransactionEventNonBlockIR::Invalid(event) => TransactionEvent::Invalid(event), + TransactionEventNonBlockIR::Dropped(event) => TransactionEvent::Dropped(event), + }, + TransactionEventIR::Block(block) => match block { + TransactionEventBlockIR::Finalized(event) => TransactionEvent::Finalized(event), + TransactionEventBlockIR::BestChainBlockIncluded(event) => + TransactionEvent::BestChainBlockIncluded(event), + }, + } + } +} + +/// Serialize and deserialize helper as string. +mod as_string { + use super::*; + use serde::{Deserializer, Serializer}; + + pub fn serialize(data: &usize, serializer: S) -> Result { + data.to_string().serialize(serializer) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + String::deserialize(deserializer)? + .parse() + .map_err(|e| serde::de::Error::custom(format!("Parsing failed: {}", e))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sp_core::H256; + + #[test] + fn validated_event() { + let event: TransactionEvent<()> = TransactionEvent::Validated; + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"validated"}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn broadcasted_event() { + let event: TransactionEvent<()> = + TransactionEvent::Broadcasted(TransactionBroadcasted { num_peers: 2 }); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"broadcasted","numPeers":"2"}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn best_chain_event() { + let event: TransactionEvent<()> = TransactionEvent::BestChainBlockIncluded(None); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"bestChainBlockIncluded","block":null}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + + let event: TransactionEvent = + TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { + hash: H256::from_low_u64_be(1), + index: 2, + })); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"bestChainBlockIncluded","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"2"}}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn finalized_event() { + let event: TransactionEvent = TransactionEvent::Finalized(TransactionBlock { + hash: H256::from_low_u64_be(1), + index: 10, + }); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"finalized","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"10"}}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn error_event() { + let event: TransactionEvent<()> = + TransactionEvent::Error(TransactionError { error: "abc".to_string() }); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"error","error":"abc"}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn invalid_event() { + let event: TransactionEvent<()> = + TransactionEvent::Invalid(TransactionError { error: "abc".to_string() }); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"invalid","error":"abc"}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } + + #[test] + fn dropped_event() { + let event: TransactionEvent<()> = TransactionEvent::Dropped(TransactionDropped { + broadcasted: true, + error: "abc".to_string(), + }); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"event":"dropped","broadcasted":true,"error":"abc"}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, event); + } +} diff --git a/client/rpc-spec-v2/src/transaction/mod.rs b/client/rpc-spec-v2/src/transaction/mod.rs new file mode 100644 index 0000000000000..bb983894a428c --- /dev/null +++ b/client/rpc-spec-v2/src/transaction/mod.rs @@ -0,0 +1,38 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Substrate transaction API. +//! +//! The transaction methods allow submitting a transaction and subscribing to +//! its status updates generated by the chain. +//! +//! # Note +//! +//! Methods are prefixed by `transaction`. + +pub mod api; +pub mod error; +pub mod event; +pub mod transaction; + +pub use api::TransactionApiServer; +pub use event::{ + TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError, + TransactionEvent, +}; +pub use transaction::Transaction; diff --git a/client/rpc-spec-v2/src/transaction/transaction.rs b/client/rpc-spec-v2/src/transaction/transaction.rs new file mode 100644 index 0000000000000..e2cf736dff17a --- /dev/null +++ b/client/rpc-spec-v2/src/transaction/transaction.rs @@ -0,0 +1,208 @@ +// This file is part of Substrate. + +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API implementation for submitting transactions. + +use crate::{ + transaction::{ + api::TransactionApiServer, + error::Error, + event::{ + TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError, + TransactionEvent, + }, + }, + SubscriptionTaskExecutor, +}; +use jsonrpsee::{ + core::async_trait, + types::{ + error::{CallError, ErrorObject}, + SubscriptionResult, + }, + SubscriptionSink, +}; +use sc_transaction_pool_api::{ + error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, + TransactionStatus, +}; +use std::sync::Arc; + +use sp_api::ProvideRuntimeApi; +use sp_blockchain::HeaderBackend; +use sp_core::Bytes; +use sp_runtime::{generic, traits::Block as BlockT}; + +use codec::Decode; +use futures::{FutureExt, StreamExt, TryFutureExt}; + +/// An API for transaction RPC calls. +pub struct Transaction { + /// Substrate client. + client: Arc, + /// Transactions pool. + pool: Arc, + /// Executor to spawn subscriptions. + executor: SubscriptionTaskExecutor, +} + +impl Transaction { + /// Creates a new [`Transaction`]. + pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { + Transaction { client, pool, executor } + } +} + +/// Currently we treat all RPC transactions as externals. +/// +/// Possibly in the future we could allow opt-in for special treatment +/// of such transactions, so that the block authors can inject +/// some unique transactions via RPC and have them included in the pool. +const TX_SOURCE: TransactionSource = TransactionSource::External; + +/// Extrinsic has an invalid format. +/// +/// # Note +/// +/// This is similar to the old `author` API error code. +const BAD_FORMAT: i32 = 1001; + +#[async_trait] +impl TransactionApiServer> for Transaction +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Hash: Unpin, + ::Hash: Unpin, + Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, +{ + fn submit_and_watch(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult { + // This is the only place where the RPC server can return an error for this + // subscription. Other defects must be signaled as events to the sink. + let decoded_extrinsic = match TransactionFor::::decode(&mut &xt[..]) { + Ok(decoded_extrinsic) => decoded_extrinsic, + Err(e) => { + let err = CallError::Custom(ErrorObject::owned( + BAD_FORMAT, + format!("Extrinsic has invalid format: {}", e), + None::<()>, + )); + let _ = sink.reject(err); + return Ok(()) + }, + }; + + let best_block_hash = self.client.info().best_hash; + + let submit = self + .pool + .submit_and_watch( + &generic::BlockId::hash(best_block_hash), + TX_SOURCE, + decoded_extrinsic, + ) + .map_err(|e| { + e.into_pool_error() + .map(Error::from) + .unwrap_or_else(|e| Error::Verification(Box::new(e))) + }); + + let fut = async move { + match submit.await { + Ok(stream) => { + let mut state = TransactionState::new(); + let stream = + stream.filter_map(|event| async move { state.handle_event(event) }); + sink.pipe_from_stream(stream.boxed()).await; + }, + Err(err) => { + // We have not created an `Watcher` for the tx. Make sure the + // error is still propagated as an event. + let event: TransactionEvent<::Hash> = err.into(); + sink.pipe_from_stream(futures::stream::once(async { event }).boxed()).await; + }, + }; + }; + + self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); + Ok(()) + } +} + +/// The transaction's state that needs to be preserved between +/// multiple events generated by the transaction-pool. +/// +/// # Note +/// +/// In the future, the RPC server can submit only the last event when multiple +/// identical events happen in a row. +#[derive(Clone, Copy)] +struct TransactionState { + /// True if the transaction was previously broadcasted. + broadcasted: bool, +} + +impl TransactionState { + /// Construct a new [`TransactionState`]. + pub fn new() -> Self { + TransactionState { broadcasted: false } + } + + /// Handle events generated by the transaction-pool and convert them + /// to the new API expected state. + #[inline] + pub fn handle_event( + &mut self, + event: TransactionStatus, + ) -> Option> { + match event { + TransactionStatus::Ready | TransactionStatus::Future => + Some(TransactionEvent::::Validated), + TransactionStatus::Broadcast(peers) => { + // Set the broadcasted flag once if we submitted the transaction to + // at least one peer. + self.broadcasted = self.broadcasted || !peers.is_empty(); + + Some(TransactionEvent::Broadcasted(TransactionBroadcasted { + num_peers: peers.len(), + })) + }, + TransactionStatus::InBlock((hash, index)) => + Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { + hash, + index, + }))), + TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)), + TransactionStatus::FinalityTimeout(_) => + Some(TransactionEvent::Dropped(TransactionDropped { + broadcasted: self.broadcasted, + error: "Maximum number of finality watchers has been reached".into(), + })), + TransactionStatus::Finalized((hash, index)) => + Some(TransactionEvent::Finalized(TransactionBlock { hash, index })), + TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError { + error: "Extrinsic was rendered invalid by another extrinsic".into(), + })), + TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError { + error: "Extrinsic dropped from the pool due to exceeding limits".into(), + })), + TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError { + error: "Extrinsic marked as invalid".into(), + })), + } + } +} diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 3c574ef13c8e6..2b528d4ca1e0a 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -68,6 +68,7 @@ sc-transaction-pool-api = { version = "4.0.0-dev", path = "../transaction-pool/a sp-transaction-storage-proof = { version = "4.0.0-dev", path = "../../primitives/transaction-storage-proof" } sc-rpc-server = { version = "4.0.0-dev", path = "../rpc-servers" } sc-rpc = { version = "4.0.0-dev", path = "../rpc" } +sc-rpc-spec-v2 = { version = "0.10.0-dev", path = "../rpc-spec-v2" } sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sp-block-builder = { version = "4.0.0-dev", path = "../../primitives/block-builder" } sc-informant = { version = "0.10.0-dev", path = "../informant" } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5a2f4cf978b41..0e16cb144dc5e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -56,6 +56,7 @@ use sc_rpc::{ system::SystemApiServer, DenyUnsafe, SubscriptionTaskExecutor, }; +use sc_rpc_spec_v2::transaction::TransactionApiServer; use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO}; use sc_transaction_pool_api::MaintainedTransactionPool; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; @@ -665,6 +666,13 @@ where (chain, state, child_state) }; + let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new( + client.clone(), + transaction_pool.clone(), + task_executor.clone(), + ) + .into_rpc(); + let author = sc_rpc::author::Author::new( client.clone(), transaction_pool, @@ -682,6 +690,10 @@ where rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?; } + // Part of the RPC v2 spec. + rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?; + + // Part of the old RPC spec. rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?; rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?; rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?; diff --git a/client/transaction-pool/api/Cargo.toml b/client/transaction-pool/api/Cargo.toml index d34ffe512b023..1ab0f32bc8bad 100644 --- a/client/transaction-pool/api/Cargo.toml +++ b/client/transaction-pool/api/Cargo.toml @@ -15,3 +15,6 @@ serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sp-runtime = { version = "6.0.0", default-features = false, path = "../../../primitives/runtime" } + +[dev-dependencies] +serde_json = "1.0" diff --git a/client/transaction-pool/api/src/lib.rs b/client/transaction-pool/api/src/lib.rs index 0ebb8f9d4cd9c..c0a94516ffc97 100644 --- a/client/transaction-pool/api/src/lib.rs +++ b/client/transaction-pool/api/src/lib.rs @@ -108,15 +108,18 @@ pub enum TransactionStatus { Ready, /// The transaction has been broadcast to the given peers. Broadcast(Vec), - /// Transaction has been included in block with given hash. - InBlock(BlockHash), + /// Transaction has been included in block with given hash + /// at the given position. + #[serde(with = "v1_compatible")] + InBlock((BlockHash, TxIndex)), /// The block this transaction was included in has been retracted. Retracted(BlockHash), /// Maximum number of finality watchers has been reached, /// old watchers are being removed. FinalityTimeout(BlockHash), - /// Transaction has been finalized by a finality-gadget, e.g GRANDPA - Finalized(BlockHash), + /// Transaction has been finalized by a finality-gadget, e.g GRANDPA. + #[serde(with = "v1_compatible")] + Finalized((BlockHash, TxIndex)), /// Transaction has been replaced in the pool, by another transaction /// that provides the same tags. (e.g. same (sender, nonce)). Usurped(Hash), @@ -143,6 +146,8 @@ pub type TransactionFor

= <

::Block as BlockT>::Extrinsi pub type TransactionStatusStreamFor

= TransactionStatusStream, BlockHash

>; /// Transaction type for a local pool. pub type LocalTransactionFor

= <

::Block as BlockT>::Extrinsic; +/// Transaction's index within the block in which it was included. +pub type TxIndex = usize; /// Typical future type used in transaction pool api. pub type PoolFuture = std::pin::Pin> + Send>>; @@ -362,3 +367,52 @@ impl OffchainSubmitTransaction for TP }) } } + +/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec. +mod v1_compatible { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(data: &(H, usize), serializer: S) -> Result + where + S: Serializer, + H: Serialize, + { + let (hash, _) = data; + serde::Serialize::serialize(&hash, serializer) + } + + pub fn deserialize<'de, D, H>(deserializer: D) -> Result<(H, usize), D::Error> + where + D: Deserializer<'de>, + H: Deserialize<'de>, + { + let hash: H = serde::Deserialize::deserialize(deserializer)?; + Ok((hash, 0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tx_status_compatibility() { + let event: TransactionStatus = TransactionStatus::InBlock((1, 2)); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"inBlock":1}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionStatus = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, TransactionStatus::InBlock((1, 0))); + + let event: TransactionStatus = TransactionStatus::Finalized((1, 2)); + let ser = serde_json::to_string(&event).unwrap(); + + let exp = r#"{"finalized":1}"#; + assert_eq!(ser, exp); + + let event_dec: TransactionStatus = serde_json::from_str(exp).unwrap(); + assert_eq!(event_dec, TransactionStatus::Finalized((1, 0))); + } +} diff --git a/client/transaction-pool/src/graph/listener.rs b/client/transaction-pool/src/graph/listener.rs index d4f42b32fdbb8..776749abf2d5d 100644 --- a/client/transaction-pool/src/graph/listener.rs +++ b/client/transaction-pool/src/graph/listener.rs @@ -104,13 +104,18 @@ impl Listener { /// Transaction was pruned from the pool. pub fn pruned(&mut self, block_hash: BlockHash, tx: &H) { debug!(target: "txpool", "[{:?}] Pruned at {:?}", tx, block_hash); - self.fire(tx, |s| s.in_block(block_hash)); - self.finality_watchers.entry(block_hash).or_insert(vec![]).push(tx.clone()); + // Get the transactions included in the given block hash. + let txs = self.finality_watchers.entry(block_hash).or_insert(vec![]); + txs.push(tx.clone()); + // Current transaction is the last one included. + let tx_index = txs.len() - 1; + + self.fire(tx, |watcher| watcher.in_block(block_hash, tx_index)); while self.finality_watchers.len() > MAX_FINALITY_WATCHERS { if let Some((hash, txs)) = self.finality_watchers.pop_front() { for tx in txs { - self.fire(&tx, |s| s.finality_timeout(hash)); + self.fire(&tx, |watcher| watcher.finality_timeout(hash)); } } } @@ -120,7 +125,7 @@ impl Listener { pub fn retracted(&mut self, block_hash: BlockHash) { if let Some(hashes) = self.finality_watchers.remove(&block_hash) { for hash in hashes { - self.fire(&hash, |s| s.retracted(block_hash)) + self.fire(&hash, |watcher| watcher.retracted(block_hash)) } } } @@ -128,9 +133,9 @@ impl Listener { /// Notify all watchers that transactions have been finalized pub fn finalized(&mut self, block_hash: BlockHash) { if let Some(hashes) = self.finality_watchers.remove(&block_hash) { - for hash in hashes { + for (tx_index, hash) in hashes.into_iter().enumerate() { log::debug!(target: "txpool", "[{:?}] Sent finalization event (block {:?})", hash, block_hash); - self.fire(&hash, |s| s.finalized(block_hash)) + self.fire(&hash, |watcher| watcher.finalized(block_hash, tx_index)) } } } diff --git a/client/transaction-pool/src/graph/pool.rs b/client/transaction-pool/src/graph/pool.rs index 19acbddbe7843..108ae791e37b3 100644 --- a/client/transaction-pool/src/graph/pool.rs +++ b/client/transaction-pool/src/graph/pool.rs @@ -770,7 +770,7 @@ mod tests { assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!( stream.next(), - Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())), + Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))), ); } @@ -803,7 +803,7 @@ mod tests { assert_eq!(stream.next(), Some(TransactionStatus::Ready)); assert_eq!( stream.next(), - Some(TransactionStatus::InBlock(H256::from_low_u64_be(2).into())), + Some(TransactionStatus::InBlock((H256::from_low_u64_be(2).into(), 0))), ); } diff --git a/client/transaction-pool/src/graph/watcher.rs b/client/transaction-pool/src/graph/watcher.rs index 8cd78cfc78240..0613300c8684b 100644 --- a/client/transaction-pool/src/graph/watcher.rs +++ b/client/transaction-pool/src/graph/watcher.rs @@ -84,13 +84,13 @@ impl Sender { } /// Extrinsic has been included in block with given hash. - pub fn in_block(&mut self, hash: BH) { - self.send(TransactionStatus::InBlock(hash)); + pub fn in_block(&mut self, hash: BH, index: usize) { + self.send(TransactionStatus::InBlock((hash, index))); } /// Extrinsic has been finalized by a finality gadget. - pub fn finalized(&mut self, hash: BH) { - self.send(TransactionStatus::Finalized(hash)); + pub fn finalized(&mut self, hash: BH, index: usize) { + self.send(TransactionStatus::Finalized((hash, index))); self.is_finalized = true; } diff --git a/client/transaction-pool/tests/pool.rs b/client/transaction-pool/tests/pool.rs index f04a27cf81e1d..be75523c1230f 100644 --- a/client/transaction-pool/tests/pool.rs +++ b/client/transaction-pool/tests/pool.rs @@ -328,7 +328,7 @@ fn should_revalidate_across_many_blocks() { block_on( watcher1 - .take_while(|s| future::ready(*s != TransactionStatus::InBlock(block_hash))) + .take_while(|s| future::ready(*s != TransactionStatus::InBlock((block_hash, 0)))) .collect::>(), ); @@ -398,24 +398,24 @@ fn should_push_watchers_during_maintenance() { futures::executor::block_on_stream(watcher0).collect::>(), vec![ TransactionStatus::Ready, - TransactionStatus::InBlock(header_hash), - TransactionStatus::Finalized(header_hash) + TransactionStatus::InBlock((header_hash, 0)), + TransactionStatus::Finalized((header_hash, 0)) ], ); assert_eq!( futures::executor::block_on_stream(watcher1).collect::>(), vec![ TransactionStatus::Ready, - TransactionStatus::InBlock(header_hash), - TransactionStatus::Finalized(header_hash) + TransactionStatus::InBlock((header_hash, 1)), + TransactionStatus::Finalized((header_hash, 1)) ], ); assert_eq!( futures::executor::block_on_stream(watcher2).collect::>(), vec![ TransactionStatus::Ready, - TransactionStatus::InBlock(header_hash), - TransactionStatus::Finalized(header_hash) + TransactionStatus::InBlock((header_hash, 2)), + TransactionStatus::Finalized((header_hash, 2)) ], ); } @@ -450,8 +450,8 @@ fn finalization() { let mut stream = futures::executor::block_on_stream(watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(header.hash()))); - assert_eq!(stream.next(), Some(TransactionStatus::Finalized(header.hash()))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((header.hash(), 0)))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized((header.hash(), 0)))); assert_eq!(stream.next(), None); } @@ -573,30 +573,31 @@ fn fork_aware_finalization() { for (canon_watcher, h) in canon_watchers { let mut stream = futures::executor::block_on_stream(canon_watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(h))); - assert_eq!(stream.next(), Some(TransactionStatus::Finalized(h))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((h, 0)))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized((h, 0)))); assert_eq!(stream.next(), None); } { let mut stream = futures::executor::block_on_stream(from_dave_watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(c2))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((c2, 0)))); assert_eq!(stream.next(), Some(TransactionStatus::Retracted(c2))); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(e1))); - assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((e1, 0)))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized((e1, 0)))); assert_eq!(stream.next(), None); } { let mut stream = futures::executor::block_on_stream(from_bob_watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(d2))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((d2, 0)))); assert_eq!(stream.next(), Some(TransactionStatus::Retracted(d2))); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(e1))); - assert_eq!(stream.next(), Some(TransactionStatus::Finalized(e1))); + // In block e1 we submitted: [dave, bob] xts in this order. + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((e1, 1)))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized((e1, 1)))); assert_eq!(stream.next(), None); } } @@ -646,10 +647,10 @@ fn prune_and_retract_tx_at_same_time() { { let mut stream = futures::executor::block_on_stream(watcher); assert_eq!(stream.next(), Some(TransactionStatus::Ready)); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(b1))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((b1, 0)))); assert_eq!(stream.next(), Some(TransactionStatus::Retracted(b1))); - assert_eq!(stream.next(), Some(TransactionStatus::InBlock(b2))); - assert_eq!(stream.next(), Some(TransactionStatus::Finalized(b2))); + assert_eq!(stream.next(), Some(TransactionStatus::InBlock((b2, 0)))); + assert_eq!(stream.next(), Some(TransactionStatus::Finalized((b2, 0)))); assert_eq!(stream.next(), None); } }