From 5e9d676a4547a0dceb6d006ed8237184fb0efed7 Mon Sep 17 00:00:00 2001 From: arkpar Date: Thu, 22 Mar 2018 02:17:30 +0100 Subject: [PATCH 1/5] CLI options and keystore integration --- polkadot/cli/src/cli.yml | 43 +++++++++++++++++++--- polkadot/cli/src/lib.rs | 61 +++++++++++++++++++++++++------ polkadot/consensus/src/service.rs | 10 +++-- polkadot/keystore/src/lib.rs | 22 ++++++++++- polkadot/service/src/config.rs | 4 +- polkadot/service/src/error.rs | 7 +--- polkadot/service/src/lib.rs | 30 +++++++++++---- substrate/ed25519/src/lib.rs | 20 +++++++++- 8 files changed, 161 insertions(+), 36 deletions(-) diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 96679a6835425..6d30326b6b11d 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -4,15 +4,48 @@ about: Polkadot Node Rust Implementation args: - log: short: l + long: log value_name: LOG_PATTERN help: Sets a custom logging filter takes_value: true + - base-path: + long: base-path + short: d + value_name: PATH + help: Specify custom base path + takes_value: true - keystore-path: - value_name: KEYSTORE_PATH - help: specify custom keystore path + long: keystore-path + value_name: PATH + help: Specify custom keystore path + takes_value: true + - key: + long: key + value_name: STRING + help: Specify additional key seed takes_value: true -subcommands: - collator: - about: Run collator node + long: collator + help: Enable collator mode + takes_value: false - validator: - about: Run validator node + long: validator + help: Enable validator mode + takes_value: false + - port: + long: port + value_name: PORT + help: Specify p2p protocol TCP port + takes_value: true + - rpc-port: + long: rpc-port + value_name: PORT + help: Specify RPC server TCP port + takes_value: true + - bootnodes: + long: bootnodes + value_name: URL + help: Specify a list of bootnodes + takes_value: true + multiple: true +subcommands: diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 5c0492af01f0d..72a66b8ef558a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -42,6 +42,7 @@ extern crate log; pub mod error; use std::path::{Path, PathBuf}; +use std::net::SocketAddr; /// Parse command line arguments and start the node. /// @@ -56,7 +57,15 @@ pub fn run(args: I) -> error::Result<()> where T: Into + Clone, { let yaml = load_yaml!("./cli.yml"); - let matches = clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args)?; + let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) { + Ok(m) => m, + Err(ref e) if e.kind == clap::ErrorKind::VersionDisplayed => return Ok(()), + Err(ref e) if e.kind == clap::ErrorKind::HelpDisplayed || e.kind == clap::ErrorKind::VersionDisplayed => { + let _ = clap::App::from_yaml(yaml).print_long_help(); + return Ok(()); + } + Err(e) => return Err(e.into()), + }; // TODO [ToDr] Split parameters parsing from actual execution. let log_pattern = matches.value_of("log").unwrap_or(""); @@ -64,38 +73,68 @@ pub fn run(args: I) -> error::Result<()> where let mut config = service::Configuration::default(); + let base_path = matches.value_of("base-path") + .map(|x| Path::new(x).to_owned()) + .unwrap_or_else(default_base_path); + config.keystore_path = matches.value_of("keystore") .map(|x| Path::new(x).to_owned()) - .unwrap_or_else(default_keystore_path) + .unwrap_or_else(|| keystore_path(&base_path)) .to_string_lossy() .into(); let mut role = service::Role::FULL; - if let Some(_) = matches.subcommand_matches("collator") { + if matches.is_present("collator") { info!("Starting collator."); role = service::Role::COLLATOR; } - else if let Some(_) = matches.subcommand_matches("validator") { + else if matches.is_present("validator") { info!("Starting validator."); role = service::Role::VALIDATOR; } config.roles = role; + config.network.boot_nodes = matches + .values_of("bootnodes") + .map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect()); + config.network.config_path = Some(network_path(&base_path).to_string_lossy().into()); + config.network.net_config_path = config.network.config_path.clone(); + + let port = match matches.value_of("port") { + Some(port) => port.parse().expect("Invalid p2p port value specified."), + None => 30333, + }; + config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); + + config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); let service = service::Service::new(config)?; - let address = "127.0.0.1:9933".parse().unwrap(); + let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap(); + if let Some(port) = matches.value_of("rpc-port") { + let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified."); + address.set_port(rpc_port); + } let handler = rpc::rpc_handler(service.client()); let server = rpc::start_http(&address, handler)?; server.wait(); - println!("No command given.\n"); - let _ = clap::App::from_yaml(yaml).print_long_help(); - Ok(()) } -fn default_keystore_path() -> PathBuf { +fn keystore_path(base_path: &Path) -> PathBuf { + let mut path = base_path.to_owned(); + path.push("keystore"); + path +} + +fn network_path(base_path: &Path) -> PathBuf { + let mut path = base_path.to_owned(); + path.push("network"); + path +} + +fn default_base_path() -> PathBuf { use app_dirs::{AppInfo, AppDataType}; let app_info = AppInfo { @@ -103,13 +142,11 @@ fn default_keystore_path() -> PathBuf { author: "Parity Technologies", }; - app_dirs::get_app_dir( + app_dirs::get_app_root( AppDataType::UserData, &app_info, - "keystore", ).expect("app directories exist on all supported platforms; qed") } - fn init_logger(pattern: &str) { let mut builder = env_logger::LogBuilder::new(); // Disable info logging by default for some modules: diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index 288f7a7d839a5..d481dd7d1b510 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -26,7 +26,6 @@ use parking_lot::Mutex; use substrate_network as net; use tokio_core::reactor; use client::BlockchainEvents; -use substrate_keyring::Keyring; use primitives::{Hash, AuthorityId}; use primitives::block::{Id as BlockId, HeaderHash, Header}; use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt}; @@ -136,14 +135,19 @@ struct Network(Arc); impl Service { /// Create and start a new instance. - pub fn new(client: Arc, network: Arc, transaction_pool: Arc>, best_header: &Header) -> Service + pub fn new( + client: Arc, + network: Arc, + transaction_pool: Arc>, + key: ed25519::Pair, + best_header: &Header) -> Service where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static { let best_header = best_header.clone(); let thread = thread::spawn(move || { let mut core = reactor::Core::new().expect("tokio::Core could not be created"); - let key = Arc::new(Keyring::One.into()); + let key = Arc::new(key); let factory = ProposerFactory { client: client.clone(), transaction_pool: transaction_pool.clone(), diff --git a/polkadot/keystore/src/lib.rs b/polkadot/keystore/src/lib.rs index ece74d0021a6e..8bfa703988891 100644 --- a/polkadot/keystore/src/lib.rs +++ b/polkadot/keystore/src/lib.rs @@ -33,6 +33,7 @@ extern crate error_chain; #[cfg(test)] extern crate tempdir; +use std::collections::HashMap; use std::path::PathBuf; use std::fs::{self, File}; use std::io::{self, Write}; @@ -120,16 +121,19 @@ impl EncryptedKey { } } +type Seed = [u8; 32]; + /// Key store. pub struct Store { path: PathBuf, + additional: HashMap, } impl Store { /// Create a new store at the given path. pub fn open(path: PathBuf) -> Result { fs::create_dir_all(&path)?; - Ok(Store { path }) + Ok(Store { path, additional: HashMap::new() }) } /// Generate a new key, placing it into the store. @@ -145,8 +149,22 @@ impl Store { Ok(pair) } + /// Create a new key from seed. Do not place it into the store. + pub fn generate_from_seed(&mut self, seed: &str) -> Result { + let mut s: [u8; 32] = [' ' as u8; 32]; + let len = ::std::cmp::min(32, seed.len()); + &mut s[..len].copy_from_slice(&seed.as_bytes()[..len]); + let pair = Pair::from_seed(&s); + self.additional.insert(pair.public(), s); + Ok(pair) + } + /// Load a key file with given public key. pub fn load(&self, public: &Public, password: &str) -> Result { + if let Some(ref seed) = self.additional.get(public) { + let pair = Pair::from_seed(seed); + return Ok(pair); + } let path = self.key_file_path(public); let file = File::open(path)?; @@ -158,7 +176,7 @@ impl Store { /// Get public keys of all stored keys. pub fn contents(&self) -> Result> { - let mut public_keys = Vec::new(); + let mut public_keys: Vec = self.additional.keys().cloned().collect(); for entry in fs::read_dir(&self.path)? { let entry = entry?; let path = entry.path(); diff --git a/polkadot/service/src/config.rs b/polkadot/service/src/config.rs index 6b87d0e49bacf..bea6b5545703c 100644 --- a/polkadot/service/src/config.rs +++ b/polkadot/service/src/config.rs @@ -30,7 +30,8 @@ pub struct Configuration { pub network: NetworkConfiguration, /// Path to key files. pub keystore_path: String, - // TODO: add more network, client, tx pool configuration options + /// Additional key seeds. + pub keys: Vec, } impl Default for Configuration { @@ -40,6 +41,7 @@ impl Default for Configuration { transaction_pool: Default::default(), network: Default::default(), keystore_path: Default::default(), + keys: Default::default(), } } } diff --git a/polkadot/service/src/error.rs b/polkadot/service/src/error.rs index 58bd8d633bcf0..fbb6981407df8 100644 --- a/polkadot/service/src/error.rs +++ b/polkadot/service/src/error.rs @@ -18,18 +18,15 @@ use client; use network; +use keystore; error_chain! { links { Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"]; + Keystore(keystore::Error, keystore::ErrorKind) #[doc="Keystore error"]; } errors { - /// Key store errors - Keystore(e: ::keystore::Error) { - description("Keystore error"), - display("Keystore error: {:?}", e), - } } } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 6bf972878d208..d3daac84ca849 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -56,7 +56,6 @@ use transaction_pool::TransactionPool; use substrate_keyring::Keyring; use substrate_executor::NativeExecutor; use polkadot_executor::Executor as LocalDispatch; -use polkadot_primitives::AccountId; use keystore::Store as Keystore; use polkadot_api::PolkadotApi; use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig}; @@ -126,12 +125,27 @@ impl Service { // Create client let executor = polkadot_executor::Executor::new(); let mut storage = Default::default(); - let key: AccountId = Keyring::One.into(); + + let mut keystore = Keystore::open(config.keystore_path.into())?; + for seed in &config.keys { + keystore.generate_from_seed(seed)?; + } + + if keystore.contents()?.is_empty() { + let key = keystore.generate("")?; + info!("Generated a new keypair: {:?}", key.public()); + } let genesis_config = GenesisConfig { - validators: vec![key.clone()], - authorities: vec![key.clone()], - balances: vec![(Keyring::One.into(), 1u64 << 63), (Keyring::Two.into(), 1u64 << 63)].into_iter().collect(), + validators: vec![Keyring::Alice.into(), Keyring::Bob.into(), Keyring::Charlie.into()], + authorities: vec![Keyring::Alice.into(), Keyring::Bob.into(), Keyring::Charlie.into()], + balances: vec![ + (Keyring::One.into(), 1u64 << 63), + (Keyring::Two.into(), 1u64 << 63), + (Keyring::Alice.into(), 1u64 << 63), + (Keyring::Bob.into(), 1u64 << 63), + (Keyring::Charlie.into(), 1u64 << 63), + ].into_iter().collect(), block_time: 5, // 5 second block time. session_length: 720, // that's 1 hour per session. sessions_per_era: 24, // 24 hours per era. @@ -145,7 +159,6 @@ impl Service { (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) }; - let _keystore = Keystore::open(config.keystore_path.into()).map_err(::error::ErrorKind::Keystore)?; let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed"); info!("Starting Polkadot. Best block is #{}", best_header.number); @@ -166,7 +179,10 @@ impl Service { // Spin consensus service if configured let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { - Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool, &best_header)) + // Load the first available key. Code above makes sure it exisis. + let key = keystore.load(&keystore.contents()?[0], "")?; + info!("Using authority key {:?}", key.public()); + Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool, key, &best_header)) } else { None }; diff --git a/substrate/ed25519/src/lib.rs b/substrate/ed25519/src/lib.rs index 87085eddbfe87..390caa46c46a0 100644 --- a/substrate/ed25519/src/lib.rs +++ b/substrate/ed25519/src/lib.rs @@ -55,12 +55,18 @@ pub fn verify>(sig: &[u8], message: &[u8], public: P) -> bool { } /// A public key. -#[derive(PartialEq, Eq, Clone, Debug)] +#[derive(PartialEq, Eq, Clone)] pub struct Public(pub [u8; 32]); /// A key pair. pub struct Pair(signature::Ed25519KeyPair); +impl ::std::hash::Hash for Public { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + impl Public { /// A new instance from the given 32-byte `data`. pub fn from_raw(data: [u8; 32]) -> Self { @@ -122,6 +128,18 @@ impl AsRef for Pair { } } +impl ::std::fmt::Display for Public { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", ::primitives::hexdisplay::HexDisplay::from(&self.0)) + } +} + +impl ::std::fmt::Debug for Public { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", ::primitives::hexdisplay::HexDisplay::from(&self.0)) + } +} + impl Pair { /// Generate new secure (random) key pair, yielding it and the corresponding pkcs#8 bytes. pub fn generate_with_pkcs8() -> (Self, [u8; PKCS_LEN]) { From ab02d2cbe10fc880110128a0502216cffe35bf04 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 2 Apr 2018 15:05:30 +0200 Subject: [PATCH 2/5] Replace multiqueue with future::mpsc --- Cargo.lock | 64 +++--------------------------- polkadot/cli/Cargo.toml | 2 +- polkadot/cli/src/lib.rs | 2 +- substrate/client/Cargo.toml | 2 +- substrate/client/src/client.rs | 26 +++++------- substrate/client/src/lib.rs | 2 +- substrate/network/Cargo.toml | 1 - substrate/network/src/consensus.rs | 56 +++++++++++++------------- substrate/network/src/lib.rs | 1 - substrate/network/src/protocol.rs | 7 ++-- substrate/network/src/service.rs | 13 +++--- 11 files changed, 54 insertions(+), 122 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0280232818aa1..ba3b922964920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,8 +17,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "app_dirs" -version = "1.1.1" +name = "app_dirs2" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -182,11 +182,6 @@ name = "constant_time_eq" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "crossbeam" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "crossbeam" version = "0.3.2" @@ -954,18 +949,6 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "multiqueue" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "net2" version = "0.2.31" @@ -1010,11 +993,6 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "owning_ref" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "owning_ref" version = "0.3.3" @@ -1043,16 +1021,6 @@ dependencies = [ "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "parking_lot" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", - "thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "parking_lot" version = "0.4.8" @@ -1144,7 +1112,7 @@ dependencies = [ name = "polkadot-cli" version = "0.1.0" dependencies = [ - "app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "app_dirs2 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1615,11 +1583,6 @@ name = "smallvec" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "smallvec" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "smallvec" version = "0.4.4" @@ -1680,9 +1643,9 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", @@ -1757,7 +1720,6 @@ dependencies = [ "ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1953,16 +1915,6 @@ dependencies = [ "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "thread-id" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "thread_local" version = "0.3.5" @@ -2256,7 +2208,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" -"checksum app_dirs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b7d1c0d48a81bbb13043847f957971f4d87c81542d80ece5e84ba3cba4058fd4" +"checksum app_dirs2 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0aa02944d8a100b79057d1619032b1ad39de5eed6567cdeccbd53908b326e082" "checksum arrayvec 0.3.25 (registry+https://github.com/rust-lang/crates.io-index)" = "06f59fe10306bb78facd90d28c2038ad23ffaaefa85bac43c8a434cde383334f" "checksum arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a1e964f9e24d588183fcb43503abda40d288c8657dfc27311516ce2f05675aef" "checksum assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e772942dccdf11b368c31e044e4fca9189f80a773d2f0808379de65894cbf57" @@ -2276,7 +2228,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b8f59bcebcfe4269b09f71dab0da15b355c75916a8f975d3876ce81561893ee" "checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" -"checksum crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" "checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" @@ -2348,18 +2299,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" "checksum mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "7da01a5e23070d92d99b1ecd1cd0af36447c6fd44b0fe283c2db199fa136724f" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" -"checksum multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4059673f3516669cbf7ebb448cb37171559ed22e6d8bc79cf0cf9394cf9e73fd" "checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09" "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" "checksum num-traits 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "9936036cc70fe4a8b2d338ab665900323290efb03983c86cbe235ae800ad8017" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum odds 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22" "checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c" -"checksum owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc" "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum parity-wasm 0.15.4 (registry+https://github.com/rust-lang/crates.io-index)" = "235801e9531998c4bb307f4ea6833c9f40a4cf132895219ac8c2cd25a9b310f7" "checksum parity-wordlist 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d0dec124478845b142f68b446cbee953d14d4b41f1bc0425024417720dce693" -"checksum parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fa12d706797d42551663426a45e2db2e0364bd1dbf6aeada87e89c5f981f43e9" "checksum parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" "checksum parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3e7f7c9857874e54afeb950eebeae662b1e51a2493666d2ea4c0a5d91dcf0412" "checksum parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "9f35048d735bb93dd115a0030498785971aab3234d311fbe273d020084d26bd8" @@ -2404,7 +2352,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" -"checksum smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8266519bc1d17d0b5b16f6c21295625d562841c708f6376f49028a43e9c11e" "checksum smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee4f357e8cd37bf8822e1b964e96fd39e2cb5a0424f8aaa284ccaccc2162411c" "checksum smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44db0ecb22921ef790d17ae13a3f6d15784183ff5f2a01aa32098c7498d2b4b9" "checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" @@ -2419,7 +2366,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f73eebdb68c14bcb24aef74ea96079830e7fa7b31a6106e42ea7ee887c1e134e" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693" -"checksum thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2af4d6289a69a35c4d3aea737add39685f2784122c28119a7713165a63d68c9d" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098" "checksum tiny-keccak 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e9241752647ca572f12c9b520a5d360d9099360c527770647e694001646a1d0" diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 232c105f6f7bd..a1ae67c3b62d5 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -12,7 +12,7 @@ log = "0.3" hex-literal = "0.1" triehash = "0.1" ed25519 = { path = "../../substrate/ed25519" } -app_dirs = "1.1" +app_dirs2 = "2.0" substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec" } substrate-runtime-io = { path = "../../substrate/runtime-io" } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 72a66b8ef558a..5b7a49baaebd0 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -18,7 +18,7 @@ #![warn(missing_docs)] -extern crate app_dirs; +extern crate app_dirs2 as app_dirs; extern crate env_logger; extern crate ed25519; extern crate triehash; diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index e398af6743ab1..5d0f1d45752a3 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -9,7 +9,7 @@ log = "0.3" parking_lot = "0.4" triehash = "0.1" hex-literal = "0.1" -multiqueue = "0.3" +futures = "0.1.17" ed25519 = { path = "../ed25519" } substrate-bft = { path = "../bft" } substrate-codec = { path = "../codec" } diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index ecdb3690621c1..4b803486c006d 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -16,7 +16,7 @@ //! Substrate Client -use multiqueue; +use futures::sync::mpsc; use parking_lot::Mutex; use primitives::{self, block, AuthorityId}; use primitives::block::Id as BlockId; @@ -30,17 +30,13 @@ use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; use {error, in_mem, block_builder, runtime_io, bft}; /// Type that implements `futures::Stream` of block import events. -pub type BlockchainEventStream = multiqueue::BroadcastFutReceiver; - -//TODO: The queue is preallocated in multiqueue. Make it unbounded -const NOTIFICATION_QUEUE_SIZE: u64 = 1 << 16; +pub type BlockchainEventStream = mpsc::UnboundedReceiver; /// Polkadot Client pub struct Client where B: backend::Backend { backend: B, executor: E, - import_notification_sink: Mutex>, - import_notification_stream: Mutex>, + import_notification_sinks: Mutex>>, } /// A source of blockchain evenets. @@ -165,7 +161,6 @@ impl Client where where F: FnOnce() -> (block::Header, Vec<(Vec, Vec)>) { - let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE); if backend.blockchain().header(BlockId::Number(0))?.is_none() { trace!("Empty database, writing genesis block"); let (genesis_header, genesis_store) = build_genesis(); @@ -177,8 +172,7 @@ impl Client where Ok(Client { backend, executor, - import_notification_sink: Mutex::new(sink), - import_notification_stream: Mutex::new(stream), + import_notification_sinks: Mutex::new(Vec::new()), }) } @@ -212,9 +206,7 @@ impl Client where /// Close notification streams. pub fn stop_notifications(&self) { - let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE); - *self.import_notification_sink.lock() = sink; - *self.import_notification_stream.lock() = stream; + self.import_notification_sinks.lock().clear(); } /// Get the current set of authorities from storage. @@ -325,9 +317,7 @@ impl Client where header: header, is_new_best: is_new_best, }; - if let Err(e) = self.import_notification_sink.lock().try_send(notification) { - warn!("Error queueing block import notification: {:?}", e); - } + self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err()); } Ok(ImportResult::Queued) @@ -424,7 +414,9 @@ impl BlockchainEvents for Client { /// Get block import event stream. fn import_notification_stream(&self) -> BlockchainEventStream { - self.import_notification_stream.lock().add_stream() + let (sink, stream) = mpsc::unbounded(); + self.import_notification_sinks.lock().push(sink); + stream } } diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index a5483b8c1398d..7feede0491b0e 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -31,7 +31,7 @@ extern crate ed25519; extern crate triehash; extern crate parking_lot; -extern crate multiqueue; +extern crate futures; #[cfg(test)] #[macro_use] extern crate hex_literal; #[macro_use] extern crate error_chain; #[macro_use] extern crate log; diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index fec62305871ec..162e2d9b8443d 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -17,7 +17,6 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" futures = "0.1.17" -multiqueue = "0.3" ethcore-network = { git = "https://github.com/paritytech/parity.git" } ethcore-io = { git = "https://github.com/paritytech/parity.git" } ed25519 = { path = "../../substrate/ed25519" } diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index eb2a1e51a40e1..efff9dbf42cab 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -17,8 +17,7 @@ //! Consensus related bits of the network service. use std::collections::HashMap; -use multiqueue; -use futures::sync::oneshot; +use futures::sync::{oneshot, mpsc}; use io::SyncIo; use protocol::Protocol; use network::PeerId; @@ -26,9 +25,6 @@ use primitives::Hash; use message::{self, Message}; use runtime_support::Hashable; -//TODO: The queue is preallocated in multiqueue. Make it unbounded -const QUEUE_SIZE: u64 = 1 << 16; - struct CandidateRequest { id: message::RequestId, completion: oneshot::Sender>, @@ -43,35 +39,25 @@ struct PeerConsensus { pub struct Consensus { peers: HashMap, our_candidate: Option<(Hash, Vec)>, - statement_sink: multiqueue::BroadcastFutSender, - statement_stream: multiqueue::BroadcastFutReceiver, - bft_message_sink: multiqueue::BroadcastFutSender, - bft_message_stream: multiqueue::BroadcastFutReceiver, + statement_sink: Option>, + bft_message_sink: Option>, } impl Consensus { /// Create a new instance. pub fn new() -> Consensus { - let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); - let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); Consensus { peers: HashMap::new(), our_candidate: None, - statement_sink: statement_sink, - statement_stream: statement_stream, - bft_message_sink: bft_sink, - bft_message_stream: bft_stream, + statement_sink: None, + bft_message_sink: None, } } /// Closes all notification streams. pub fn restart(&mut self) { - let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); - let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); - self.statement_sink = statement_sink; - self.statement_stream = statement_stream; - self.bft_message_sink = bft_sink; - self.bft_message_stream = bft_stream; + self.statement_sink = None; + self.bft_message_sink = None; } /// Handle new connected peer. @@ -93,31 +79,43 @@ impl Consensus { &message::UnsignedStatement::Available(ref hash) => peer.candidate_available = Some(*hash), &message::UnsignedStatement::Valid(_) | &message::UnsignedStatement::Invalid(_) => (), } - if let Err(e) = self.statement_sink.try_send(statement) { - trace!(target:"sync", "Error broadcasting statement notification: {:?}", e); + if let Some(sink) = self.statement_sink.take() { + if let Err(e) = sink.unbounded_send(statement) { + trace!(target:"sync", "Error broadcasting statement notification: {:?}", e); + } else { + self.statement_sink = Some(sink); + } } } else { trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id); } } - pub fn statements(&self) -> multiqueue::BroadcastFutReceiver{ - self.statement_stream.add_stream() + pub fn statements(&mut self) -> mpsc::UnboundedReceiver{ + let (sink, stream) = mpsc::unbounded(); + self.statement_sink = Some(sink); + stream } pub fn on_bft_message(&mut self, peer_id: PeerId, message: message::BftMessage) { if self.peers.contains_key(&peer_id) { // TODO: validate signature? - if let Err(e) = self.bft_message_sink.try_send(message) { - trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); + if let Some(sink) = self.bft_message_sink.take() { + if let Err(e) = sink.unbounded_send(message) { + trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); + } else { + self.bft_message_sink = Some(sink); + } } } else { trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id); } } - pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver{ - self.bft_message_stream.add_stream() + pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver{ + let (sink, stream) = mpsc::unbounded(); + self.bft_message_sink = Some(sink); + stream } pub fn fetch_candidate(&mut self, io: &mut SyncIo, protocol: &Protocol, hash: &Hash) -> oneshot::Receiver> { diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 1c7fe9fa0e4c4..5a2727938af52 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -32,7 +32,6 @@ extern crate substrate_bft; extern crate serde; extern crate serde_json; extern crate futures; -extern crate multiqueue; extern crate ed25519; #[macro_use] extern crate serde_derive; #[macro_use] extern crate log; diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 35f2ca0549788..7921cc0c5f320 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -19,7 +19,6 @@ use std::{mem, cmp}; use std::sync::Arc; use std::time; use parking_lot::{RwLock, Mutex}; -use multiqueue; use futures::sync::oneshot; use serde_json; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; @@ -29,7 +28,7 @@ use network::{PeerId, NodeId}; use message::{self, Message}; use sync::{ChainSync, Status as SyncStatus, SyncState}; use consensus::Consensus; -use service::{Role, TransactionPool}; +use service::{Role, TransactionPool, StatementStream, BftMessageStream}; use config::ProtocolConfig; use chain::Client; use io::SyncIo; @@ -310,12 +309,12 @@ impl Protocol { } /// See `ConsensusService` trait. - pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver { + pub fn bft_messages(&self) -> BftMessageStream { self.consensus.lock().bft_messages() } /// See `ConsensusService` trait. - pub fn statements(&self) -> multiqueue::BroadcastFutReceiver { + pub fn statements(&self) -> StatementStream { self.consensus.lock().statements() } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 1688d1e717057..ce5ff0059cb31 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -17,8 +17,7 @@ use std::sync::Arc; use std::collections::{BTreeMap}; use std::io; -use multiqueue; -use futures::sync::oneshot; +use futures::sync::{oneshot, mpsc}; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, NetworkConfiguration , NonReservedPeerMode, ErrorKind}; use primitives::block::{TransactionHash, Header}; @@ -37,9 +36,9 @@ pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; /// Type that represents fetch completion future. pub type FetchFuture = oneshot::Receiver>; /// Type that represents statement stream. -pub type StatementStream = multiqueue::BroadcastFutReceiver; +pub type StatementStream = mpsc::UnboundedReceiver; /// Type that represents bft messages stream. -pub type BftMessageStream = multiqueue::BroadcastFutReceiver; +pub type BftMessageStream = mpsc::UnboundedReceiver; bitflags! { /// Node roles bitmask. @@ -80,7 +79,7 @@ pub trait TransactionPool: Send + Sync { /// ConsensusService pub trait ConsensusService: Send + Sync { /// Get statement stream. - fn statements(&self) -> multiqueue::BroadcastFutReceiver; + fn statements(&self) -> StatementStream; /// Send out a statement. fn send_statement(&self, statement: Statement); /// Maintain connectivity to given addresses. @@ -92,7 +91,7 @@ pub trait ConsensusService: Send + Sync { fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>); /// Get BFT message stream. - fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver; + fn bft_messages(&self) -> BftMessageStream; /// Send out a BFT message. fn send_bft_message(&self, message: BftMessage); } @@ -228,7 +227,7 @@ impl SyncProvider for Service { /// ConsensusService impl ConsensusService for Service { - fn statements(&self) -> multiqueue::BroadcastFutReceiver { + fn statements(&self) -> StatementStream { self.handler.protocol.statements() } From ae96357d4a3b7cb4e16d65fe3206572a6869867c Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 2 Apr 2018 15:44:31 +0200 Subject: [PATCH 3/5] BFT gossip --- polkadot/consensus/src/service.rs | 1 - substrate/network/src/consensus.rs | 69 ++++++++++++++++++++++++------ substrate/network/src/message.rs | 10 ++--- substrate/network/src/protocol.rs | 20 ++++++--- 4 files changed, 75 insertions(+), 25 deletions(-) diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index d481dd7d1b510..6ef1aa8389217 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -143,7 +143,6 @@ impl Service { best_header: &Header) -> Service where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static { - let best_header = best_header.clone(); let thread = thread::spawn(move || { let mut core = reactor::Core::new().expect("tokio::Core could not be created"); diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index efff9dbf42cab..630506739ca67 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -16,8 +16,10 @@ //! Consensus related bits of the network service. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use futures::sync::{oneshot, mpsc}; +use std::time::{Instant, Duration}; +use std::collections::hash_map::Entry; use io::SyncIo; use protocol::Protocol; use network::PeerId; @@ -25,6 +27,9 @@ use primitives::Hash; use message::{self, Message}; use runtime_support::Hashable; +// TODO: Add additional spam/DoS attack protection. +const MESSAGE_LIFETIME_SECONDS: u64 = 600; + struct CandidateRequest { id: message::RequestId, completion: oneshot::Sender>, @@ -33,6 +38,7 @@ struct CandidateRequest { struct PeerConsensus { candidate_fetch: Option, candidate_available: Option, + known_messages: HashSet, } /// Consensus network protocol handler. Manages statements and candidate requests. @@ -41,6 +47,7 @@ pub struct Consensus { our_candidate: Option<(Hash, Vec)>, statement_sink: Option>, bft_message_sink: Option>, + message_timestamps: HashMap, } impl Consensus { @@ -51,6 +58,7 @@ impl Consensus { our_candidate: None, statement_sink: None, bft_message_sink: None, + message_timestamps: Default::default(), } } @@ -67,11 +75,26 @@ impl Consensus { self.peers.insert(peer_id, PeerConsensus { candidate_fetch: None, candidate_available: None, + known_messages: Default::default(), }); } } - pub fn on_statement(&mut self, peer_id: PeerId, statement: message::Statement) { + fn propagate(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::Message, hash: Hash) { + for (id, ref mut peer) in self.peers.iter_mut() { + if peer.known_messages.insert(hash.clone()) { + protocol.send_message(io, *id, message.clone()); + } + } + } + + fn register_message(&mut self, hash: Hash) { + if let Entry::Vacant(entry) = self.message_timestamps.entry(hash) { + entry.insert(Instant::now()); + } + } + + pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { // TODO: validate signature? match &statement.statement { @@ -79,8 +102,9 @@ impl Consensus { &message::UnsignedStatement::Available(ref hash) => peer.candidate_available = Some(*hash), &message::UnsignedStatement::Valid(_) | &message::UnsignedStatement::Invalid(_) => (), } + peer.known_messages.insert(hash); if let Some(sink) = self.statement_sink.take() { - if let Err(e) = sink.unbounded_send(statement) { + if let Err(e) = sink.unbounded_send(statement.clone()) { trace!(target:"sync", "Error broadcasting statement notification: {:?}", e); } else { self.statement_sink = Some(sink); @@ -88,7 +112,11 @@ impl Consensus { } } else { trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id); + return; } + self.register_message(hash.clone()); + // Propagate to other peers. + self.propagate(io, protocol, Message::Statement(statement), hash); } pub fn statements(&mut self) -> mpsc::UnboundedReceiver{ @@ -97,11 +125,12 @@ impl Consensus { stream } - pub fn on_bft_message(&mut self, peer_id: PeerId, message: message::BftMessage) { - if self.peers.contains_key(&peer_id) { + pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::BftMessage, hash: Hash) { + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + peer.known_messages.insert(hash); // TODO: validate signature? if let Some(sink) = self.bft_message_sink.take() { - if let Err(e) = sink.unbounded_send(message) { + if let Err(e) = sink.unbounded_send(message.clone()) { trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); } else { self.bft_message_sink = Some(sink); @@ -109,7 +138,11 @@ impl Consensus { } } else { trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id); + return; } + self.register_message(hash.clone()); + // Propagate to other peers. + self.propagate(io, protocol, Message::BftMessage(message), hash); } pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver{ @@ -145,17 +178,19 @@ impl Consensus { pub fn send_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, statement: message::Statement) { // Broadcast statement to all validators. trace!(target:"sync", "Broadcasting statement {:?}", statement); - for peer in self.peers.keys() { - protocol.send_message(io, *peer, Message::Statement(statement.clone())); - } + let message = Message::Statement(statement); + let hash = Protocol::hash_message(&message); + self.register_message(hash.clone()); + self.propagate(io, protocol, message, hash); } pub fn send_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::BftMessage) { // Broadcast message to all validators. trace!(target:"sync", "Broadcasting BFT message {:?}", message); - for peer in self.peers.keys() { - protocol.send_message(io, *peer, Message::BftMessage(message.clone())); - } + let message = Message::BftMessage(message); + let hash = Protocol::hash_message(&message); + self.register_message(hash.clone()); + self.propagate(io, protocol, message, hash); } pub fn set_local_candidate(&mut self, candidate: Option<(Hash, Vec)>) { @@ -198,4 +233,14 @@ impl Consensus { pub fn peer_disconnected(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId) { self.peers.remove(&peer_id); } + + pub fn collect_garbage(&mut self) { + let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); + let now = Instant::now(); + self.message_timestamps.retain(|_, timestamp| *timestamp + expiration < now); + let timestamps = &self.message_timestamps; + for (_, ref mut peer) in self.peers.iter_mut() { + peer.known_messages.retain(|h| timestamps.contains_key(h)); + } + } } diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index 52c78d95febb1..81d73ac1cb07e 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -26,7 +26,7 @@ pub type RequestId = u64; type Bytes = Vec; /// Configured node role. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub enum Role { /// Full relay chain client with no additional responsibilities. Full, @@ -213,7 +213,7 @@ pub enum SignedConsensusMessage { /// A vote. Vote(SignedConsensusVote), } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] /// A network message. pub enum Message { /// Status packet. @@ -236,7 +236,7 @@ pub enum Message { BftMessage(BftMessage), } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] pub struct Status { /// Protocol version. pub version: u32, @@ -291,7 +291,7 @@ pub struct CandidateResponse { pub data: Option>, } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] /// Response to `BlockRequest` pub struct BlockResponse { /// Id of a request this response was made for. @@ -300,7 +300,7 @@ pub struct BlockResponse { pub blocks: Vec, } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] /// Announce a new complete relay chain block on the network. pub struct BlockAnnounce { /// New block header. diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 7921cc0c5f320..229d2a2d244be 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -22,7 +22,7 @@ use parking_lot::{RwLock, Mutex}; use futures::sync::oneshot; use serde_json; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; -use primitives::Hash; +use primitives::{Hash, blake2_256}; use network::{PeerId, NodeId}; use message::{self, Message}; @@ -176,10 +176,10 @@ impl Protocol { Message::BlockAnnounce(announce) => { self.on_block_announce(io, peer_id, announce); }, - Message::Statement(s) => self.on_statement(io, peer_id, s), + Message::Statement(s) => self.on_statement(io, peer_id, s, blake2_256(data).into()), Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r), Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), - Message::BftMessage(m) => self.on_bft_message(io, peer_id, m), + Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()), Message::Transactions(m) => self.on_transactions(io, peer_id, m), } } @@ -204,6 +204,11 @@ impl Protocol { } } + pub fn hash_message(message: &Message) -> Hash { + let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); + blake2_256(&data).into() + } + /// Called when a new peer is connected pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) { trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id)); @@ -293,14 +298,14 @@ impl Protocol { self.consensus.lock().on_candidate_response(io, self, peer, response); } - fn on_statement(&self, _io: &mut SyncIo, peer: PeerId, statement: message::Statement) { + fn on_statement(&self, io: &mut SyncIo, peer: PeerId, statement: message::Statement, hash: Hash) { trace!(target: "sync", "Statement from {}: {:?}", peer, statement); - self.consensus.lock().on_statement(peer, statement); + self.consensus.lock().on_statement(io, self, peer, statement, hash); } - fn on_bft_message(&self, _io: &mut SyncIo, peer: PeerId, message: message::BftMessage) { + fn on_bft_message(&self, io: &mut SyncIo, peer: PeerId, message: message::BftMessage, hash: Hash) { trace!(target: "sync", "BFT message from {}: {:?}", peer, message); - self.consensus.lock().on_bft_message(peer, message); + self.consensus.lock().on_bft_message(io, self, peer, message, hash); } /// See `ConsensusService` trait. @@ -336,6 +341,7 @@ impl Protocol { /// Perform time based maintenance. pub fn tick(&self, io: &mut SyncIo) { self.maintain_peers(io); + self.consensus.lock().collect_garbage(); } fn maintain_peers(&self, io: &mut SyncIo) { From ae0d1911ae5233e459ce561965fe6c78a582409c Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 3 Apr 2018 13:16:43 +0200 Subject: [PATCH 4/5] Revert to app_dirs --- Cargo.lock | 8 ++++---- polkadot/cli/Cargo.toml | 2 +- polkadot/cli/src/lib.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba3b922964920..995c0525af84c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,8 +17,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "app_dirs2" -version = "2.0.3" +name = "app_dirs" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1112,7 +1112,7 @@ dependencies = [ name = "polkadot-cli" version = "0.1.0" dependencies = [ - "app_dirs2 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)", "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2208,7 +2208,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" -"checksum app_dirs2 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0aa02944d8a100b79057d1619032b1ad39de5eed6567cdeccbd53908b326e082" +"checksum app_dirs 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e73a24bad9bd6a94d6395382a6c69fe071708ae4409f763c5475e14ee896313d" "checksum arrayvec 0.3.25 (registry+https://github.com/rust-lang/crates.io-index)" = "06f59fe10306bb78facd90d28c2038ad23ffaaefa85bac43c8a434cde383334f" "checksum arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a1e964f9e24d588183fcb43503abda40d288c8657dfc27311516ce2f05675aef" "checksum assert_matches 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9e772942dccdf11b368c31e044e4fca9189f80a773d2f0808379de65894cbf57" diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index a1ae67c3b62d5..8fa34c3a02eea 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -12,7 +12,7 @@ log = "0.3" hex-literal = "0.1" triehash = "0.1" ed25519 = { path = "../../substrate/ed25519" } -app_dirs2 = "2.0" +app_dirs = "1.2" substrate-client = { path = "../../substrate/client" } substrate-codec = { path = "../../substrate/codec" } substrate-runtime-io = { path = "../../substrate/runtime-io" } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 5b7a49baaebd0..72a66b8ef558a 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -18,7 +18,7 @@ #![warn(missing_docs)] -extern crate app_dirs2 as app_dirs; +extern crate app_dirs; extern crate env_logger; extern crate ed25519; extern crate triehash; From 48754017aa88a7b476c1806eccef466fc49fb7d9 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 3 Apr 2018 13:18:58 +0200 Subject: [PATCH 5/5] generate_from_seed commented --- polkadot/keystore/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/polkadot/keystore/src/lib.rs b/polkadot/keystore/src/lib.rs index 8bfa703988891..d91d986e5650a 100644 --- a/polkadot/keystore/src/lib.rs +++ b/polkadot/keystore/src/lib.rs @@ -150,6 +150,8 @@ impl Store { } /// Create a new key from seed. Do not place it into the store. + /// Only the first 32 bytes of the sead are used. This is meant to be used for testing only. + // TODO: Remove this pub fn generate_from_seed(&mut self, seed: &str) -> Result { let mut s: [u8; 32] = [' ' as u8; 32]; let len = ::std::cmp::min(32, seed.len());