Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

prune finalized transactions from the pool #127

Merged
merged 1 commit into from
Apr 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions polkadot/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use runtime_io::with_externalities;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash};
use primitives::hashing;
use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
Expand All @@ -71,7 +71,6 @@ pub use config::{Configuration, Role, ChainSpec};

type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;


/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
Expand All @@ -87,7 +86,7 @@ struct TransactionPoolAdapter {
}

impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,
Err(e) => {
Expand All @@ -104,7 +103,7 @@ impl network::TransactionPool for TransactionPoolAdapter {
}).collect()
}

fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
Expand Down Expand Up @@ -299,11 +298,14 @@ impl Service {

let thread_client = client.clone();
let thread_network = network.clone();
let thread_txpool = transaction_pool.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*thread_client, &*thread_txpool, notification.hash);

Ok(())
});
if let Err(e) = core.run(events) {
Expand Down Expand Up @@ -336,6 +338,23 @@ impl Service {
}
}

fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}

/// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) {
let id = BlockId::Hash(hash);
match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
Ok(None) => warn!("Missing imported block {:?}", hash),
Err(e) => warn!("Failed to fetch block: {:?}", e),
}
}

impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
Expand Down
6 changes: 3 additions & 3 deletions substrate/client/src/block_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::vec::Vec;
use codec::{Joiner, Slicable};
use state_machine::{self, CodeExecutor};
use primitives::{Header, Block};
use primitives::block::{Id as BlockId, Transaction};
use primitives::block::{Id as BlockId, Extrinsic};
use {backend, error, Client};
use triehash::ordered_trie_root;

Expand All @@ -31,7 +31,7 @@ pub struct BlockBuilder<B, E> where
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{
header: Header,
transactions: Vec<Transaction>,
transactions: Vec<Extrinsic>,
executor: E,
state: B::State,
changes: state_machine::OverlayedChanges,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<B, E> BlockBuilder<B, E> where
/// Push a transaction onto the block's list of transactions. This will ensure the transaction
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Transaction) -> error::Result<()> {
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> {
let output = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction",
&vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
Expand Down
8 changes: 4 additions & 4 deletions substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
use codec::Slicable;
use keyring::Keyring;
use {primitives, genesis};
use primitives::block::Transaction as PrimitiveTransaction;
use primitives::block::Extrinsic as PrimitiveExtrinsic;
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use test_runtime::{UncheckedTransaction, Transaction};
use test_runtime;
Expand Down Expand Up @@ -559,12 +559,12 @@ mod tests {
}

trait Signable {
fn signed(self) -> PrimitiveTransaction;
fn signed(self) -> PrimitiveExtrinsic;
}
impl Signable for Transaction {
fn signed(self) -> PrimitiveTransaction {
fn signed(self) -> PrimitiveExtrinsic {
let signature = Keyring::from_raw_public(self.from.clone()).unwrap().sign(&self.encode());
PrimitiveTransaction::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
PrimitiveExtrinsic::decode(&mut UncheckedTransaction { signature, tx: self }.encode().as_ref()).unwrap()
}
}

Expand Down
8 changes: 4 additions & 4 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time;
use parking_lot::{RwLock, Mutex};
use futures::sync::oneshot;
use serde_json;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::{Hash, blake2_256};
use runtime_support::Hashable;
use network::{PeerId, NodeId};
Expand Down Expand Up @@ -82,7 +82,7 @@ struct Peer {
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_transactions: HashSet<TransactionHash>,
known_transactions: HashSet<ExtrinsicHash>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<HeaderHash>,
/// Request counter,
Expand Down Expand Up @@ -443,7 +443,7 @@ impl Protocol {
}

/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
Expand Down Expand Up @@ -513,7 +513,7 @@ impl Protocol {
}
}

pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
BTreeMap::new()
}

Expand Down
12 changes: 6 additions & 6 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::sync::{oneshot, mpsc};
use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_devp2p::{NetworkService};
use primitives::block::{TransactionHash, Header, HeaderHash};
use primitives::block::{ExtrinsicHash, Header, HeaderHash};
use primitives::Hash;
use core_io::{TimerToken};
use io::NetSyncIo;
Expand Down Expand Up @@ -66,15 +66,15 @@ pub trait SyncProvider: Send + Sync {
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats>;
}

/// Transaction pool interface
pub trait TransactionPool: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)>;
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)>;
/// Import a transction into the pool.
fn import(&self, transaction: &[u8]) -> Option<TransactionHash>;
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash>;
}

