Skip to content

Commit

Permalink
fix: remove use of upgradable reads (#310)
Browse files Browse the repository at this point in the history
Upgradable reads block plain reads, this behavior does not match the docs
  • Loading branch information
zhangsoledad authored and doitian committed Mar 6, 2019
1 parent 9d2efa8 commit f9e7f97
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 84 deletions.
6 changes: 2 additions & 4 deletions miner/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{MinerConfig, Work};
use ckb_core::block::Block;
use ckb_util::RwLockUpgradableReadGuard;
use crossbeam_channel::Sender;
use futures::sync::{mpsc, oneshot};
use hyper::error::Error as HyperError;
Expand Down Expand Up @@ -153,10 +152,9 @@ impl Client {
let mut updated = false;
match self.get_block_template().wait() {
Ok(new) => {
let work = self.current_work.upgradable_read();
let mut work = self.current_work.lock();
if work.as_ref().map_or(true, |old| old.work_id != new.work_id) {
let mut write_guard = RwLockUpgradableReadGuard::upgrade(work);
*write_guard = Some(new);
*work = Some(new);
updated = true;
let _ = self.new_work.send(());
}
Expand Down
4 changes: 2 additions & 2 deletions miner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub use crate::block_assembler::{BlockAssembler, BlockAssemblerController};
pub use crate::client::Client;
pub use crate::config::{BlockAssemblerConfig, MinerConfig};
pub use crate::miner::Miner;
use ckb_util::RwLock;
use ckb_util::Mutex;
use jsonrpc_types::BlockTemplate;
use std::sync::Arc;

pub type Work = Arc<RwLock<Option<BlockTemplate>>>;
pub type Work = Arc<Mutex<Option<BlockTemplate>>>;
2 changes: 1 addition & 1 deletion miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Miner {
}

fn mine(&self) -> Option<(String, Block)> {
if let Some(template) = self.current_work.read().clone() {
if let Some(template) = { self.current_work.lock().clone() } {
let BlockTemplate {
version,
difficulty,
Expand Down
4 changes: 2 additions & 2 deletions src/cli/miner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::helper::{require_path_exists, to_absolute_path};
use ckb_chain_spec::ChainSpec;
use ckb_miner::{Client, Miner, MinerConfig};
use ckb_util::RwLock;
use ckb_util::Mutex;
use clap::ArgMatches;
use crossbeam_channel::unbounded;
use dir::Directories;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub fn miner(matches: &ArgMatches) {

let (new_work_tx, new_work_rx) = unbounded();

let work = Arc::new(RwLock::new(None));
let work = Arc::new(Mutex::new(None));

let client = Client::new(Arc::clone(&work), new_work_tx, config.miner);

Expand Down
2 changes: 1 addition & 1 deletion sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
.relayer
.state
.pending_compact_blocks
.write()
.lock()
.remove(&hash)
{
let transactions: Vec<Transaction> =
Expand Down
104 changes: 52 additions & 52 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use ckb_protocol::{CompactBlock as FbsCompactBlock, RelayMessage};
use ckb_shared::block_median_time_context::BlockMedianTimeContext;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::ChainProvider;
use ckb_util::RwLockUpgradableReadGuard;
use ckb_shared::shared::Shared;
use ckb_verification::{HeaderResolverWrapper, HeaderVerifier, Verifier};
use flatbuffers::FlatBufferBuilder;
use fnv::FnvHashMap;
use numext_fixed_hash::H256;
use std::sync::Arc;

Expand Down Expand Up @@ -39,57 +40,61 @@ where
pub fn execute(self) {
let compact_block: CompactBlock = (*self.message).into();
let block_hash = compact_block.header.hash();
let pending_compact_blocks = self.relayer.state.pending_compact_blocks.upgradable_read();
if pending_compact_blocks.get(&block_hash).is_none()
&& self.relayer.get_block(&block_hash).is_none()
let mut missing_indexes: Vec<usize> = Vec::new();
{
let resolver =
HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone());
let header_verifier = HeaderVerifier::new(
CompactBlockMedianTimeView {
relayer: self.relayer,
},
Arc::clone(&self.relayer.shared.consensus().pow_engine()),
);

if header_verifier.verify(&resolver).is_ok() {
self.relayer
.request_proposal_txs(self.nc, self.peer, &compact_block);
let mut pending_compact_blocks = self.relayer.state.pending_compact_blocks.lock();
if pending_compact_blocks.get(&block_hash).is_none()
&& self.relayer.get_block(&block_hash).is_none()
{
let resolver =
HeaderResolverWrapper::new(&compact_block.header, self.relayer.shared.clone());
let header_verifier = HeaderVerifier::new(
CompactBlockMedianTimeView {
pending_compact_blocks: &pending_compact_blocks,
shared: &self.relayer.shared,
},
Arc::clone(&self.relayer.shared.consensus().pow_engine()),
);

match self.relayer.reconstruct_block(&compact_block, Vec::new()) {
Ok(block) => self
.relayer
.accept_block(self.nc, self.peer, &Arc::new(block)),
Err(missing_indexes) => {
{
let mut write_guard =
RwLockUpgradableReadGuard::upgrade(pending_compact_blocks);
write_guard.insert(block_hash.clone(), compact_block.clone());
if header_verifier.verify(&resolver).is_ok() {
self.relayer
.request_proposal_txs(self.nc, self.peer, &compact_block);
match self.relayer.reconstruct_block(&compact_block, Vec::new()) {
Ok(block) => {
self.relayer
.accept_block(self.nc, self.peer, &Arc::new(block))
}
Err(missing) => {
missing_indexes = missing;
pending_compact_blocks
.insert(block_hash.clone(), compact_block.clone());
}

let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_get_block_transactions(
fbb,
&block_hash,
&missing_indexes
.into_iter()
.map(|i| i as u32)
.collect::<Vec<_>>(),
);
fbb.finish(message, None);
let _ = self.nc.send(self.peer, fbb.finished_data().to_vec());
}
}
}
}
if !missing_indexes.is_empty() {
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_get_block_transactions(
fbb,
&block_hash,
&missing_indexes
.into_iter()
.map(|i| i as u32)
.collect::<Vec<_>>(),
);
fbb.finish(message, None);
let _ = self.nc.send(self.peer, fbb.finished_data().to_vec());
}
}
}

struct CompactBlockMedianTimeView<'a, CI>
where
CI: ChainIndex + 'static,
{
relayer: &'a Relayer<CI>,
pending_compact_blocks: &'a FnvHashMap<H256, CompactBlock>,
shared: &'a Shared<CI>,
}

impl<'a, CI> ::std::clone::Clone for CompactBlockMedianTimeView<'a, CI>
Expand All @@ -98,7 +103,8 @@ where
{
fn clone(&self) -> Self {
CompactBlockMedianTimeView {
relayer: self.relayer,
pending_compact_blocks: self.pending_compact_blocks,
shared: self.shared,
}
}
}
Expand All @@ -108,32 +114,26 @@ where
CI: ChainIndex + 'static,
{
fn block_count(&self) -> u32 {
self.relayer.shared.consensus().median_time_block_count() as u32
self.shared.consensus().median_time_block_count() as u32
}

fn timestamp(&self, hash: &H256) -> Option<u64> {
self.relayer
.state
.pending_compact_blocks
.read()
self.pending_compact_blocks
.get(hash)
.map(|cb| cb.header.timestamp())
.or_else(|| {
self.relayer
.shared
self.shared
.block_header(hash)
.map(|header| header.timestamp())
})
}

fn parent_hash(&self, hash: &H256) -> Option<H256> {
self.relayer
.state
.pending_compact_blocks
.read()
self.pending_compact_blocks
.get(hash)
.map(|cb| cb.header.parent_hash().to_owned())
.or_else(|| {
self.relayer
.shared
self.shared
.block_header(hash)
.map(|header| header.parent_hash().to_owned())
})
Expand Down
4 changes: 2 additions & 2 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use ckb_pool::txs_pool::TransactionPoolController;
use ckb_protocol::{short_transaction_id, short_transaction_id_keys, RelayMessage, RelayPayload};
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::{ChainProvider, Shared};
use ckb_util::{Mutex, RwLock};
use ckb_util::Mutex;
use flatbuffers::{get_root, FlatBufferBuilder};
use fnv::{FnvHashMap, FnvHashSet};
use log::{debug, info};
Expand Down Expand Up @@ -289,7 +289,7 @@ where

#[derive(Default)]
pub struct RelayState {
pub pending_compact_blocks: RwLock<FnvHashMap<H256, CompactBlock>>,
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
}
11 changes: 3 additions & 8 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ckb_core::header::Header;
use ckb_network::PeerIndex;
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::ChainProvider;
use ckb_util::{try_option, RwLockUpgradableReadGuard};
use ckb_util::try_option;
use faketime::unix_time_as_millis;
use log::debug;
use numext_fixed_hash::H256;
Expand Down Expand Up @@ -75,11 +75,7 @@ where
}

pub fn last_common_header(&self, best: &HeaderView) -> Option<Header> {
let guard = self
.synchronizer
.peers
.last_common_headers
.upgradable_read();
let mut guard = self.synchronizer.peers.last_common_headers.write();

let last_common_header = try_option!(guard.get(&self.peer).cloned().or_else(|| {
if best.number() < self.tip_header.number() {
Expand All @@ -95,8 +91,7 @@ where
.last_common_ancestor(&last_common_header, &best.inner())?;

if fixed_last_common_header != last_common_header {
let mut write_guard = RwLockUpgradableReadGuard::upgrade(guard);
write_guard
guard
.entry(self.peer)
.and_modify(|last_common_header| {
*last_common_header = fixed_last_common_header.clone()
Expand Down
15 changes: 7 additions & 8 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ckb_network::{CKBProtocolContext, CKBProtocolHandler, PeerIndex, Severity, T
use ckb_protocol::{SyncMessage, SyncPayload};
use ckb_shared::index::ChainIndex;
use ckb_shared::shared::{ChainProvider, Shared};
use ckb_util::{try_option, RwLock, RwLockUpgradableReadGuard};
use ckb_util::{try_option, Mutex, RwLock};
use faketime::unix_time_as_millis;
use flatbuffers::{get_root, FlatBufferBuilder};
use log::{debug, info, warn};
Expand Down Expand Up @@ -66,7 +66,7 @@ bitflags! {
}
}

pub type BlockStatusMap = Arc<RwLock<HashMap<H256, BlockStatus>>>;
pub type BlockStatusMap = Arc<Mutex<HashMap<H256, BlockStatus>>>;
pub type BlockHeaderMap = Arc<RwLock<HashMap<H256, HeaderView>>>;

pub struct Synchronizer<CI: ChainIndex> {
Expand Down Expand Up @@ -128,7 +128,7 @@ impl<CI: ChainIndex> Synchronizer<CI> {
peers: Arc::new(Peers::default()),
orphan_block_pool: Arc::new(OrphanBlockPool::with_capacity(orphan_block_limit)),
best_known_header: Arc::new(RwLock::new(best_known_header)),
status_map: Arc::new(RwLock::new(HashMap::new())),
status_map: Arc::new(Mutex::new(HashMap::new())),
header_map: Arc::new(RwLock::new(HashMap::new())),
n_sync: Arc::new(AtomicUsize::new(0)),
outbound_peers_with_protect: Arc::new(AtomicUsize::new(0)),
Expand Down Expand Up @@ -167,13 +167,12 @@ impl<CI: ChainIndex> Synchronizer<CI> {
}

pub fn get_block_status(&self, hash: &H256) -> BlockStatus {
let guard = self.status_map.upgradable_read();
let mut guard = self.status_map.lock();
match guard.get(hash).cloned() {
Some(s) => s,
None => {
if self.shared.block_header(hash).is_some() {
let mut write_guard = RwLockUpgradableReadGuard::upgrade(guard);
write_guard.insert(hash.clone(), BlockStatus::BLOCK_HAVE_MASK);
guard.insert(hash.clone(), BlockStatus::BLOCK_HAVE_MASK);
BlockStatus::BLOCK_HAVE_MASK
} else {
BlockStatus::UNKNOWN
Expand All @@ -187,7 +186,7 @@ impl<CI: ChainIndex> Synchronizer<CI> {
}

pub fn insert_block_status(&self, hash: H256, status: BlockStatus) {
self.status_map.write().insert(hash, status);
self.status_map.lock().insert(hash, status);
}

pub fn best_known_header(&self) -> HeaderView {
Expand All @@ -209,7 +208,7 @@ impl<CI: ChainIndex> Synchronizer<CI> {

pub fn mark_block_stored(&self, hash: H256) {
self.status_map
.write()
.lock()
.entry(hash)
.and_modify(|status| *status = BlockStatus::BLOCK_HAVE_MASK)
.or_insert_with(|| BlockStatus::BLOCK_HAVE_MASK);
Expand Down
5 changes: 1 addition & 4 deletions util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
mod unstable;

pub use parking_lot::{
Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard,
RwLockWriteGuard,
};
pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};

/// Helper macro for reducing boilerplate code for matching `Option` together
/// with early return.
Expand Down

0 comments on commit f9e7f97

Please sign in to comment.