Skip to content

Commit

Permalink
Add transactions from retracted blocks back to the pool (paritytech#3562
Browse files Browse the repository at this point in the history
)

* Add transactions from retracted blocks back to the pool

* Line width

* Reverse retracted
  • Loading branch information
arkpar authored and Demi-Marie committed Sep 17, 2019
1 parent 018de16 commit 7642c0b
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 34 deletions.
2 changes: 1 addition & 1 deletion core/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
let chain_api = transaction_pool::ChainApi::new(client.clone());
let txpool = Arc::new(TransactionPool::new(Default::default(), chain_api));

txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)]).unwrap();
txpool.submit_at(&BlockId::number(0), vec![extrinsic(0), extrinsic(1)], false).unwrap();

let mut proposer_factory = ProposerFactory {
client: client.clone(),
Expand Down
19 changes: 10 additions & 9 deletions core/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,23 @@ pub type StorageCollection = Vec<(Vec<u8>, Option<Vec<u8>>)>;
/// In memory arrays of storage values for multiple child tries.
pub type ChildStorageCollection = Vec<(Vec<u8>, StorageCollection)>;

pub(crate) struct ImportSummary<Block: BlockT> {
pub(crate) hash: Block::Hash,
pub(crate) origin: BlockOrigin,
pub(crate) header: Block::Header,
pub(crate) is_new_best: bool,
pub(crate) storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
pub(crate) retracted: Vec<Block::Hash>,
}

/// Import operation wrapper
pub struct ClientImportOperation<
Block: BlockT,
H: Hasher<Out=Block::Hash>,
B: Backend<Block, H>,
> {
pub(crate) op: B::BlockImportOperation,
pub(crate) notify_imported: Option<(
Block::Hash,
BlockOrigin,
Block::Header,
bool,
Option<(
StorageCollection,
ChildStorageCollection,
)>)>,
pub(crate) notify_imported: Option<ImportSummary<Block>>,
pub(crate) notify_finalized: Vec<Block::Hash>,
}

Expand Down
49 changes: 30 additions & 19 deletions core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{
},
backend::{
self, BlockImportOperation, PrunableStateChangesTrieStorage,
ClientImportOperation, Finalizer,
ClientImportOperation, Finalizer, ImportSummary,
},
blockchain::{
self, Info as ChainInfo, Backend as ChainBackend,
Expand Down Expand Up @@ -199,6 +199,8 @@ pub struct BlockImportNotification<Block: BlockT> {
pub header: Block::Header,
/// Is this the new best block.
pub is_new_best: bool,
/// List of retracted blocks ordered by block number.
pub retracted: Vec<Block::Hash>,
}

/// Summary of a finalized block.
Expand Down Expand Up @@ -968,6 +970,17 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
crate::backend::NewBlockState::Normal
};

let retracted = if is_new_best {
let route_from_best = crate::blockchain::tree_route(
|id| self.header(&id)?.ok_or_else(|| Error::UnknownBlock(format!("{:?}", id))),
BlockId::Hash(info.best_hash),
BlockId::Hash(parent_hash),
)?;
route_from_best.retracted().iter().rev().map(|e| e.hash.clone()).collect()
} else {
Vec::default()
};

trace!("Imported {}, (#{}), best={}, origin={:?}", hash, import_headers.post().number(), is_new_best, origin);

operation.op.set_block_data(
Expand Down Expand Up @@ -995,7 +1008,14 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
operation.notify_finalized.push(hash);
}

operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes));
operation.notify_imported = Some(ImportSummary {
hash,
origin,
header: import_headers.into_post(),
is_new_best,
storage_changes,
retracted,
})
}

Ok(ImportResult::imported(is_new_best))
Expand Down Expand Up @@ -1167,33 +1187,24 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where