/// ConsensusService
Expand Down Expand Up @@ -161,7 +161,7 @@ impl Service {
}

/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec<u8>)]) {
pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec<u8>)]) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
});
Expand Down Expand Up @@ -223,7 +223,7 @@ impl SyncProvider for Service {
self.network.external_url()
}

fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
self.handler.protocol.transactions_stats()
}
}
Expand Down
8 changes: 4 additions & 4 deletions substrate/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, TransactionHash};
use primitives::block::{Id as BlockId, ExtrinsicHash};
use primitives;
use executor;
use io::SyncIo;
Expand Down Expand Up @@ -206,7 +206,7 @@ impl Peer {
nonce: nonce,
};
let signature = Keyring::from_raw_public(tx.from.clone()).unwrap().sign(&tx.encode());
let tx = primitives::block::Transaction::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
let tx = primitives::block::Extrinsic::decode(&mut test_runtime::UncheckedTransaction { signature, tx: tx }.encode().as_ref()).unwrap();
builder.push(tx).unwrap();
nonce = nonce + 1;
});
Expand All @@ -219,11 +219,11 @@ impl Peer {
struct EmptyTransactionPool;

impl TransactionPool for EmptyTransactionPool {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
Vec::new()
}

fn import(&self, _transaction: &[u8]) -> Option<TransactionHash> {
fn import(&self, _transaction: &[u8]) -> Option<ExtrinsicHash> {
None
}
}
Expand Down
27 changes: 6 additions & 21 deletions substrate/primitives/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,8 @@ pub type Number = u64;
/// Hash used to refer to a block hash.
pub type HeaderHash = Hash;

/// Hash used to refer to a transaction hash.
pub type TransactionHash = Hash;

/// Simple generic transaction type.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
pub struct Transaction(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);

impl Slicable for Transaction {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Vec::<u8>::decode(input).map(Transaction)
}

fn using_encoded<R, F: FnOnce(&[u8]) -> R>(&self, f: F) -> R {
self.0.using_encoded(f)
}
}
/// Hash used to refer to an extrinsic.
pub type ExtrinsicHash = Hash;

/// Simple generic extrinsic type.
#[derive(PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -126,11 +111,11 @@ pub mod generic {
}
}

/// The body of a block is just a bunch of transactions.
pub type Body = Vec<Transaction>;
/// The body of a block is just a bunch of extrinsics.
pub type Body = Vec<Extrinsic>;
/// The header and body of a concrete, but unspecialised, block. Used by substrate to represent a
/// block some fields of which the runtime alone knows how to interpret (e.g. the transactions).
pub type Block = generic::Block<Transaction>;
pub type Block = generic::Block<Extrinsic>;

/// A substrate chain block header.
// TODO: split out into light-client-specific fields and runtime-specific fields.
Expand Down Expand Up @@ -270,7 +255,7 @@ mod tests {
fn test_block_encoding() {
let block = Block {
header: Header::from_block_number(12),
transactions: vec![Transaction(vec!(4))],
transactions: vec![Extrinsic(vec!(4))],
};

assert_eq!(block.encode(), vec![
Expand Down
2 changes: 1 addition & 1 deletion substrate/runtime/primitives/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub trait Block {
}

impl Block for substrate_primitives::Block {
type Extrinsic = substrate_primitives::block::Transaction;
type Extrinsic = substrate_primitives::block::Extrinsic;
type Header = substrate_primitives::Header;
fn header(&self) -> &Self::Header {
&self.header
Expand Down