Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc-v2/tx: Implement transaction_unstable_broadcast and transaction_unstable_stop #3079

Merged
merged 40 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1e52e10
rpc-v2/tx: Add broadcast method
lexnv Jan 24, 2024
e795285
rpc-v2/tx: Submit the transaction to pool until finalized or usurped
lexnv Jan 24, 2024
c1e9396
rpc-v2/tx: Generate random operation ID
lexnv Jan 24, 2024
1a8ba25
rpc-v2/tx: Add api for `transaction_unstable_stop`
lexnv Jan 24, 2024
3df160b
rpc-v2/tx: Make the brodcasting future abortable
lexnv Jan 24, 2024
badd863
rpc-v2/tx: Stop the brodcasting by aborting the future
lexnv Jan 24, 2024
7f788aa
rpc-v2/tx: Use best block stream for broadcasting the tx
lexnv Jan 25, 2024
eef2751
rpc-v2/tx: Implement `Stream::last` for best block imports
lexnv Jan 25, 2024
96a80b0
rpc-v2/tx: Simplify tx trait bounds
lexnv Jan 25, 2024
4e72ce7
rpc-v2/chainHead: Make test util crate public
lexnv Jan 25, 2024
70c7d08
tx/tests: Check brodcasted transaction enters pool
lexnv Jan 25, 2024
0cd5a80
rpc-v2/tx: Split transactionWatch and transactionBroadcast
lexnv Jan 25, 2024
69ea4d2
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx
lexnv Jan 25, 2024
bacbf8f
tx/tests: Rename tx rpc server to reflect refactoring changes
lexnv Jan 25, 2024
da4f247
tx/tests: Improve future testability by extracting a setup helper
lexnv Jan 25, 2024
442a20f
rpc-v2/tx: Add error for broadcast_stop
lexnv Jan 25, 2024
10cced3
tx/tests: Check broadcast stop
lexnv Jan 25, 2024
9f70d44
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx
lexnv Jan 26, 2024
909e72f
rpc-v2/tx: Comment typo
lexnv Jan 26, 2024
ea93a25
prdoc: Add pr doc
lexnv Jan 26, 2024
9fdc472
tx/tests: Add license
lexnv Jan 26, 2024
f8ea133
rpc-v2/tx: Fix docs
lexnv Jan 26, 2024
be82fea
Merge branch 'master' into lexnv/broadcast-tx
lexnv Jan 29, 2024
e817f49
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx
lexnv Jan 30, 2024
21932cb
rpc-v2/tx: Replace mut rand with rand::thread_rng
lexnv Jan 31, 2024
e7781a9
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx
lexnv Jan 31, 2024
015c87b
rpc-v2/tx: Wrapper for the last available element of a stream
lexnv Feb 2, 2024
301f3bf
tx/tests: Check `last_stream_element` returns the last element
lexnv Feb 2, 2024
f174bf6
rpc-v2/tx: Continue broadcast on recoverable errors
lexnv Feb 2, 2024
d28e197
rpc-v2/tx: Make broadcast and stop methods non-blocking
lexnv Feb 2, 2024
03b97b4
tx/tests: Ensure invalid tx does not enter the pool
lexnv Feb 2, 2024
58331ae
tx/tests: Move invalid tx stop to a dedicated test
lexnv Feb 2, 2024
a27e387
rpc-v2/tx: Adjust comment wrt TransactionStatus::FinalityTimeout
lexnv Feb 5, 2024
a0deac0
rpc-v2/tx: Remove generic hash from the transaction broadcast API
lexnv Feb 8, 2024
22c1a6b
rpc-v2/tx: Clean internal state on dropping the future broadcast
lexnv Feb 8, 2024
9fb01af
rpc-v2/tx: Remove unpin requirements
lexnv Feb 8, 2024
68ab17d
tx/tests: Implement future executor for knowing when the tx finishes
lexnv Feb 8, 2024
ea83c6a
Update substrate/client/rpc-spec-v2/src/transaction/transaction_broad…
lexnv Feb 8, 2024
f11b6c4
Merge remote-tracking branch 'origin/master' into lexnv/broadcast-tx
lexnv Feb 12, 2024
55d9eef
rpc-v2/tx: Use tx error wrappers
lexnv Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions prdoc/pr_3079.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Implement transaction_unstable_broadcast and transaction_unstable_stop

