From b957d2bb1da1007f7ba5e4d391673c6e8413ac6c Mon Sep 17 00:00:00 2001 From: quake wang Date: Tue, 13 Nov 2018 16:15:04 +0900 Subject: [PATCH] feat: relay msg to peers and network tweak --- Cargo.lock | 1 + miner/src/miner.rs | 8 +++--- network/src/ckb_protocol_handler.rs | 7 +++++ network/src/network_service.rs | 19 +------------- network/src/peers_registry.rs | 5 ---- rpc/Cargo.toml | 1 + rpc/src/integration_test.rs | 25 +++++++++++------- rpc/src/lib.rs | 3 +++ rpc/src/server.rs | 31 +++++++++++++++-------- sync/src/relayer/compact_block_process.rs | 21 +++++++++------ sync/src/relayer/transaction_process.rs | 29 ++++++++++++--------- sync/src/synchronizer/mod.rs | 4 +++ sync/src/tests/mod.rs | 5 ++++ 13 files changed, 92 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84efdd5fe5..c1d10b8f36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -580,6 +580,7 @@ dependencies = [ "ckb-time 0.1.0", "ckb-verification 0.1.0", "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "flatbuffers 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 8.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 7d633654db..5efd2c7bae 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -157,12 +157,12 @@ impl MinerService { fn announce_new_block(&self, block: &Arc) { self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| { - for peer in self.network.connected_peers_indexes() { + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); + fbb.finish(message, None); + for peer in nc.connected_peers() { debug!(target: "miner", "announce new block to peer#{}, {} => {}", peer, block.header().number(), block.header().hash()); - let fbb = &mut FlatBufferBuilder::new(); - let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); - fbb.finish(message, None); let _ = nc.send(peer, fbb.finished_data().to_vec()); } }); diff --git a/network/src/ckb_protocol_handler.rs b/network/src/ckb_protocol_handler.rs index 53d4ff9da0..a80248b9ee 100644 --- a/network/src/ckb_protocol_handler.rs +++ b/network/src/ckb_protocol_handler.rs @@ -36,6 +36,7 @@ pub trait CKBProtocolContext: Send { .and_then(|session| Some((*peer_index, session))) }).collect() } + fn connected_peers(&self) -> Vec; } pub(crate) struct DefaultCKBProtocolContext { @@ -147,6 +148,12 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { fn protocol_id(&self) -> ProtocolId { self.protocol_id } + + fn connected_peers(&self) -> Vec { + let peers_registry = self.network.peers_registry().read(); + let iter = peers_registry.connected_peers_indexes(); + iter.collect::>() + } } pub trait CKBProtocolHandler: Sync + Send { diff --git a/network/src/network_service.rs b/network/src/network_service.rs index e115860b8f..ad4a3dbbd8 100644 --- a/network/src/network_service.rs +++ b/network/src/network_service.rs @@ -1,5 +1,5 @@ +use super::NetworkConfig; use super::{Error, ErrorKind, ProtocolId}; -use super::{NetworkConfig, PeerIndex}; use ckb_protocol::CKBProtocol; use ckb_protocol_handler::CKBProtocolHandler; use ckb_protocol_handler::{CKBProtocolContext, DefaultCKBProtocolContext}; @@ -45,23 +45,6 @@ impl NetworkService { &self.network.peer_store() } - #[cfg_attr(feature = "cargo-clippy", allow(let_and_return))] - pub fn connected_peers(&self) -> Vec { - let peers_registry = self.peers_registry().read(); - let peers = peers_registry - .connected_peers() - .map(|peer_id| peer_id.to_owned()) - .collect::>(); - peers - } - - #[cfg_attr(feature = "cargo-clippy", allow(let_and_return))] - pub fn connected_peers_indexes(&self) -> Vec { - let peers_registry = self.peers_registry().read(); - let peers = peers_registry.connected_peers_indexes().collect::>(); - peers - } - pub fn add_peer(&self, peer_id: PeerId, peer: PeerConnection) { let mut peers_registry = self.peers_registry().write(); peers_registry.add_peer(peer_id, peer); diff --git a/network/src/peers_registry.rs b/network/src/peers_registry.rs index fe60d9cdeb..2529809f73 100644 --- a/network/src/peers_registry.rs +++ b/network/src/peers_registry.rs @@ -271,11 +271,6 @@ impl PeersRegistry { } } - #[inline] - pub fn connected_peers<'a>(&'a self) -> impl Iterator + 'a { - Box::new(self.peer_connections.iter().map(|(k, _v)| k)) - } - #[inline] pub fn connected_peers_indexes<'a>(&'a self) -> impl Iterator + 'a { Box::new( diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 53ad35ef50..de428c66cb 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -26,6 +26,7 @@ serde_json = "1.0" log = "0.4" crossbeam-channel = "0.2" fnv = "1.0.3" +flatbuffers = "0.5.0" [dev-dependencies] ckb-db = { path = "../db" } diff --git a/rpc/src/integration_test.rs b/rpc/src/integration_test.rs index 066d0e2b08..db7f5386d0 100644 --- a/rpc/src/integration_test.rs +++ b/rpc/src/integration_test.rs @@ -6,15 +6,18 @@ use bigint::H256; use ckb_pow::Clicker; use core::header::{BlockNumber, Header}; use core::transaction::{OutPoint, Transaction}; +use flatbuffers::FlatBufferBuilder; use jsonrpc_core::{Error, IoHandler, Result}; use jsonrpc_http_server::ServerBuilder; use jsonrpc_server_utils::cors::AccessControlAllowOrigin; use jsonrpc_server_utils::hosts::DomainsValidation; use network::NetworkService; use pool::txs_pool::TransactionPoolController; +use protocol::RelayMessage; use shared::index::ChainIndex; use shared::shared::{ChainProvider, Shared}; use std::sync::Arc; +use sync::RELAY_PROTOCOL_ID; //TODO: build_rpc_trait! do not surppot trait bounds build_rpc_trait! { @@ -74,17 +77,21 @@ impl IntegrationTestRpc for RpcImpl { } fn send_transaction(&self, tx: Transaction) -> Result { - let indexed_tx: Transaction = tx.into(); - let result = indexed_tx.hash(); - let pool_result = self.tx_pool.add_transaction(indexed_tx.clone()); + let tx_hash = tx.hash(); + let pool_result = self.tx_pool.add_transaction(tx.clone()); debug!(target: "rpc", "send_transaction add to pool result: {:?}", pool_result); - // TODO PENDING new api NetworkContext#connected_peers - // for peer_id in self.nc.connected_peers() { - // let data = builde_transaction(indexed_tx); - // self.nc.send(peer_id, 0, data.to_vec()); - // } - Ok(result) + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_transaction(fbb, &tx); + fbb.finish(message, None); + + self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| { + for peer in nc.connected_peers() { + debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer); + let _ = nc.send(peer, fbb.finished_data().to_vec()); + } + }); + Ok(tx_hash) } fn get_block(&self, hash: H256) -> Result> { diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 1bde3dfd95..2c1aefecd6 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,4 +1,5 @@ extern crate bigint; +extern crate flatbuffers; extern crate jsonrpc_core; #[macro_use] extern crate jsonrpc_macros; @@ -12,7 +13,9 @@ extern crate ckb_db as db; extern crate ckb_network as network; extern crate ckb_notify as notify; extern crate ckb_pool as pool; +extern crate ckb_protocol as protocol; extern crate ckb_shared as shared; +extern crate ckb_sync as sync; extern crate ckb_time; #[cfg(test)] extern crate ckb_verification as verification; diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 21ab145797..b6f690dd64 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -3,15 +3,18 @@ use super::{BlockWithHash, CellOutputWithOutPoint, Config, TransactionWithHash}; use bigint::H256; use core::header::{BlockNumber, Header}; use core::transaction::{OutPoint, Transaction}; +use flatbuffers::FlatBufferBuilder; use jsonrpc_core::{Error, IoHandler, Result}; use jsonrpc_http_server::ServerBuilder; use jsonrpc_server_utils::cors::AccessControlAllowOrigin; use jsonrpc_server_utils::hosts::DomainsValidation; use network::NetworkService; use pool::txs_pool::TransactionPoolController; +use protocol::RelayMessage; use shared::index::ChainIndex; use shared::shared::{ChainProvider, Shared}; use std::sync::Arc; +use sync::RELAY_PROTOCOL_ID; build_rpc_trait! { pub trait Rpc { @@ -46,7 +49,7 @@ build_rpc_trait! { } struct RpcImpl { - _network: Arc, + network: Arc, shared: Shared, tx_pool: TransactionPoolController, controller: RpcController, @@ -54,15 +57,21 @@ struct RpcImpl { impl Rpc for RpcImpl { fn send_transaction(&self, tx: Transaction) -> Result { - let result = tx.hash(); - let pool_result = self.tx_pool.add_transaction(tx); + let tx_hash = tx.hash(); + let pool_result = self.tx_pool.add_transaction(tx.clone()); debug!(target: "rpc", "send_transaction add to pool result: {:?}", pool_result); - // TODO PENDING new api NetworkContext#connected_peers - // for peer_id in self.nc.connected_peers() { - // let data = builde_transaction(indexed_tx); - // self.nc.send(peer_id, 0, data.to_vec()); - // } - Ok(result) + + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_transaction(fbb, &tx); + fbb.finish(message, None); + + self.network.with_protocol_context(RELAY_PROTOCOL_ID, |nc| { + for peer in nc.connected_peers() { + debug!(target: "rpc", "relay transaction {} to peer#{}", tx_hash, peer); + let _ = nc.send(peer, fbb.finished_data().to_vec()); + } + }); + Ok(tx_hash) } fn get_block(&self, hash: H256) -> Result> { @@ -130,7 +139,7 @@ pub struct RpcServer { impl RpcServer { pub fn start( &self, - _network: Arc, + network: Arc, shared: Shared, tx_pool: TransactionPoolController, controller: RpcController, @@ -140,7 +149,7 @@ impl RpcServer { let mut io = IoHandler::new(); io.extend_with( RpcImpl { - _network, + network, shared, tx_pool, controller, diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index b422fe42b8..23ee88d611 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -4,6 +4,7 @@ use ckb_shared::index::ChainIndex; use flatbuffers::FlatBufferBuilder; use network::{CKBProtocolContext, PeerIndex}; use relayer::Relayer; +use std::collections::HashSet; pub struct CompactBlockProcess<'a, CI: ChainIndex + 'a> { message: &'a FbsCompactBlock<'a>, @@ -45,14 +46,18 @@ where match self.relayer.reconstruct_block(&compact_block, Vec::new()) { (Some(block), _) => { - let _ = self.relayer.accept_block(self.peer, block); - // TODO PENDING new api NetworkContext#connected_peers - // for peer_id in self.nc.connected_peers() { - // let fbb = &mut FlatBufferBuilder::new(); - // let message = RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); - // fbb.finish(message, None); - // self.nc.send(peer_id, 0, fbb.finished_data().to_vec()); - // } + if self.relayer.accept_block(self.peer, block.clone()).is_ok() { + let fbb = &mut FlatBufferBuilder::new(); + let message = + RelayMessage::build_compact_block(fbb, &block, &HashSet::new()); + fbb.finish(message, None); + + for peer_id in self.nc.connected_peers() { + if peer_id != self.peer { + let _ = self.nc.send(peer_id, fbb.finished_data().to_vec()); + } + } + } } (_, Some(missing_indexes)) => { let hash = compact_block.header.hash(); diff --git a/sync/src/relayer/transaction_process.rs b/sync/src/relayer/transaction_process.rs index 6c29e12bd6..d77e897b45 100644 --- a/sync/src/relayer/transaction_process.rs +++ b/sync/src/relayer/transaction_process.rs @@ -1,12 +1,12 @@ -use ckb_protocol::Transaction; +use ckb_protocol::{RelayMessage, Transaction as FbsTransaction}; use ckb_shared::index::ChainIndex; +use core::transaction::Transaction; +use flatbuffers::FlatBufferBuilder; use network::{CKBProtocolContext, PeerIndex}; use relayer::Relayer; -// TODO PENDING remove this attribute later -#[allow(dead_code)] pub struct TransactionProcess<'a, CI: ChainIndex + 'a> { - message: &'a Transaction<'a>, + message: &'a FbsTransaction<'a>, relayer: &'a Relayer, peer: PeerIndex, nc: &'a CKBProtocolContext, @@ -17,7 +17,7 @@ where CI: ChainIndex + 'static, { pub fn new( - message: &'a Transaction, + message: &'a FbsTransaction, relayer: &'a Relayer, peer: PeerIndex, nc: &'a CKBProtocolContext, @@ -31,12 +31,17 @@ where } pub fn execute(self) { - let tx = (*self.message).into(); - let _ = self.relayer.tx_pool.add_transaction(tx); - // TODO PENDING new api NetworkContext#connected_peers - // for peer_id in self.nc.connected_peers() { - // let data = builde_transaction(indexed_tx); - // self.nc.send(peer_id, 0, data.to_vec()); - // } + let tx: Transaction = (*self.message).into(); + let _ = self.relayer.tx_pool.add_transaction(tx.clone()); + + let fbb = &mut FlatBufferBuilder::new(); + let message = RelayMessage::build_transaction(fbb, &tx); + fbb.finish(message, None); + + for peer_id in self.nc.connected_peers() { + if peer_id != self.peer { + let _ = self.nc.send(peer_id, fbb.finished_data().to_vec()); + } + } } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 001f83a195..a384080721 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -1042,6 +1042,10 @@ mod tests { fn protocol_id(&self) -> ProtocolId { unimplemented!(); } + + fn connected_peers(&self) -> Vec { + unimplemented!(); + } } fn mock_network_context(peer_num: usize) -> DummyNetworkContext { diff --git a/sync/src/tests/mod.rs b/sync/src/tests/mod.rs index 9a5fb5b365..9403e71de3 100644 --- a/sync/src/tests/mod.rs +++ b/sync/src/tests/mod.rs @@ -187,6 +187,11 @@ impl NetworkContext for TestNetworkContext { None } + fn connected_peers(&self) -> Vec { + self.msg_senders.keys().map(|k| k.1).collect::>() + } +} + fn subprotocol_name(&self) -> ProtocolId { [1, 1, 1] }