diff --git a/miner/src/client.rs b/miner/src/client.rs index e50175164b..669f12349b 100644 --- a/miner/src/client.rs +++ b/miner/src/client.rs @@ -1,6 +1,5 @@ use crate::{MinerConfig, Work}; use ckb_core::block::Block; -use ckb_util::RwLockUpgradableReadGuard; use crossbeam_channel::Sender; use futures::sync::{mpsc, oneshot}; use hyper::error::Error as HyperError; @@ -153,10 +152,9 @@ impl Client { let mut updated = false; match self.get_block_template().wait() { Ok(new) => { - let work = self.current_work.upgradable_read(); + let mut work = self.current_work.lock(); if work.as_ref().map_or(true, |old| old.work_id != new.work_id) { - let mut write_guard = RwLockUpgradableReadGuard::upgrade(work); - *write_guard = Some(new); + *work = Some(new); updated = true; let _ = self.new_work.send(()); } diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 5dc78b0ec0..2048dc71ed 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -7,8 +7,8 @@ pub use crate::block_assembler::{BlockAssembler, BlockAssemblerController}; pub use crate::client::Client; pub use crate::config::{BlockAssemblerConfig, MinerConfig}; pub use crate::miner::Miner; -use ckb_util::RwLock; +use ckb_util::Mutex; use jsonrpc_types::BlockTemplate; use std::sync::Arc; -pub type Work = Arc>>; +pub type Work = Arc>>; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index e16e304ac8..43640bd981 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -40,7 +40,7 @@ impl Miner { } fn mine(&self) -> Option<(String, Block)> { - if let Some(template) = self.current_work.read().clone() { + if let Some(template) = { self.current_work.lock().clone() } { let BlockTemplate { version, difficulty, diff --git a/src/cli/miner.rs b/src/cli/miner.rs index a2cacf4f70..82420bb66e 100644 --- a/src/cli/miner.rs +++ b/src/cli/miner.rs @@ -1,7 +1,7 @@ use crate::helper::{require_path_exists, to_absolute_path}; use ckb_chain_spec::ChainSpec; use ckb_miner::{Client, Miner, MinerConfig}; -use ckb_util::RwLock; +use ckb_util::Mutex; use clap::ArgMatches; use crossbeam_channel::unbounded; use dir::Directories; @@ -63,7 +63,7 @@ pub fn miner(matches: &ArgMatches) { let (new_work_tx, new_work_rx) = unbounded(); - let work = Arc::new(RwLock::new(None)); + let work = Arc::new(Mutex::new(None)); let client = Client::new(Arc::clone(&work), new_work_tx, config.miner); diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index f76cd0091a..82dc70459b 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -37,7 +37,7 @@ where .relayer .state .pending_compact_blocks - .write() + .lock() .remove(&hash) { let transactions: Vec = diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 32e268dafa..3d4fb1288d 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -5,9 +5,10 @@ use ckb_protocol::{CompactBlock as FbsCompactBlock, RelayMessage}; use ckb_shared::block_median_time_context::BlockMedianTimeContext; use ckb_shared::index::ChainIndex; use ckb_shared::shared::ChainProvider; -use ckb_util::RwLockUpgradableReadGuard; +use ckb_shared::shared::Shared; use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier}; use flatbuffers::FlatBufferBuilder; +use fnv::FnvHashMap; use numext_fixed_hash::H256; use std::sync::Arc; @@ -39,49 +40,52 @@ where pub fn execute(self) { let compact_block: CompactBlock = (*self.message).into(); let block_hash = compact_block.header.hash(); - let pending_compact_blocks = self.relayer.state.pending_compact_blocks.upgradable_read(); - if pending_compact_blocks.get(&block_hash).is_none() - && self.relayer.get_block(&block_hash).is_none() + let mut missing_indexes: Vec = Vec::new(); { - let resolver = - HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone()); - let header_verifier = HeaderVerifier::new( - CompactBlockMedianTimeView { - relayer: self.relayer, - }, - Arc::clone(&self.relayer.shared.consensus().pow_engine()), - ); - - if header_verifier.verify(&resolver).is_ok() { - self.relayer - .request_proposal_txs(self.nc, self.peer, &compact_block); + let mut pending_compact_blocks = self.relayer.state.pending_compact_blocks.lock(); + if pending_compact_blocks.get(&block_hash).is_none() + && self.relayer.get_block(&block_hash).is_none() + { + let resolver = + HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone()); + let header_verifier = HeaderVerifier::new( + CompactBlockMedianTimeView { + pending_compact_blocks: &pending_compact_blocks, + shared: &self.relayer.shared, + }, + Arc::clone(&self.relayer.shared.consensus().pow_engine()), + ); - match self.relayer.reconstruct_block(&compact_block, Vec::new()) { - Ok(block) => self - .relayer - .accept_block(self.nc, self.peer, &Arc::new(block)), - Err(missing_indexes) => { - { - let mut write_guard = - RwLockUpgradableReadGuard::upgrade(pending_compact_blocks); - write_guard.insert(block_hash.clone(), compact_block.clone()); + if header_verifier.verify(&resolver).is_ok() { + self.relayer + .request_proposal_txs(self.nc, self.peer, &compact_block); + match self.relayer.reconstruct_block(&compact_block, Vec::new()) { + Ok(block) => { + self.relayer + .accept_block(self.nc, self.peer, &Arc::new(block)) + } + Err(missing) => { + missing_indexes = missing; + pending_compact_blocks + .insert(block_hash.clone(), compact_block.clone()); } - - let fbb = &mut FlatBufferBuilder::new(); - let message = RelayMessage::build_get_block_transactions( - fbb, - &block_hash, - &missing_indexes - .into_iter() - .map(|i| i as u32) - .collect::>(), - ); - fbb.finish(message, None); - let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); } } } } + if !missing_indexes.is_empty() { + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_get_block_transactions( + fbb, + &block_hash, + &missing_indexes + .into_iter() + .map(|i| i as u32) + .collect::>(), + ); + fbb.finish(message, None); + let _ = self.nc.send(self.peer, fbb.finished_data().to_vec()); + } } } @@ -89,7 +93,8 @@ struct CompactBlockMedianTimeView<'a, CI> where CI: ChainIndex + 'static, { - relayer: &'a Relayer, + pending_compact_blocks: &'a FnvHashMap, + shared: &'a Shared, } impl<'a, CI> ::std::clone::Clone for CompactBlockMedianTimeView<'a, CI> @@ -98,7 +103,8 @@ where { fn clone(&self) -> Self { CompactBlockMedianTimeView { - relayer: self.relayer, + pending_compact_blocks: self.pending_compact_blocks, + shared: self.shared, } } } @@ -108,32 +114,26 @@ where CI: ChainIndex + 'static, { fn block_count(&self) -> u32 { - self.relayer.shared.consensus().median_time_block_count() as u32 + self.shared.consensus().median_time_block_count() as u32 } + fn timestamp(&self, hash: &H256) -> Option { - self.relayer - .state - .pending_compact_blocks - .read() + self.pending_compact_blocks .get(hash) .map(|cb| cb.header.timestamp()) .or_else(|| { - self.relayer - .shared + self.shared .block_header(hash) .map(|header| header.timestamp()) }) } + fn parent_hash(&self, hash: &H256) -> Option { - self.relayer - .state - .pending_compact_blocks - .read() + self.pending_compact_blocks .get(hash) .map(|cb| cb.header.parent_hash().to_owned()) .or_else(|| { - self.relayer - .shared + self.shared .block_header(hash) .map(|header| header.parent_hash().to_owned()) }) diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 64cae65318..c51e27653b 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -24,7 +24,7 @@ use ckb_pool::txs_pool::TransactionPoolController; use ckb_protocol::{short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload}; use ckb_shared::index::ChainIndex; use ckb_shared::shared::{ChainProvider, Shared}; -use ckb_util::{Mutex, RwLock}; +use ckb_util::Mutex; use flatbuffers::{get_root, FlatBufferBuilder}; use fnv::{FnvHashMap, FnvHashSet}; use log::{debug, info}; @@ -289,7 +289,7 @@ where #[derive(Default)] pub struct RelayState { - pub pending_compact_blocks: RwLock>, + pub pending_compact_blocks: Mutex>, pub inflight_proposals: Mutex>, pub pending_proposals_request: Mutex>>, } diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index cbefd4ab27..40e1f8eaf5 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -8,7 +8,7 @@ use ckb_core::header::Header; use ckb_network::PeerIndex; use ckb_shared::index::ChainIndex; use ckb_shared::shared::ChainProvider; -use ckb_util::{try_option, RwLockUpgradableReadGuard}; +use ckb_util::try_option; use faketime::unix_time_as_millis; use log::debug; use numext_fixed_hash::H256; @@ -75,11 +75,7 @@ where } pub fn last_common_header(&self, best: &HeaderView) -> Option
{ - let guard = self - .synchronizer - .peers - .last_common_headers - .upgradable_read(); + let mut guard = self.synchronizer.peers.last_common_headers.write(); let last_common_header = try_option!(guard.get(&self.peer).cloned().or_else(|| { if best.number() < self.tip_header.number() { @@ -95,8 +91,7 @@ where .last_common_ancestor(&last_common_header, &best.inner())?; if fixed_last_common_header != last_common_header { - let mut write_guard = RwLockUpgradableReadGuard::upgrade(guard); - write_guard + guard .entry(self.peer) .and_modify(|last_common_header| { *last_common_header = fixed_last_common_header.clone() diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index be81d87df6..6c588cec5b 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -30,7 +30,7 @@ use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, Severity, T use ckb_protocol::{SyncMessage, SyncPayload}; use ckb_shared::index::ChainIndex; use ckb_shared::shared::{ChainProvider, Shared}; -use ckb_util::{try_option, RwLock, RwLockUpgradableReadGuard}; +use ckb_util::{try_option, Mutex, RwLock}; use faketime::unix_time_as_millis; use flatbuffers::{get_root, FlatBufferBuilder}; use log::{debug, info, warn}; @@ -66,7 +66,7 @@ bitflags! { } } -pub type BlockStatusMap = Arc>>; +pub type BlockStatusMap = Arc>>; pub type BlockHeaderMap = Arc>>; pub struct Synchronizer { @@ -128,7 +128,7 @@ impl Synchronizer { peers: Arc::new(Peers::default()), orphan_block_pool: Arc::new(OrphanBlockPool::with_capacity(orphan_block_limit)), best_known_header: Arc::new(RwLock::new(best_known_header)), - status_map: Arc::new(RwLock::new(HashMap::new())), + status_map: Arc::new(Mutex::new(HashMap::new())), header_map: Arc::new(RwLock::new(HashMap::new())), n_sync: Arc::new(AtomicUsize::new(0)), outbound_peers_with_protect: Arc::new(AtomicUsize::new(0)), @@ -167,13 +167,12 @@ impl Synchronizer { } pub fn get_block_status(&self, hash: &H256) -> BlockStatus { - let guard = self.status_map.upgradable_read(); + let mut guard = self.status_map.lock(); match guard.get(hash).cloned() { Some(s) => s, None => { if self.shared.block_header(hash).is_some() { - let mut write_guard = RwLockUpgradableReadGuard::upgrade(guard); - write_guard.insert(hash.clone(), BlockStatus::BLOCK_HAVE_MASK); + guard.insert(hash.clone(), BlockStatus::BLOCK_HAVE_MASK); BlockStatus::BLOCK_HAVE_MASK } else { BlockStatus::UNKNOWN @@ -187,7 +186,7 @@ impl Synchronizer { } pub fn insert_block_status(&self, hash: H256, status: BlockStatus) { - self.status_map.write().insert(hash, status); + self.status_map.lock().insert(hash, status); } pub fn best_known_header(&self) -> HeaderView { @@ -209,7 +208,7 @@ impl Synchronizer { pub fn mark_block_stored(&self, hash: H256) { self.status_map - .write() + .lock() .entry(hash) .and_modify(|status| *status = BlockStatus::BLOCK_HAVE_MASK) .or_insert_with(|| BlockStatus::BLOCK_HAVE_MASK); diff --git a/util/src/lib.rs b/util/src/lib.rs index 2c2f0cac59..fdaa9c30c3 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -1,9 +1,6 @@ mod unstable; -pub use parking_lot::{ - Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, - RwLockWriteGuard, -}; +pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; /// Helper macro for reducing boilerplate code for matching `Option` together /// with early return.