doc:
- audience: Node Dev
description: |
A new RPC class is added to handle transactions. The `transaction_unstable_broadcast` broadcasts
the provided transaction to the peers of the node, until the `transaction_unstable_stop` is called.
The APIs are marked as unstable and subject to change in the future.
To know if the transaction was added to the chain, users can decode the bodies of announced finalized blocks.
This is a low-level approach for `transactionWatch_unstable_submitAndWatch`.

crates: [ ]
3 changes: 3 additions & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = "0.4.17"
futures-util = { version = "0.3.30", default-features = false }
rand = "0.8.5"

[dev-dependencies]
serde_json = "1.0.111"
tokio = { version = "1.22.0", features = ["macros"] }
substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
substrate-test-runtime = { path = "../../test-utils/runtime" }
substrate-test-runtime-transaction-pool = { path = "../../test-utils/runtime/transaction-pool" }
sp-consensus = { path = "../../primitives/consensus/common" }
sp-externalities = { path = "../../primitives/externalities" }
sp-maybe-compressed-blob = { path = "../../primitives/maybe-compressed-blob" }
sc-block-builder = { path = "../block-builder" }
sc-service = { path = "../service", features = ["test-helpers"] }
assert_matches = "1.3.0"
pretty_assertions = "1.2.1"
sc-transaction-pool = { path = "../transaction-pool" }
2 changes: 1 addition & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! Methods are prefixed by `chainHead`.

#[cfg(test)]
mod test_utils;
pub mod test_utils;
#[cfg(test)]
mod tests;

Expand Down
27 changes: 25 additions & 2 deletions substrate/client/rpc-spec-v2/src/transaction/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

//! API trait for transactions.

use crate::transaction::event::TransactionEvent;
use jsonrpsee::proc_macros::rpc;
use crate::transaction::{error::ErrorBroadcast, event::TransactionEvent};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use sp_core::Bytes;

#[rpc(client, server)]
Expand All @@ -28,10 +28,33 @@ pub trait TransactionApi<Hash: Clone> {
///
/// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on
/// transaction life cycle.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[subscription(
name = "transactionWatch_unstable_submitAndWatch" => "transactionWatch_unstable_watchEvent",
unsubscribe = "transactionWatch_unstable_unwatch",
item = TransactionEvent<Hash>,
)]
fn submit_and_watch(&self, bytes: Bytes);
}

#[rpc(client, server)]
pub trait TransactionBroadcastApi<Hash: Clone> {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast")]
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;

/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop")]
fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
lexnv marked this conversation as resolved.
Show resolved Hide resolved
}
27 changes: 27 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! Errors are interpreted as transaction events for subscriptions.

use crate::transaction::event::{TransactionError, TransactionEvent};
use jsonrpsee::types::error::ErrorObject;
use sc_transaction_pool_api::error::Error as PoolError;
use sp_runtime::transaction_validity::InvalidTransaction;

Expand Down Expand Up @@ -98,3 +99,29 @@ impl<Hash> From<Error> for TransactionEvent<Hash> {
}
}
}

/// TransactionBroadcast error.
#[derive(Debug, thiserror::Error)]
pub enum ErrorBroadcast {
/// The provided operation ID is invalid.
#[error("Invalid operation id")]
InvalidOperationID,
}

/// General purpose errors, as defined in
/// <https://www.jsonrpc.org/specification#error_object>.
pub mod json_rpc_spec {
/// Invalid parameter error.
pub const INVALID_PARAM_ERROR: i32 = -32602;
}

impl From<ErrorBroadcast> for ErrorObject<'static> {
fn from(e: ErrorBroadcast) -> Self {
let msg = e.to_string();

match e {
ErrorBroadcast::InvalidOperationID =>
ErrorObject::owned(json_rpc_spec::INVALID_PARAM_ERROR, msg, None::<()>),
}
}
}
7 changes: 6 additions & 1 deletion substrate/client/rpc-spec-v2/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
//!
//! Methods are prefixed by `transaction`.

#[cfg(test)]
mod tests;

pub mod api;
pub mod error;
pub mod event;
pub mod transaction;
pub mod transaction_broadcast;

