diff --git a/Cargo.lock b/Cargo.lock index c1d10b8f36..c899d7b957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,11 +485,11 @@ name = "ckb-network" version = "0.1.0" dependencies = [ "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "ckb-util 0.1.0", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/chain/src/chain.rs b/chain/src/chain.rs index dfa507b372..a20acf6ec7 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1,4 +1,4 @@ -use bigint::H256; +use bigint::{H256, U256}; use chain_spec::consensus::Consensus; use channel::{self, Receiver, Sender}; use ckb_notify::{ForkBlocks, NotifyController, NotifyService}; @@ -16,6 +16,7 @@ use std::cmp; use std::sync::Arc; use std::thread::{self, JoinHandle}; use time::now_ms; +use util::RwLockUpgradableReadGuard; use verification::{BlockVerifier, Verifier}; pub struct ChainService { @@ -129,8 +130,14 @@ impl ChainService { fn insert_block(&self, block: &Block) -> Result { let mut new_best_block = false; + let mut output_root = H256::zero(); + let mut total_difficulty = U256::zero(); + let mut old_cumulative_blks = Vec::new(); let mut new_cumulative_blks = Vec::new(); + + let tip_header = self.shared.tip_header().upgradable_read(); + let tip_number = tip_header.number(); self.shared.store().save_with_batch(|batch| { let root = self.check_transactions(batch, block)?; let parent_ext = self @@ -150,41 +157,50 @@ impl ChainService { self.shared.store().insert_output_root(batch, block.header().hash(), root); self.shared.store().insert_block_ext(batch, &block.header().hash(), &ext); - { - debug!(target: "chain", "acquire lock"); - let mut tip_header = self.shared.tip_header().write(); - let current_total_difficulty = tip_header.total_difficulty(); - debug!( - "difficulty diff = {}; current = {}, cannon = {}", - cannon_total_difficulty.low_u64() as i64 - - current_total_difficulty.low_u64() as i64, - current_total_difficulty, - cannon_total_difficulty, - ); + let current_total_difficulty = tip_header.total_difficulty(); + debug!( + "difficulty diff = {}; current = {}, cannon = {}", + cannon_total_difficulty.low_u64() as i64 + - current_total_difficulty.low_u64() as i64, + current_total_difficulty, + cannon_total_difficulty, + ); - if cannon_total_difficulty > current_total_difficulty - || (current_total_difficulty == cannon_total_difficulty - && block.header().hash() < tip_header.hash()) - { - debug!(target: "chain", "new best block found: {} => {}", block.header().number(), block.header().hash()); - new_best_block = true; - let new_tip_header = TipHeader::new( - block.header().clone(), - cannon_total_difficulty, - root, - ); - - self.update_index(batch, tip_header.number(), block, &mut old_cumulative_blks, &mut new_cumulative_blks); - // TODO: Move out - *tip_header = new_tip_header; - self.shared.store().insert_tip_header(batch, &block.header()); - self.shared.store().rebuild_tree(root); - } - debug!(target: "chain", "lock release"); + if cannon_total_difficulty > current_total_difficulty + || (current_total_difficulty == cannon_total_difficulty + && block.header().hash() < tip_header.hash()) + { + debug!(target: "chain", "new best block found: {} => {}", block.header().number(), block.header().hash()); + new_best_block = true; + output_root = root; + total_difficulty = cannon_total_difficulty; } Ok(()) })?; + if new_best_block { + debug!(target: "chain", "update index"); + let mut guard = RwLockUpgradableReadGuard::upgrade(tip_header); + let new_tip_header = + TipHeader::new(block.header().clone(), total_difficulty, output_root); + self.shared.store().save_with_batch(|batch| { + self.update_index( + batch, + tip_number, + block, + &mut old_cumulative_blks, + &mut new_cumulative_blks, + ); + self.shared + .store() + .insert_tip_header(batch, &block.header()); + self.shared.store().rebuild_tree(output_root); + Ok(()) + })?; + *guard = new_tip_header; + debug!(target: "chain", "update index release"); + } + Ok(BlockInsertionResult { new_best_block, fork_blks: ForkBlocks::new(old_cumulative_blks, new_cumulative_blks), @@ -220,11 +236,11 @@ impl ChainService { old_cumulative_blks: &mut Vec, new_cumulative_blks: &mut Vec, ) { - let mut number = block.header().number() - 1; + let mut number = block.header().number(); // The old fork may longer than new fork - if number < tip_number { - for n in number..tip_number + 1 { + if tip_number >= number { + for n in number..=tip_number { let hash = self.shared.block_hash(n).unwrap(); let old_block = self.shared.block(&hash).unwrap(); self.shared.store().delete_block_hash(batch, n); @@ -253,7 +269,7 @@ impl ChainService { } let mut hash = block.header().parent_hash(); - + number -= 1; loop { if let Some(old_hash) = self.shared.block_hash(number) { if old_hash == hash { diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 643e7bba1f..b5b4822932 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -19,6 +19,7 @@ extern crate ckb_verification as verification; extern crate log; #[macro_use] extern crate crossbeam_channel as channel; +extern crate ckb_util as util; #[cfg(test)] extern crate rand; diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 5efd2c7bae..7b5836fdff 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -117,7 +117,6 @@ impl MinerService { let mut new_transactions_counter = 0; let mut nonce: u64 = thread_rng().gen(); loop { - debug!(target: "miner", "mining {}", nonce); loop { select! { recv(self.new_tx_receiver, msg) => match msg { diff --git a/network/Cargo.toml b/network/Cargo.toml index b25bd6676b..63ff0fea3b 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -9,7 +9,7 @@ rand = "0.5" fnv = "1.0" serde = "1.0" serde_derive = "1.0" -parking_lot = "0.6" +ckb-util = { path = "../util" } unsigned-varint = {git = "https://github.com/paritytech/unsigned-varint", features = ["codec"]} log = "0.4.5" bytes = "0.4.9" diff --git a/network/src/ckb_protocol.rs b/network/src/ckb_protocol.rs index 24a91392e3..e60b5ced23 100644 --- a/network/src/ckb_protocol.rs +++ b/network/src/ckb_protocol.rs @@ -104,7 +104,7 @@ where Ok(result) => result, Err(err) => { return { - error!("failed to upgrade ckb_protocol"); + error!(target: "network", "failed to upgrade ckb_protocol"); future::err(IoError::new(IoErrorKind::Other, err)) } } @@ -118,7 +118,7 @@ where outgoing_msg_channel, incoming_stream, }; - trace!("success to upgrade ckb_protocol"); + trace!(target: "network", "success to upgrade ckb_protocol"); future::ok((out, remote_addr)) } diff --git a/network/src/ckb_protocol_handler.rs b/network/src/ckb_protocol_handler.rs index a80248b9ee..45a4e84f4d 100644 --- a/network/src/ckb_protocol_handler.rs +++ b/network/src/ckb_protocol_handler.rs @@ -1,9 +1,9 @@ use super::errors::{Error, ErrorKind}; use super::{Network, SessionInfo, Timer}; use super::{PeerIndex, ProtocolId, TimerToken}; -use parking_lot::Mutex; use std::sync::Arc; use std::time::Duration; +use util::Mutex; #[derive(Clone, Debug)] pub enum Severity<'a> { @@ -83,7 +83,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { // report peer behaviour fn report_peer(&self, peer_index: PeerIndex, reason: Severity) { // TODO combinate this interface with peer score - info!("report peer {} reason: {:?}", peer_index, reason); + info!(target: "network", "report peer {} reason: {:?}", peer_index, reason); self.disconnect(peer_index); } // ban peer diff --git a/network/src/ckb_service.rs b/network/src/ckb_service.rs index 89687ce2f1..f3c348d1e6 100644 --- a/network/src/ckb_service.rs +++ b/network/src/ckb_service.rs @@ -46,6 +46,7 @@ impl CKBService { }; if protocol_connec.state() == UniqueConnecState::Full { error!( + target: "network", "we already connected peer {:?} with {:?}, stop handling", peer_id, protocol_id ); @@ -99,6 +100,7 @@ impl CKBService { let protocol_id = protocol_id; move |val| { info!( + target: "network", "Disconnect! peer {:?} protocol_id {:?} reason {:?}", peer_id, protocol_id, val ); @@ -122,6 +124,7 @@ impl CKBService { }; info!( + target: "network", "Connected to peer {:?} with protocol_id {:?} version {}", peer_id, protocol_id, protocol_version ); diff --git a/network/src/discovery_service.rs b/network/src/discovery_service.rs index 0d7197ead2..fc7199c8b3 100644 --- a/network/src/discovery_service.rs +++ b/network/src/discovery_service.rs @@ -8,7 +8,6 @@ use libp2p::core::{upgrade, MuxedTransport, PeerId}; use libp2p::core::{Endpoint, Multiaddr, UniqueConnec}; use libp2p::core::{PublicKey, SwarmController}; use libp2p::{kad, Transport}; -use parking_lot::Mutex; use peer_store::Status; use protocol::Protocol; use protocol_service::ProtocolService; @@ -25,6 +24,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Interval; use tokio::timer::Timeout; use transport::TransportOutput; +use util::Mutex; pub struct DiscoveryService { timeout: Duration, @@ -111,7 +111,7 @@ impl ProtocolService for DiscoveryService { let kad_upgrade = self.kad_upgrade.clone(); // dial kad peer move |peer_id| { - debug!("Initialize kad search peers from peer {:?}", peer_id); + debug!(target: "network", "Initialize kad search peers from peer {:?}", peer_id); Self::dial_kad_peer( Arc::clone(&kad_manage), kad_upgrade.clone(), @@ -223,7 +223,7 @@ impl DiscoveryService { _endpoint: Endpoint, kademlia_stream: Box + Send>, ) -> Result + Send>, IoError> { - debug!("client_addr is {:?}", client_addr); + debug!(target: "network", "client_addr is {:?}", client_addr); //let peer_id = match convert_addr_into_peer_id(client_addr) { // Some(peer_id) => peer_id, // None => { @@ -247,7 +247,7 @@ impl DiscoveryService { Timeout::new(next_future, timeout) .map_err({ move |err| { - info!("kad timeout error {:?}", err.description()); + info!(target: "network", "kad timeout error {:?}", err.description()); IoError::new( IoErrorKind::Other, format!("discovery request timeout {:?}", err.description()), @@ -278,6 +278,7 @@ impl DiscoveryService { let peer_id = peer_id.clone(); move |val| { trace!( + target: "network", "Kad connection closed when handling peer {:?} reason: {:?}", peer_id, val @@ -314,7 +315,8 @@ impl DiscoveryService { let network = Arc::clone(&network); move |peer_id| { if peer_id == *kad_system.local_peer_id() { - info!( + debug!( + target: "network", "response self address to kad {:?}", network.listened_addresses.read().clone() ); @@ -336,7 +338,8 @@ impl DiscoveryService { Status::Connected => kad::KadConnectionType::Connected, _ => kad::KadConnectionType::NotConnected, }; - info!( + debug!( + target: "network", "response other address to kad {:?} {:?}", peer_id, multiaddrs.clone() @@ -404,6 +407,7 @@ impl DiscoveryService { Some(Some(addr)) => addr.to_owned(), _ => { debug!( + target: "network", "dial kad error, can't find dial address for peer_id {:?}", peer_id ); diff --git a/network/src/identify_service.rs b/network/src/identify_service.rs index f149e2de9f..ee2b5df76a 100644 --- a/network/src/identify_service.rs +++ b/network/src/identify_service.rs @@ -2,7 +2,7 @@ use super::Network; use super::PeerId; -use futures::future::{self, lazy, Future}; +use futures::future::{self, Future}; use futures::Stream; use libp2p::core::Multiaddr; use libp2p::core::SwarmController; @@ -51,6 +51,7 @@ impl IdentifyService { }) } None => error!( + target: "network", "can't find peer_id {:?} during process identify info", peer_id ), @@ -61,13 +62,14 @@ impl IdentifyService { for original_address in network.original_listened_addresses.read().iter() { let transport = libp2p::tcp::TcpConfig::new(); trace!( + target: "network", "try get address use original_address {:?} and observed_address {:?}", original_address, observed_addr ); // get an external addrs for our node if let Some(mut ext_addr) = transport.nat_traversal(original_address, &observed_addr) { - debug!("get new external address {:?}", ext_addr); + debug!(target: "network", "get new external address {:?}", ext_addr); let mut listened_addresses = network.listened_addresses.write(); if !listened_addresses.iter().any(|a| a == &ext_addr) { listened_addresses.push(ext_addr.clone()); @@ -182,7 +184,7 @@ where Instant::now() + Duration::from_secs(5), self.identify_interval, ).map_err(|err| { - debug!("identify periodic error {:?}", err); + debug!(target: "network", "identify periodic error {:?}", err); IoError::new( IoErrorKind::Other, format!("identify periodic error {:?}", err), @@ -200,6 +202,7 @@ where } } trace!( + target: "network", "request identify to peer {:?} {:?}", peer_id, peer.remote_addresses @@ -210,15 +213,16 @@ where let _ = swarm_controller.dial(addr.clone(), transport.clone()); } else { error!( + target: "network", "error when prepare identify : can't find addresses for peer {:?}", peer_id ); } } - Box::new(lazy(|| future::ok(()))) as Box + Send> + Ok(()) } }).then(|err| { - warn!("Identify service stopped, reason: {:?}", err); + warn!(target: "network", "Identify service stopped, reason: {:?}", err); err }); Box::new(periodic_identify_future) as Box + Send> diff --git a/network/src/lib.rs b/network/src/lib.rs index e95290d21b..d5e1d4873b 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -8,9 +8,9 @@ extern crate unsigned_varint; #[macro_use] extern crate log; extern crate fnv; -extern crate parking_lot; #[macro_use] extern crate serde_derive; +extern crate ckb_util as util; mod ckb_protocol; mod ckb_protocol_handler; diff --git a/network/src/memory_peer_store.rs b/network/src/memory_peer_store.rs index 834b186fb3..95a75b7ee2 100644 --- a/network/src/memory_peer_store.rs +++ b/network/src/memory_peer_store.rs @@ -113,6 +113,7 @@ impl PeerStore for MemoryPeerStore { } fn peers_to_attempt<'a>(&'a self) -> Box + 'a> { trace!( + target: "network", "try fetch attempt peers from {:?}", self.peers.iter().collect::>() ); diff --git a/network/src/network.rs b/network/src/network.rs index 6fd2392dfd..fa0269fe99 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -19,7 +19,6 @@ use libp2p::core::{PublicKey, SwarmController}; use libp2p::{self, identify, kad, secio, Transport, TransportTimeout}; use memory_peer_store::MemoryPeerStore; use outgoing_service::OutgoingService; -use parking_lot::{Mutex, RwLock}; use peer_store::{Behaviour, PeerStore}; use peers_registry::PeerIdentifyInfo; use peers_registry::PeersRegistry; @@ -35,6 +34,7 @@ use std::usize; use timer_service::TimerService; use tokio::io::{AsyncRead, AsyncWrite}; use transport::{new_transport, TransportOutput}; +use util::{Mutex, RwLock}; const WAIT_LOCK_TIMEOUT: u64 = 3; const KBUCKETS_TIMEOUT: u64 = 600; @@ -272,6 +272,7 @@ impl Network { C: Send + 'static, { trace!( + target: "network", "prepare open protocol {:?} to {:?}", protocol.base_name(), addr @@ -326,6 +327,7 @@ impl Network { }); trace!( + target: "network", "Opening connection to {:?} addr {} with protocol {:?}", expected_peer_id, addr, @@ -389,7 +391,7 @@ impl Network { move |err, addr| { let mut peer_store = network.peer_store().write(); peer_store.report_address(&addr, Behaviour::FailedToConnect); - trace!("Failed to connect to peer {}, error: {:?}", addr, err); + trace!(target: "network", "Failed to connect to peer {}, error: {:?}", addr, err); err } }); @@ -398,9 +400,9 @@ impl Network { let local_peer_id = local_peer_id.clone(); move |(peer_id, stream), _endpoint, remote_addr_fut| { remote_addr_fut.and_then(move |remote_addr| { - trace!("connection from {:?}", remote_addr); + trace!(target: "network", "connection from {:?}", remote_addr); if peer_id == local_peer_id { - trace!("connect to self, disconnect"); + trace!(target: "network", "connect to self, disconnect"); return Err(IoErrorKind::ConnectionRefused.into()); } let out = TransportOutput { @@ -533,7 +535,11 @@ impl Network { for addr in &config.listen_addresses { match swarm_controller.listen_on(addr.clone()) { Ok(listen_address) => { - info!("Listen on address {}", listen_address.clone()); + info!( + target: "network", + "Listen on address: {}", + network.to_external_url(&listen_address) + ); network .original_listened_addresses .write() @@ -541,6 +547,7 @@ impl Network { } Err(err) => { warn!( + target: "network", "listen on address {} failed, due to error: {}", addr.clone(), err @@ -567,7 +574,7 @@ impl Network { } // dial bootnodes for (peer_id, addr) in peer_store.bootnodes().take(max_outgoing) { - debug!("dial bootnode {:?} {:?}", peer_id, addr); + debug!(target: "network", "dial bootnode {:?} {:?}", peer_id, addr); network.dial_to_peer( basic_transport.clone(), addr, @@ -613,12 +620,12 @@ impl Network { let network = Arc::clone(&network); move |_| { let mut peers_registry = network.peers_registry().write(); - debug!("drop all connections..."); + debug!(target: "network", "drop all connections..."); peers_registry.drop_all(); Ok(()) } }).map_err(|(err, _, _)| { - debug!("network exit, error {:?}", err); + debug!(target: "network", "network exit, error {:?}", err); err }); let service_futures = diff --git a/network/src/network_config.rs b/network/src/network_config.rs index 0e543ae4a6..118b4aca10 100644 --- a/network/src/network_config.rs +++ b/network/src/network_config.rs @@ -63,7 +63,7 @@ impl NetworkConfig { } pub fn generate_random_key(&mut self) -> Result { - info!("Generate random key"); + info!(target: "network", "Generate random key"); let mut key: [u8; 32] = [0; 32]; rand::rngs::EntropyRng::new().fill(&mut key); self.secret_key = Some(Bytes::from(key.to_vec())); @@ -78,7 +78,7 @@ impl NetworkConfig { pub fn write_secret_key_to_file(&mut self) -> Result<(), IoError> { if let Some(ref secret_key_path) = self.secret_key_path { if let Some(secret_key) = self.secret_key.clone() { - info!("write random secret key to {}", secret_key_path); + info!(target: "network", "write random secret key to {}", secret_key_path); return fs::OpenOptions::new() .create(true) .write(true) diff --git a/network/src/network_service.rs b/network/src/network_service.rs index ad4a3dbbd8..f66bd9383f 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -7,7 +7,6 @@ use futures::future::Future; use futures::sync::oneshot; use libp2p::core::PeerId; use network::Network; -use parking_lot::RwLock; use peer_store::PeerStore; use peers_registry::{PeerConnection, PeersRegistry}; use std::boxed::Box; @@ -15,6 +14,7 @@ use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::sync::Arc; use std::thread; use tokio::runtime::current_thread; +use util::RwLock; pub struct NetworkService { network: Arc, @@ -74,11 +74,16 @@ impl NetworkService { let network = Arc::clone(&network); let config = config.clone(); move || { + info!( + target: "network", + "network peer_id {:?}", + network.local_public_key().clone().into_peer_id() + ); let network_future = Network::build_network_future(network, &config, close_rx).unwrap(); init_tx.send(()).unwrap(); match current_thread::block_on_all(network_future) { - Ok(_) => info!("network service exit"), + Ok(_) => info!(target: "network", "network service exit"), Err(err) => panic!("network service exit unexpected {}", err), } } @@ -99,11 +104,11 @@ impl NetworkService { // This method will not wait for the server stopped, you should use server_future or // thread_handle to achieve that. fn shutdown(&mut self) -> Result<(), IoError> { - debug!("shutdown network service self: {:?}", self.external_url()); + debug!(target: "network", "shutdown network service self: {:?}", self.external_url()); if let Some(close_tx) = self.close_tx.take() { let _ = close_tx .send(()) - .map_err(|err| debug!("send shutdown signal error, ignoring error: {:?}", err)); + .map_err(|err| debug!(target: "network", "send shutdown signal error, ignoring error: {:?}", err)); }; if let Some(join_handle) = self.join_handle.take() { join_handle.join().map_err(|_| { diff --git a/network/src/outgoing_service.rs b/network/src/outgoing_service.rs index e8de624b72..06f7946449 100644 --- a/network/src/outgoing_service.rs +++ b/network/src/outgoing_service.rs @@ -106,7 +106,7 @@ impl ProtocolService for OutgoingService { Box::new(lazy(|| future::ok(()))) as Box + Send> } }).then(|err| { - warn!("Outgoing service stopped, reason: {:?}", err); + warn!(target: "network", "Outgoing service stopped, reason: {:?}", err); err }); Box::new(outgoing_future) as Box + Send> diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index 2529809f73..597d38812c 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -4,12 +4,12 @@ use fnv::FnvHashMap; use futures::sync::mpsc::UnboundedSender; use libp2p::core::{Endpoint, Multiaddr, UniqueConnec}; use libp2p::ping; -use parking_lot::{Mutex, RwLock}; use peer_store::PeerStore; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +use util::{Mutex, RwLock}; struct PeerConnections { id_allocator: AtomicUsize, diff --git a/network/src/ping_service.rs b/network/src/ping_service.rs index 8ef5f0b623..c9ba78696b 100644 --- a/network/src/ping_service.rs +++ b/network/src/ping_service.rs @@ -145,6 +145,7 @@ impl ProtocolService for PingService { let received_during = ping_start_time.elapsed(); peer_store.report(&peer_id, Behaviour::Ping); trace!( + target: "network", "received pong from {:?} in {:?}", peer_id, received_during @@ -159,6 +160,7 @@ impl ProtocolService for PingService { .write() .drop_peer(&peer_id); trace!( + target: "network", "error when send ping to {:?}, error: {:?}", peer_id, err @@ -180,7 +182,7 @@ impl ProtocolService for PingService { ) as Box + Send> } }).then(|err| { - warn!("Ping service stopped, reason: {:?}", err); + warn!(target: "network", "Ping service stopped, reason: {:?}", err); err }); Box::new(periodic_ping_future) as Box + Send> diff --git a/network/src/timer_service.rs b/network/src/timer_service.rs index ea111ce73f..da642d406d 100644 --- a/network/src/timer_service.rs +++ b/network/src/timer_service.rs @@ -7,16 +7,17 @@ use futures::stream::FuturesUnordered; use futures::Stream; use libp2p::core::Multiaddr; use libp2p::core::{MuxedTransport, SwarmController}; -use parking_lot::Mutex; use protocol::Protocol; use protocol_service::ProtocolService; use std::boxed::Box; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::sync::Arc; use std::time::Duration; +use tokio; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Interval; use transport::TransportOutput; +use util::Mutex; pub(crate) type Timer = (Arc, ProtocolId, TimerToken, Duration); @@ -73,33 +74,39 @@ impl ProtocolService for TimerService { move || -> Box + Send> { let timer_registry = timer_registry.lock().take().unwrap(); let mut timer_futures = FuturesUnordered::new(); - trace!("start timer service, timer count: {}", timer_registry.len()); + trace!(target: "network", "start timer service, timer count: {}", timer_registry.len()); // register timers for (handler, protocol_id, timer_symbol, duration) in timer_registry { trace!( + target: "network", "register timer: timer_symbol {} protocol {:?} duration: {:?}", timer_symbol, protocol_id, duration ); let timer_interval = Interval::new_interval(duration); - let future = Box::new( + let network_clone = Arc::clone(&network); + let handler_clone = Arc::clone(&handler); + let timer_future = Box::new( timer_interval - .for_each({ - let network = Arc::clone(&network); - move |_| { - handler.timer_triggered( + .for_each(move |_| { + let network_clone = Arc::clone(&network_clone); + let handler_clone = Arc::clone(&handler_clone); + let handle_timer = future::lazy(move || { + handler_clone.timer_triggered( Box::new(DefaultCKBProtocolContext::new( - Arc::clone(&network), + Arc::clone(&network_clone), protocol_id, )), timer_symbol, ); - future::ok(()) - } + Ok(()) + }); + tokio::spawn(handle_timer); + Ok(()) }).map_err(|err| IoError::new(IoErrorKind::Other, err)), ); - timer_futures.push(future); + timer_futures.push(timer_future); } if timer_futures.is_empty() { Box::new(future::empty()) as Box + Send> diff --git a/pool/src/txs_pool/pool.rs b/pool/src/txs_pool/pool.rs index b2074542ca..0b3b32ea9b 100644 --- a/pool/src/txs_pool/pool.rs +++ b/pool/src/txs_pool/pool.rs @@ -554,14 +554,17 @@ where } } - self.proposed.reconcile(bn, ids).unwrap() + self.proposed.reconcile(bn, ids).unwrap_or_else(|error| { + error!(target: "txs_pool", "Failed to proposed reconcile {:?}", error); + vec![] + }) }; // We can sort it by some rules for tx in new_txs { let tx_hash = tx.hash(); if let Err(error) = self.add_to_pool(tx) { - info!(target: "txs_pool", "Failed to add proposed tx {:} to pool, reason: {:?}", tx_hash, error); + error!(target: "txs_pool", "Failed to add proposed tx {:} to pool, reason: {:?}", tx_hash, error); } } } diff --git a/rpc/src/integration_test.rs b/rpc/src/integration_test.rs index db7f5386d0..4ca162f612 100644 --- a/rpc/src/integration_test.rs +++ b/rpc/src/integration_test.rs @@ -130,10 +130,11 @@ impl IntegrationTestRpc for RpcImpl { .shared .block(&block_hash) .ok_or_else(Error::internal_error)?; + let tip_header = self.shared.tip_header().read(); for transaction in block.commit_transactions() { let transaction_meta = self .shared - .get_transaction_meta(&transaction.hash()) + .get_transaction_meta(&tip_header.output_root(), &transaction.hash()) .ok_or_else(Error::internal_error)?; for (i, output) in transaction.outputs().iter().enumerate() { if output.lock == type_hash && (!transaction_meta.is_spent(i)) { diff --git a/rpc/src/server.rs b/rpc/src/server.rs index b6f690dd64..c081acb026 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -104,10 +104,11 @@ impl Rpc for RpcImpl { .shared .block(&block_hash) .ok_or_else(Error::internal_error)?; + let tip_header = self.shared.tip_header().read(); for transaction in block.commit_transactions() { let transaction_meta = self .shared - .get_transaction_meta(&transaction.hash()) + .get_transaction_meta(&tip_header.output_root(), &transaction.hash()) .ok_or_else(Error::internal_error)?; for (i, output) in transaction.outputs().iter().enumerate() { if output.lock == type_hash && (!transaction_meta.is_spent(i)) { diff --git a/rpc/src/service.rs b/rpc/src/service.rs index 507c74e5ea..b12e1effbf 100644 --- a/rpc/src/service.rs +++ b/rpc/src/service.rs @@ -242,7 +242,7 @@ impl RpcService { let mut included = FnvHashSet::default(); let mut uncles = Vec::with_capacity(max_uncles_len); let mut bad_uncles = Vec::new(); - let current_number = self.shared.tip_header().read().number() + 1; + let current_number = tip_header.number() + 1; for (hash, block) in &self.candidate_uncles { if uncles.len() == max_uncles_len { break; diff --git a/shared/src/shared.rs b/shared/src/shared.rs index 334aef4921..a134ae7222 100644 --- a/shared/src/shared.rs +++ b/shared/src/shared.rs @@ -55,6 +55,10 @@ impl TipHeader { pub fn into_inner(self) -> Header { self.inner } + + pub fn output_root(&self) -> H256 { + self.output_root + } } pub struct Shared { @@ -124,7 +128,8 @@ impl Shared { impl CellProvider for Shared { fn cell(&self, out_point: &OutPoint) -> CellStatus { let index = out_point.index as usize; - if let Some(meta) = self.get_transaction_meta(&out_point.hash) { + let tip_header = self.tip_header().read(); + if let Some(meta) = self.get_transaction_meta(&tip_header.output_root, &out_point.hash) { if index < meta.len() { if !meta.is_spent(index) { let mut transaction = self @@ -192,7 +197,7 @@ pub trait ChainProvider: Sync + Send { fn contain_transaction(&self, hash: &H256) -> bool; - fn get_transaction_meta(&self, hash: &H256) -> Option; + fn get_transaction_meta(&self, output_root: &H256, hash: &H256) -> Option; fn get_transaction_meta_at(&self, hash: &H256, parent: &H256) -> Option; @@ -221,11 +226,7 @@ impl ChainProvider for Shared { } fn block_header(&self, hash: &H256) -> Option
{ - if &self.tip_header.read().hash() == hash { - Some(self.tip_header.read().inner().clone()) - } else { - self.store.get_header(hash) - } + self.store.get_header(hash) } fn block_proposal_txs_ids(&self, hash: &H256) -> Option> { @@ -264,10 +265,8 @@ impl ChainProvider for Shared { self.store.get_transaction_address(hash).is_some() } - fn get_transaction_meta(&self, hash: &H256) -> Option { - let tip_header = self.tip_header.read(); - self.store - .get_transaction_meta(tip_header.output_root, *hash) + fn get_transaction_meta(&self, output_root: &H256, hash: &H256) -> Option { + self.store.get_transaction_meta(*output_root, *hash) } fn get_transaction_meta_at(&self, hash: &H256, parent: &H256) -> Option { diff --git a/sync/src/lib.rs b/sync/src/lib.rs index e6be0ad222..9e555d7be3 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -73,3 +73,5 @@ pub const EVICTION_HEADERS_RESPONSE_TIME: u64 = 120 * 1000; // 2 minutes //The maximum number of entries in a locator pub const MAX_LOCATOR_SIZE: usize = 101; + +pub const BLOCK_DOWNLOAD_TIMEOUT: u64 = 30 * 1000; // 30s diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index bebb0a60b9..c8aded5a88 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -2,12 +2,16 @@ use super::header_view::HeaderView; use bigint::H256; use ckb_shared::index::ChainIndex; use ckb_shared::shared::{ChainProvider, TipHeader}; +use ckb_time::now_ms; use core::header::Header; use network::PeerIndex; use std::cmp; use synchronizer::{BlockStatus, Synchronizer}; use util::RwLockUpgradableReadGuard; -use {BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER, PER_FETCH_BLOCK_LIMIT}; +use { + BLOCK_DOWNLOAD_TIMEOUT, BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER, + PER_FETCH_BLOCK_LIMIT, +}; pub struct BlockFetcher { synchronizer: Synchronizer, @@ -29,13 +33,17 @@ where } pub fn initial_and_check_inflight(&self) -> bool { let mut blocks_inflight = self.synchronizer.peers.blocks_inflight.write(); - let inflight_count = blocks_inflight + let inflight = blocks_inflight .entry(self.peer) - .or_insert_with(Default::default) - .len(); + .or_insert_with(Default::default); + + if inflight.timestamp < now_ms().saturating_sub(BLOCK_DOWNLOAD_TIMEOUT) { + debug!(target: "sync", "[block downloader] inflight block download timeout"); + inflight.clear(); + } // current peer block blocks_inflight reach limit - if MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight_count) == 0 { + if MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.len()) == 0 { debug!(target: "sync", "[block downloader] inflight count reach limit"); true } else { diff --git a/sync/src/synchronizer/block_process.rs b/sync/src/synchronizer/block_process.rs index b55f39a8c9..b3cffac23e 100644 --- a/sync/src/synchronizer/block_process.rs +++ b/sync/src/synchronizer/block_process.rs @@ -1,13 +1,13 @@ -use ckb_protocol::Block; +use ckb_protocol::Block as PBlock; use ckb_shared::index::ChainIndex; +use core::block::Block; use network::{CKBProtocolContext, PeerIndex}; use synchronizer::Synchronizer; pub struct BlockProcess<'a, CI: ChainIndex + 'a> { - message: &'a Block<'a>, + message: &'a PBlock<'a>, synchronizer: &'a Synchronizer, peer: PeerIndex, - // nc: &'a CKBProtocolContext, } impl<'a, CI> BlockProcess<'a, CI> @@ -15,7 +15,7 @@ where CI: ChainIndex + 'a, { pub fn new( - message: &'a Block, + message: &'a PBlock, synchronizer: &'a Synchronizer, peer: PeerIndex, _nc: &'a CKBProtocolContext, @@ -28,7 +28,8 @@ where } pub fn execute(self) { - let block = (*self.message).into(); + let block: Block = (*self.message).into(); + debug!(target: "sync", "BlockProcess received block {} {:?}", block.header().number(), block.header().hash()); self.synchronizer.peers.block_received(self.peer, &block); self.synchronizer.process_new_block(self.peer, block); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index a384080721..22560e5036 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -559,6 +559,7 @@ impl Synchronizer { } } for peer in eviction { + warn!(target: "sync", "timeout eviction peer={}", peer); nc.report_peer(peer, Severity::Timeout); } } @@ -614,9 +615,9 @@ where { fn initialize(&self, nc: Box) { // NOTE: 100ms is what bitcoin use. - let _ = nc.register_timer(SEND_GET_HEADERS_TOKEN, Duration::from_millis(100)); - let _ = nc.register_timer(BLOCK_FETCH_TOKEN, Duration::from_millis(100)); - let _ = nc.register_timer(TIMEOUT_EVICTION_TOKEN, Duration::from_millis(100)); + let _ = nc.register_timer(SEND_GET_HEADERS_TOKEN, Duration::from_millis(1000)); + let _ = nc.register_timer(BLOCK_FETCH_TOKEN, Duration::from_millis(1000)); + let _ = nc.register_timer(TIMEOUT_EVICTION_TOKEN, Duration::from_millis(1000)); } fn received(&self, nc: Box, peer: PeerIndex, data: &[u8]) { diff --git a/sync/src/synchronizer/peers.rs b/sync/src/synchronizer/peers.rs index 81d89581d5..920c7a0cc0 100644 --- a/sync/src/synchronizer/peers.rs +++ b/sync/src/synchronizer/peers.rs @@ -104,6 +104,10 @@ impl BlocksInflight { pub fn update_timestamp(&mut self) { self.timestamp = now_ms(); } + + pub fn clear(&mut self) { + self.blocks.clear(); + } } impl Peers { @@ -177,6 +181,7 @@ impl Peers { self.best_known_headers.write().remove(&peer); // self.misbehavior.write().remove(peer); self.blocks_inflight.write().remove(&peer); + self.last_common_headers.write().remove(&peer); } pub fn block_received(&self, peer: PeerIndex, block: &Block) { @@ -184,6 +189,7 @@ impl Peers { debug!(target: "sync", "block_received from peer {} {} {:?}", peer, block.header().number(), block.header().hash()); blocks_inflight.entry(peer).and_modify(|inflight| { inflight.remove(&block.header().hash()); + inflight.update_timestamp(); }); } diff --git a/verification/src/tests/dummy.rs b/verification/src/tests/dummy.rs index ef57a912ea..05dc83c5fd 100644 --- a/verification/src/tests/dummy.rs +++ b/verification/src/tests/dummy.rs @@ -85,7 +85,7 @@ impl ChainProvider for DummyChainProvider { panic!("Not implemented!"); } - fn get_transaction_meta(&self, _hash: &H256) -> Option { + fn get_transaction_meta(&self, _output_root: &H256, _hash: &H256) -> Option { panic!("Not implemented!"); }