From 91bea249627790aa2a4c7244b0951c1ef5353284 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 24 Apr 2018 12:07:17 +0300 Subject: [PATCH 1/2] preparing light client structure --- Cargo.lock | 12 + demo/cli/src/lib.rs | 5 +- polkadot/cli/src/cli.yml | 4 + polkadot/cli/src/informant.rs | 9 +- polkadot/cli/src/lib.rs | 16 +- polkadot/service/Cargo.toml | 1 + polkadot/service/src/chain_config.rs | 142 ++++++++++ polkadot/service/src/client_adapter.rs | 191 +++++++++++++ polkadot/service/src/lib.rs | 262 ++++-------------- substrate/client/src/client.rs | 78 +++++- substrate/client/src/error.rs | 6 + substrate/client/src/lib.rs | 6 +- substrate/light/Cargo.toml | 10 + substrate/light/src/client.rs | 77 +++++ substrate/light/src/lib.rs | 29 ++ substrate/network/Cargo.toml | 1 + substrate/network/src/config.rs | 16 +- substrate/network/src/{ => full}/blocks.rs | 4 +- substrate/network/src/{ => full}/chain.rs | 21 +- substrate/network/src/{ => full}/consensus.rs | 6 +- substrate/network/src/full/handler.rs | 47 ++++ substrate/network/src/{ => full}/message.rs | 2 +- substrate/network/src/full/mod.rs | 51 ++++ substrate/network/src/{ => full}/protocol.rs | 57 +--- substrate/network/src/{ => full}/sync.rs | 39 +-- substrate/network/src/handler.rs | 121 ++++++++ substrate/network/src/lib.rs | 25 +- substrate/network/src/service.rs | 224 ++++++--------- substrate/network/src/sync_provider.rs | 100 +++++++ substrate/network/src/test/mod.rs | 2 +- substrate/network/src/test/sync.rs | 2 +- substrate/rpc/src/chain/mod.rs | 33 +-- substrate/rpc/src/chain/tests.rs | 11 +- substrate/rpc/src/state/mod.rs | 41 ++- substrate/rpc/src/state/tests.rs | 6 +- 35 files changed, 1147 insertions(+), 510 deletions(-) create mode 100644 polkadot/service/src/chain_config.rs create mode 100644 polkadot/service/src/client_adapter.rs create mode 100644 substrate/light/Cargo.toml create mode 100644 substrate/light/src/client.rs create mode 100644 substrate/light/src/lib.rs rename substrate/network/src/{ => full}/blocks.rs (99%) rename substrate/network/src/{ => full}/chain.rs (82%) rename substrate/network/src/{ => full}/consensus.rs (99%) create mode 100644 substrate/network/src/full/handler.rs rename substrate/network/src/{ => full}/message.rs (99%) create mode 100644 substrate/network/src/full/mod.rs rename substrate/network/src/{ => full}/protocol.rs (94%) rename substrate/network/src/{ => full}/sync.rs (95%) create mode 100644 substrate/network/src/handler.rs create mode 100644 substrate/network/src/sync_provider.rs diff --git a/Cargo.lock b/Cargo.lock index ea3a6879b48ef..898246d90d83d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1386,6 +1386,7 @@ dependencies = [ "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-executor 0.1.0", + "substrate-light 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-runtime-io 0.1.0", @@ -1852,6 +1853,16 @@ dependencies = [ "lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-light" +version = "0.1.0" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-client 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "substrate-misbehavior-check" version = "0.1.0" @@ -1886,6 +1897,7 @@ dependencies = [ "substrate-codec 0.1.0", "substrate-executor 0.1.0", "substrate-keyring 0.1.0", + "substrate-light 0.1.0", "substrate-primitives 0.1.0", "substrate-runtime-support 0.1.0", "substrate-serializer 0.1.0", diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 0330192b5816a..9cd9cc657c2cb 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -136,8 +136,9 @@ pub fn run(args: I) -> error::Result<()> where let _rpc_servers = { let handler = || { - let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote()); - rpc::rpc_handler(client.clone(), chain, DummyPool) + let chain = rpc::apis::chain::Chain::new(client.clone(), client.clone(), client.clone(), core.remote()); + let state = rpc::apis::state::State::new(client.clone(), client.clone(), client.clone()); + rpc::rpc_handler(state, chain, DummyPool) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 07d3c0ec66d13..84004e783bc58 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -32,6 +32,10 @@ args: long: validator help: Enable validator mode takes_value: false + - light: + long: light + help: Run in light client mode + takes_value: false - port: long: port value_name: PORT diff --git a/polkadot/cli/src/informant.rs b/polkadot/cli/src/informant.rs index 70c49f9c448f4..d6c42cf204d7b 100644 --- a/polkadot/cli/src/informant.rs +++ b/polkadot/cli/src/informant.rs @@ -23,7 +23,6 @@ use tokio_core::reactor; use network::{SyncState, SyncProvider}; use runtime_support::Hashable; use primitives::block::HeaderHash; -use client::BlockchainEvents; const TIMER_INTERVAL_MS: u64 = 5000; @@ -33,12 +32,12 @@ pub fn start(service: &Service, handle: reactor::Handle) { .expect("Error creating informant timer"); let network = service.network(); - let client = service.client(); + let chain_head = service.chain_head(); let display_notifications = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { let sync_status = network.status(); - if let Ok(best_block) = client.best_block_header() { + if let Ok(best_block) = chain_head.best_block_header() { let hash: HeaderHash = best_block.blake2_256().into(); let status = match (sync_status.sync.state, sync_status.sync.best_seen_block) { (SyncState::Idle, _) => "Idle".into(), @@ -52,8 +51,8 @@ pub fn start(service: &Service, handle: reactor::Handle) { Ok(()) }); - let client = service.client(); - let display_block_import = client.import_notification_stream().for_each(|n| { + let chain_events = service.chain_events(); + let display_block_import = chain_events.import_notification_stream().for_each(|n| { info!(target: "polkadot", "Imported #{} ({})", n.header.number, n.hash); Ok(()) }); diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index b3f62fb65b57f..09be04e11a778 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -120,6 +120,10 @@ pub fn run(args: I) -> error::Result<()> where info!("Starting validator."); role = service::Role::VALIDATOR; } + else if matches.is_present("light") { + info!("Starting light."); + role = service::Role::LIGHT; + } match matches.value_of("chain") { Some("poc-1") => config.chain_spec = ChainSpec::PoC1Testnet, @@ -160,8 +164,16 @@ pub fn run(args: I) -> error::Result<()> where let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?; let handler = || { - let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); - rpc::rpc_handler(service.client(), chain, service.transaction_pool()) + let chain = rpc::apis::chain::Chain::new( + service.chain_head(), + service.chain_data(), + service.chain_events(), + core.remote()); + let state = rpc::apis::state::State::new( + service.chain_head(), + service.state_data(), + service.contract_caller()); + rpc::rpc_handler(state, chain, service.transaction_pool()) }; ( start_server(http_address, |address| rpc::start_http(address, handler())), diff --git a/polkadot/service/Cargo.toml b/polkadot/service/Cargo.toml index d88c069470af2..c830f5435bb3b 100644 --- a/polkadot/service/Cargo.toml +++ b/polkadot/service/Cargo.toml @@ -26,3 +26,4 @@ substrate-network = { path = "../../substrate/network" } substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec" } substrate-executor = { path = "../../substrate/executor" } +substrate-light = { path = "../../substrate/light" } diff --git a/polkadot/service/src/chain_config.rs b/polkadot/service/src/chain_config.rs new file mode 100644 index 0000000000000..3192c0e116a62 --- /dev/null +++ b/polkadot/service/src/chain_config.rs @@ -0,0 +1,142 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! Chain configuration. + +use ed25519; +use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, + SessionConfig, StakingConfig}; + +/// Chain configuration. +pub struct ChainConfig { + /// Genesis block confguration. + pub genesis_config: GenesisConfig, + /// List of bootnodes. + pub boot_nodes: Vec, +} +/// Prepare chain configuration for POC-1 testnet. +pub fn poc_1_testnet_config() -> ChainConfig { + let initial_authorities = vec![ + hex!["82c39b31a2b79a90f8e66e7a77fdb85a4ed5517f2ae39f6a80565e8ecae85cf5"].into(), + hex!["4de37a07567ebcbf8c64568428a835269a566723687058e017b6d69db00a77e7"].into(), + hex!["063d7787ebca768b7445dfebe7d62cbb1625ff4dba288ea34488da266dd6dca5"].into(), + ]; + let endowed_accounts = vec![ + hex!["24d132eb1a4cbf8e46de22652019f1e07fadd5037a6a057c75dbbfd4641ba85d"].into(), + ]; + let genesis_config = GenesisConfig { + consensus: Some(ConsensusConfig { + code: include_bytes!("../../runtime/wasm/genesis.wasm").to_vec(), // TODO change + authorities: initial_authorities.clone(), + }), + system: None, + session: Some(SessionConfig { + validators: initial_authorities.clone(), + session_length: 720, // that's 1 hour per session. + }), + staking: Some(StakingConfig { + current_era: 0, + intentions: vec![], + transaction_fee: 100, + balances: endowed_accounts.iter().map(|&k|(k, 1u64 << 60)).collect(), + validator_count: 12, + sessions_per_era: 24, // 24 hours per era. + bonding_duration: 90, // 90 days per bond. + }), + democracy: Some(DemocracyConfig { + launch_period: 120 * 24 * 14, // 2 weeks per public referendum + voting_period: 120 * 24 * 28, // 4 weeks to discuss & vote on an active referendum + minimum_deposit: 1000, // 1000 as the minimum deposit for a referendum + }), + council: Some(CouncilConfig { + active_council: vec![], + candidacy_bond: 1000, // 1000 to become a council candidate + voter_bond: 100, // 100 down to vote for a candidate + present_slash_per_voter: 1, // slash by 1 per voter for an invalid presentation. + carry_count: 24, // carry over the 24 runners-up to the next council election + presentation_duration: 120 * 24, // one day for presenting winners. + approval_voting_period: 7 * 120 * 24, // one week period between possible council elections. + term_duration: 180 * 120 * 24, // 180 day term duration for the council. + desired_seats: 0, // start with no council: we'll raise this once the stake has been dispersed a bit. + inactive_grace_period: 1, // one addition vote should go by before an inactive voter can be reaped. + + cooloff_period: 90 * 120 * 24, // 90 day cooling off period if council member vetoes a proposal. + voting_period: 7 * 120 * 24, // 7 day voting period for council members. + }), + parachains: Some(Default::default()), + }; + let boot_nodes = Vec::new(); + ChainConfig { genesis_config, boot_nodes } +} + +/// Prepare chain configuration for local testnet. +pub fn local_testnet_config() -> ChainConfig { + let initial_authorities = vec![ + ed25519::Pair::from_seed(b"Alice ").public().into(), + ed25519::Pair::from_seed(b"Bob ").public().into(), + ]; + let endowed_accounts = vec![ + ed25519::Pair::from_seed(b"Alice ").public().into(), + ed25519::Pair::from_seed(b"Bob ").public().into(), + ed25519::Pair::from_seed(b"Charlie ").public().into(), + ed25519::Pair::from_seed(b"Dave ").public().into(), + ed25519::Pair::from_seed(b"Eve ").public().into(), + ed25519::Pair::from_seed(b"Ferdie ").public().into(), + ]; + let genesis_config = GenesisConfig { + consensus: Some(ConsensusConfig { + code: include_bytes!("../../runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm").to_vec(), + authorities: initial_authorities.clone(), + }), + system: None, + session: Some(SessionConfig { + validators: initial_authorities.clone(), + session_length: 10, + }), + staking: Some(StakingConfig { + current_era: 0, + intentions: initial_authorities.clone(), + transaction_fee: 1, + balances: endowed_accounts.iter().map(|&k|(k, 1u64 << 60)).collect(), + validator_count: 2, + sessions_per_era: 5, + bonding_duration: 2, + }), + democracy: Some(DemocracyConfig { + launch_period: 9, + voting_period: 18, + minimum_deposit: 10, + }), + council: Some(CouncilConfig { + active_council: vec![], + candidacy_bond: 10, + voter_bond: 2, + present_slash_per_voter: 1, + carry_count: 4, + presentation_duration: 10, + approval_voting_period: 20, + term_duration: 40, + desired_seats: 0, + inactive_grace_period: 1, + + cooloff_period: 75, + voting_period: 20, + }), + parachains: Some(Default::default()), + }; + let boot_nodes = Vec::new(); + ChainConfig { genesis_config, boot_nodes } +} diff --git a/polkadot/service/src/client_adapter.rs b/polkadot/service/src/client_adapter.rs new file mode 100644 index 0000000000000..83d0efa9181ea --- /dev/null +++ b/polkadot/service/src/client_adapter.rs @@ -0,0 +1,191 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Polkadot clients -> service adapter. + +use std::sync::Arc; +use parking_lot::Mutex; +use client::{self, BlockchainEvents, ChainHead, ChainData, StateData, ContractCaller}; +use client::in_mem::Backend as InMemory; +use codec::{self, Slicable}; +use error::Error; +use light_client; +use network::{Service as NetworkService, ConsensusService as NetworkConsensusService, + Params as NetworkParams, TransactionPool as NetworkTransactionPool}; +use polkadot_api::PolkadotApi; +use polkadot_executor::Executor as LocalDispatch; +use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; +use primitives::hashing; +use substrate_executor::NativeExecutor; +use transaction_pool::{self, TransactionPool}; + +type FullClient = client::Client>; +type LightClient = light_client::LightClient; + +/// Clients adapter. +#[derive(Clone)] +pub enum Client { + /// Working with full client. + Full(Arc), + /// Working with light client. + Light(Arc), +} + +/// Transaction pool adapter to use with full client. +struct FullTransactionPoolAdapter { + pool: Arc>, + client: Arc, +} + +/// Transaction pool adapter to use with light client. +struct LighTransactionsPoolAdapter; + +impl Client { + /// Get shared blockchain events instance. + pub fn chain_events(&self) -> Arc { + match *self { + Client::Full(ref c) => c.clone(), + Client::Light(ref c) => c.clone(), + } + } + + /// Get shared chain head instance. + pub fn chain_head(&self) -> Arc { + match *self { + Client::Full(ref c) => c.clone(), + Client::Light(ref c) => c.clone(), + } + } + + /// Get shared chain data instance. + pub fn chain_data(&self) -> Arc { + match *self { + Client::Full(ref c) => c.clone(), + Client::Light(ref c) => c.clone(), + } + } + + /// Get shared state data instance. + pub fn state_data(&self) -> Arc { + match *self { + Client::Full(ref c) => c.clone(), + Client::Light(ref c) => c.clone(), + } + } + + /// Get shared contract caller instance. + pub fn contract_caller(&self) -> Arc { + match *self { + Client::Full(ref c) => c.clone(), + Client::Light(ref c) => c.clone(), + } + } + + /// Create transaction pool adapter to use with network. + pub fn create_transaction_pool_adapter(&self, pool: Arc>) -> Arc { + match *self { + Client::Full(ref client) => Arc::new(FullTransactionPoolAdapter { + pool: pool.clone(), + client: client.clone(), + }), + Client::Light(_) => Arc::new(LighTransactionsPoolAdapter), + } + } + + /// Create network service to use with current client. + pub fn create_network_service(&self, params: NetworkParams) -> Result<(Arc, Option>), Error> { + match *self { + Client::Full(ref client) => NetworkService::new_full(client.clone(), params) + .map(|(n, nc)| (n, Some(nc))) + .map_err(Into::into), + Client::Light(ref client) => NetworkService::new_light(client.clone(), params) + .map(|n| (n, None)) + .map_err(Into::into), + } + } + + /// Prune imported transactions from pool. + pub fn prune_imported(&self, pool: &Mutex, hash: HeaderHash) { + match *self { + Client::Full(ref client) => prune_imported(&*client, pool, hash), + Client::Light(_) => (), + } + } +} + +impl NetworkTransactionPool for FullTransactionPoolAdapter { + fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)> { + let best_block = match self.client.info() { + Ok(info) => info.chain.best_hash, + Err(e) => { + debug!("Error getting best block: {:?}", e); + return Vec::new(); + } + }; + let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); + let ready = transaction_pool::Ready::create(id, &*self.client); + self.pool.lock().pending(ready).map(|t| { + let hash = ::primitives::Hash::from(&t.hash()[..]); + let tx = codec::Slicable::encode(t.as_transaction()); + (hash, tx) + }).collect() + } + + fn import(&self, transaction: &[u8]) -> Option { + if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { + match self.pool.lock().import(tx) { + Ok(t) => Some(t.hash()[..].into()), + Err(e) => match *e.kind() { + transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), + _ => { + debug!("Error adding transaction to the pool: {:?}", e); + None + }, + } + } + } else { + debug!("Error decoding transaction"); + None + } + } +} + +impl NetworkTransactionPool for LighTransactionsPoolAdapter { + fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)> { + Vec::new() // TODO [light]: implement me + } + + fn import(&self, _transaction: &[u8]) -> Option { + None // TODO [light]: implement me + } +} + +fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { + for extrinsic in extrinsics { + let hash: _ = hashing::blake2_256(&extrinsic.encode()).into(); + pool.remove(&hash, true); + } +} + +/// Produce a task which prunes any finalized transactions from the pool. +fn prune_imported(client: &FullClient, pool: &Mutex, hash: HeaderHash) { + let id = BlockId::Hash(hash); + match client.body(&id) { + Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), + Ok(None) => warn!("Missing imported block {:?}", hash), + Err(e) => warn!("Failed to fetch block: {:?}", e), + } +} diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index d21415f7d0a7b..94cc6ec2e5257 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -29,6 +29,7 @@ extern crate polkadot_consensus as consensus; extern crate polkadot_transaction_pool as transaction_pool; extern crate polkadot_keystore as keystore; extern crate substrate_client as client; +extern crate substrate_light as light_client; extern crate substrate_runtime_io as runtime_io; extern crate substrate_primitives as primitives; extern crate substrate_network as network; @@ -46,6 +47,8 @@ extern crate log; extern crate hex_literal; mod error; +mod chain_config; +mod client_adapter; mod config; use std::sync::Arc; @@ -54,194 +57,28 @@ use futures::prelude::*; use parking_lot::Mutex; use tokio_core::reactor::Core; use codec::Slicable; -use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; -use primitives::hashing; use transaction_pool::TransactionPool; -use substrate_executor::NativeExecutor; -use polkadot_executor::Executor as LocalDispatch; use keystore::Store as Keystore; -use polkadot_api::PolkadotApi; -use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, - SessionConfig, StakingConfig, BuildExternalities}; -use client::{genesis, BlockchainEvents}; -use client::in_mem::Backend as InMemory; +use polkadot_runtime::BuildExternalities; +use client_adapter::Client; +use client::{genesis, BlockchainEvents, ChainHead, ChainData, StateData, ContractCaller}; use network::ManageNetwork; use exit_future::Signal; pub use self::error::{ErrorKind, Error}; +pub use chain_config::ChainConfig; pub use config::{Configuration, Role, ChainSpec}; -type Client = client::Client>; - /// Polkadot service. pub struct Service { thread: Option>, - client: Arc, + client: Client, network: Arc, transaction_pool: Arc>, signal: Option, _consensus: Option, } -struct TransactionPoolAdapter { - pool: Arc>, - client: Arc, -} - -impl network::TransactionPool for TransactionPoolAdapter { - fn transactions(&self) -> Vec<(ExtrinsicHash, Vec)> { - let best_block = match self.client.info() { - Ok(info) => info.chain.best_hash, - Err(e) => { - debug!("Error getting best block: {:?}", e); - return Vec::new(); - } - }; - let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); - let ready = transaction_pool::Ready::create(id, &*self.client); - self.pool.lock().pending(ready).map(|t| { - let hash = ::primitives::Hash::from(&t.hash()[..]); - let tx = codec::Slicable::encode(t.as_transaction()); - (hash, tx) - }).collect() - } - - fn import(&self, transaction: &[u8]) -> Option { - if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { - match self.pool.lock().import(tx) { - Ok(t) => Some(t.hash()[..].into()), - Err(e) => match *e.kind() { - transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), - _ => { - debug!("Error adding transaction to the pool: {:?}", e); - None - }, - } - } - } else { - debug!("Error decoding transaction"); - None - } - } -} - -pub struct ChainConfig { - genesis_config: GenesisConfig, - boot_nodes: Vec, -} - -fn poc_1_testnet_config() -> ChainConfig { - let initial_authorities = vec![ - hex!["82c39b31a2b79a90f8e66e7a77fdb85a4ed5517f2ae39f6a80565e8ecae85cf5"].into(), - hex!["4de37a07567ebcbf8c64568428a835269a566723687058e017b6d69db00a77e7"].into(), - hex!["063d7787ebca768b7445dfebe7d62cbb1625ff4dba288ea34488da266dd6dca5"].into(), - ]; - let endowed_accounts = vec![ - hex!["24d132eb1a4cbf8e46de22652019f1e07fadd5037a6a057c75dbbfd4641ba85d"].into(), - ]; - let genesis_config = GenesisConfig { - consensus: Some(ConsensusConfig { - code: include_bytes!("../../runtime/wasm/genesis.wasm").to_vec(), // TODO change - authorities: initial_authorities.clone(), - }), - system: None, - session: Some(SessionConfig { - validators: initial_authorities.clone(), - session_length: 720, // that's 1 hour per session. - }), - staking: Some(StakingConfig { - current_era: 0, - intentions: vec![], - transaction_fee: 100, - balances: endowed_accounts.iter().map(|&k|(k, 1u64 << 60)).collect(), - validator_count: 12, - sessions_per_era: 24, // 24 hours per era. - bonding_duration: 90, // 90 days per bond. - }), - democracy: Some(DemocracyConfig { - launch_period: 120 * 24 * 14, // 2 weeks per public referendum - voting_period: 120 * 24 * 28, // 4 weeks to discuss & vote on an active referendum - minimum_deposit: 1000, // 1000 as the minimum deposit for a referendum - }), - council: Some(CouncilConfig { - active_council: vec![], - candidacy_bond: 1000, // 1000 to become a council candidate - voter_bond: 100, // 100 down to vote for a candidate - present_slash_per_voter: 1, // slash by 1 per voter for an invalid presentation. - carry_count: 24, // carry over the 24 runners-up to the next council election - presentation_duration: 120 * 24, // one day for presenting winners. - approval_voting_period: 7 * 120 * 24, // one week period between possible council elections. - term_duration: 180 * 120 * 24, // 180 day term duration for the council. - desired_seats: 0, // start with no council: we'll raise this once the stake has been dispersed a bit. - inactive_grace_period: 1, // one addition vote should go by before an inactive voter can be reaped. - - cooloff_period: 90 * 120 * 24, // 90 day cooling off period if council member vetoes a proposal. - voting_period: 7 * 120 * 24, // 7 day voting period for council members. - }), - parachains: Some(Default::default()), - }; - let boot_nodes = Vec::new(); - ChainConfig { genesis_config, boot_nodes } -} - -fn local_testnet_config() -> ChainConfig { - let initial_authorities = vec![ - ed25519::Pair::from_seed(b"Alice ").public().into(), - ed25519::Pair::from_seed(b"Bob ").public().into(), - ]; - let endowed_accounts = vec![ - ed25519::Pair::from_seed(b"Alice ").public().into(), - ed25519::Pair::from_seed(b"Bob ").public().into(), - ed25519::Pair::from_seed(b"Charlie ").public().into(), - ed25519::Pair::from_seed(b"Dave ").public().into(), - ed25519::Pair::from_seed(b"Eve ").public().into(), - ed25519::Pair::from_seed(b"Ferdie ").public().into(), - ]; - let genesis_config = GenesisConfig { - consensus: Some(ConsensusConfig { - code: include_bytes!("../../runtime/wasm/target/wasm32-unknown-unknown/release/polkadot_runtime.compact.wasm").to_vec(), - authorities: initial_authorities.clone(), - }), - system: None, - session: Some(SessionConfig { - validators: initial_authorities.clone(), - session_length: 10, - }), - staking: Some(StakingConfig { - current_era: 0, - intentions: initial_authorities.clone(), - transaction_fee: 1, - balances: endowed_accounts.iter().map(|&k|(k, 1u64 << 60)).collect(), - validator_count: 2, - sessions_per_era: 5, - bonding_duration: 2, - }), - democracy: Some(DemocracyConfig { - launch_period: 9, - voting_period: 18, - minimum_deposit: 10, - }), - council: Some(CouncilConfig { - active_council: vec![], - candidacy_bond: 10, - voter_bond: 2, - present_slash_per_voter: 1, - carry_count: 4, - presentation_duration: 10, - approval_voting_period: 20, - term_duration: 40, - desired_seats: 0, - inactive_grace_period: 1, - - cooloff_period: 75, - voting_period: 20, - }), - parachains: Some(Default::default()), - }; - let boot_nodes = Vec::new(); - ChainConfig { genesis_config, boot_nodes } -} - impl Service { /// Creates and register protocol with the network service pub fn new(mut config: Configuration) -> Result { @@ -264,8 +101,8 @@ impl Service { } let ChainConfig { genesis_config, boot_nodes } = match config.chain_spec { - ChainSpec::Development => local_testnet_config(), - ChainSpec::PoC1Testnet => poc_1_testnet_config(), + ChainSpec::Development => chain_config::local_testnet_config(), + ChainSpec::PoC1Testnet => chain_config::poc_1_testnet_config(), }; config.network.boot_nodes.extend(boot_nodes); @@ -275,24 +112,25 @@ impl Service { (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) }; - let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); - let best_header = client.best_block_header()?; + let client = if config.roles & Role::LIGHT == Role::LIGHT { + Client::Light(Arc::new(light_client::new_in_mem()?)) + } else { + Client::Full(Arc::new(client::new_in_mem(executor, prepare_genesis)?)) + }; + let best_header = client.chain_head().best_block_header()?; info!("Starting Polkadot. Best block is #{}", best_header.number); let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); - let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { - pool: transaction_pool.clone(), - client: client.clone(), - }); + let transaction_pool_adapter = client.create_transaction_pool_adapter(transaction_pool.clone()); + let network_params = network::Params { config: network::ProtocolConfig { roles: config.roles, }, network_config: config.network, - chain: client.clone(), transaction_pool: transaction_pool_adapter, }; + let (network, network_consensus) = client.create_network_service(network_params)?; - let network = network::Service::new(network_params)?; let barrier = ::std::sync::Arc::new(Barrier::new(2)); let thread = { @@ -306,9 +144,9 @@ impl Service { thread_barrier.wait(); let mut core = Core::new().expect("tokio::Core could not be created"); - let events = client.import_notification_stream().for_each(move |notification| { + let events = client.chain_events().import_notification_stream().for_each(move |notification| { network.on_block_imported(notification.hash, ¬ification.header); - prune_imported(&*client, &*txpool, notification.hash); + client.prune_imported(&*txpool, notification.hash); Ok(()) }); @@ -326,13 +164,14 @@ impl Service { barrier.wait(); // Spin consensus service if configured - let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { - // Load the first available key. Code above makes sure it exisis. - let key = keystore.load(&keystore.contents()?[0], "")?; - info!("Using authority key {:?}", key.public()); - Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool.clone(), key)) - } else { - None + let consensus_service = match (client.clone(), network_consensus) { + (Client::Full(ref client), Some(ref network_consensus)) if config.roles & Role::VALIDATOR == Role::VALIDATOR => { + // Load the first available key. Code above makes sure it exisis. + let key = keystore.load(&keystore.contents()?[0], "")?; + info!("Using authority key {:?}", key.public()); + Some(consensus::Service::new(client.clone(), network_consensus.clone(), transaction_pool.clone(), key)) + }, + _ => None, }; Ok(Service { @@ -345,9 +184,29 @@ impl Service { }) } - /// Get shared client instance. - pub fn client(&self) -> Arc { - self.client.clone() + /// Get shared blockchain events instance. + pub fn chain_events(&self) -> Arc { + self.client.chain_events() + } + + /// Get shared chain head instance. + pub fn chain_head(&self) -> Arc { + self.client.chain_head() + } + + /// Get shared chain data instance. + pub fn chain_data(&self) -> Arc { + self.client.chain_data() + } + + /// Get shared state data instance. + pub fn state_data(&self) -> Arc { + self.client.state_data() + } + + /// Get shared contract caller instance. + pub fn contract_caller(&self) -> Arc { + self.client.contract_caller() } /// Get shared network instance. @@ -361,23 +220,6 @@ impl Service { } } -fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) { - for extrinsic in extrinsics { - let hash: _ = hashing::blake2_256(&extrinsic.encode()).into(); - pool.remove(&hash, true); - } -} - -/// Produce a task which prunes any finalized transactions from the pool. -pub fn prune_imported(client: &Client, pool: &Mutex, hash: HeaderHash) { - let id = BlockId::Hash(hash); - match client.body(&id) { - Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), - Ok(None) => warn!("Missing imported block {:?}", hash), - Err(e) => warn!("Failed to fetch block: {:?}", e), - } -} - impl Drop for Service { fn drop(&mut self) { self.network.stop_network(); diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index ac016166caf29..1671ea19bd8b3 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -40,15 +40,40 @@ pub struct Client { } /// A source of blockchain evenets. -pub trait BlockchainEvents { +pub trait BlockchainEvents: Send + Sync { /// Get block import event stream. fn import_notification_stream(&self) -> BlockchainEventStream; } /// Chain head information. -pub trait ChainHead { +pub trait ChainHead: Send + Sync { /// Get best block header. fn best_block_header(&self) -> Result; + /// Get best block hash. + fn best_block_hash(&self) -> Result; +} + +/// Chain data information. +pub trait ChainData: Send + Sync { + /// Get block header by id. + fn header(&self, id: &BlockId) -> Result, error::Error>; +} + +/// State data information. +pub trait StateData: Send + Sync { + /// Get the code at a given block. + fn code_at(&self, id: &BlockId) -> Result, error::Error>; + + /// Return single storage entry of contract under given address in state in a block of given hash. + fn storage(&self, id: &BlockId, key: &StorageKey) -> Result; +} + +/// Contract caller. +pub trait ContractCaller: Send + Sync { + /// Execute a call to a contract on top of state in a block of given hash. + /// + /// No changes are made. + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> Result; } /// Client info @@ -419,8 +444,8 @@ impl bft::Authorities for Client impl BlockchainEvents for Client where - B: backend::Backend, - E: state_machine::CodeExecutor, + B: backend::Backend + Send + Sync, + E: state_machine::CodeExecutor + Send + Sync, error::Error: From<::Error> { /// Get block import event stream. @@ -433,13 +458,54 @@ impl BlockchainEvents for Client impl ChainHead for Client where - B: backend::Backend, - E: state_machine::CodeExecutor, + B: backend::Backend + Send + Sync, + E: state_machine::CodeExecutor + Send + Sync, error::Error: From<::Error> { fn best_block_header(&self) -> error::Result { Client::best_block_header(self) } + + fn best_block_hash(&self) -> error::Result { + Client::info(self).map(|i| i.chain.best_hash) + } +} + +impl ChainData for Client + where + B: backend::Backend + Send + Sync, + E: state_machine::CodeExecutor + Send + Sync, + error::Error: From<::Error> +{ + fn header(&self, id: &BlockId) -> error::Result> { + Client::header(self, id) + } +} + +impl StateData for Client + where + B: backend::Backend + Send + Sync, + E: state_machine::CodeExecutor + Send + Sync, + error::Error: From<::Error> +{ + fn code_at(&self, id: &BlockId) -> error::Result> { + Client::code_at(self, id) + } + + fn storage(&self, id: &BlockId, key: &StorageKey) -> error::Result { + Client::storage(self, id, key) + } +} + +impl ContractCaller for Client + where + B: backend::Backend + Send + Sync, + E: state_machine::CodeExecutor + Send + Sync, + error::Error: From<::Error> +{ + fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> Result { + Client::call(self, id, method, call_data) + } } #[cfg(test)] diff --git a/substrate/client/src/error.rs b/substrate/client/src/error.rs index 179fa7d910a91..3bc2b96003831 100644 --- a/substrate/client/src/error.rs +++ b/substrate/client/src/error.rs @@ -22,6 +22,12 @@ use primitives::hexdisplay::HexDisplay; error_chain! { errors { + /// Not implemented error. + NotImplemented { + description("Method not implemented error"), + display("Not implemented error"), + } + /// Backend error. Backend { description("Unrecoverable backend error"), diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 8690b19f2d3f2..3aa1ede4f0bcb 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -17,6 +17,7 @@ //! Substrate Client and associated logic. #![warn(missing_docs)] +#![recursion_limit="128"] extern crate substrate_bft as bft; extern crate substrate_runtime_support as runtime_support; @@ -44,6 +45,7 @@ pub mod genesis; pub mod block_builder; mod client; -pub use client::{Client, ClientInfo, CallResult, ImportResult, ChainHead, - BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents}; +pub use client::{Client, ClientInfo, CallResult, ImportResult, ChainHead, ChainData, + BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents, + StateData, ContractCaller, BlockImportNotification}; pub use blockchain::Info as ChainInfo; diff --git a/substrate/light/Cargo.toml b/substrate/light/Cargo.toml new file mode 100644 index 0000000000000..9472a59e0bfd9 --- /dev/null +++ b/substrate/light/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "substrate-light" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +futures = "0.1.17" +parking_lot = "0.4" +substrate-client = { path = "../client" } +substrate-primitives = { path = "../primitives" } diff --git a/substrate/light/src/client.rs b/substrate/light/src/client.rs new file mode 100644 index 0000000000000..00bd8af42c2cd --- /dev/null +++ b/substrate/light/src/client.rs @@ -0,0 +1,77 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Substrate light Client + +use futures::sync::mpsc; +use parking_lot::Mutex; +use full_client::{BlockchainEvents, BlockchainEventStream, ChainHead, ChainData, + StateData, ContractCaller, CallResult, BlockImportNotification}; +use full_client::error; +use primitives::block; +use primitives::block::Id as BlockId; +use primitives::storage::{StorageKey, StorageData}; + +/// Polkadot light client +#[derive(Default)] +pub struct LightClient { + import_notification_sinks: Mutex>>, +} + +impl BlockchainEvents for LightClient { + fn import_notification_stream(&self) -> BlockchainEventStream { + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream + } +} + +impl ChainHead for LightClient { + fn best_block_header(&self) -> error::Result { + Err(error::ErrorKind::NotImplemented.into()) + } + + fn best_block_hash(&self) -> error::Result { + Err(error::ErrorKind::NotImplemented.into()) + } +} + +impl ChainData for LightClient { + fn header(&self, _id: &BlockId) -> error::Result> { + Err(error::ErrorKind::NotImplemented.into()) + } +} + +impl StateData for LightClient { + fn code_at(&self, _id: &BlockId) -> error::Result> { + Err(error::ErrorKind::NotImplemented.into()) + } + + fn storage(&self, _id: &BlockId, _key: &StorageKey) -> error::Result { + Err(error::ErrorKind::NotImplemented.into()) + } +} + +impl ContractCaller for LightClient { + fn call(&self, _id: &BlockId, _method: &str, _call_data: &[u8]) -> Result { + Err(error::ErrorKind::NotImplemented.into()) + } +} + +/// Create an instance of in-memory client. +pub fn new_in_mem() -> error::Result { + Ok(Default::default()) +} diff --git a/substrate/light/src/lib.rs b/substrate/light/src/lib.rs new file mode 100644 index 0000000000000..1dcd059a3348d --- /dev/null +++ b/substrate/light/src/lib.rs @@ -0,0 +1,29 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Substrate Light Client and associated logic. + +#![warn(missing_docs)] + +extern crate substrate_client as full_client; +extern crate substrate_primitives as primitives; + +extern crate futures; +extern crate parking_lot; + +mod client; + +pub use client::{LightClient, new_in_mem}; diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index 5b1fe410d6c56..130633c506bc8 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -23,6 +23,7 @@ ethcore-io = { git = "https://github.com/paritytech/parity.git" } ed25519 = { path = "../../substrate/ed25519" } substrate-primitives = { path = "../../substrate/primitives" } substrate-client = { path = "../../substrate/client" } +substrate-light = { path = "../../substrate/light" } substrate-state-machine = { path = "../../substrate/state-machine" } substrate-serializer = { path = "../../substrate/serializer" } substrate-runtime-support = { path = "../../substrate/runtime-support" } diff --git a/substrate/network/src/config.rs b/substrate/network/src/config.rs index 7e21a5ded3b02..bac8deb31f9a0 100644 --- a/substrate/network/src/config.rs +++ b/substrate/network/src/config.rs @@ -14,7 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -pub use service::Role; +bitflags! { + /// Node roles bitmask. + pub struct Role: u32 { + /// No network. + const NONE = 0b00000000; + /// Full node, doe not participate in consensus. + const FULL = 0b00000001; + /// Light client node. + const LIGHT = 0b00000010; + /// Act as a validator. + const VALIDATOR = 0b00000100; + /// Act as a collator. + const COLLATOR = 0b00001000; + } +} /// Protocol configuration #[derive(Clone)] diff --git a/substrate/network/src/blocks.rs b/substrate/network/src/full/blocks.rs similarity index 99% rename from substrate/network/src/blocks.rs rename to substrate/network/src/full/blocks.rs index f111d431049fb..e75214c1429bc 100644 --- a/substrate/network/src/blocks.rs +++ b/substrate/network/src/full/blocks.rs @@ -21,7 +21,7 @@ use std::collections::{HashMap, BTreeMap}; use std::collections::hash_map::Entry; use network::PeerId; use primitives::block::Number as BlockNumber; -use message; +use full::message; const MAX_PARALLEL_DOWNLOADS: u32 = 1; @@ -190,7 +190,7 @@ impl BlockCollection { #[cfg(test)] mod test { use super::{BlockCollection, BlockData}; - use message; + use full::message; use primitives::block::HeaderHash; fn is_empty(bc: &BlockCollection) -> bool { diff --git a/substrate/network/src/chain.rs b/substrate/network/src/full/chain.rs similarity index 82% rename from substrate/network/src/chain.rs rename to substrate/network/src/full/chain.rs index 10d7a0c7d5a79..24ff3c6840741 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/full/chain.rs @@ -17,10 +17,10 @@ //! Blockchain access trait use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin}; -use client::error::Error; use state_machine; use primitives::block::{self, Id as BlockId}; use primitives::bft::Justification; +use error::Error; pub trait Client: Send + Sync { /// Import a new block. Parent is supposed to be existing in the blockchain. @@ -48,36 +48,35 @@ pub trait Client: Send + Sync { impl Client for PolkadotClient where B: client::backend::Backend + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static, - Error: From<<::State as state_machine::backend::Backend>::Error>, { - + client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option) -> Result { // TODO: defer justification check. - let justified_header = self.check_justification(header, justification.into())?; + let justified_header = self.check_justification(header, justification.into()); let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; - (self as &PolkadotClient).import_block(origin, justified_header, body) + (self as &PolkadotClient).import_block(origin, justified_header?, body).map_err(Into::into) } fn info(&self) -> Result { - (self as &PolkadotClient).info() + (self as &PolkadotClient).info().map_err(Into::into) } fn block_status(&self, id: &BlockId) -> Result { - (self as &PolkadotClient).block_status(id) + (self as &PolkadotClient).block_status(id).map_err(Into::into) } fn block_hash(&self, block_number: block::Number) -> Result, Error> { - (self as &PolkadotClient).block_hash(block_number) + (self as &PolkadotClient).block_hash(block_number).map_err(Into::into) } fn header(&self, id: &BlockId) -> Result, Error> { - (self as &PolkadotClient).header(id) + (self as &PolkadotClient).header(id).map_err(Into::into) } fn body(&self, id: &BlockId) -> Result, Error> { - (self as &PolkadotClient).body(id) + (self as &PolkadotClient).body(id).map_err(Into::into) } fn justification(&self, id: &BlockId) -> Result, Error> { - (self as &PolkadotClient).justification(id) + (self as &PolkadotClient).justification(id).map_err(Into::into) } } diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/full/consensus.rs similarity index 99% rename from substrate/network/src/consensus.rs rename to substrate/network/src/full/consensus.rs index 7d13049ed4189..b0e19b36de621 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/full/consensus.rs @@ -20,12 +20,12 @@ use std::collections::{HashMap, HashSet}; use futures::sync::{oneshot, mpsc}; use std::time::{Instant, Duration}; use std::collections::hash_map::Entry; -use io::SyncIo; -use protocol::Protocol; use network::PeerId; use primitives::Hash; -use message::{self, Message}; use runtime_support::Hashable; +use full::message::{self, Message}; +use full::protocol::Protocol; +use io::SyncIo; // TODO: Add additional spam/DoS attack protection. const MESSAGE_LIFETIME_SECONDS: u64 = 600; diff --git a/substrate/network/src/full/handler.rs b/substrate/network/src/full/handler.rs new file mode 100644 index 0000000000000..6edec191c1696 --- /dev/null +++ b/substrate/network/src/full/handler.rs @@ -0,0 +1,47 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +use core_io::{TimerToken}; +use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId}; +use full::protocol::Protocol; +use io::NetSyncIo; + +/// devp2p protocol handler +pub struct ProtocolHandler { + pub protocol: Protocol, +} + +impl NetworkProtocolHandler for ProtocolHandler { + fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { + io.register_timer(0, 1000).expect("Error registering sync timer"); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { + self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); + } + + fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + self.protocol.tick(&mut NetSyncIo::new(io)); + } +} diff --git a/substrate/network/src/message.rs b/substrate/network/src/full/message.rs similarity index 99% rename from substrate/network/src/message.rs rename to substrate/network/src/full/message.rs index 5ab6c2b6a381b..1613e4ba20069 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/full/message.rs @@ -19,8 +19,8 @@ use primitives::{AuthorityId, Hash}; use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body, Block}; use primitives::bft::Justification; -use service::Role as RoleFlags; use ed25519; +use config::Role as RoleFlags; pub type RequestId = u64; type Bytes = Vec; diff --git a/substrate/network/src/full/mod.rs b/substrate/network/src/full/mod.rs new file mode 100644 index 0000000000000..ce07da65cceed --- /dev/null +++ b/substrate/network/src/full/mod.rs @@ -0,0 +1,51 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +use futures::sync::{oneshot, mpsc}; +use primitives::Hash; + +pub mod blocks; +pub mod chain; +pub mod consensus; +pub mod handler; +pub mod message; +pub mod protocol; +pub mod sync; + +/// Type that represents statement stream. +pub type StatementStream = mpsc::UnboundedReceiver; +/// Type that represents bft messages stream. +pub type BftMessageStream = mpsc::UnboundedReceiver; + +/// ConsensusService +pub trait ConsensusService: Send + Sync { + /// Get statement stream. + fn statements(&self) -> StatementStream; + /// Send out a statement. + fn send_statement(&self, statement: message::Statement); + /// Maintain connectivity to given addresses. + fn connect_to_authorities(&self, addresses: &[String]); + /// Fetch candidate. + fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver>; + /// Note local candidate. Accepts candidate receipt hash and candidate data. + /// Pass `None` to clear the candidate. + fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>); + + /// Get BFT message stream. + fn bft_messages(&self) -> BftMessageStream; + /// Send out a BFT message. + fn send_bft_message(&self, message: message::LocalizedBftMessage); +} diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/full/protocol.rs similarity index 94% rename from substrate/network/src/protocol.rs rename to substrate/network/src/full/protocol.rs index abbb41d2f6f70..f20336853465d 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/full/protocol.rs @@ -24,17 +24,20 @@ use serde_json; use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::{Hash, blake2_256}; use runtime_support::Hashable; -use network::{PeerId, NodeId}; - -use message::{self, Message}; -use sync::{ChainSync, Status as SyncStatus, SyncState}; -use consensus::Consensus; -use service::{Role, TransactionPool, StatementStream, BftMessageStream}; +use network::PeerId; + +use full::{StatementStream, BftMessageStream}; +use full::chain::Client; +use full::consensus::Consensus; +use full::message::{self, Message}; +use full::sync::ChainSync; +use config::Role; +use sync_provider::{SyncState, ProtocolStatus, ProtocolPeerInfo, TransactionStats}; +use service::TransactionPool; use config::ProtocolConfig; -use chain::Client; -use io::SyncIo; use error; -use super::header_hash; +use io::SyncIo; +use header_hash; const REQUEST_TIMEOUT_SEC: u64 = 15; const PROTOCOL_VERSION: u32 = 0; @@ -56,17 +59,6 @@ pub struct Protocol { transaction_pool: Arc, } -/// Syncing status and statistics -#[derive(Clone)] -pub struct ProtocolStatus { - /// Sync status. - pub sync: SyncStatus, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers. - pub num_active_peers: usize, -} - /// Peer information struct Peer { /// Protocol version @@ -89,27 +81,6 @@ struct Peer { next_request_id: message::RequestId, } -#[derive(Debug)] -pub struct PeerInfo { - /// Roles - pub roles: Role, - /// Protocol version - pub protocol_version: u32, - /// Peer best block hash - pub best_hash: HeaderHash, - /// Peer best block number - pub best_number: BlockNumber, -} - -/// Transaction stats -#[derive(Debug)] -pub struct TransactionStats { - /// Block number where this TX was first seen. - pub first_seen: u64, - /// Peers it was propagated to. - pub propagated_to: BTreeMap, -} - impl Protocol { /// Create a new instance. pub fn new(config: ProtocolConfig, chain: Arc, transaction_pool: Arc) -> error::Result { @@ -368,9 +339,9 @@ impl Protocol { } } - pub fn peer_info(&self, peer: PeerId) -> Option { + pub fn peer_info(&self, peer: PeerId) -> Option { self.peers.read().get(&peer).map(|p| { - PeerInfo { + ProtocolPeerInfo { roles: p.roles, protocol_version: p.protocol_version, best_hash: p.best_hash, diff --git a/substrate/network/src/sync.rs b/substrate/network/src/full/sync.rs similarity index 95% rename from substrate/network/src/sync.rs rename to substrate/network/src/full/sync.rs index 37d579771d070..ed914ec327ebe 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/full/sync.rs @@ -16,13 +16,14 @@ use std::collections::HashMap; use io::SyncIo; -use protocol::Protocol; use network::PeerId; use client::{ImportResult, BlockStatus, ClientInfo}; use primitives::block::{HeaderHash, Number as BlockNumber, Header, Id as BlockId}; -use blocks::{self, BlockCollection}; -use message::{self, Message}; -use super::header_hash; +use full::protocol::Protocol; +use full::blocks::{self, BlockCollection}; +use full::message::{self, Message}; +use sync_provider::{SyncState, SyncStatus}; +use header_hash; // Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -37,10 +38,10 @@ struct PeerSync { #[derive(Copy, Clone, Eq, PartialEq, Debug)] enum PeerSyncState { - AncestorSearch(BlockNumber), - Available, - DownloadingNew(BlockNumber), - DownloadingStale(HeaderHash), + AncestorSearch(BlockNumber), + Available, + DownloadingNew(BlockNumber), + DownloadingStale(HeaderHash), } /// Relay chain sync strategy. @@ -53,24 +54,6 @@ pub struct ChainSync { required_block_attributes: Vec, } -/// Reported sync state. -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum SyncState { - /// Initial sync is complete, keep-up sync is active. - Idle, - /// Actively catching up with the chain. - Downloading -} - -/// Syncing status and statistics -#[derive(Clone)] -pub struct Status { - /// Current global sync state. - pub state: SyncState, - /// Target sync block number. - pub best_seen_block: Option, -} - impl ChainSync { /// Create a new instance. pub fn new(info: &ClientInfo) -> ChainSync { @@ -89,13 +72,13 @@ impl ChainSync { } /// Returns sync status - pub fn status(&self) -> Status { + pub fn status(&self) -> SyncStatus { let best_seen = self.best_seen_block(); let state = match &best_seen { &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading, _ => SyncState::Idle, }; - Status { + SyncStatus { state: state, best_seen_block: best_seen, } diff --git a/substrate/network/src/handler.rs b/substrate/network/src/handler.rs new file mode 100644 index 0000000000000..e16afb21339e5 --- /dev/null +++ b/substrate/network/src/handler.rs @@ -0,0 +1,121 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +use std::sync::Arc; +use std::collections::BTreeMap; +use network::{PeerId, ProtocolId}; +use network_devp2p::NetworkService; +use primitives::block::{HeaderHash, Header, ExtrinsicHash}; +use io::SyncIo; +use full::handler::ProtocolHandler as FullProtocolHandler; +use {ProtocolStatus, SyncStatus, SyncState, TransactionStats, + ProtocolPeerInfo}; + +/// Polkadot devp2p full protocol id +pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; +/// Polkadot devp2p light protocol id +pub const LIGHT_DOT_PROTOCOL_ID: ProtocolId = *b"ldt"; + +/// Protocol handler aapter. +pub enum ProtocolHandler { + /// Full protocol handler. + Full(Arc), + /// Light protocol handler. + Light(()), +} + +impl ProtocolHandler { + /// Get protocol ID. + pub fn protocol_id(&self) -> ProtocolId { + match *self { + ProtocolHandler::Full(_) => DOT_PROTOCOL_ID, + ProtocolHandler::Light(_) => LIGHT_DOT_PROTOCOL_ID, + } + } + + /// Get full protocol handler. + pub fn full(&self) -> Option<&Arc> { + match *self { + ProtocolHandler::Full(ref handler) => Some(handler), + ProtocolHandler::Light(_) => None, + } + } + + /// Register protocol. + pub fn register(&self, network: &NetworkService) { + match *self { + ProtocolHandler::Full(ref handler) => network + .register_protocol(handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8]) + .unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e)), + ProtocolHandler::Light(_) => (), + } + } + + /// Abort protocol. + pub fn abort(&self) { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.abort(), + ProtocolHandler::Light(_) => (), + } + } + + /// When block is imported by client. + pub fn on_block_imported(&self, io: &mut SyncIo, hash: HeaderHash, header: &Header) { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.on_block_imported(io, hash, header), + ProtocolHandler::Light(_) => (), + } + } + + /// When new transactions are imported by client. + pub fn on_new_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec)]) { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.propagate_transactions(io, transactions), + ProtocolHandler::Light(_) => (), + } + } + + /// Get sync status + pub fn status(&self) -> ProtocolStatus { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.status(), + ProtocolHandler::Light(_) => ProtocolStatus { + sync: SyncStatus { + state: SyncState::Idle, + best_seen_block: None, + }, + num_peers: 0, + num_active_peers: 0, + }, + } + } + + /// Get protocol peer info + pub fn protocol_peer_info(&self, peer: PeerId) -> Option { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.peer_info(peer), + ProtocolHandler::Light(_) => None, + } + } + + /// Get transactions statis + pub fn transactions_stats(&self) -> BTreeMap { + match *self { + ProtocolHandler::Full(ref handler) => handler.protocol.transactions_stats(), + ProtocolHandler::Light(_) => BTreeMap::new(), + } + } +} diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 6ccfb965930f2..6e5d21f35d1fd 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -28,6 +28,7 @@ extern crate substrate_primitives as primitives; extern crate substrate_state_machine as state_machine; extern crate substrate_serializer as ser; extern crate substrate_client as client; +extern crate substrate_light as light_client; extern crate substrate_runtime_support as runtime_support; extern crate substrate_bft; extern crate serde; @@ -46,28 +47,26 @@ extern crate ed25519; #[cfg(test)] extern crate substrate_codec as codec; #[cfg(test)] extern crate substrate_bft as bft; -mod service; -mod sync; -mod protocol; -mod io; -mod message; mod config; -mod chain; -mod blocks; -mod consensus; +mod full; +mod handler; +mod io; +mod service; +mod sync_provider; pub mod error; #[cfg(test)] mod test; -pub use service::{Service, FetchFuture, StatementStream, ConsensusService, BftMessageStream, - TransactionPool, Params, ManageNetwork, SyncProvider}; -pub use protocol::{ProtocolStatus}; -pub use sync::{Status as SyncStatus, SyncState}; +pub use service::{Service, FetchFuture, TransactionPool, Params, ManageNetwork}; +pub use full::{ConsensusService, StatementStream, BftMessageStream}; pub use network::{NonReservedPeerMode, NetworkConfiguration}; pub use network_devp2p::{ConnectionFilter, ConnectionDirection}; -pub use message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; +pub use full::message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, + SignedConsensusMessage, SignedConsensusProposal}; pub use error::Error; pub use config::{Role, ProtocolConfig}; +pub use sync_provider::{SyncProvider, ProtocolPeerInfo, PeerInfo, TransactionStats, ProtocolStatus, + SyncStatus, SyncState}; // TODO: move it elsewhere fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 6d8f5bea115c5..52a4f8d095094 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -17,57 +17,25 @@ use std::sync::Arc; use std::collections::{BTreeMap}; use std::io; -use futures::sync::{oneshot, mpsc}; -use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId, ProtocolId, -NetworkConfiguration , NonReservedPeerMode, ErrorKind}; +use futures::sync::oneshot; +use network::{NetworkConfiguration, NonReservedPeerMode, ErrorKind}; use network_devp2p::{NetworkService}; use primitives::block::{ExtrinsicHash, Header, HeaderHash}; use primitives::Hash; -use core_io::{TimerToken}; +use light_client::LightClient; +use full::{ConsensusService, StatementStream, BftMessageStream}; +use full::handler::ProtocolHandler as FullProtocolHandler; +use full::chain::Client; +use full::message::{Statement, LocalizedBftMessage}; +use full::protocol::Protocol; +use handler::ProtocolHandler; +use sync_provider::{SyncProvider, ProtocolStatus, TransactionStats, PeerInfo}; use io::NetSyncIo; -use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats}; use config::{ProtocolConfig}; use error::Error; -use chain::Client; -use message::{Statement, LocalizedBftMessage}; - -/// Polkadot devp2p protocol id -pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; -/// Type that represents statement stream. -pub type StatementStream = mpsc::UnboundedReceiver; -/// Type that represents bft messages stream. -pub type BftMessageStream = mpsc::UnboundedReceiver; - -bitflags! { - /// Node roles bitmask. - pub struct Role: u32 { - /// No network. - const NONE = 0b00000000; - /// Full node, doe not participate in consensus. - const FULL = 0b00000001; - /// Light client node. - const LIGHT = 0b00000010; - /// Act as a validator. - const VALIDATOR = 0b00000100; - /// Act as a collator. - const COLLATOR = 0b00001000; - } -} - -/// Sync status -pub trait SyncProvider: Send + Sync { - /// Get sync status - fn status(&self) -> ProtocolStatus; - /// Get peers information - fn peers(&self) -> Vec; - /// Get this node id if available. - fn node_id(&self) -> Option; - /// Returns propagation count for pending transactions. - fn transactions_stats(&self) -> BTreeMap; -} /// Transaction pool interface pub trait TransactionPool: Send + Sync { @@ -77,110 +45,84 @@ pub trait TransactionPool: Send + Sync { fn import(&self, transaction: &[u8]) -> Option; } -/// ConsensusService -pub trait ConsensusService: Send + Sync { - /// Get statement stream. - fn statements(&self) -> StatementStream; - /// Send out a statement. - fn send_statement(&self, statement: Statement); - /// Maintain connectivity to given addresses. - fn connect_to_authorities(&self, addresses: &[String]); - /// Fetch candidate. - fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver>; - /// Note local candidate. Accepts candidate receipt hash and candidate data. - /// Pass `None` to clear the candidate. - fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>); - - /// Get BFT message stream. - fn bft_messages(&self) -> BftMessageStream; - /// Send out a BFT message. - fn send_bft_message(&self, message: LocalizedBftMessage); -} - -/// devp2p Protocol handler -struct ProtocolHandler { - protocol: Protocol, -} - -/// Peer connection information -#[derive(Debug)] -pub struct PeerInfo { - /// Public node id - pub id: Option, - /// Node client ID - pub client_version: String, - /// Capabilities - pub capabilities: Vec, - /// Remote endpoint address - pub remote_address: String, - /// Local endpoint address - pub local_address: String, - /// Dot protocol info. - pub dot_info: Option, -} - /// Service initialization parameters. pub struct Params { /// Configuration. pub config: ProtocolConfig, /// Network layer configuration. pub network_config: NetworkConfiguration, - /// Polkadot relay chain access point. - pub chain: Arc, /// Transaction pool. pub transaction_pool: Arc, } /// Polkadot network service. Handles network IO and manages connectivity. pub struct Service { + data: Arc, +} + +struct ServiceData { /// Network service network: NetworkService, /// Devp2p protocol handler - handler: Arc, + handler: ProtocolHandler, } impl Service { - /// Creates and register protocol with the network service - pub fn new(params: Params) -> Result, Error> { + /// Creates and registers full protocol with the network service. + pub fn new_full(chain: Arc, params: Params) -> Result<(Arc, Arc), Error> { + let service = NetworkService::new(params.network_config.clone(), None)?; + let data = Arc::new(ServiceData { + network: service, + handler: ProtocolHandler::Full(Arc::new(FullProtocolHandler { + protocol: Protocol::new(params.config, chain, params.transaction_pool)?, + })), + }); + let sync = Arc::new(Service { data: data.clone(), }); + + Ok((sync, data)) + } + + /// Creates and register light protocol with the network service + pub fn new_light(_light_client: Arc, params: Params) -> Result, Error> { let service = NetworkService::new(params.network_config.clone(), None)?; - let sync = Arc::new(Service { + let data = Arc::new(ServiceData { network: service, - handler: Arc::new(ProtocolHandler { - protocol: Protocol::new(params.config, params.chain.clone(), params.transaction_pool)?, - }), + handler: ProtocolHandler::Light(()), }); + let sync = Arc::new(Service { data, }); Ok(sync) } /// Called when a new block is imported by the client. pub fn on_block_imported(&self, hash: HeaderHash, header: &Header) { - self.network.with_context(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header) + self.data.network.with_context(self.data.handler.protocol_id(), |context| { + self.data.handler.on_block_imported(&mut NetSyncIo::new(context), hash, header) }); } /// Called when new transactons are imported by the client. pub fn on_new_transactions(&self, transactions: &[(ExtrinsicHash, Vec)]) { - self.network.with_context(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions); + self.data.network.with_context(self.data.handler.protocol_id(), |context| { + self.data.handler.on_new_transactions(&mut NetSyncIo::new(context), transactions); }); } fn start(&self) { - match self.network.start().map_err(Into::into) { + match self.data.network.start().map_err(Into::into) { Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse => - warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")), + warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", + self.data.network.config().listen_address.expect("Listen address is not set.")), Err(err) => warn!("Error starting network: {}", err), _ => {}, }; - self.network.register_protocol(self.handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8]) - .unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e)); + + self.data.handler.register(&self.data.network); } fn stop(&self) { - self.handler.protocol.abort(); - self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); + self.data.handler.abort(); + self.data.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } } @@ -193,13 +135,13 @@ impl Drop for Service { impl SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { - self.handler.protocol.status() + self.data.handler.status() } /// Get sync peers fn peers(&self) -> Vec { - self.network.with_context_eval(DOT_PROTOCOL_ID, |ctx| { - let peer_ids = self.network.connected_peers(); + self.data.network.with_context_eval(self.data.handler.protocol_id(), |ctx| { + let peer_ids = self.data.network.connected_peers(); peer_ids.into_iter().filter_map(|peer_id| { let session_info = match ctx.session_info(peer_id) { @@ -213,25 +155,29 @@ impl SyncProvider for Service { capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), remote_address: session_info.remote_address, local_address: session_info.local_address, - dot_info: self.handler.protocol.peer_info(peer_id), + dot_info: self.data.handler.protocol_peer_info(peer_id), }) }).collect() }).unwrap_or_else(Vec::new) } fn node_id(&self) -> Option { - self.network.external_url() + self.data.network.external_url() } fn transactions_stats(&self) -> BTreeMap { - self.handler.protocol.transactions_stats() + self.data.handler.transactions_stats() } } /// ConsensusService -impl ConsensusService for Service { +const CONSENSUS_SERVICE_PROOF: &str = "ConsensusService is returned only if full_handler.is_some(); qed"; + +impl ConsensusService for ServiceData { fn statements(&self) -> StatementStream { - self.handler.protocol.statements() + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.statements() } fn connect_to_authorities(&self, _addresses: &[String]) { @@ -239,54 +185,42 @@ impl ConsensusService for Service { } fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver> { - self.network.with_context_eval(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.fetch_candidate(&mut NetSyncIo::new(context), hash) + self.network.with_context_eval(self.handler.protocol_id(), |context| { + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.fetch_candidate(&mut NetSyncIo::new(context), hash) }).expect("DOT Service is registered") } fn send_statement(&self, statement: Statement) { - self.network.with_context(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.send_statement(&mut NetSyncIo::new(context), statement); + self.network.with_context(self.handler.protocol_id(), |context| { + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.send_statement(&mut NetSyncIo::new(context), statement); }); } fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>) { - self.handler.protocol.set_local_candidate(candidate) + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.set_local_candidate(candidate) } fn bft_messages(&self) -> BftMessageStream { - self.handler.protocol.bft_messages() + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.bft_messages() } fn send_bft_message(&self, message: LocalizedBftMessage) { - self.network.with_context(DOT_PROTOCOL_ID, |context| { - self.handler.protocol.send_bft_message(&mut NetSyncIo::new(context), message); + self.network.with_context(self.handler.protocol_id(), |context| { + self.handler.full() + .expect(CONSENSUS_SERVICE_PROOF) + .protocol.send_bft_message(&mut NetSyncIo::new(context), message); }); } } -impl NetworkProtocolHandler for ProtocolHandler { - fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { - io.register_timer(0, 1000).expect("Error registering sync timer"); - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { - self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); - } - - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.protocol.tick(&mut NetSyncIo::new(io)); - } -} - /// Trait for managing network pub trait ManageNetwork : Send + Sync { /// Set to allow unreserved peers to connect @@ -306,19 +240,19 @@ pub trait ManageNetwork : Send + Sync { impl ManageNetwork for Service { fn accept_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); + self.data.network.set_non_reserved_mode(NonReservedPeerMode::Accept); } fn deny_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Deny); + self.data.network.set_non_reserved_mode(NonReservedPeerMode::Deny); } fn remove_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + self.data.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + self.data.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) } fn start_network(&self) { diff --git a/substrate/network/src/sync_provider.rs b/substrate/network/src/sync_provider.rs new file mode 100644 index 0000000000000..4ef45b2998e33 --- /dev/null +++ b/substrate/network/src/sync_provider.rs @@ -0,0 +1,100 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +use std::collections::BTreeMap; +use network::NodeId; +use primitives::block::{HeaderHash, ExtrinsicHash, Number as BlockNumber}; +use config::Role; + +/// Reported sync state. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum SyncState { + /// Initial sync is complete, keep-up sync is active. + Idle, + /// Actively catching up with the chain. + Downloading +} + +/// Syncing status and statistics +#[derive(Clone)] +pub struct SyncStatus { + /// Current global sync state. + pub state: SyncState, + /// Target sync block number. + pub best_seen_block: Option, +} + +/// Syncing status and statistics +#[derive(Clone)] +pub struct ProtocolStatus { + /// Sync status. + pub sync: SyncStatus, + /// Total number of connected peers + pub num_peers: usize, + /// Total number of active peers. + pub num_active_peers: usize, +} + +/// Peer protocol information. +#[derive(Debug)] +pub struct ProtocolPeerInfo { + /// Roles + pub roles: Role, + /// Protocol version + pub protocol_version: u32, + /// Peer best block hash + pub best_hash: HeaderHash, + /// Peer best block number + pub best_number: BlockNumber, +} + +/// Peer connection information +#[derive(Debug)] +pub struct PeerInfo { + /// Public node id + pub id: Option, + /// Node client ID + pub client_version: String, + /// Capabilities + pub capabilities: Vec, + /// Remote endpoint address + pub remote_address: String, + /// Local endpoint address + pub local_address: String, + /// Dot protocol info. + pub dot_info: Option, +} + +/// Transaction stats +#[derive(Debug)] +pub struct TransactionStats { + /// Block number where this TX was first seen. + pub first_seen: u64, + /// Peers it was propagated to. + pub propagated_to: BTreeMap, +} + +/// Sync status +pub trait SyncProvider: Send + Sync { + /// Get sync status + fn status(&self) -> ProtocolStatus; + /// Get peers information + fn peers(&self) -> Vec; + /// Get this node id if available. + fn node_id(&self) -> Option; + /// Returns propagation count for pending transactions. + fn transactions_stats(&self) -> BTreeMap; +} diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index afa6d82e32f0b..b5c6b791547af 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -25,7 +25,7 @@ use primitives::block::{Id as BlockId, ExtrinsicHash}; use primitives; use executor; use io::SyncIo; -use protocol::Protocol; +use full::protocol::Protocol; use config::ProtocolConfig; use service::TransactionPool; use network::{PeerId, SessionInfo, Error as NetworkError}; diff --git a/substrate/network/src/test/sync.rs b/substrate/network/src/test/sync.rs index e5cc1ebfad082..cd4d3de76ef61 100644 --- a/substrate/network/src/test/sync.rs +++ b/substrate/network/src/test/sync.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use client::backend::Backend; -use sync::SyncState; +use sync_provider::SyncState; use super::*; #[test] diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index c5c179837c3ed..a0cc0e5bac9f4 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -19,8 +19,7 @@ use std::sync::Arc; use primitives::block; -use client::{self, Client, BlockchainEvents}; -use state_machine; +use client::{BlockchainEvents, ChainHead, ChainData}; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; @@ -62,41 +61,43 @@ build_rpc_trait! { } /// Chain API with subscriptions support. -pub struct Chain { - /// Substrate client. - client: Arc>, +pub struct Chain { + /// Chain head shared instance. + chain_head: Arc, + /// Chain data shared instance. + chain_data: Arc, + /// Blockchain events shared instance. + chain_events: Arc, /// Current subscriptions. subscriptions: Subscriptions, } -impl Chain { +impl Chain { /// Create new Chain API RPC handler. - pub fn new(client: Arc>, remote: Remote) -> Self { + pub fn new(chain_head: Arc, chain_data: Arc, chain_events: Arc, remote: Remote) -> Self { Chain { - client, + chain_head, + chain_data, + chain_events, subscriptions: Subscriptions::new(remote), } } } -impl ChainApi for Chain where - B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, - client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, -{ +impl ChainApi for Chain { type Metadata = ::metadata::Metadata; fn header(&self, hash: block::HeaderHash) -> Result> { - self.client.header(&block::Id::Hash(hash)).chain_err(|| "Blockchain error") + self.chain_data.header(&block::Id::Hash(hash)).chain_err(|| "Blockchain error") } fn head(&self) -> Result { - Ok(self.client.info().chain_err(|| "Blockchain error")?.chain.best_hash) + self.chain_head.best_block_hash().chain_err(|| "Blockchain error") } fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber) { self.subscriptions.add(subscriber, |sink| { - let stream = self.client.import_notification_stream() + let stream = self.chain_events.import_notification_stream() .filter(|notification| notification.is_new_best) .map(|notification| Ok(notification.header)) .map_err(|e| warn!("Block notification stream error: {:?}", e)); diff --git a/substrate/rpc/src/chain/tests.rs b/substrate/rpc/src/chain/tests.rs index b44f107fac1cd..e91e32880dd33 100644 --- a/substrate/rpc/src/chain/tests.rs +++ b/substrate/rpc/src/chain/tests.rs @@ -32,13 +32,16 @@ fn should_return_header() { let core = ::tokio_core::reactor::Core::new().unwrap(); let remote = core.remote(); - let client = Chain { - client: Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()), + let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); + let chain = Chain { + chain_head: client.clone(), + chain_data: client.clone(), + chain_events: client.clone(), subscriptions: Subscriptions::new(remote), }; assert_matches!( - ChainApi::header(&client, test_genesis_block.blake2_256().into()), + ChainApi::header(&chain, test_genesis_block.blake2_256().into()), Ok(Some(ref x)) if x == &block::Header { parent_hash: 0.into(), number: 0, @@ -49,7 +52,7 @@ fn should_return_header() { ); assert_matches!( - ChainApi::header(&client, 5.into()), + ChainApi::header(&chain, 5.into()), Ok(None) ); } diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index d8f2c8f00c157..00b772cad8973 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -22,10 +22,9 @@ mod error; mod tests; use std::sync::Arc; -use client::{self, Client}; +use client::{ChainHead, StateData, ContractCaller}; use primitives::block; use primitives::storage::{StorageKey, StorageData}; -use state_machine; use self::error::Result; @@ -50,27 +49,45 @@ build_rpc_trait! { } } -impl StateApi for Arc> where - B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, - client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, -{ +/// State API. +pub struct State { + /// Chain head shared instance. + chain_head: Arc, + /// State data shared instance. + state_data: Arc, + /// Contract caller shared instance. + contract_caller: Arc, +} + +impl State { + /// Create new State API RPC handler. + pub fn new(chain_head: Arc, state_data: Arc, contract_caller: Arc) -> Self { + State { + chain_head, + state_data, + contract_caller, + } + } +} + +impl StateApi for State { fn storage_at(&self, key: StorageKey, block: block::HeaderHash) -> Result { - Ok(self.as_ref().storage(&block::Id::Hash(block), &key)?) + Ok(self.state_data.storage(&block::Id::Hash(block), &key)?) } fn call_at(&self, method: String, data: Vec, block: block::HeaderHash) -> Result> { - Ok(self.as_ref().call(&block::Id::Hash(block), &method, &data)?.return_data) + Ok(self.contract_caller.call(&block::Id::Hash(block), &method, &data)?.return_data) } fn storage(&self, key: StorageKey) -> Result { - let at = block::Id::Hash(self.as_ref().info()?.chain.best_hash); + let at = block::Id::Hash(self.chain_head.best_block_hash()?); use primitives::hexdisplay::HexDisplay; info!("Querying storage at {:?} for key {}", at, HexDisplay::from(&key.0)); - Ok(self.as_ref().storage(&at, &key)?) + Ok(self.state_data.storage(&at, &key)?) } fn call(&self, method: String, data: Vec) -> Result> { - Ok(self.as_ref().call(&block::Id::Hash(self.as_ref().info()?.chain.best_hash), &method, &data)?.return_data) + let at = block::Id::Hash(self.chain_head.best_block_hash()?); + Ok(self.contract_caller.call(&at, &method, &data)?.return_data) } } diff --git a/substrate/rpc/src/state/tests.rs b/substrate/rpc/src/state/tests.rs index 3cbf75693aecd..996c587487f49 100644 --- a/substrate/rpc/src/state/tests.rs +++ b/substrate/rpc/src/state/tests.rs @@ -31,10 +31,11 @@ fn should_return_storage() { }; let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); + let state = State::new(client.clone(), client.clone(), client.clone()); let genesis_hash = test_genesis_block.blake2_256().into(); assert_matches!( - StateApi::storage_at(&client, StorageKey(vec![10]), genesis_hash), + StateApi::storage_at(&state, StorageKey(vec![10]), genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::NoValueForKey(ref k)), _)) if *k == vec![10] ) } @@ -52,10 +53,11 @@ fn should_call_contract() { }; let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); + let state = State::new(client.clone(), client.clone(), client.clone()); let genesis_hash = test_genesis_block.blake2_256().into(); assert_matches!( - StateApi::call_at(&client, "balanceOf".into(), vec![1,2,3], genesis_hash), + StateApi::call_at(&state, "balanceOf".into(), vec![1,2,3], genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::Execution(_)), _)) ) } From c57c38a10f2e52c74e8afc19dfba095ade01f70b Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 26 Apr 2018 13:04:15 +0300 Subject: [PATCH 2/2] extracted Protocol trait --- substrate/network/src/full/handler.rs | 47 -------- substrate/network/src/full/mod.rs | 1 - substrate/network/src/full/protocol.rs | 154 ++++++++++++------------ substrate/network/src/handler.rs | 86 +++++++++---- substrate/network/src/lib.rs | 1 + substrate/network/src/light/mod.rs | 17 +++ substrate/network/src/light/protocol.rs | 36 ++++++ substrate/network/src/service.rs | 33 ++--- 8 files changed, 211 insertions(+), 164 deletions(-) delete mode 100644 substrate/network/src/full/handler.rs create mode 100644 substrate/network/src/light/mod.rs create mode 100644 substrate/network/src/light/protocol.rs diff --git a/substrate/network/src/full/handler.rs b/substrate/network/src/full/handler.rs deleted file mode 100644 index 6edec191c1696..0000000000000 --- a/substrate/network/src/full/handler.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use core_io::{TimerToken}; -use network::{NetworkProtocolHandler, NetworkContext, HostInfo, PeerId}; -use full::protocol::Protocol; -use io::NetSyncIo; - -/// devp2p protocol handler -pub struct ProtocolHandler { - pub protocol: Protocol, -} - -impl NetworkProtocolHandler for ProtocolHandler { - fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { - io.register_timer(0, 1000).expect("Error registering sync timer"); - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { - self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); - } - - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.protocol.tick(&mut NetSyncIo::new(io)); - } -} diff --git a/substrate/network/src/full/mod.rs b/substrate/network/src/full/mod.rs index ce07da65cceed..8268f95060c03 100644 --- a/substrate/network/src/full/mod.rs +++ b/substrate/network/src/full/mod.rs @@ -20,7 +20,6 @@ use primitives::Hash; pub mod blocks; pub mod chain; pub mod consensus; -pub mod handler; pub mod message; pub mod protocol; pub mod sync; diff --git a/substrate/network/src/full/protocol.rs b/substrate/network/src/full/protocol.rs index f20336853465d..e839a22922f05 100644 --- a/substrate/network/src/full/protocol.rs +++ b/substrate/network/src/full/protocol.rs @@ -26,6 +26,7 @@ use primitives::{Hash, blake2_256}; use runtime_support::Hashable; use network::PeerId; +use handler::Protocol as ProtocolApi; use full::{StatementStream, BftMessageStream}; use full::chain::Client; use full::consensus::Consensus; @@ -109,55 +110,6 @@ impl Protocol { } } - pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) { - let message: Message = match serde_json::from_slice(data) { - Ok(m) => m, - Err(e) => { - debug!("Invalid packet from {}: {}", peer_id, e); - io.disable_peer(peer_id); - return; - } - }; - - match message { - Message::Status(s) => self.on_status_message(io, peer_id, s), - Message::BlockRequest(r) => self.on_block_request(io, peer_id, r), - Message::BlockResponse(r) => { - let request = { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { - peer.request_timestamp = None; - match mem::replace(&mut peer.block_request, None) { - Some(r) => r, - None => { - debug!("Unexpected response packet from {}", peer_id); - io.disable_peer(peer_id); - return; - } - } - } else { - debug!("Unexpected packet from {}", peer_id); - io.disable_peer(peer_id); - return; - } - }; - if request.id != r.id { - trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id); - return; - } - self.on_block_response(io, peer_id, request, r); - }, - Message::BlockAnnounce(announce) => { - self.on_block_announce(io, peer_id, announce); - }, - Message::Statement(s) => self.on_statement(io, peer_id, s, blake2_256(data).into()), - Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r), - Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), - Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()), - Message::Transactions(m) => self.on_transactions(io, peer_id, m), - } - } - pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) { match &mut message { &mut Message::BlockRequest(ref mut r) => { @@ -183,28 +135,6 @@ impl Protocol { blake2_256(&data).into() } - /// Called when a new peer is connected - pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) { - trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id)); - self.handshaking_peers.write().insert(peer_id, time::Instant::now()); - self.send_status(io, peer_id); - } - - /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer)); - let removed = { - let mut peers = self.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - handshaking_peers.remove(&peer); - peers.remove(&peer).is_some() - }; - if removed { - self.consensus.lock().peer_disconnected(io, self, peer); - self.sync.write().peer_disconnected(io, self, peer); - } - } - fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); let mut blocks = Vec::new(); @@ -312,12 +242,6 @@ impl Protocol { self.consensus.lock().set_local_candidate(candidate) } - /// Perform time based maintenance. - pub fn tick(&self, io: &mut SyncIo) { - self.maintain_peers(io); - self.consensus.lock().collect_garbage(); - } - fn maintain_peers(&self, io: &mut SyncIo) { let tick = time::Instant::now(); let mut aborting = Vec::new(); @@ -492,3 +416,79 @@ impl Protocol { &*self.chain } } + +impl ProtocolApi for Protocol { + fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) { + let message: Message = match serde_json::from_slice(data) { + Ok(m) => m, + Err(e) => { + debug!("Invalid packet from {}: {}", peer_id, e); + io.disable_peer(peer_id); + return; + } + }; + + match message { + Message::Status(s) => self.on_status_message(io, peer_id, s), + Message::BlockRequest(r) => self.on_block_request(io, peer_id, r), + Message::BlockResponse(r) => { + let request = { + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&peer_id) { + peer.request_timestamp = None; + match mem::replace(&mut peer.block_request, None) { + Some(r) => r, + None => { + debug!("Unexpected response packet from {}", peer_id); + io.disable_peer(peer_id); + return; + } + } + } else { + debug!("Unexpected packet from {}", peer_id); + io.disable_peer(peer_id); + return; + } + }; + if request.id != r.id { + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id); + return; + } + self.on_block_response(io, peer_id, request, r); + }, + Message::BlockAnnounce(announce) => { + self.on_block_announce(io, peer_id, announce); + }, + Message::Statement(s) => self.on_statement(io, peer_id, s, blake2_256(data).into()), + Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r), + Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), + Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()), + Message::Transactions(m) => self.on_transactions(io, peer_id, m), + } + } + + fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) { + trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id)); + self.handshaking_peers.write().insert(peer_id, time::Instant::now()); + self.send_status(io, peer_id); + } + + fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer)); + let removed = { + let mut peers = self.peers.write(); + let mut handshaking_peers = self.handshaking_peers.write(); + handshaking_peers.remove(&peer); + peers.remove(&peer).is_some() + }; + if removed { + self.consensus.lock().peer_disconnected(io, self, peer); + self.sync.write().peer_disconnected(io, self, peer); + } + } + + fn tick(&self, io: &mut SyncIo) { + self.maintain_peers(io); + self.consensus.lock().collect_garbage(); + } +} diff --git a/substrate/network/src/handler.rs b/substrate/network/src/handler.rs index e16afb21339e5..22f63f4d8c7de 100644 --- a/substrate/network/src/handler.rs +++ b/substrate/network/src/handler.rs @@ -14,13 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -use std::sync::Arc; use std::collections::BTreeMap; -use network::{PeerId, ProtocolId}; -use network_devp2p::NetworkService; +use core_io::{TimerToken}; +use network::{PeerId, ProtocolId, NetworkProtocolHandler, NetworkContext, HostInfo}; use primitives::block::{HeaderHash, Header, ExtrinsicHash}; -use io::SyncIo; -use full::handler::ProtocolHandler as FullProtocolHandler; +use io::{SyncIo, NetSyncIo}; +use full::protocol::Protocol as FullProtocol; +use light::protocol::Protocol as LightProtocol; use {ProtocolStatus, SyncStatus, SyncState, TransactionStats, ProtocolPeerInfo}; @@ -32,12 +32,35 @@ pub const LIGHT_DOT_PROTOCOL_ID: ProtocolId = *b"ldt"; /// Protocol handler aapter. pub enum ProtocolHandler { /// Full protocol handler. - Full(Arc), + Full(FullProtocol), /// Light protocol handler. - Light(()), + Light(LightProtocol), +} + +/// Protocol trait. +pub trait Protocol: Send + Sync { + /// When peer is connected. + fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId); + + /// When peer is disconnected. + fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId); + + /// When new packet from peer is received. + fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]); + + /// Perform time based maintenance. + fn tick(&self, io: &mut SyncIo); } impl ProtocolHandler { + /// As protocol reference. + pub fn as_protocol(&self) -> &Protocol { + match *self { + ProtocolHandler::Full(ref protocol) => protocol, + ProtocolHandler::Light(ref protocol) => protocol, + } + } + /// Get protocol ID. pub fn protocol_id(&self) -> ProtocolId { match *self { @@ -47,27 +70,17 @@ impl ProtocolHandler { } /// Get full protocol handler. - pub fn full(&self) -> Option<&Arc> { + pub fn full(&self) -> Option<&FullProtocol> { match *self { - ProtocolHandler::Full(ref handler) => Some(handler), + ProtocolHandler::Full(ref protocol) => Some(protocol), ProtocolHandler::Light(_) => None, } } - /// Register protocol. - pub fn register(&self, network: &NetworkService) { - match *self { - ProtocolHandler::Full(ref handler) => network - .register_protocol(handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8]) - .unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e)), - ProtocolHandler::Light(_) => (), - } - } - /// Abort protocol. pub fn abort(&self) { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.abort(), + ProtocolHandler::Full(ref protocol) => protocol.abort(), ProtocolHandler::Light(_) => (), } } @@ -75,7 +88,7 @@ impl ProtocolHandler { /// When block is imported by client. pub fn on_block_imported(&self, io: &mut SyncIo, hash: HeaderHash, header: &Header) { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.on_block_imported(io, hash, header), + ProtocolHandler::Full(ref protocol) => protocol.on_block_imported(io, hash, header), ProtocolHandler::Light(_) => (), } } @@ -83,7 +96,7 @@ impl ProtocolHandler { /// When new transactions are imported by client. pub fn on_new_transactions(&self, io: &mut SyncIo, transactions: &[(ExtrinsicHash, Vec)]) { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.propagate_transactions(io, transactions), + ProtocolHandler::Full(ref protocol) => protocol.propagate_transactions(io, transactions), ProtocolHandler::Light(_) => (), } } @@ -91,7 +104,7 @@ impl ProtocolHandler { /// Get sync status pub fn status(&self) -> ProtocolStatus { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.status(), + ProtocolHandler::Full(ref protocol) => protocol.status(), ProtocolHandler::Light(_) => ProtocolStatus { sync: SyncStatus { state: SyncState::Idle, @@ -106,7 +119,7 @@ impl ProtocolHandler { /// Get protocol peer info pub fn protocol_peer_info(&self, peer: PeerId) -> Option { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.peer_info(peer), + ProtocolHandler::Full(ref protocol) => protocol.peer_info(peer), ProtocolHandler::Light(_) => None, } } @@ -114,8 +127,31 @@ impl ProtocolHandler { /// Get transactions statis pub fn transactions_stats(&self) -> BTreeMap { match *self { - ProtocolHandler::Full(ref handler) => handler.protocol.transactions_stats(), + ProtocolHandler::Full(ref protocol) => protocol.transactions_stats(), ProtocolHandler::Light(_) => BTreeMap::new(), } } } + +impl NetworkProtocolHandler for ProtocolHandler { + fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { + io.register_timer(0, 1000).expect("Error registering sync timer"); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { + self.as_protocol().handle_packet(&mut NetSyncIo::new(io), *peer, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.as_protocol().on_peer_connected(&mut NetSyncIo::new(io), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.as_protocol().on_peer_disconnected(&mut NetSyncIo::new(io), *peer); + } + + fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + self.as_protocol().tick(&mut NetSyncIo::new(io)); + } +} + diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 6e5d21f35d1fd..789866337cb4e 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -51,6 +51,7 @@ mod config; mod full; mod handler; mod io; +mod light; mod service; mod sync_provider; pub mod error; diff --git a/substrate/network/src/light/mod.rs b/substrate/network/src/light/mod.rs new file mode 100644 index 0000000000000..92bafa044e0a6 --- /dev/null +++ b/substrate/network/src/light/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +pub mod protocol; diff --git a/substrate/network/src/light/protocol.rs b/substrate/network/src/light/protocol.rs new file mode 100644 index 0000000000000..d9c753cf99113 --- /dev/null +++ b/substrate/network/src/light/protocol.rs @@ -0,0 +1,36 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +use io::SyncIo; +use network::PeerId; +use handler::Protocol as ProtocolApi; + +/// Light protocol implementation. +pub struct Protocol; + +impl ProtocolApi for Protocol { + fn handle_packet(&self, _io: &mut SyncIo, _peer_id: PeerId, _data: &[u8]) { + } + + fn on_peer_connected(&self, _io: &mut SyncIo, _peer_id: PeerId) { + } + + fn on_peer_disconnected(&self, _io: &mut SyncIo, _peer: PeerId) { + } + + fn tick(&self, _io: &mut SyncIo) { + } +} diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 52a4f8d095094..b9cfe11da820d 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -24,10 +24,10 @@ use primitives::block::{ExtrinsicHash, Header, HeaderHash}; use primitives::Hash; use light_client::LightClient; use full::{ConsensusService, StatementStream, BftMessageStream}; -use full::handler::ProtocolHandler as FullProtocolHandler; use full::chain::Client; use full::message::{Statement, LocalizedBftMessage}; -use full::protocol::Protocol; +use full::protocol::Protocol as FullProtocol; +use light::protocol::Protocol as LightProtocol; use handler::ProtocolHandler; use sync_provider::{SyncProvider, ProtocolStatus, TransactionStats, PeerInfo}; use io::NetSyncIo; @@ -64,7 +64,7 @@ struct ServiceData { /// Network service network: NetworkService, /// Devp2p protocol handler - handler: ProtocolHandler, + handler: Arc, } impl Service { @@ -73,9 +73,9 @@ impl Service { let service = NetworkService::new(params.network_config.clone(), None)?; let data = Arc::new(ServiceData { network: service, - handler: ProtocolHandler::Full(Arc::new(FullProtocolHandler { - protocol: Protocol::new(params.config, chain, params.transaction_pool)?, - })), + handler: Arc::new(ProtocolHandler::Full(FullProtocol::new( + params.config, chain, params.transaction_pool)?, + )), }); let sync = Arc::new(Service { data: data.clone(), }); @@ -87,7 +87,7 @@ impl Service { let service = NetworkService::new(params.network_config.clone(), None)?; let data = Arc::new(ServiceData { network: service, - handler: ProtocolHandler::Light(()), + handler: Arc::new(ProtocolHandler::Light(LightProtocol)), }); let sync = Arc::new(Service { data, }); @@ -117,7 +117,12 @@ impl Service { _ => {}, }; - self.data.handler.register(&self.data.network); + self.data.network.register_protocol( + self.data.handler.clone(), + self.data.handler.protocol_id(), + 1, + &[0u8]) + .unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e)); } fn stop(&self) { @@ -177,7 +182,7 @@ impl ConsensusService for ServiceData { fn statements(&self) -> StatementStream { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.statements() + .statements() } fn connect_to_authorities(&self, _addresses: &[String]) { @@ -188,7 +193,7 @@ impl ConsensusService for ServiceData { self.network.with_context_eval(self.handler.protocol_id(), |context| { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.fetch_candidate(&mut NetSyncIo::new(context), hash) + .fetch_candidate(&mut NetSyncIo::new(context), hash) }).expect("DOT Service is registered") } @@ -196,27 +201,27 @@ impl ConsensusService for ServiceData { self.network.with_context(self.handler.protocol_id(), |context| { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.send_statement(&mut NetSyncIo::new(context), statement); + .send_statement(&mut NetSyncIo::new(context), statement); }); } fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>) { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.set_local_candidate(candidate) + .set_local_candidate(candidate) } fn bft_messages(&self) -> BftMessageStream { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.bft_messages() + .bft_messages() } fn send_bft_message(&self, message: LocalizedBftMessage) { self.network.with_context(self.handler.protocol_id(), |context| { self.handler.full() .expect(CONSENSUS_SERVICE_PROOF) - .protocol.send_bft_message(&mut NetSyncIo::new(context), message); + .send_bft_message(&mut NetSyncIo::new(context), message); }); } }