Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
revalidation queue updates
Browse files Browse the repository at this point in the history
  • Loading branch information
NikVolf committed Feb 26, 2020
1 parent bad1280 commit a96183e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 19 deletions.
3 changes: 3 additions & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#![warn(missing_docs)]
#![warn(unused_extern_crates)]
#![recursion_limit="256"]

mod api;
pub mod error;
Expand Down Expand Up @@ -345,6 +346,8 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
if let Err(e) = pool.prune_known(&id, &hashes) {
log::error!("Cannot prune known in the pool {:?}!", e);
}

revalidation_queue.notify_pruned(hashes);
}

if next_action.resubmit {
Expand Down
82 changes: 63 additions & 19 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ pub const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(5);

const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;

/// Payload from queue to worker.
struct WorkerPayload<Api: ChainApi> {
at: NumberFor<Api>,
transactions: Vec<ExHash<Api>>,
/// Background worker notification.
enum WorkerNotify<Api: ChainApi> {
Add {
at: NumberFor<Api>,
transactions: Vec<ExHash<Api>>,
},
Remove {
transactions: Vec<ExHash<Api>>,
},
}

/// Async revalidation worker.
Expand Down Expand Up @@ -89,6 +94,7 @@ async fn batch_revalidate<Api: ChainApi>(
log::trace!(target: "txpool", "[{:?}]: Unknown during revalidation: {:?}", ext_hash, err);
},
Ok(Ok(validity)) => {
log::debug!(target: "txpool", "[{:?}]: Valid during revalidation, will be resubmitted.", ext_hash);
revalidated.insert(
ext_hash.clone(),
ValidatedTransaction::valid_at(
Expand All @@ -113,7 +119,10 @@ async fn batch_revalidate<Api: ChainApi>(
}

pool.validated_pool().remove_invalid(&invalid_hashes);
pool.resubmit(revalidated);

if revalidated.len() > 0 {
pool.resubmit(revalidated);
}
}

impl<Api: ChainApi> RevalidationWorker<Api> {
Expand Down Expand Up @@ -163,11 +172,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
queued_exts
}

fn push(&mut self, worker_payload: WorkerPayload<Api>) {
// we don't add something that already scheduled for revalidation
let transactions = worker_payload.transactions;
let block_number = worker_payload.at;

fn push(&mut self, block_number: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
for ext_hash in transactions {
// we don't add something that already scheduled for revalidation
if self.members.contains_key(&ext_hash) { continue; }
Expand All @@ -183,12 +188,26 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
}
}

fn remove(&mut self, transactions: Vec<ExHash<Api>>) {
for ext_hash in transactions {
if let Some(block_number) = self.members.remove(&ext_hash) {
if let Some(block_record) = self.block_ordered.get_mut(&block_number) {
block_record.remove(&ext_hash);
}
}
}
}

fn len(&self) -> usize {
self.block_ordered.iter().map(|b| b.1.len()).sum()
}

/// Background worker main loop.
///
/// It does two things: periodically tries to process some transactions
/// from the queue and also accepts messages to enqueue some more
/// transactions from the pool.
pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver<WorkerPayload<Api>>) {
pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver<WorkerNotify<Api>>) {
let interval = interval(BACKGROUND_REVALIDATION_INTERVAL).fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
Expand All @@ -198,13 +217,29 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
futures::select! {
_ = interval.next() => {
let next_batch = this.prepare_batch();
let batch_len = next_batch.len();
batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
if batch_len > 0 || this.len() > 0 {
log::debug!(
target: "txpool",
"Revalidated {} transactions. Left in the queue for revalidation: {}.",
batch_len,
this.len(),
);
}
},
workload = from_queue.next() => {
match workload {
Some(worker_payload) => {
this.best_block = worker_payload.at;
this.push(worker_payload);
notification = from_queue.next() => {
match notification {
Some(notification) => {
match notification {
WorkerNotify::Add { transactions, at } => {
this.best_block = at;
this.push(at, transactions);
},
WorkerNotify::Remove { transactions } => {
this.remove(transactions);
},
}
continue;
},
// R.I.P. worker!
Expand All @@ -224,7 +259,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
pub struct RevalidationQueue<Api: ChainApi> {
pool: Arc<Pool<Api>>,
api: Arc<Api>,
background: Option<mpsc::UnboundedSender<WorkerPayload<Api>>>,
background: Option<mpsc::UnboundedSender<WorkerNotify<Api>>>,
}

impl<Api: ChainApi> RevalidationQueue<Api>
Expand Down Expand Up @@ -265,7 +300,7 @@ where
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
if let Err(e) = to_worker.unbounded_send(WorkerNotify::Add { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
return;
Expand All @@ -275,6 +310,15 @@ where
batch_revalidate(pool, api, at, transactions).await
}
}

/// Notify that some transactinos are no longer required to be revalidated.
pub fn notify_pruned(&self, transactions: Vec<ExHash<Api>>) {
if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerNotify::Remove { transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -310,4 +354,4 @@ mod tests {
// number of ready
assert_eq!(pool.validated_pool().status().ready, 1);
}
}
}
21 changes: 21 additions & 0 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,27 @@ fn should_not_retain_invalid_hashes_from_retracted() {
assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_not_resubmit_pruned_transaction_back_to_ready() {
let xt = uxt(Alice, 209);

let (pool, _guard) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(1, vec![xt.clone()]);

let event = block_event(1);

block_on(pool.maintain(event));

// maintenance is in background
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_push_watchers_during_maintaince() {
fn alice_uxt(nonce: u64) -> Extrinsic {
Expand Down

0 comments on commit a96183e

Please sign in to comment.