Skip to content

Commit

Permalink
refactor verify worker clean up resume_loop
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 17, 2024
1 parent 1607a98 commit 9e339fa
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 138 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion tx-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ ckb-stop-handler = { path = "../util/stop-handler", version = "= 0.114.0-pre" }
ckb-app-config = { path = "../util/app-config", version = "= 0.114.0-pre" }
ckb-network = { path = "../network", version = "= 0.114.0-pre" }
ckb-channel = { path = "../util/channel", version = "= 0.114.0-pre" }
ckb-traits = { path = "../traits", version = "= 0.114.0-pre" }
ckb-db = { path = "../db", version = "= 0.114.0-pre" }
ckb-script = { path = "../script", version = "= 0.114.0-pre" }

Expand Down
105 changes: 105 additions & 0 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::callback::Callbacks;
use crate::component::entry::TxEntry;
use crate::component::orphan::Entry as OrphanEntry;
use crate::component::pool_map::Status;
use crate::component::verify_queue::Entry;
use crate::error::Reject;
use crate::pool::TxPool;
use crate::service::{BlockAssemblerMessage, TxPoolService, TxVerificationResult};
Expand All @@ -16,6 +17,7 @@ use ckb_jsonrpc_types::BlockTemplate;
use ckb_logger::Level::Trace;
use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
use ckb_network::PeerIndex;
use ckb_script::{ChunkCommand, VerifyResult};
use ckb_snapshot::Snapshot;
use ckb_store::data_loader_wrapper::AsDataLoader;
use ckb_store::ChainStore;
Expand All @@ -30,10 +32,12 @@ use ckb_verification::{
ContextualTransactionVerifier, DaoScriptSizeVerifier, TimeRelativeTransactionVerifier,
TxVerifyEnv,
};
use ckb_verification::{ContextualWithoutScriptTransactionVerifier, ScriptVerifier};
use std::collections::HashSet;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task::block_in_place;

const DELAY_LIMIT: usize = 1_500 * 21; // 1_500 per block, 21 blocks
Expand Down Expand Up @@ -718,6 +722,107 @@ impl TxPoolService {
Some((Ok(ProcessResult::Completed(completed)), submit_snapshot))
}

pub(crate) async fn run_verify_tx(
&mut self,
entry: Entry,
command_rx: &mut watch::Receiver<ChunkCommand>,
) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
let Entry { tx, remote } = entry;
let tx_hash = tx.hash();

let (ret, snapshot) = self.pre_check(&tx).await;
let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);
let cached = self.fetch_tx_verify_cache(&tx_hash).await;
let tip_header = snapshot.tip_header();
let consensus = snapshot.cloned_consensus();

let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header));

if let Some(ref completed) = cached {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
snapshot.as_data_loader(),
Arc::clone(&tx_env),
)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification);
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, snapshot) = self.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);
return Some((Ok(completed), snapshot));
}

let cloned_snapshot = Arc::clone(&snapshot);
let data_loader = cloned_snapshot.as_data_loader();
let ret = ContextualWithoutScriptTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
data_loader.clone(),
Arc::clone(&tx_env),
)
.verify()
.and_then(|result| {
DaoScriptSizeVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
data_loader.clone(),
)
.verify()?;
Ok(result)
})
.map_err(Reject::Verification);
let fee = try_or_return_with_snapshot!(ret, snapshot);

let max_cycles = if let Some((declared_cycle, _peer)) = remote {
declared_cycle
} else {
consensus.max_block_cycles()
};

let ret = {
let script_verifier =
ScriptVerifier::new(Arc::clone(&rtx), data_loader, consensus, tx_env);
script_verifier
.resumable_verify_with_signal(max_cycles, command_rx)
.await
.map_err(Reject::Verification)
};

let state = try_or_return_with_snapshot!(ret, snapshot);
let completed: Completed = match state {
VerifyResult::Completed(cycles) => Completed { cycles, fee },
_ => {
panic!("not expected");
}
};
if let Some((declared_cycle, _peer)) = remote {
if declared_cycle != completed.cycles {
return Some((
Err(Reject::DeclaredWrongCycles(
declared_cycle,
completed.cycles,
)),
snapshot,
));
}
}
// verify passed
let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, snapshot) = self.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);

self.notify_block_assembler(status).await;

let mut guard = self.txs_verify_cache.write().await;
guard.put(tx_hash, completed);

Some((Ok(completed), snapshot))
}

