Skip to content

Commit

Permalink
rpc-v2/tx: Implement transaction_unstable_broadcast and `transactio…
Browse files Browse the repository at this point in the history
…n_unstable_stop` (paritytech#3079)

This PR implements the
[transaction_unstable_broadcast](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_broadcast.md)
and
[transaction_unstable_stop](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_stop.md).


The
[transaction_unstable_broadcast](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_broadcast.md)
submits the provided transaction at the best block of the chain.
If the transaction is dropped or declared invalid, the API tries to
resubmit the transaction at the next available best block.

### Broadcasting 
The broadcasting operation continues until either:

- the user called `transaction_unstable_stop` with the operation ID that
identifies the broadcasting operation
- the transaction state is one of the following: 
  - Finalized: the transaction is part of the chain
- FinalizedTimeout: we have waited for 256 finalized blocks and timedout
  - Usurped the transaction has been replaced in the tx pool
  
The broadcasting retires to submit the transaction when the transaction
state is:
- Invalid: the transaction might become valid at a later time
- Dropped: the transaction pool's capacity is full at the moment, but
might clear when other transactions are finalized/dropped

### Stopping

The `transaction_unstable_broadcast` spawns an abortable future and
tracks the abort handler.
When the
[transaction_unstable_stop](https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/transaction_unstable_stop.md)
is called with a valid operation ID; the abort handler of the
corresponding `transaction_unstable_broadcast` future is called. This
behavior ensures the broadcast future is finishes on the next polling.
When the `transaction_unstable_stop` is called with an invalid operation
ID, an invalid jsonrpc specific error object is returned.


### Testing

This PR adds the testing harness of the transaction API and validates
two basic scenarios:
- transaction enters and exits the transaction pool
- transaction stop returns appropriate values when called with valid and
invalid operation IDs


Closes: paritytech#3039

Note that the API should be enabled after:
paritytech#3084.

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Sebastian Kunert <[email protected]>
  • Loading branch information
lexnv and skunert committed Feb 12, 2024
1 parent aa7d53a commit 803f09e
Show file tree
Hide file tree
Showing 8 changed files with 556 additions and 12 deletions.
3 changes: 3 additions & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = { workspace = true, default-features = true }
futures-util = { version = "0.3.30", default-features = false }
rand = "0.8.5"

[dev-dependencies]
serde_json = "1.0.111"
tokio = { version = "1.22.0", features = ["macros"] }
substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
substrate-test-runtime = { path = "../../test-utils/runtime" }
substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" }
sp-consensus = { path = "../../primitives/consensus/common" }
sp-externalities = { path = "../../primitives/externalities" }
sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { path = "../block-builder" }
sc-service = { path = "../service", features = ["test-helpers"] }
assert_matches = "1.3.0"
pretty_assertions = "1.2.1"
sc-transaction-pool = { path = "../transaction-pool" }
2 changes: 1 addition & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! Methods are prefixed by `chainHead`.

#[cfg(test)]
mod test_utils;
pub mod test_utils;
#[cfg(test)]
mod tests;

Expand Down
27 changes: 25 additions & 2 deletions substrate/client/rpc-spec-v2/src/transaction/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

//! API trait for transactions.

use crate::transaction::event::TransactionEvent;
use jsonrpsee::proc_macros::rpc;
use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sp_core::Bytes;

#[rpc(client, server)]
Expand All @@ -28,10 +28,33 @@ pub trait TransactionApi<Hash: Clone> {
///
/// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on
/// transaction life cycle.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "transactionWatch_unstable_submitAndWatch" => "transactionWatch_unstable_watchEvent",
unsubscribe = "transactionWatch_unstable_unwatch",
item = TransactionEvent<Hash>,
)]
fn submit_and_watch(&self, bytes: Bytes);
}

#[rpc(client, server)]
pub trait TransactionBroadcastApi {
/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast")]
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;

/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop")]
fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
}
27 changes: 27 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! Errors are interpreted as transaction events for subscriptions.

use crate::transaction::event::{TransactionError, TransactionEvent};
use jsonrpsee::types::error::ErrorObject;
use sc_transaction_pool_api::error::Error as PoolError;
use sp_runtime::transaction_validity::InvalidTransaction;

Expand Down Expand Up @@ -98,3 +99,29 @@ impl<Hash> From<Error> for TransactionEvent<Hash> {
}
}
}

/// TransactionBroadcast error.
#[derive(Debug, thiserror::Error)]
pub enum ErrorBroadcast {
/// The provided operation ID is invalid.
#[error("Invalid operation id")]
InvalidOperationID,
}

/// General purpose errors, as defined in
/// <https://www.jsonrpc.org/specification#error_object>.
pub mod json_rpc_spec {
/// Invalid parameter error.
pub const INVALID_PARAM_ERROR: i32 = -32602;
}

