Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
prune finalized transactions from the pool (#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier authored and gavofyork committed Apr 15, 2018
1 parent dda9e01 commit b4fef81
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 48 deletions.
29 changes: 24 additions & 5 deletions polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use runtime_io::with_externalities;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash};
use primitives::hashing;
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
Expand All @@ -71,7 +71,6 @@ pub use config::{Configuration, Role, ChainSpec};

type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;


/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
Expand All @@ -87,7 +86,7 @@ struct TransactionPoolAdapter {
}

impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,
Err(e) => {
Expand All @@ -104,7 +103,7 @@ impl network::TransactionPool for TransactionPoolAdapter {
}).collect()
}

fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
Expand Down Expand Up @@ -299,11 +298,14 @@ impl Service {

let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);

Ok(())
});
if let Err(e) = core.run(events) {
Expand Down Expand Up @@ -336,6 +338,23 @@ impl Service {
}
}

fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}

/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) {
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
}
}

impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::vec::Vec;
use codec::{Joiner, Slicable};
use state_machine::{self, CodeExecutor};
use primitives::{Header, Block};
use primitives::block::{Id as BlockId, Transaction};
use primitives::block::{Id as BlockId, Extrinsic};
use {backend, error, Client};
use triehash::ordered_trie_root;

Expand All @@ -31,7 +31,7 @@ pub struct BlockBuilder<B, E> where
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{
header: Header,
transactions: Vec<Transaction>,
transactions: Vec<Extrinsic>,
executor: E,
state: B::State,
changes: state_machine::OverlayedChanges,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<B, E> BlockBuilder<B, E> where
/// Push a transaction onto the block's list of transactions. This will ensure the transaction
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Transaction) -> error::Result<()> {
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> {
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
&vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
Expand Down
8 changes: 4 additions & 4 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
use codec::Slicable;
use keyring::Keyring;
use {primitives, genesis};
use primitives::block::Transaction as PrimitiveTransaction;
use primitives::block::Extrinsic as PrimitiveExtrinsic;
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use test_runtime::{UncheckedTransaction, Transaction};
use test_runtime;
Expand Down Expand Up @@ -559,12 +559,12 @@ mod tests {
}

trait Signable {
fn signed(self) -> PrimitiveTransaction;
fn signed(self) -> PrimitiveExtrinsic;
}
impl Signable for Transaction {
fn signed(self) -> PrimitiveTransaction {
fn signed(self) -> PrimitiveExtrinsic {
let signature = Keyring::from_raw_public(self.from.clone()).unwrap().sign(&self.encode());
PrimitiveTransaction::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
PrimitiveExtrinsic::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
}
}

Expand Down
8 changes: 4 additions & 4 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time;
use parking_lot::{RwLock, Mutex};
use futures::sync::oneshot;
use serde_json;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::{Hash, blake2_256};
use runtime_support::Hashable;
use network::{PeerId, NodeId};
Expand Down Expand Up @@ -82,7 +82,7 @@ struct Peer {
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_transactions: HashSet<TransactionHash>,
known_transactions: HashSet<ExtrinsicHash>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<HeaderHash>,
/// Request counter,
Expand Down Expand Up @@ -443,7 +443,7 @@ impl Protocol {
}

/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
Expand Down Expand Up @@ -513,7 +513,7 @@ impl Protocol {
}
}

pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
BTreeMap::new()
}

Expand Down
12 changes: 6 additions & 6 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService};
use primitives::block::{TransactionHash, Header, HeaderHash};
use primitives::block::{ExtrinsicHash, Header, HeaderHash};
use primitives::Hash;
use core_io::{TimerToken};
use io::NetSyncIo;
Expand Down Expand Up @@ -66,15 +66,15 @@ pub trait SyncProvider: Send + Sync {
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats>;
}

/// Transaction pool interface
pub trait TransactionPool: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)>;
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)>;
/// Import a transction into the pool.
fn import(&self, transaction: &[u8]) -> Option<TransactionHash>;
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash>;
}

/// ConsensusService
Expand Down Expand Up @@ -161,7 +161,7 @@ impl Service {
}

/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
});
Expand Down Expand Up @@ -223,7 +223,7 @@ impl SyncProvider for Service {
self.network.external_url()
}

fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
self.handler.protocol.transactions_stats()
}
}
Expand Down
8 changes: 4 additions & 4 deletions substrate/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, ExtrinsicHash};
use primitives;
use executor;
use io::SyncIo;
Expand Down Expand Up @@ -206,7 +206,7 @@ impl Peer {
nonce: nonce,
};
let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode());
let tx = primitives::block::Transaction::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
let tx = primitives::block::Extrinsic::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
builder.push(tx).unwrap();
nonce = nonce + 1;
});
Expand All @@ -219,11 +219,11 @@ impl Peer {
struct EmptyTransactionPool;

impl TransactionPool for EmptyTransactionPool {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
Vec::new()
}

fn import(&self, _transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, _transaction: &[u8]) -> Option<ExtrinsicHash> {
None
}
}
Expand Down
27 changes: 6 additions & 21 deletions substrate/primitives/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,8 @@ pub type Number = u64;
/// Hash used to refer to a block hash.
pub type HeaderHash = Hash;

/// Hash used to refer to a transaction hash.
pub type TransactionHash = Hash;

/// Simple generic transaction type.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
pub struct Transaction(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);

impl Slicable for Transaction {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Vec::<u8>::decode(input).map(Transaction)
}

fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
self.0.using_encoded(f)
}
}
/// Hash used to refer to an extrinsic.
pub type ExtrinsicHash = Hash;

/// Simple generic extrinsic type.
#[derive(PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -126,11 +111,11 @@ pub mod generic {
}
}

/// The body of a block is just a bunch of transactions.
pub type Body = Vec<Transaction>;
/// The body of a block is just a bunch of extrinsics.
pub type Body = Vec<Extrinsic>;
/// The header and body of a concrete, but unspecialised, block. Used by substrate to represent a
/// block some fields of which the runtime alone knows how to interpret (e.g. the transactions).
pub type Block = generic::Block<Transaction>;
pub type Block = generic::Block<Extrinsic>;

/// A substrate chain block header.
// TODO: split out into light-client-specific fields and runtime-specific fields.
Expand Down Expand Up @@ -270,7 +255,7 @@ mod tests {
fn test_block_encoding() {
let block = Block {
header: Header::from_block_number(12),
transactions: vec![Transaction(vec!(4))],
transactions: vec![Extrinsic(vec!(4))],
};

assert_eq!(block.encode(), vec![
Expand Down
2 changes: 1 addition & 1 deletion substrate/runtime/primitives/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub trait Block {
}

impl Block for substrate_primitives::Block {
type Extrinsic = substrate_primitives::block::Transaction;
type Extrinsic = substrate_primitives::block::Extrinsic;
type Header = substrate_primitives::Header;
fn header(&self) -> &Self::Header {
&self.header
Expand Down

0 comments on commit b4fef81

Please sign in to comment.