Skip to content

Commit

Permalink
Merge pull request #284 from jjyr/allow-disable-cache
Browse files Browse the repository at this point in the history
feat: allow disable txs_verify_cache
  • Loading branch information
zhangsoledad authored Feb 19, 2019
2 parents a64d940 + 6ffc323 commit 3b7a161
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 150 deletions.
55 changes: 28 additions & 27 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ckb_shared::error::SharedError;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::{ChainProvider, ChainState, Shared};
use ckb_shared::txo_set::TxoSetDiff;
use ckb_verification::{verify_transactions, BlockVerifier, Verifier};
use ckb_verification::{BlockVerifier, TransactionsVerifier, Verifier};
use crossbeam_channel::{self, select, Receiver, Sender};
use faketime::unix_time_as_millis;
use fnv::{FnvHashMap, FnvHashSet};
Expand Down Expand Up @@ -455,37 +455,38 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
push_new(b, &mut new_inputs, &mut new_outputs);
}

let max_cycles = self.shared.consensus().max_block_cycles();
let mut txs_cycles = self.shared.txs_cycles().write();
let mut txs_cache = self.shared.txs_verify_cache().write();
// The verify function
let mut verify =
|b, new_inputs: &FnvHashSet<OutPoint>, new_outputs: &FnvHashMap<H256, usize>| -> bool {
verify_transactions(b, max_cycles, &mut *txs_cycles, |op| {
self.shared.cell_at(op, |op| {
if new_inputs.contains(op) {
Some(true)
} else if let Some(x) = new_outputs.get(&op.hash) {
if op.index < (*x as u32) {
Some(false)
} else {
Some(true)
}
} else if old_outputs.contains(&op.hash) {
None
} else {
chain_state
.is_spent(op)
.map(|x| x && !old_inputs.contains(op))
}
})
})
.is_ok()
};
let txs_verifier = TransactionsVerifier::new(self.shared.consensus().max_block_cycles());