impl From<ErrorBroadcast> for ErrorObject<'static> {
fn from(e: ErrorBroadcast) -> Self {
let msg = e.to_string();

match e {
ErrorBroadcast::InvalidOperationID =>
ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>),
}
}
}
7 changes: 6 additions & 1 deletion substrate/client/rpc-spec-v2/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
//!
//! Methods are prefixed by `transaction`.

#[cfg(test)]
mod tests;

pub mod api;
pub mod error;
pub mod event;
pub mod transaction;
pub mod transaction_broadcast;

pub use api::TransactionApiServer;
pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
pub use event::{
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
TransactionEvent,
};
pub use transaction::Transaction;
pub use transaction_broadcast::TransactionBroadcast;
238 changes: 238 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::*;
use crate::{
chain_head::test_utils::ChainHeadMockClient, hex_string,
transaction::TransactionBroadcast as RpcTransactionBroadcast,
};
use assert_matches::assert_matches;
use codec::Encode;
use futures::Future;
use jsonrpsee::{core::error::Error, rpc_params, RpcModule};
use sc_transaction_pool::*;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::{pin::Pin, sync::Arc, time::Duration};
use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client};
use substrate_test_runtime_transaction_pool::{uxt, TestApi};
use tokio::sync::mpsc;

type Block = substrate_test_runtime_client::runtime::Block;

/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
#[derive(Clone)]
struct TaskExecutorBroadcast {
executor: TaskExecutor,
sender: mpsc::UnboundedSender<()>,
}

/// The channel that receives events when the broadcast futures are dropped.
type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;

impl TaskExecutorBroadcast {
/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
/// are dropped.
fn new() -> (Self, TaskExecutorRecv) {
let (sender, recv) = mpsc::unbounded_channel();

(Self { executor: TaskExecutor::new(), sender }, recv)
}
}

impl SpawnNamed for TaskExecutorBroadcast {
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let future = Box::pin(async move {
future.await;
let _ = sender.send(());
});

self.executor.spawn(name, group, future)
}

fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let future = Box::pin(async move {
future.await;
let _ = sender.send(());
});

self.executor.spawn_blocking(name, group, future)
}
}

/// Initial Alice account nonce.
const ALICE_NONCE: u64 = 209;

fn create_basic_pool_with_genesis(
test_api: Arc<TestApi>,
) -> (BasicPool<TestApi, Block>, Pin<Box<dyn Future<Output = ()> + Send>>) {
let genesis_hash = {
test_api
.chain()
.read()
.block_by_number
.get(&0)
.map(|blocks| blocks[0].0.header.hash())
.expect("there is block 0. qed")
};
BasicPool::new_test(test_api, genesis_hash, genesis_hash)
}

fn maintained_pool() -> (BasicPool<TestApi, Block>, Arc<TestApi>, futures::executor::ThreadPool) {
let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE));
let (pool, background_task) = create_basic_pool_with_genesis(api.clone());

let thread_pool = futures::executor::ThreadPool::new().unwrap();
thread_pool.spawn_ok(background_task);
(pool, api, thread_pool)
}

fn setup_api() -> (
Arc<TestApi>,
Arc<BasicPool<TestApi, Block>>,
Arc<ChainHeadMockClient<Client<Backend>>>,
RpcModule<
TransactionBroadcast<BasicPool<TestApi, Block>, ChainHeadMockClient<Client<Backend>>>,
>,
TaskExecutorRecv,
) {
let (pool, api, _) = maintained_pool();
let pool = Arc::new(pool);

let builder = TestClientBuilder::new();
let client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));

let (task_executor, executor_recv) = TaskExecutorBroadcast::new();

let tx_api =
RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
.into_rpc();

(api, pool, client_mock, tx_api, executor_recv)
}

#[tokio::test]
async fn tx_broadcast_enters_pool() {
let (api, pool, client_mock, tx_api, _) = setup_api();

// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.

// TODO: Improve testability by extending the `transaction_unstable_broadcast` with
// a middleware trait that intercepts the transaction status for testing.
let mut num_retries = 12;
while num_retries > 0 && pool.status().ready != 1 {
tokio::time::sleep(Duration::from_secs(5)).await;
num_retries -= 1;
}
assert_eq!(1, pool.status().ready);
assert_eq!(uxt.encode().len(), pool.status().ready_bytes);

// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();

// Announce block 2 to the pool.
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.maintain(event).await;

assert_eq!(0, pool.status().ready);

// Stop call can still be made.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
}

#[tokio::test]
async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api, mut exec_recv) = setup_api();

// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
);

assert_eq!(0, pool.status().ready);

// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();

assert_eq!(0, pool.status().ready);

// Await the broadcast future to exit.
// Without this we'd be subject to races, where we try to call the stop before the tx is
// dropped.
exec_recv.recv().await.unwrap();

// The broadcast future was dropped, and the operation is no longer active.
// When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}

#[tokio::test]
async fn tx_invalid_stop() {
let (_, _, _, tx_api, _) = setup_api();

// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
Loading

0 comments on commit 803f09e

Please sign in to comment.