Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
revalidation queue fix
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf committed Feb 26, 2020
1 parent bad1280 commit 5de7e1f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 2 deletions.
1 change: 1 addition & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Substrate transaction pool implementation.

#![recursion_limit="256"]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]

Expand Down
34 changes: 32 additions & 2 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ async fn batch_revalidate<Api: ChainApi>(
}

pool.validated_pool().remove_invalid(&invalid_hashes);
pool.resubmit(revalidated);
if revalidated.len() > 0 {
pool.resubmit(revalidated);
}
}

impl<Api: ChainApi> RevalidationWorker<Api> {
Expand Down Expand Up @@ -149,6 +151,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
} else {
for xt in &to_queue {
extrinsics.remove(xt);
self.members.remove(xt);
}
}
left -= to_queue.len();
Expand All @@ -163,14 +166,26 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
queued_exts
}

fn len(&self) -> usize {
self.block_ordered.iter().map(|b| b.1.len()).sum()
}

fn push(&mut self, worker_payload: WorkerPayload<Api>) {
// we don't add something that already scheduled for revalidation
let transactions = worker_payload.transactions;
let block_number = worker_payload.at;

for ext_hash in transactions {
// we don't add something that already scheduled for revalidation
if self.members.contains_key(&ext_hash) { continue; }
if self.members.contains_key(&ext_hash) {
log::debug!(
target: "txpool",
"[{:?}] Skipped adding for revalidation: Already there.",
ext_hash,
);

continue;
}

self.block_ordered.entry(block_number)
.and_modify(|value| { value.insert(ext_hash.clone()); })
Expand Down Expand Up @@ -198,7 +213,18 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
futures::select! {
_ = interval.next() => {
let next_batch = this.prepare_batch();
let batch_len = next_batch.len();

batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;

if batch_len > 0 || this.len() > 0 {
log::debug!(
target: "txpool",
"Revalidated {} transactions. Left in the queue for revalidation: {}.",
batch_len,
this.len(),
);
}
},
workload = from_queue.next() => {
match workload {
Expand Down Expand Up @@ -264,6 +290,10 @@ where
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Added {} transactions to revalidation queue", transactions.len());
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
Expand Down
28 changes: 28 additions & 0 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,34 @@ fn should_not_retain_invalid_hashes_from_retracted() {
assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_transaction_multiple_times() {
let xt = uxt(Alice, 209);

let (pool, _guard) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(1, vec![xt.clone()]);

// maintenance is in background
block_on(pool.maintain(block_event(1)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(2, vec![]);
pool.api.add_invalid(&xt);

// maintenance is in background
block_on(pool.maintain(block_event(2)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_push_watchers_during_maintaince() {
fn alice_uxt(nonce: u64) -> Extrinsic {
Expand Down

0 comments on commit 5de7e1f

Please sign in to comment.