let mut found_error = false;
// verify transaction
for (ext, b) in fork.open_exts.iter_mut().zip(fork.new_blocks.iter()).rev() {
if !found_error || skip_verify || verify(b, &new_inputs, &new_outputs) {
let cell_resolver = |op: &OutPoint| {
self.shared.cell_at(op, |op| {
if new_inputs.contains(op) {
Some(true)
} else if let Some(x) = new_outputs.get(&op.hash) {
if op.index < (*x as u32) {
Some(false)
} else {
Some(true)
}
} else if old_outputs.contains(&op.hash) {
None
} else {
chain_state
.is_spent(op)
.map(|x| x && !old_inputs.contains(op))
}
})
};
if !found_error
|| skip_verify
|| txs_verifier
.verify(&mut *txs_cache, b, cell_resolver)
.is_ok()
{
push_new(b, &mut new_inputs, &mut new_outputs);
ext.valid = Some(true);
} else {
Expand Down
2 changes: 1 addition & 1 deletion miner/src/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<CI: ChainIndex + 'static> BlockAssembler<CI> {
TransactionTemplate {
hash: tx.transaction.hash(),
required,
cycles: Some(tx.cycles),
cycles: tx.cycles,
depends,
data: (&tx.transaction).into(),
}
Expand Down
2 changes: 1 addition & 1 deletion nodes_template/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@
"block_assembler": {
"type_hash": "0x0da2fe99fe549e082d4ed483c2e968a89ea8d11aabf5d79e5cbf06522de6e674"
},
"cycles_cache_size": 100000
"txs_verify_cache_size": 100000
}
8 changes: 2 additions & 6 deletions pool/src/tests/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use ckb_core::extras::BlockExt;
use ckb_core::header::HeaderBuilder;
use ckb_core::script::Script;
use ckb_core::transaction::*;
use ckb_core::Cycle;
use ckb_db::memorydb::MemoryKeyValueDB;
use ckb_notify::{ForkBlocks, MsgSwitchFork, NotifyService};
use ckb_shared::index::ChainIndex;
Expand Down Expand Up @@ -208,10 +207,7 @@ pub fn test_cellbase_spent() {
.output(CellOutput::new(50000, Vec::new(), H256::default(), None))
.build();

match pool
.service
.add_to_pool(PoolEntry::new(valid_tx, 0, Cycle::default()))
{
match pool.service.add_to_pool(PoolEntry::new(valid_tx, 0, None)) {
Ok(_) => {}
Err(err) => panic!(
"Unexpected error while adding a valid transaction: {:?}",
Expand Down Expand Up @@ -767,7 +763,7 @@ fn test_transaction_with_capacity(
.outputs(outputs)
.build();

PoolEntry::new(tx, 0, Cycle::default())
PoolEntry::new(tx, 0, None)
}

// Since the main point here is to test pool functionality, not scripting
Expand Down
83 changes: 46 additions & 37 deletions pool/src/txs_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,25 +361,17 @@ where
}

//readd txs
let mut txs_cycles = self.shared.txs_cycles().write();
let mut txs_cache = self.shared.txs_verify_cache().write();
for tx in b.commit_transactions().iter().rev() {
if tx.is_cellbase() {
continue;
}
let tx_hash = tx.hash();
let cycles = match txs_cycles.get(&tx_hash).cloned() {
Some(cycles) => cycles,
None => {
let rtx = self.resolve_transaction(&tx);
// TODO: remove unwrap, remove transactions that depend on it.
let cycles = TransactionVerifier::new(&rtx)
.verify(self.shared.consensus().max_block_cycles())
.map_err(PoolError::InvalidTx)
.unwrap();
txs_cycles.insert(tx_hash, cycles);
cycles
}
};
let rtx = self.resolve_transaction(&tx);
// TODO: remove unwrap, remove transactions that depend on it.
let cycles = self
.verify_transaction(&rtx, &mut txs_cache)
.map_err(PoolError::InvalidTx)
.unwrap();
self.pool.readd_transaction(tx, cycles);
}
}
Expand Down Expand Up @@ -448,7 +440,7 @@ where
&mut self,
tx: Transaction,
) -> Result<InsertionResult, PoolError> {
let tx = PoolEntry::new(tx, 0, Cycle::default());
let tx = PoolEntry::new(tx, 0, None);
match { self.proposed.insert(tx) } {
TxStage::Mineable(x) => self.add_to_pool(x),
TxStage::Unknown(x) => {
Expand All @@ -464,7 +456,7 @@ where
tx: Transaction,
) -> Result<InsertionResult, PoolError> {
let tx_hash = tx.hash();
let tx = PoolEntry::new(tx, 0, Cycle::default());
let tx = PoolEntry::new(tx, 0, None);
match { self.proposed.insert(tx) } {
TxStage::Mineable(x) => self.add_to_pool(x),
TxStage::Unknown(x) => {
Expand Down Expand Up @@ -510,15 +502,24 @@ where
self.pool.get_mineable_transactions(self.pool.size())
}

fn verify_transaction(&self, rtx: &ResolvedTransaction) -> Result<Cycle, TransactionError> {
let mut txs_cycles = self.shared.txs_cycles().write();
fn verify_transaction(
&self,
rtx: &ResolvedTransaction,
txs_cache: &mut Option<LruCache<H256, Cycle>>,
) -> Result<Cycle, TransactionError> {
let tx_hash = rtx.transaction.hash();
match txs_cycles.get(&tx_hash).cloned() {
match txs_cache
.as_ref()
.and_then(|cache| cache.get(&tx_hash).cloned())
{
Some(cycles) => Ok(cycles),
None => {
let cycles = TransactionVerifier::new(&rtx)
.verify(self.shared.consensus().max_block_cycles())?;
txs_cycles.insert(tx_hash, cycles);
// write cache
txs_cache
.as_mut()
.and_then(|cache| cache.insert(tx_hash, cycles));
Ok(cycles)
}
}
Expand Down Expand Up @@ -570,12 +571,14 @@ where
}
}

if unknowns.is_empty() && pe.cycles != Cycle::default() {
if unknowns.is_empty() && pe.cycles.is_none() {
// TODO: Parallel

let mut txs_cache = self.shared.txs_verify_cache().write();
let cycles = self
.verify_transaction(&rtx)
.verify_transaction(&rtx, &mut txs_cache)
.map_err(PoolError::InvalidTx)?;
pe.cycles = cycles;
pe.cycles = Some(cycles);
}
}

Expand Down Expand Up @@ -604,31 +607,37 @@ where
pub(crate) fn reconcile_orphan(&mut self, tx: &Transaction) {
let pes = self.orphan.reconcile_transaction(tx);

let mut txs_cache = self.shared.txs_verify_cache().write();
for mut pe in pes {
let rs = if pe.cycles == Cycle::default() {
let rtx = self.resolve_transaction(&pe.transaction);
self.verify_transaction(&rtx)
} else {
Ok(pe.cycles)
let verify_result = match pe.cycles {
Some(cycles) => Ok(cycles),
None => {
let rtx = self.resolve_transaction(&pe.transaction);
self.verify_transaction(&rtx, &mut txs_cache)
}
};

if self.config.trace_enable() {
self.trace.add_commit(
&tx.hash(),
format!(
"removed from orphan, prepare add to commit, verify result {:?}",
rs
verify_result
),
);
}

if let Ok(cycles) = rs {
pe.cycles = cycles;
self.last_txs_updated_at
.store(unix_time_as_millis() as usize, Ordering::SeqCst);
self.pool.add_transaction(pe);
} else if rs == Err(TransactionError::DoubleSpent) {
self.cache.insert(pe.transaction.proposal_short_id(), pe);
match verify_result {
Ok(cycles) => {
pe.cycles = Some(cycles);
self.last_txs_updated_at
.store(unix_time_as_millis() as usize, Ordering::SeqCst);
self.pool.add_transaction(pe);
}
Err(TransactionError::DoubleSpent) => {
self.cache.insert(pe.transaction.proposal_short_id(), pe);
}
_ => (),
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pool/src/txs_pool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ pub struct PoolEntry {
/// Bytes size
pub bytes_size: usize,
/// Cycles
pub cycles: Cycle,
pub cycles: Option<Cycle>,
}

impl PoolEntry {
/// Create new transaction pool entry
pub fn new(tx: Transaction, count: usize, cycles: Cycle) -> PoolEntry {
pub fn new(tx: Transaction, count: usize, cycles: Option<Cycle>) -> PoolEntry {
PoolEntry {
bytes_size: tx.occupied_capacity(),
transaction: tx,
Expand Down Expand Up @@ -353,7 +353,7 @@ impl Pool {

self.vertices.insert_front(
tx.proposal_short_id(),
PoolEntry::new(tx.clone(), 0, cycles),
PoolEntry::new(tx.clone(), 0, Some(cycles)),
);

for i in inputs {
Expand Down Expand Up @@ -863,7 +863,7 @@ mod tests {
)
.build();

PoolEntry::new(tx, 0, Cycle::default())
PoolEntry::new(tx, 0, None)
}

#[test]
Expand Down
1 change: 0 additions & 1 deletion shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ pub const COLUMN_EXT: Col = Some(7);
pub const COLUMN_BLOCK_TRANSACTION_ADDRESSES: Col = Some(9);
pub const COLUMN_BLOCK_TRANSACTION_IDS: Col = Some(10);
pub const COLUMN_BLOCK_PROPOSAL_IDS: Col = Some(11);
pub const COLUMN_TRANSACTION_CYCLES: Col = Some(12);
Loading

0 comments on commit 3b7a161

Please sign in to comment.