diff --git a/Cargo.lock b/Cargo.lock index b7a98679c90..8e79977357e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,7 +1547,6 @@ dependencies = [ "ckb-stop-handler", "ckb-store", "ckb-systemtime", - "ckb-traits", "ckb-types", "ckb-util", "ckb-verification", diff --git a/tx-pool/Cargo.toml b/tx-pool/Cargo.toml index e65e0545911..9ee7279a611 100644 --- a/tx-pool/Cargo.toml +++ b/tx-pool/Cargo.toml @@ -33,7 +33,6 @@ ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.114.0-pre" } ckb-app-config = { path = "../util/app-config", version = "= 0.114.0-pre" } ckb-network = { path = "../network", version = "= 0.114.0-pre" } ckb-channel = { path = "../util/channel", version = "= 0.114.0-pre" } -ckb-traits = { path = "../traits", version = "= 0.114.0-pre" } ckb-db = { path = "../db", version = "= 0.114.0-pre" } ckb-script = { path = "../script", version = "= 0.114.0-pre" } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index eeff0c8221e..2a4064fb1d5 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -2,6 +2,7 @@ use crate::callback::Callbacks; use crate::component::entry::TxEntry; use crate::component::orphan::Entry as OrphanEntry; use crate::component::pool_map::Status; +use crate::component::verify_queue::Entry; use crate::error::Reject; use crate::pool::TxPool; use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult}; @@ -16,6 +17,7 @@ use ckb_jsonrpc_types::BlockTemplate; use ckb_logger::Level::Trace; use ckb_logger::{debug, error, info, log_enabled_target, trace_target}; use ckb_network::PeerIndex; +use ckb_script::{ChunkCommand, VerifyResult}; use ckb_snapshot::Snapshot; use ckb_store::data_loader_wrapper::AsDataLoader; use ckb_store::ChainStore; @@ -30,10 +32,12 @@ use ckb_verification::{ ContextualTransactionVerifier, DaoScriptSizeVerifier, TimeRelativeTransactionVerifier, TxVerifyEnv, }; +use ckb_verification::{ContextualWithoutScriptTransactionVerifier, ScriptVerifier}; use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; +use tokio::sync::watch; use tokio::task::block_in_place; const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks @@ -718,6 +722,107 @@ impl TxPoolService { Some((Ok(ProcessResult::Completed(completed)), submit_snapshot)) } + pub(crate) async fn run_verify_tx( + &mut self, + entry: Entry, + command_rx: &mut watch::Receiver, + ) -> Option<(Result, Arc)> { + let Entry { tx, remote } = entry; + let tx_hash = tx.hash(); + + let (ret, snapshot) = self.pre_check(&tx).await; + let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot); + let cached = self.fetch_tx_verify_cache(&tx_hash).await; + let tip_header = snapshot.tip_header(); + let consensus = snapshot.cloned_consensus(); + + let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header)); + + if let Some(ref completed) = cached { + let ret = TimeRelativeTransactionVerifier::new( + Arc::clone(&rtx), + Arc::clone(&consensus), + snapshot.as_data_loader(), + Arc::clone(&tx_env), + ) + .verify() + .map(|_| *completed) + .map_err(Reject::Verification); + let completed = try_or_return_with_snapshot!(ret, snapshot); + + let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); + let (ret, snapshot) = self.submit_entry(tip_hash, entry, status).await; + try_or_return_with_snapshot!(ret, snapshot); + return Some((Ok(completed), snapshot)); + } + + let cloned_snapshot = Arc::clone(&snapshot); + let data_loader = cloned_snapshot.as_data_loader(); + let ret = ContextualWithoutScriptTransactionVerifier::new( + Arc::clone(&rtx), + Arc::clone(&consensus), + data_loader.clone(), + Arc::clone(&tx_env), + ) + .verify() + .and_then(|result| { + DaoScriptSizeVerifier::new( + Arc::clone(&rtx), + Arc::clone(&consensus), + data_loader.clone(), + ) + .verify()?; + Ok(result) + }) + .map_err(Reject::Verification); + let fee = try_or_return_with_snapshot!(ret, snapshot); + + let max_cycles = if let Some((declared_cycle, _peer)) = remote { + declared_cycle + } else { + consensus.max_block_cycles() + }; + + let ret = { + let script_verifier = + ScriptVerifier::new(Arc::clone(&rtx), data_loader, consensus, tx_env); + script_verifier + .resumable_verify_with_signal(max_cycles, command_rx) + .await + .map_err(Reject::Verification) + }; + + let state = try_or_return_with_snapshot!(ret, snapshot); + let completed: Completed = match state { + VerifyResult::Completed(cycles) => Completed { cycles, fee }, + _ => { + panic!("not expected"); + } + }; + if let Some((declared_cycle, _peer)) = remote { + if declared_cycle != completed.cycles { + return Some(( + Err(Reject::DeclaredWrongCycles( + declared_cycle, + completed.cycles, + )), + snapshot, + )); + } + } + // verify passed + let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); + let (ret, snapshot) = self.submit_entry(tip_hash, entry, status).await; + try_or_return_with_snapshot!(ret, snapshot); + + self.notify_block_assembler(status).await; + + let mut guard = self.txs_verify_cache.write().await; + guard.put(tx_hash, completed); + + Some((Ok(completed), snapshot)) + } + pub(crate) async fn enqueue_suspended_tx( &self, tx: TransactionView, diff --git a/tx-pool/src/verify_mgr.rs b/tx-pool/src/verify_mgr.rs index 2f53419440a..c7addcfc51e 100644 --- a/tx-pool/src/verify_mgr.rs +++ b/tx-pool/src/verify_mgr.rs @@ -1,20 +1,9 @@ extern crate num_cpus; -use crate::component::entry::TxEntry; -use crate::component::verify_queue::{Entry, VerifyQueue}; -use crate::try_or_return_with_snapshot; -use crate::{error::Reject, service::TxPoolService}; -use ckb_chain_spec::consensus::Consensus; +use crate::component::verify_queue::VerifyQueue; +use crate::service::TxPoolService; use ckb_logger::info; -use ckb_script::{ChunkCommand, VerifyResult}; -use ckb_snapshot::Snapshot; +use ckb_script::ChunkCommand; use ckb_stop_handler::CancellationToken; -use ckb_store::data_loader_wrapper::AsDataLoader; -use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider}; -use ckb_types::core::{cell::ResolvedTransaction, Cycle}; -use ckb_verification::{ - cache::Completed, ContextualWithoutScriptTransactionVerifier, DaoScriptSizeVerifier, - ScriptVerifier, TimeRelativeTransactionVerifier, TxVerifyEnv, -}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{watch, RwLock}; @@ -98,7 +87,8 @@ impl Worker { }; let (res, snapshot) = self - .run_verify_tx(entry.clone()) + .service + .run_verify_tx(entry.clone(), &mut self.command_rx) .await .expect("run_verify_tx failed"); @@ -115,127 +105,6 @@ impl Worker { } } } - - async fn run_verify_tx( - &mut self, - entry: Entry, - ) -> Option<(Result, Arc)> { - let Entry { tx, remote } = entry; - let tx_hash = tx.hash(); - - let (ret, snapshot) = self.service.pre_check(&tx).await; - let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot); - - let cached = self.service.fetch_tx_verify_cache(&tx_hash).await; - - let tip_header = snapshot.tip_header(); - let consensus = snapshot.cloned_consensus(); - - let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header)); - - if let Some(ref completed) = cached { - let ret = TimeRelativeTransactionVerifier::new( - Arc::clone(&rtx), - Arc::clone(&consensus), - snapshot.as_data_loader(), - Arc::clone(&tx_env), - ) - .verify() - .map(|_| *completed) - .map_err(Reject::Verification); - let completed = try_or_return_with_snapshot!(ret, snapshot); - - let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, snapshot) = self.service.submit_entry(tip_hash, entry, status).await; - try_or_return_with_snapshot!(ret, snapshot); - return Some((Ok(completed), snapshot)); - } - - let cloned_snapshot = Arc::clone(&snapshot); - let data_loader = cloned_snapshot.as_data_loader(); - let ret = ContextualWithoutScriptTransactionVerifier::new( - Arc::clone(&rtx), - Arc::clone(&consensus), - data_loader.clone(), - Arc::clone(&tx_env), - ) - .verify() - .and_then(|result| { - DaoScriptSizeVerifier::new( - Arc::clone(&rtx), - Arc::clone(&consensus), - data_loader.clone(), - ) - .verify()?; - Ok(result) - }) - .map_err(Reject::Verification); - let fee = try_or_return_with_snapshot!(ret, snapshot); - - let max_cycles = if let Some((declared_cycle, _peer)) = remote { - declared_cycle - } else { - consensus.max_block_cycles() - }; - - let ret = self - .loop_resume( - Arc::clone(&rtx), - data_loader, - max_cycles, - Arc::clone(&consensus), - Arc::clone(&tx_env), - ) - .await; - let state = try_or_return_with_snapshot!(ret, snapshot); - - let completed: Completed = match state { - VerifyResult::Completed(cycles) => Completed { cycles, fee }, - _ => { - panic!("not expected"); - } - }; - if let Some((declared_cycle, _peer)) = remote { - if declared_cycle != completed.cycles { - return Some(( - Err(Reject::DeclaredWrongCycles( - declared_cycle, - completed.cycles, - )), - snapshot, - )); - } - } - // verify passed - let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size); - let (ret, snapshot) = self.service.submit_entry(tip_hash, entry, status).await; - try_or_return_with_snapshot!(ret, snapshot); - - self.service.notify_block_assembler(status).await; - - let mut guard = self.service.txs_verify_cache.write().await; - guard.put(tx_hash, completed); - - Some((Ok(completed), snapshot)) - } - - async fn loop_resume< - DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static, - >( - &mut self, - rtx: Arc, - data_loader: DL, - max_cycles: Cycle, - consensus: Arc, - tx_env: Arc, - ) -> Result { - let script_verifier = ScriptVerifier::new(rtx, data_loader, consensus, tx_env); - let res = script_verifier - .resumable_verify_with_signal(max_cycles, &mut self.command_rx) - .await - .map_err(Reject::Verification)?; - Ok(res) - } } pub(crate) struct VerifyMgr {