pub use api::TransactionApiServer;
pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
pub use event::{
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
TransactionEvent,
};
pub use transaction::Transaction;
pub use transaction_broadcast::TransactionBroadcast;
175 changes: 175 additions & 0 deletions substrate/client/rpc-spec-v2/src/transaction/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use super::*;
use crate::{
chain_head::test_utils::ChainHeadMockClient, hex_string,
transaction::TransactionBroadcast as RpcTransactionBroadcast,
};
use assert_matches::assert_matches;
use codec::Encode;
use futures::Future;
use jsonrpsee::{core::error::Error, rpc_params, RpcModule};
use sc_transaction_pool::*;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
use sp_core::testing::TaskExecutor;
use std::{pin::Pin, sync::Arc, time::Duration};
use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client};
use substrate_test_runtime_transaction_pool::{uxt, TestApi};

type Block = substrate_test_runtime_client::runtime::Block;

/// Initial Alice account nonce.
const ALICE_NONCE: u64 = 209;

fn create_basic_pool_with_genesis(
test_api: Arc<TestApi>,
) -> (BasicPool<TestApi, Block>, Pin<Box<dyn Future<Output = ()> + Send>>) {
let genesis_hash = {
test_api
.chain()
.read()
.block_by_number
.get(&0)
.map(|blocks| blocks[0].0.header.hash())
.expect("there is block 0. qed")
};
BasicPool::new_test(test_api, genesis_hash, genesis_hash)
}

fn maintained_pool() -> (BasicPool<TestApi, Block>, Arc<TestApi>, futures::executor::ThreadPool) {
let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE));
let (pool, background_task) = create_basic_pool_with_genesis(api.clone());

let thread_pool = futures::executor::ThreadPool::new().unwrap();
thread_pool.spawn_ok(background_task);
(pool, api, thread_pool)
}

fn setup_api() -> (
Arc<TestApi>,
Arc<BasicPool<TestApi, Block>>,
Arc<ChainHeadMockClient<Client<Backend>>>,
RpcModule<
TransactionBroadcast<BasicPool<TestApi, Block>, ChainHeadMockClient<Client<Backend>>>,
>,
) {
let (pool, api, _) = maintained_pool();
let pool = Arc::new(pool);

let builder = TestClientBuilder::new();
let client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));

let tx_api = RpcTransactionBroadcast::new(
client_mock.clone(),
pool.clone(),
Arc::new(TaskExecutor::default()),
)
.into_rpc();

(api, pool, client_mock, tx_api)
}

#[tokio::test]
async fn tx_broadcast_enters_pool() {
let (api, pool, client_mock, tx_api) = setup_api();

// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();

// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;

// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.

// TODO: Improve testability by extending the `transaction_unstable_broadcast` with
// a middleware trait that intercepts the transaction status for testing.
let mut num_retries = 12;
while num_retries > 0 && pool.status().ready != 1 {
tokio::time::sleep(Duration::from_secs(5)).await;
num_retries -= 1;
}
assert_eq!(1, pool.status().ready);
assert_eq!(uxt.encode().len(), pool.status().ready_bytes);

// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();

// Announce block 2 to the pool.
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.maintain(event).await;

assert_eq!(0, pool.status().ready);

// Stop call can still be made.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
}

#[tokio::test]
async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api) = setup_api();

// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
);

assert_eq!(0, pool.status().ready);

// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();

assert_eq!(0, pool.status().ready);

// Ensure stop can be called, the tx was decoded and the broadcast future terminated.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
}

#[tokio::test]
async fn tx_invalid_stop() {
let (_, _, _, tx_api) = setup_api();

// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
Error::Call(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a separate independent test, ie that calling unstable_stop with an invalid ID will result in an error?

Copy link
Contributor Author

@lexnv lexnv Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move this to another test; to not fail the tx_broadcast_invalid_tx if somehow we accept stop(invalid). That sounds good, just wanted to make sure I got this right

}
13 changes: 5 additions & 8 deletions substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ use crate::{
},
SubscriptionTaskExecutor,
};
use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, types::error::ErrorObject, PendingSubscriptionSink};
use sc_rpc::utils::pipe_from_stream;
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
};
use std::sync::Arc;

use sc_rpc::utils::pipe_from_stream;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::traits::Block as BlockT;

use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use std::sync::Arc;

/// An API for transaction RPC calls.
pub struct Transaction<Pool, Client> {
Expand Down Expand Up @@ -82,7 +79,7 @@ where
Pool: TransactionPool + Sync + Send + 'static,
Pool::Hash: Unpin,
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
{
fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
let client = self.client.clone();
Expand Down
Loading
Loading