diff --git a/substrate/client/rpc-spec-v2/Cargo.toml b/substrate/client/rpc-spec-v2/Cargo.toml index 12a02e0b45083..1b7870764dc36 100644 --- a/substrate/client/rpc-spec-v2/Cargo.toml +++ b/substrate/client/rpc-spec-v2/Cargo.toml @@ -41,12 +41,14 @@ tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = { workspace = true, default-features = true } futures-util = { version = "0.3.30", default-features = false } +rand = "0.8.5" [dev-dependencies] serde_json = "1.0.111" tokio = { version = "1.22.0", features = ["macros"] } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } substrate-test-runtime = { path = "../../test-utils/runtime" } +substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" } sp-consensus = { path = "../../primitives/consensus/common" } sp-externalities = { path = "../../primitives/externalities" } sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" } @@ -54,3 +56,4 @@ sc-block-builder = { path = "../block-builder" } sc-service = { path = "../service", features = ["test-helpers"] } assert_matches = "1.3.0" pretty_assertions = "1.2.1" +sc-transaction-pool = { path = "../transaction-pool" } diff --git a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs index 4cbbd00f64f31..c9fe19aca2b18 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/mod.rs @@ -23,7 +23,7 @@ //! Methods are prefixed by `chainHead`. #[cfg(test)] -mod test_utils; +pub mod test_utils; #[cfg(test)] mod tests; diff --git a/substrate/client/rpc-spec-v2/src/transaction/api.rs b/substrate/client/rpc-spec-v2/src/transaction/api.rs index 53c83b662a35f..33af9c9533388 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/api.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/api.rs @@ -18,8 +18,8 @@ //! API trait for transactions. -use crate::transaction::event::TransactionEvent; -use jsonrpsee::proc_macros::rpc; +use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent}; +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use sp_core::Bytes; #[rpc(client, server)] @@ -28,6 +28,10 @@ pub trait TransactionApi { /// /// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on /// transaction life cycle. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. #[subscription( name = "transactionWatch_unstable_submitAndWatch" => "transactionWatch_unstable_watchEvent", unsubscribe = "transactionWatch_unstable_unwatch", @@ -35,3 +39,22 @@ pub trait TransactionApi { )] fn submit_and_watch(&self, bytes: Bytes); } + +#[rpc(client, server)] +pub trait TransactionBroadcastApi { + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "transaction_unstable_broadcast")] + fn broadcast(&self, bytes: Bytes) -> RpcResult>; + + /// Broadcast an extrinsic to the chain. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "transaction_unstable_stop")] + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>; +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/error.rs b/substrate/client/rpc-spec-v2/src/transaction/error.rs index d2de07afd5955..116977af66001 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/error.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/error.rs @@ -21,6 +21,7 @@ //! Errors are interpreted as transaction events for subscriptions. use crate::transaction::event::{TransactionError, TransactionEvent}; +use jsonrpsee::types::error::ErrorObject; use sc_transaction_pool_api::error::Error as PoolError; use sp_runtime::transaction_validity::InvalidTransaction; @@ -98,3 +99,29 @@ impl From for TransactionEvent { } } } + +/// TransactionBroadcast error. +#[derive(Debug, thiserror::Error)] +pub enum ErrorBroadcast { + /// The provided operation ID is invalid. + #[error("Invalid operation id")] + InvalidOperationID, +} + +/// General purpose errors, as defined in +/// . +pub mod json_rpc_spec { + /// Invalid parameter error. + pub const INVALID_PARAM_ERROR: i32 = -32602; +} + +impl From for ErrorObject<'static> { + fn from(e: ErrorBroadcast) -> Self { + let msg = e.to_string(); + + match e { + ErrorBroadcast::InvalidOperationID => + ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>), + } + } +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/mod.rs b/substrate/client/rpc-spec-v2/src/transaction/mod.rs index 212912ba1c728..74268a5372a37 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/mod.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/mod.rs @@ -25,14 +25,19 @@ //! //! Methods are prefixed by `transaction`. +#[cfg(test)] +mod tests; + pub mod api; pub mod error; pub mod event; pub mod transaction; +pub mod transaction_broadcast; -pub use api::TransactionApiServer; +pub use api::{TransactionApiServer, TransactionBroadcastApiServer}; pub use event::{ TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError, TransactionEvent, }; pub use transaction::Transaction; +pub use transaction_broadcast::TransactionBroadcast; diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests.rs b/substrate/client/rpc-spec-v2/src/transaction/tests.rs new file mode 100644 index 0000000000000..45477494768ae --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/tests.rs @@ -0,0 +1,238 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::*; +use crate::{ + chain_head::test_utils::ChainHeadMockClient, hex_string, + transaction::TransactionBroadcast as RpcTransactionBroadcast, +}; +use assert_matches::assert_matches; +use codec::Encode; +use futures::Future; +use jsonrpsee::{core::error::Error, rpc_params, RpcModule}; +use sc_transaction_pool::*; +use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool}; +use sp_core::{testing::TaskExecutor, traits::SpawnNamed}; +use std::{pin::Pin, sync::Arc, time::Duration}; +use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client}; +use substrate_test_runtime_transaction_pool::{uxt, TestApi}; +use tokio::sync::mpsc; + +type Block = substrate_test_runtime_client::runtime::Block; + +/// Wrap the `TaskExecutor` to know when the broadcast future is dropped. +#[derive(Clone)] +struct TaskExecutorBroadcast { + executor: TaskExecutor, + sender: mpsc::UnboundedSender<()>, +} + +/// The channel that receives events when the broadcast futures are dropped. +type TaskExecutorRecv = mpsc::UnboundedReceiver<()>; + +impl TaskExecutorBroadcast { + /// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures + /// are dropped. + fn new() -> (Self, TaskExecutorRecv) { + let (sender, recv) = mpsc::unbounded_channel(); + + (Self { executor: TaskExecutor::new(), sender }, recv) + } +} + +impl SpawnNamed for TaskExecutorBroadcast { + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn(name, group, future) + } + + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + let sender = self.sender.clone(); + let future = Box::pin(async move { + future.await; + let _ = sender.send(()); + }); + + self.executor.spawn_blocking(name, group, future) + } +} + +/// Initial Alice account nonce. +const ALICE_NONCE: u64 = 209; + +fn create_basic_pool_with_genesis( + test_api: Arc, +) -> (BasicPool, Pin + Send>>) { + let genesis_hash = { + test_api + .chain() + .read() + .block_by_number + .get(&0) + .map(|blocks| blocks[0].0.header.hash()) + .expect("there is block 0. qed") + }; + BasicPool::new_test(test_api, genesis_hash, genesis_hash) +} + +fn maintained_pool() -> (BasicPool, Arc, futures::executor::ThreadPool) { + let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE)); + let (pool, background_task) = create_basic_pool_with_genesis(api.clone()); + + let thread_pool = futures::executor::ThreadPool::new().unwrap(); + thread_pool.spawn_ok(background_task); + (pool, api, thread_pool) +} + +fn setup_api() -> ( + Arc, + Arc>, + Arc>>, + RpcModule< + TransactionBroadcast, ChainHeadMockClient>>, + >, + TaskExecutorRecv, +) { + let (pool, api, _) = maintained_pool(); + let pool = Arc::new(pool); + + let builder = TestClientBuilder::new(); + let client = Arc::new(builder.build()); + let client_mock = Arc::new(ChainHeadMockClient::new(client.clone())); + + let (task_executor, executor_recv) = TaskExecutorBroadcast::new(); + + let tx_api = + RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor)) + .into_rpc(); + + (api, pool, client_mock, tx_api, executor_recv) +} + +#[tokio::test] +async fn tx_broadcast_enters_pool() { + let (api, pool, client_mock, tx_api, _) = setup_api(); + + // Start at block 1. + let block_1_header = api.push_block(1, vec![], true); + + let uxt = uxt(Alice, ALICE_NONCE); + let xt = hex_string(&uxt.encode()); + + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + // Announce block 1 to `transaction_unstable_broadcast`. + client_mock.trigger_import_stream(block_1_header).await; + + // Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool. + + // TODO: Improve testability by extending the `transaction_unstable_broadcast` with + // a middleware trait that intercepts the transaction status for testing. + let mut num_retries = 12; + while num_retries > 0 && pool.status().ready != 1 { + tokio::time::sleep(Duration::from_secs(5)).await; + num_retries -= 1; + } + assert_eq!(1, pool.status().ready); + assert_eq!(uxt.encode().len(), pool.status().ready_bytes); + + // Import block 2 with the transaction included. + let block_2_header = api.push_block(2, vec![uxt.clone()], true); + let block_2 = block_2_header.hash(); + + // Announce block 2 to the pool. + let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None }; + pool.maintain(event).await; + + assert_eq!(0, pool.status().ready); + + // Stop call can still be made. + let _: () = tx_api + .call("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap(); +} + +#[tokio::test] +async fn tx_broadcast_invalid_tx() { + let (_, pool, _, tx_api, mut exec_recv) = setup_api(); + + // Invalid parameters. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params" + ); + + assert_eq!(0, pool.status().ready); + + // Invalid transaction that cannot be decoded. The broadcast silently exits. + let xt = "0xdeadbeef"; + let operation_id: String = + tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap(); + + assert_eq!(0, pool.status().ready); + + // Await the broadcast future to exit. + // Without this we'd be subject to races, where we try to call the stop before the tx is + // dropped. + exec_recv.recv().await.unwrap(); + + // The broadcast future was dropped, and the operation is no longer active. + // When the operation is not active, either from the tx being finalized or a + // terminal error; the stop method should return an error. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); +} + +#[tokio::test] +async fn tx_invalid_stop() { + let (_, _, _, tx_api, _) = setup_api(); + + // Make an invalid stop call. + let err = tx_api + .call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"]) + .await + .unwrap_err(); + assert_matches!(err, + Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id" + ); +} diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs index b2cfa36c9c99f..17889b3bad2a5 100644 --- a/substrate/client/rpc-spec-v2/src/transaction/transaction.rs +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction.rs @@ -29,21 +29,18 @@ use crate::{ }, SubscriptionTaskExecutor, }; +use codec::Decode; +use futures::{StreamExt, TryFutureExt}; use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink}; +use sc_rpc::utils::pipe_from_stream; use sc_transaction_pool_api::{ error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource, TransactionStatus, }; -use std::sync::Arc; - -use sc_rpc::utils::pipe_from_stream; -use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_core::Bytes; use sp_runtime::traits::Block as BlockT; - -use codec::Decode; -use futures::{StreamExt, TryFutureExt}; +use std::sync::Arc; /// An API for transaction RPC calls. pub struct Transaction { @@ -82,7 +79,7 @@ where Pool: TransactionPool + Sync + Send + 'static, Pool::Hash: Unpin, ::Hash: Unpin, - Client: HeaderBackend + ProvideRuntimeApi + Send + Sync + 'static, + Client: HeaderBackend + Send + Sync + 'static, { fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) { let client = self.client.clone(); diff --git a/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs new file mode 100644 index 0000000000000..92c838261874a --- /dev/null +++ b/substrate/client/rpc-spec-v2/src/transaction/transaction_broadcast.rs @@ -0,0 +1,251 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! API implementation for broadcasting transactions. + +use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor}; +use codec::Decode; +use futures::{FutureExt, Stream, StreamExt}; +use futures_util::stream::AbortHandle; +use jsonrpsee::core::{async_trait, RpcResult}; +use parking_lot::RwLock; +use rand::{distributions::Alphanumeric, Rng}; +use sc_client_api::BlockchainEvents; +use sc_transaction_pool_api::{ + error::IntoPoolError, TransactionFor, TransactionPool, TransactionSource, +}; +use sp_blockchain::HeaderBackend; +use sp_core::Bytes; +use sp_runtime::traits::Block as BlockT; +use std::{collections::HashMap, sync::Arc}; + +use super::error::ErrorBroadcast; + +/// An API for transaction RPC calls. +pub struct TransactionBroadcast { + /// Substrate client. + client: Arc, + /// Transactions pool. + pool: Arc, + /// Executor to spawn subscriptions. + executor: SubscriptionTaskExecutor, + /// The brodcast operation IDs. + broadcast_ids: Arc>>, +} + +/// The state of a broadcast operation. +struct BroadcastState { + /// Handle to abort the running future that broadcasts the transaction. + handle: AbortHandle, +} + +impl TransactionBroadcast { + /// Creates a new [`TransactionBroadcast`]. + pub fn new(client: Arc, pool: Arc, executor: SubscriptionTaskExecutor) -> Self { + TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } + } + + /// Generate an unique operation ID for the `transaction_broadcast` RPC method. + pub fn generate_unique_id(&self) -> String { + let generate_operation_id = || { + // The length of the operation ID. + const OPERATION_ID_LEN: usize = 16; + + rand::thread_rng() + .sample_iter(Alphanumeric) + .take(OPERATION_ID_LEN) + .map(char::from) + .collect::() + }; + + let mut id = generate_operation_id(); + + let broadcast_ids = self.broadcast_ids.read(); + + while broadcast_ids.contains_key(&id) { + id = generate_operation_id(); + } + + id + } +} + +/// Currently we treat all RPC transactions as externals. +/// +/// Possibly in the future we could allow opt-in for special treatment +/// of such transactions, so that the block authors can inject +/// some unique transactions via RPC and have them included in the pool. +const TX_SOURCE: TransactionSource = TransactionSource::External; + +#[async_trait] +impl TransactionBroadcastApiServer for TransactionBroadcast +where + Pool: TransactionPool + Sync + Send + 'static, + Pool::Error: IntoPoolError, + ::Hash: Unpin, + Client: HeaderBackend + BlockchainEvents + Send + Sync + 'static, +{ + fn broadcast(&self, bytes: Bytes) -> RpcResult> { + let pool = self.pool.clone(); + + // The unique ID of this operation. + let id = self.generate_unique_id(); + + let mut best_block_import_stream = + Box::pin(self.client.import_notification_stream().filter_map( + |notification| async move { notification.is_new_best.then_some(notification.hash) }, + )); + + let broadcast_transaction_fut = async move { + // There is nothing we could do with an extrinsic of invalid format. + let Ok(decoded_extrinsic) = TransactionFor::::decode(&mut &bytes[..]) else { + return; + }; + + // Flag to determine if the we should broadcast the transaction again. + let mut is_done = false; + + while !is_done { + // Wait for the last block to become available. + let Some(best_block_hash) = + last_stream_element(&mut best_block_import_stream).await + else { + return; + }; + + let mut stream = match pool + .submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic.clone()) + .await + { + Ok(stream) => stream, + // The transaction was not included to the pool. + Err(e) => { + let Ok(pool_err) = e.into_pool_error() else { return }; + + if pool_err.is_retriable() { + // Try to resubmit the transaction at a later block for + // recoverable errors. + continue + } else { + return; + } + }, + }; + + while let Some(event) = stream.next().await { + // Check if the transaction could be submitted again + // at a later time. + if event.is_retriable() { + break; + } + + // Stop if this is the final event of the transaction stream + // and the event is not retriable. + if event.is_final() { + is_done = true; + break; + } + } + } + }; + + // Convert the future into an abortable future, for easily terminating it from the + // `transaction_stop` method. + let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); + let broadcast_ids = self.broadcast_ids.clone(); + let drop_id = id.clone(); + // The future expected by the executor must be `Future` instead of + // `Future>`. + let fut = fut.map(move |_| { + // Remove the entry from the broadcast IDs map. + broadcast_ids.write().remove(&drop_id); + }); + + // Keep track of this entry and the abortable handle. + { + let mut broadcast_ids = self.broadcast_ids.write(); + broadcast_ids.insert(id.clone(), BroadcastState { handle }); + } + + sc_rpc::utils::spawn_subscription_task(&self.executor, fut); + + Ok(Some(id)) + } + + fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> { + let mut broadcast_ids = self.broadcast_ids.write(); + + let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else { + return Err(ErrorBroadcast::InvalidOperationID) + }; + + broadcast_state.handle.abort(); + + Ok(()) + } +} + +/// Returns the last element of the providided stream, or `None` if the stream is closed. +async fn last_stream_element(stream: &mut S) -> Option +where + S: Stream + Unpin, +{ + let Some(mut element) = stream.next().await else { return None }; + + // We are effectively polling the stream for the last available item at this time. + // The `now_or_never` returns `None` if the stream is `Pending`. + // + // If the stream contains `Hash0x1 Hash0x2 Hash0x3 Hash0x4`, we want only `Hash0x4`. + while let Some(next) = stream.next().now_or_never() { + let Some(next) = next else { + // Nothing to do if the stream terminated. + return None + }; + element = next; + } + + Some(element) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio_stream::wrappers::ReceiverStream; + + #[tokio::test] + async fn check_last_stream_element() { + let (tx, rx) = tokio::sync::mpsc::channel(16); + + let mut stream = ReceiverStream::new(rx); + // Check the stream with one element queued. + tx.send(1).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(1)); + + // Check the stream with multiple elements. + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + assert_eq!(last_stream_element(&mut stream).await, Some(3)); + + // Drop the stream with some elements + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + drop(tx); + assert_eq!(last_stream_element(&mut stream).await, None); + } +}