pub(crate) async fn enqueue_suspended_tx(
&self,
tx: TransactionView,
Expand Down
141 changes: 5 additions & 136 deletions tx-pool/src/verify_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
extern crate num_cpus;
use crate::component::entry::TxEntry;
use crate::component::verify_queue::{Entry, VerifyQueue};
use crate::try_or_return_with_snapshot;
use crate::{error::Reject, service::TxPoolService};
use ckb_chain_spec::consensus::Consensus;
use crate::component::verify_queue::VerifyQueue;
use crate::service::TxPoolService;
use ckb_logger::info;
use ckb_script::{ChunkCommand, VerifyResult};
use ckb_snapshot::Snapshot;
use ckb_script::ChunkCommand;
use ckb_stop_handler::CancellationToken;
use ckb_store::data_loader_wrapper::AsDataLoader;
use ckb_traits::{CellDataProvider, ExtensionProvider, HeaderProvider};
use ckb_types::core::{cell::ResolvedTransaction, Cycle};
use ckb_verification::{
cache::Completed, ContextualWithoutScriptTransactionVerifier, DaoScriptSizeVerifier,
ScriptVerifier, TimeRelativeTransactionVerifier, TxVerifyEnv,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{watch, RwLock};
Expand Down Expand Up @@ -98,7 +87,8 @@ impl Worker {
};

let (res, snapshot) = self
.run_verify_tx(entry.clone())
.service
.run_verify_tx(entry.clone(), &mut self.command_rx)
.await
.expect("run_verify_tx failed");

Expand All @@ -115,127 +105,6 @@ impl Worker {
}
}
}

async fn run_verify_tx(
&mut self,
entry: Entry,
) -> Option<(Result<Completed, Reject>, Arc<Snapshot>)> {
let Entry { tx, remote } = entry;
let tx_hash = tx.hash();

let (ret, snapshot) = self.service.pre_check(&tx).await;
let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);

let cached = self.service.fetch_tx_verify_cache(&tx_hash).await;

let tip_header = snapshot.tip_header();
let consensus = snapshot.cloned_consensus();

let tx_env = Arc::new(TxVerifyEnv::new_submit(tip_header));

if let Some(ref completed) = cached {
let ret = TimeRelativeTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
snapshot.as_data_loader(),
Arc::clone(&tx_env),
)
.verify()
.map(|_| *completed)
.map_err(Reject::Verification);
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, snapshot) = self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);
return Some((Ok(completed), snapshot));
}

let cloned_snapshot = Arc::clone(&snapshot);
let data_loader = cloned_snapshot.as_data_loader();
let ret = ContextualWithoutScriptTransactionVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
data_loader.clone(),
Arc::clone(&tx_env),
)
.verify()
.and_then(|result| {
DaoScriptSizeVerifier::new(
Arc::clone(&rtx),
Arc::clone(&consensus),
data_loader.clone(),
)
.verify()?;
Ok(result)
})
.map_err(Reject::Verification);
let fee = try_or_return_with_snapshot!(ret, snapshot);

let max_cycles = if let Some((declared_cycle, _peer)) = remote {
declared_cycle
} else {
consensus.max_block_cycles()
};

let ret = self
.loop_resume(
Arc::clone(&rtx),
data_loader,
max_cycles,
Arc::clone(&consensus),
Arc::clone(&tx_env),
)
.await;
let state = try_or_return_with_snapshot!(ret, snapshot);

let completed: Completed = match state {
VerifyResult::Completed(cycles) => Completed { cycles, fee },
_ => {
panic!("not expected");
}
};
if let Some((declared_cycle, _peer)) = remote {
if declared_cycle != completed.cycles {
return Some((
Err(Reject::DeclaredWrongCycles(
declared_cycle,
completed.cycles,
)),
snapshot,
));
}
}
// verify passed
let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, snapshot) = self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);

self.service.notify_block_assembler(status).await;

let mut guard = self.service.txs_verify_cache.write().await;
guard.put(tx_hash, completed);

Some((Ok(completed), snapshot))
}

async fn loop_resume<
DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + Clone + 'static,
>(
&mut self,
rtx: Arc<ResolvedTransaction>,
data_loader: DL,
max_cycles: Cycle,
consensus: Arc<Consensus>,
tx_env: Arc<TxVerifyEnv>,
) -> Result<VerifyResult, Reject> {
let script_verifier = ScriptVerifier::new(rtx, data_loader, consensus, tx_env);
let res = script_verifier
.resumable_verify_with_signal(max_cycles, &mut self.command_rx)
.await
.map_err(Reject::Verification)?;
Ok(res)
}
}

pub(crate) struct VerifyMgr {
Expand Down

0 comments on commit 9e339fa

Please sign in to comment.