Skip to content

Commit

Permalink
fix: chain index
Browse files Browse the repository at this point in the history
* fix chain index
* clean network log
* use yamux
* fix: spawn timer
  • Loading branch information
zhangsoledad authored and doitian committed Nov 19, 2018
1 parent 765ea25 commit 8a28fd8
Show file tree
Hide file tree
Showing 30 changed files with 183 additions and 112 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 51 additions & 35 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bigint::H256;
use bigint::{H256, U256};
use chain_spec::consensus::Consensus;
use channel::{self, Receiver, Sender};
use ckb_notify::{ForkBlocks, NotifyController, NotifyService};
Expand All @@ -16,6 +16,7 @@ use std::cmp;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use time::now_ms;
use util::RwLockUpgradableReadGuard;
use verification::{BlockVerifier, Verifier};

pub struct ChainService<CI> {
Expand Down Expand Up @@ -129,8 +130,14 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {

fn insert_block(&self, block: &Block) -> Result<BlockInsertionResult, SharedError> {
let mut new_best_block = false;
let mut output_root = H256::zero();
let mut total_difficulty = U256::zero();

let mut old_cumulative_blks = Vec::new();
let mut new_cumulative_blks = Vec::new();

let tip_header = self.shared.tip_header().upgradable_read();
let tip_number = tip_header.number();
self.shared.store().save_with_batch(|batch| {
let root = self.check_transactions(batch, block)?;
let parent_ext = self
Expand All @@ -150,41 +157,50 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
self.shared.store().insert_output_root(batch, block.header().hash(), root);
self.shared.store().insert_block_ext(batch, &block.header().hash(), &ext);

{
debug!(target: "chain", "acquire lock");
let mut tip_header = self.shared.tip_header().write();
let current_total_difficulty = tip_header.total_difficulty();
debug!(
"difficulty diff = {}; current = {}, cannon = {}",
cannon_total_difficulty.low_u64() as i64
- current_total_difficulty.low_u64() as i64,
current_total_difficulty,
cannon_total_difficulty,
);
let current_total_difficulty = tip_header.total_difficulty();
debug!(
"difficulty diff = {}; current = {}, cannon = {}",
cannon_total_difficulty.low_u64() as i64
- current_total_difficulty.low_u64() as i64,
current_total_difficulty,
cannon_total_difficulty,
);

if cannon_total_difficulty > current_total_difficulty
|| (current_total_difficulty == cannon_total_difficulty
&& block.header().hash() < tip_header.hash())
{
debug!(target: "chain", "new best block found: {} => {}", block.header().number(), block.header().hash());
new_best_block = true;
let new_tip_header = TipHeader::new(
block.header().clone(),
cannon_total_difficulty,
root,
);

self.update_index(batch, tip_header.number(), block, &mut old_cumulative_blks, &mut new_cumulative_blks);
// TODO: Move out
*tip_header = new_tip_header;
self.shared.store().insert_tip_header(batch, &block.header());
self.shared.store().rebuild_tree(root);
}
debug!(target: "chain", "lock release");
if cannon_total_difficulty > current_total_difficulty
|| (current_total_difficulty == cannon_total_difficulty
&& block.header().hash() < tip_header.hash())
{
debug!(target: "chain", "new best block found: {} => {}", block.header().number(), block.header().hash());
new_best_block = true;
output_root = root;
total_difficulty = cannon_total_difficulty;
}
Ok(())
})?;

if new_best_block {
debug!(target: "chain", "update index");
let mut guard = RwLockUpgradableReadGuard::upgrade(tip_header);
let new_tip_header =
TipHeader::new(block.header().clone(), total_difficulty, output_root);
self.shared.store().save_with_batch(|batch| {
self.update_index(
batch,
tip_number,
block,
&mut old_cumulative_blks,
&mut new_cumulative_blks,
);
self.shared
.store()
.insert_tip_header(batch, &block.header());
self.shared.store().rebuild_tree(output_root);
Ok(())
})?;
*guard = new_tip_header;
debug!(target: "chain", "update index release");
}

Ok(BlockInsertionResult {
new_best_block,
fork_blks: ForkBlocks::new(old_cumulative_blks, new_cumulative_blks),
Expand Down Expand Up @@ -220,11 +236,11 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
old_cumulative_blks: &mut Vec<Block>,
new_cumulative_blks: &mut Vec<Block>,
) {
let mut number = block.header().number() - 1;
let mut number = block.header().number();

// The old fork may longer than new fork
if number < tip_number {
for n in number..tip_number + 1 {
if tip_number >= number {
for n in number..=tip_number {
let hash = self.shared.block_hash(n).unwrap();
let old_block = self.shared.block(&hash).unwrap();
self.shared.store().delete_block_hash(batch, n);
Expand Down Expand Up @@ -253,7 +269,7 @@ impl<CI: ChainIndex + 'static> ChainService<CI> {
}

let mut hash = block.header().parent_hash();

number -= 1;
loop {
if let Some(old_hash) = self.shared.block_hash(number) {
if old_hash == hash {
Expand Down
1 change: 1 addition & 0 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern crate ckb_verification as verification;
extern crate log;
#[macro_use]
extern crate crossbeam_channel as channel;
extern crate ckb_util as util;

#[cfg(test)]
extern crate rand;
Expand Down
1 change: 0 additions & 1 deletion miner/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl MinerService {
let mut new_transactions_counter = 0;
let mut nonce: u64 = thread_rng().gen();
loop {
debug!(target: "miner", "mining {}", nonce);
loop {
select! {
recv(self.new_tx_receiver, msg) => match msg {
Expand Down
2 changes: 1 addition & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rand = "0.5"
fnv = "1.0"
serde = "1.0"
serde_derive = "1.0"
parking_lot = "0.6"
ckb-util = { path = "../util" }
unsigned-varint = {git = "https://github.com/paritytech/unsigned-varint", features = ["codec"]}
log = "0.4.5"
bytes = "0.4.9"
Expand Down
4 changes: 2 additions & 2 deletions network/src/ckb_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
Ok(result) => result,
Err(err) => {
return {
error!("failed to upgrade ckb_protocol");
error!(target: "network", "failed to upgrade ckb_protocol");
future::err(IoError::new(IoErrorKind::Other, err))
}
}
Expand All @@ -118,7 +118,7 @@ where
outgoing_msg_channel,
incoming_stream,
};
trace!("success to upgrade ckb_protocol");
trace!(target: "network", "success to upgrade ckb_protocol");

future::ok((out, remote_addr))
}
Expand Down
4 changes: 2 additions & 2 deletions network/src/ckb_protocol_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::errors::{Error, ErrorKind};
use super::{Network, SessionInfo, Timer};
use super::{PeerIndex, ProtocolId, TimerToken};
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use util::Mutex;

#[derive(Clone, Debug)]
pub enum Severity<'a> {
Expand Down Expand Up @@ -83,7 +83,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
// report peer behaviour
fn report_peer(&self, peer_index: PeerIndex, reason: Severity) {
// TODO combinate this interface with peer score
info!("report peer {} reason: {:?}", peer_index, reason);
info!(target: "network", "report peer {} reason: {:?}", peer_index, reason);
self.disconnect(peer_index);
}
// ban peer
Expand Down
3 changes: 3 additions & 0 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl CKBService {
};
if protocol_connec.state() == UniqueConnecState::Full {
error!(
target: "network",
"we already connected peer {:?} with {:?}, stop handling",
peer_id, protocol_id
);
Expand Down Expand Up @@ -99,6 +100,7 @@ impl CKBService {
let protocol_id = protocol_id;
move |val| {
info!(
target: "network",
"Disconnect! peer {:?} protocol_id {:?} reason {:?}",
peer_id, protocol_id, val
);
Expand All @@ -122,6 +124,7 @@ impl CKBService {
};

info!(
target: "network",
"Connected to peer {:?} with protocol_id {:?} version {}",
peer_id, protocol_id, protocol_version
);
Expand Down
16 changes: 10 additions & 6 deletions network/src/discovery_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use libp2p::core::{upgrade, MuxedTransport, PeerId};
use libp2p::core::{Endpoint, Multiaddr, UniqueConnec};
use libp2p::core::{PublicKey, SwarmController};
use libp2p::{kad, Transport};
use parking_lot::Mutex;
use peer_store::Status;
use protocol::Protocol;
use protocol_service::ProtocolService;
Expand All @@ -25,6 +24,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::timer::Interval;
use tokio::timer::Timeout;
use transport::TransportOutput;
use util::Mutex;

pub struct DiscoveryService {
timeout: Duration,
Expand Down Expand Up @@ -111,7 +111,7 @@ impl<T: Send> ProtocolService<T> for DiscoveryService {
let kad_upgrade = self.kad_upgrade.clone();
// dial kad peer
move |peer_id| {
debug!("Initialize kad search peers from peer {:?}", peer_id);
debug!(target: "network", "Initialize kad search peers from peer {:?}", peer_id);
Self::dial_kad_peer(
Arc::clone(&kad_manage),
kad_upgrade.clone(),
Expand Down Expand Up @@ -223,7 +223,7 @@ impl DiscoveryService {
_endpoint: Endpoint,
kademlia_stream: Box<Stream<Item = kad::KadIncomingRequest, Error = IoError> + Send>,
) -> Result<Box<Future<Item = (), Error = IoError> + Send>, IoError> {
debug!("client_addr is {:?}", client_addr);
debug!(target: "network", "client_addr is {:?}", client_addr);
//let peer_id = match convert_addr_into_peer_id(client_addr) {
// Some(peer_id) => peer_id,
// None => {
Expand All @@ -247,7 +247,7 @@ impl DiscoveryService {
Timeout::new(next_future, timeout)
.map_err({
move |err| {
info!("kad timeout error {:?}", err.description());
info!(target: "network", "kad timeout error {:?}", err.description());
IoError::new(
IoErrorKind::Other,
format!("discovery request timeout {:?}", err.description()),
Expand Down Expand Up @@ -278,6 +278,7 @@ impl DiscoveryService {
let peer_id = peer_id.clone();
move |val| {
trace!(
target: "network",
"Kad connection closed when handling peer {:?} reason: {:?}",
peer_id,
val
Expand Down Expand Up @@ -314,7 +315,8 @@ impl DiscoveryService {
let network = Arc::clone(&network);
move |peer_id| {
if peer_id == *kad_system.local_peer_id() {
info!(
debug!(
target: "network",
"response self address to kad {:?}",
network.listened_addresses.read().clone()
);
Expand All @@ -336,7 +338,8 @@ impl DiscoveryService {
Status::Connected => kad::KadConnectionType::Connected,
_ => kad::KadConnectionType::NotConnected,
};
info!(
debug!(
target: "network",
"response other address to kad {:?} {:?}",
peer_id,
multiaddrs.clone()
Expand Down Expand Up @@ -404,6 +407,7 @@ impl DiscoveryService {
Some(Some(addr)) => addr.to_owned(),
_ => {
debug!(
target: "network",
"dial kad error, can't find dial address for peer_id {:?}",
peer_id
);
Expand Down
14 changes: 9 additions & 5 deletions network/src/identify_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use super::Network;
use super::PeerId;
use futures::future::{self, lazy, Future};
use futures::future::{self, Future};
use futures::Stream;
use libp2p::core::Multiaddr;
use libp2p::core::SwarmController;
Expand Down Expand Up @@ -51,6 +51,7 @@ impl IdentifyService {
})
}
None => error!(
target: "network",
"can't find peer_id {:?} during process identify info",
peer_id
),
Expand All @@ -61,13 +62,14 @@ impl IdentifyService {
for original_address in network.original_listened_addresses.read().iter() {
let transport = libp2p::tcp::TcpConfig::new();
trace!(
target: "network",
"try get address use original_address {:?} and observed_address {:?}",
original_address,
observed_addr
);
// get an external addrs for our node
if let Some(mut ext_addr) = transport.nat_traversal(original_address, &observed_addr) {
debug!("get new external address {:?}", ext_addr);
debug!(target: "network", "get new external address {:?}", ext_addr);
let mut listened_addresses = network.listened_addresses.write();
if !listened_addresses.iter().any(|a| a == &ext_addr) {
listened_addresses.push(ext_addr.clone());
Expand Down Expand Up @@ -182,7 +184,7 @@ where
Instant::now() + Duration::from_secs(5),
self.identify_interval,
).map_err(|err| {
debug!("identify periodic error {:?}", err);
debug!(target: "network", "identify periodic error {:?}", err);
IoError::new(
IoErrorKind::Other,
format!("identify periodic error {:?}", err),
Expand All @@ -200,6 +202,7 @@ where
}
}
trace!(
target: "network",
"request identify to peer {:?} {:?}",
peer_id,
peer.remote_addresses
Expand All @@ -210,15 +213,16 @@ where
let _ = swarm_controller.dial(addr.clone(), transport.clone());
} else {
error!(
target: "network",
"error when prepare identify : can't find addresses for peer {:?}",
peer_id
);
}
}
Box::new(lazy(|| future::ok(()))) as Box<Future<Item = _, Error = _> + Send>
Ok(())
}
}).then(|err| {
warn!("Identify service stopped, reason: {:?}", err);
warn!(target: "network", "Identify service stopped, reason: {:?}", err);
err
});
Box::new(periodic_identify_future) as Box<Future<Item = _, Error = _> + Send>
Expand Down
2 changes: 1 addition & 1 deletion network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ extern crate unsigned_varint;
#[macro_use]
extern crate log;
extern crate fnv;
extern crate parking_lot;
#[macro_use]
extern crate serde_derive;
extern crate ckb_util as util;

mod ckb_protocol;
mod ckb_protocol_handler;
Expand Down
Loading

0 comments on commit 8a28fd8

Please sign in to comment.