fn notify_imported(
&self,
notify_import: (
Block::Hash, BlockOrigin,
Block::Header,
bool,
Option<(
Vec<(Vec<u8>, Option<Vec<u8>>)>,
Vec<(Vec<u8>, Vec<(Vec<u8>, Option<Vec<u8>>)>)>,
)
>),
notify_import: ImportSummary<Block>,
) -> error::Result<()> {
let (hash, origin, header, is_new_best, storage_changes) = notify_import;

if let Some(storage_changes) = storage_changes {
if let Some(storage_changes) = notify_import.storage_changes {
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
self.storage_notifications.lock()
.trigger(
&hash,
&notify_import.hash,
storage_changes.0.into_iter(),
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
);
}

let notification = BlockImportNotification::<Block> {
hash,
origin,
header,
is_new_best,
hash: notify_import.hash,
origin: notify_import.origin,
header: notify_import.header,
is_new_best: notify_import.is_new_best,
retracted: notify_import.retracted,
};

self.import_notification_sinks.lock()
Expand Down
1 change: 1 addition & 0 deletions core/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ mod tests {
origin: BlockOrigin::File,
header,
is_new_best: false,
retracted: vec![],
}).unwrap();
}
}
Expand Down
70 changes: 69 additions & 1 deletion core/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ ServiceBuilder<
dht_event_tx,
))
},
|h, c, tx| maintain_transaction_pool(h, c, tx),
|h, c, tx, r| maintain_transaction_pool(h, c, tx, r),
|n, o, p, ns, v| offchain_workers(n, o, p, ns, v),
|c, ssb, si, te, tp, ext, ks| start_rpc(&rpc_builder, c, ssb, si, te, tp, ext, ks),
)
Expand Down Expand Up @@ -924,6 +924,7 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
id: &BlockId<Block>,
client: &Client<Backend, Executor, Block, Api>,
transaction_pool: &TransactionPool<PoolApi>,
retracted: &[Block::Hash],
) -> error::Result<()> where
Block: BlockT<Hash = <Blake2Hasher as primitives::Hasher>::Out>,
Backend: client::backend::Backend<Block, Blake2Hasher>,
Expand All @@ -932,6 +933,16 @@ pub(crate) fn maintain_transaction_pool<Api, Backend, Block, Executor, PoolApi>(
Executor: client::CallExecutor<Block, Blake2Hasher>,
PoolApi: txpool::ChainApi<Hash = Block::Hash, Block = Block>,
{
// Put transactions from retracted blocks back into the pool.
for r in retracted {
if let Some(block) = client.block(&BlockId::hash(*r))? {
let extrinsics = block.block.extrinsics();
if let Err(e) = transaction_pool.submit_at(id, extrinsics.iter().cloned(), true) {
warn!("Error re-submitting transactions: {:?}", e);
}
}
}

// Avoid calling into runtime if there is nothing to prune from the pool anyway.
if transaction_pool.status().is_empty() {
return Ok(())
Expand Down Expand Up @@ -1007,10 +1018,67 @@ mod tests {
&id,
&client,
&pool,
&[]
).unwrap();

// then
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);
}

#[test]
fn should_add_reverted_transactions_to_the_pool() {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let pool = TransactionPool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()));
let transaction = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
}.into_signed_tx();
let best = longest_chain.best_chain().unwrap();

// store the transaction in the pool
pool.submit_one(&BlockId::hash(best.hash()), transaction.clone()).unwrap();

// import the block
let mut builder = client.new_block(Default::default()).unwrap();
builder.push(transaction.clone()).unwrap();
let block = builder.bake().unwrap();
let block1_hash = block.header().hash();
let id = BlockId::hash(block1_hash.clone());
client.import(BlockOrigin::Own, block).unwrap();

// fire notification - this should clean up the queue
assert_eq!(pool.status().ready, 1);
maintain_transaction_pool(
&id,
&client,
&pool,
&[]
).unwrap();

// then
assert_eq!(pool.status().ready, 0);
assert_eq!(pool.status().future, 0);

// import second block
let builder = client.new_block_at(&BlockId::hash(best.hash()), Default::default()).unwrap();
let block = builder.bake().unwrap();
let id = BlockId::hash(block.header().hash());
client.import(BlockOrigin::Own, block).unwrap();

// fire notification - this should add the transaction back to the pool.
maintain_transaction_pool(
&id,
&client,
&pool,
&[block1_hash]
).unwrap();

// then
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
}
}
1 change: 1 addition & 0 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ macro_rules! new_impl {
&BlockId::hash(notification.hash),
&*client,
&*txpool,
&notification.retracted,
).map_err(|e| warn!("Pool error processing new block: {:?}", e))?;
}

Expand Down
10 changes: 6 additions & 4 deletions core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ pub struct Pool<B: ChainApi> {

impl<B: ChainApi> Pool<B> {
/// Imports a bunch of unverified extrinsics to the pool
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T, force: bool)
-> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error>
where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
let block_number = self.api.block_id_to_number(at)?
Expand All @@ -124,7 +126,7 @@ impl<B: ChainApi> Pool<B> {
.into_iter()
.map(|xt| -> Result<_, B::Error> {
let (hash, bytes) = self.api.hash_and_length(&xt);
if self.rotator.is_banned(&hash) {
if !force && self.rotator.is_banned(&hash) {
return Err(error::Error::TemporarilyBanned.into())
}

Expand Down Expand Up @@ -207,7 +209,7 @@ impl<B: ChainApi> Pool<B> {

/// Imports one unverified extrinsic to the pool
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
Ok(self.submit_at(at, ::std::iter::once(xt), false)?.pop().expect("One extrinsic passed; one result returned; qed")?)
}

/// Import a single extrinsic and starts to watch their progress in the pool.
Expand Down Expand Up @@ -306,7 +308,7 @@ impl<B: ChainApi> Pool<B> {
// try to re-submit pruned transactions since some of them might be still valid.
// note that `known_imported_hashes` will be rejected here due to temporary ban.
let hashes = status.pruned.iter().map(|tx| tx.hash.clone()).collect::<Vec<_>>();
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()))?;
let results = self.submit_at(at, status.pruned.into_iter().map(|tx| tx.data.clone()), false)?;

// Collect the hashes of transactions that now became invalid (meaning that they are successfully pruned).
let hashes = results.into_iter().enumerate().filter_map(|(idx, r)| match r.map_err(error::IntoPoolError::into_pool_error) {
Expand Down

0 comments on commit 7642c0b

Please sign in to comment.