Skip to content

Commit

Permalink
Companion PR for Substrate#4394 (paritytech#723)
Browse files Browse the repository at this point in the history
* service/src/lib.rs: Register network event stream for authority disc

Previously one would create a sender and receiver channel pair, pass the
sender to the build_network_future through the service builder and
funnel network events returned from polling the network service into the
sender to be consumed by the authority discovery module owning the
receiver.

With recent changes it is now possible to register an event_stream
with the network service directly, thus one does not need to make the
detour through the build_network_future.

This commit is an adjusted clone of one targeting the Substrate
repository.

* service/src/lib.rs: Fix futures::stream imports

* [TMP] *: Replace polkadot-upstream with feature branch

* Switch branch

* Small change

* Companion PR to substrate#4542

* Revert "Merge remote-tracking branch 'tomaka/companion-4542' into ashley-browser-utils"

This reverts commit 17f00afe483ee65cb3cf4a0faca27034e6d6523a, reversing
changes made to 928cbb9c55542baff56b53accd9a5a45f12f01f1.

* ashley-browser-utils -> ashley-browser-utils-polkadot

* Switch branches back

Co-authored-by: Max Inden <[email protected]>
Co-authored-by: Pierre Krieger <[email protected]>
  • Loading branch information
3 people authored and gavofyork committed Jan 9, 2020
1 parent 7207d97 commit 6c8eb8a
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 176 deletions.
34 changes: 10 additions & 24 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,27 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
log = "0.4.8"
tokio = "0.1.22"
futures = { version = "0.3.1", features = ["compat"] }
futures01 = { package = "futures", version = "0.1.29" }
structopt = "=0.3.7"
cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "polkadot-service", path = "../service", default-features = false }

libp2p = { version = "0.13.1", default-features = false, optional = true }
wasm-bindgen = { version = "0.2.55", optional = true }
wasm-bindgen-futures = { version = "0.4.5", optional = true }
console_log = { version = "0.1.2", optional = true }
console_error_panic_hook = { version = "0.1.1", optional = true }
js-sys = { version = "0.3.22", optional = true }
kvdb-web = { version = "0.1.1", optional = true }
substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false }
substrate-network = { package = "sc-network", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true }
tokio = { version = "0.1.22", optional = true }

# Imported just for the `wasm-bindgen` feature
rand = { version = "0.7", features = ["wasm-bindgen"], optional = true }
rand6 = { package = "rand", version = "0.6.5", features = ["wasm-bindgen"], optional = true }
wasm-bindgen = { version = "0.2.57", optional = true }
wasm-bindgen-futures = { version = "0.4.7", optional = true }
browser-utils = { package = "browser-utils", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true }
substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false }

[features]
default = [ "wasmtime", "rocksdb" ]
wasmtime = [ "cli/wasmtime" ]
default = [ "wasmtime", "rocksdb", "cli" ]
wasmtime = [ "sc-cli/wasmtime" ]
rocksdb = [ "service/rocksdb" ]
cli = [ "tokio" ]
browser = [
"libp2p",
"wasm-bindgen",
"console_error_panic_hook",
"wasm-bindgen-futures",
"console_log",
"js-sys",
"kvdb-web",
"browser-utils",
"substrate-service",
"substrate-network",
"rand",
"rand6",
]
145 changes: 12 additions & 133 deletions cli/src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,28 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::ChainSpec;
use futures01::{prelude::*, sync::oneshot, sync::mpsc};
use libp2p::wasm_ext;
use log::{debug, info};
use std::sync::Arc;
use service::{AbstractService, Roles as ServiceRoles};
use substrate_service::{RpcSession, Configuration, config::DatabaseConfig};
use log::info;
use substrate_service::Configuration;
use wasm_bindgen::prelude::*;
use futures::{compat::*, TryFutureExt as _, TryStreamExt as _, FutureExt as _};
use service::CustomConfiguration;

/// Starts the client.
///
/// You must pass a libp2p transport that supports .
#[wasm_bindgen]
pub async fn start_client(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, JsValue> {
pub async fn start_client(wasm_ext: browser_utils::Transport) -> Result<browser_utils::Client, JsValue> {
start_inner(wasm_ext)
.await
.map_err(|err| JsValue::from_str(&err.to_string()))
}

async fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<dyn std::error::Error>> {
console_error_panic_hook::set_once();
console_log::init_with_level(log::Level::Info);
async fn start_inner(wasm_ext: browser_utils::Transport) -> Result<browser_utils::Client, Box<dyn std::error::Error>> {
browser_utils::set_console_error_panic_hook();
browser_utils::init_console_log(log::Level::Info)?;

// Build the configuration to pass to the service.
let config = {
let wasm_ext = wasm_ext::ExtTransport::new(wasm_ext);
let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?;
let mut config = Configuration::<service::CustomConfiguration, _, _>::default_with_spec_and_base_path(chain_spec, None);
config.network.transport = substrate_network::config::TransportConfig::Normal {
wasm_external_transport: Some(wasm_ext.clone()),
allow_private_ipv4: true,
enable_mdns: false,
};
config.telemetry_external_transport = Some(wasm_ext);
config.roles = ServiceRoles::LIGHT;
config.name = "Browser node".to_string();
config.database = {
let db = kvdb_web::Database::open("polkadot".into(), 10)
.await
.unwrap();
DatabaseConfig::Custom(Arc::new(db))
};
config.keystore_path = Some(std::path::PathBuf::from("/"));
config
};
let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?;
let config: Configuration<CustomConfiguration, _, _> = browser_utils::browser_configuration(wasm_ext, chain_spec)
.await?;

info!("Polkadot browser node");
info!(" version {}", config.full_version());
Expand All @@ -76,105 +53,7 @@ async fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<d
info!("Roles: {:?}", config.roles);

// Create the service. This is the most heavy initialization step.
let mut service = service::new_light(config).map_err(|e| format!("{:?}", e))?;
let service = service::kusama_new_light(config).map_err(|e| format!("{:?}", e))?;

// We now dispatch a background task responsible for processing the service.
//
// The main action performed by the code below consists in polling the service with
// `service.poll()`.
// The rest consists in handling RPC requests.
let (rpc_send_tx, mut rpc_send_rx) = mpsc::unbounded::<RpcMessage>();
wasm_bindgen_futures::spawn_local(futures01::future::poll_fn(move || {
loop {
match rpc_send_rx.poll() {
Ok(Async::Ready(Some(message))) => {
let fut = service.rpc_query(&message.session, &message.rpc_json);
let _ = message.send_back.send(Box::new(fut));
},
Ok(Async::NotReady) => break,
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
}
}

loop {
match service.poll().map_err(|_| ())? {
Async::Ready(()) => return Ok(Async::Ready(())),
Async::NotReady => break
}
}

Ok::<_, ()>(Async::NotReady)
}).compat().map(drop));

Ok(Client {
rpc_send_tx,
})
}

/// A running client.
#[wasm_bindgen]
pub struct Client {
rpc_send_tx: mpsc::UnboundedSender<RpcMessage>,
}

struct RpcMessage {
rpc_json: String,
session: RpcSession,
send_back: oneshot::Sender<Box<dyn Future<Item = Option<String>, Error = ()> + Unpin>>,
}

#[wasm_bindgen]
impl Client {
/// Allows starting an RPC request. Returns a `Promise` containing the result of that request.
#[wasm_bindgen(js_name = "rpcSend")]
pub fn rpc_send(&mut self, rpc: &str) -> js_sys::Promise {
let rpc_session = RpcSession::new(mpsc::channel(1).0);
let (tx, rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session,
send_back: tx,
});
let fut = rx
.compat()
.map_err(|_| ())
.and_then(|fut| fut.compat())
.map_ok(|s| JsValue::from_str(&s.unwrap_or(String::new())))
.map_err(|_| JsValue::NULL);
wasm_bindgen_futures::future_to_promise(fut)
}

/// Subscribes to an RPC pubsub endpoint.
#[wasm_bindgen(js_name = "rpcSubscribe")]
pub fn rpc_subscribe(&mut self, rpc: &str, callback: js_sys::Function) {
let (tx, rx) = mpsc::channel(4);
let rpc_session = RpcSession::new(tx);
let (fut_tx, fut_rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session.clone(),
send_back: fut_tx,
});
let fut_rx = fut_rx
.compat()
.map_err(|_| ())
.and_then(|fut| fut.compat())
.map(drop);
wasm_bindgen_futures::spawn_local(fut_rx);
wasm_bindgen_futures::spawn_local(rx
.compat()
.try_for_each(move |s| {
match callback.call1(&callback, &JsValue::from_str(&s)) {
Ok(_) => futures::future::ready(Ok(())),
Err(_) => futures::future::ready(Err(())),
}
})
.then(move |_| {
// We need to keep `rpc_session` alive.
debug!("RPC subscription has ended");
drop(rpc_session);
futures::future::ready(())
})
);
}
Ok(browser_utils::start_client(service))
}
40 changes: 22 additions & 18 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use chain_spec::ChainSpec;
use futures::{
Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt,
};
#[cfg(feature = "cli")]
use tokio::runtime::Runtime;
use log::info;
use structopt::StructOpt;
Expand All @@ -36,8 +37,8 @@ pub use service::{
WrappedExecutor
};

pub use cli::{VersionInfo, IntoExit, NoCustom, SharedParams};
pub use cli::{display_role, error};
pub use sc_cli::{VersionInfo, IntoExit, NoCustom, SharedParams};
pub use sc_cli::{display_role, error};

/// Load the `ChainSpec` for the given `id`.
pub fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> {
Expand All @@ -53,8 +54,8 @@ enum PolkadotSubCommands {
ValidationWorker(ValidationWorkerCommand),
}

impl cli::GetSharedParams for PolkadotSubCommands {
fn shared_params(&self) -> Option<&cli::SharedParams> { None }
impl sc_cli::GetSharedParams for PolkadotSubCommands {
fn shared_params(&self) -> Option<&sc_cli::SharedParams> { None }
}

#[derive(Debug, StructOpt, Clone)]
Expand All @@ -70,16 +71,17 @@ struct PolkadotSubParams {
}

/// Parses polkadot specific CLI arguments and run the service.
pub fn run<E: IntoExit>(exit: E, version: cli::VersionInfo) -> error::Result<()> {
let cmd = cli::parse_and_prepare::<PolkadotSubCommands, PolkadotSubParams, _>(
#[cfg(feature = "cli")]
pub fn run<E: IntoExit>(exit: E, version: sc_cli::VersionInfo) -> error::Result<()> {
let cmd = sc_cli::parse_and_prepare::<PolkadotSubCommands, PolkadotSubParams, _>(
&version,
"parity-polkadot",
std::env::args(),
);

// Preload spec to select native runtime
let spec = match cmd.shared_params() {
Some(params) => Some(cli::load_spec(params, &load_spec)?),
Some(params) => Some(sc_cli::load_spec(params, &load_spec)?),
None => None,
};
if spec.as_ref().map_or(false, |c| c.is_kusama()) {
Expand All @@ -100,10 +102,11 @@ pub fn run<E: IntoExit>(exit: E, version: cli::VersionInfo) -> error::Result<()>
}

/// Execute the given `cmd` with the given runtime.
#[cfg(feature = "cli")]
fn execute_cmd_with_runtime<R, D, E, X>(
exit: X,
version: &cli::VersionInfo,
cmd: cli::ParseAndPrepare<PolkadotSubCommands, PolkadotSubParams>,
version: &sc_cli::VersionInfo,
cmd: sc_cli::ParseAndPrepare<PolkadotSubCommands, PolkadotSubParams>,
spec: Option<service::ChainSpec>,
) -> error::Result<()>
where
Expand All @@ -120,7 +123,7 @@ where
// Use preloaded spec
let load_spec = |_: &str| Ok(spec);
match cmd {
cli::ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
sc_cli::ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit,
|exit, _cli_args, custom_args, mut config| {
info!("{}", version.name);
info!(" version {}", config.full_version());
Expand Down Expand Up @@ -154,17 +157,17 @@ where
),
}.map_err(|e| format!("{:?}", e))
}),
cli::ParseAndPrepare::BuildSpec(cmd) => cmd.run::<NoCustom, _, _, _>(load_spec),
cli::ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
sc_cli::ParseAndPrepare::BuildSpec(cmd) => cmd.run::<NoCustom, _, _, _>(load_spec),
sc_cli::ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
sc_cli::ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::CheckBlock(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
sc_cli::ParseAndPrepare::CheckBlock(cmd) => cmd.run_with_builder::<_, _, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec, exit),
cli::ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec),
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<_, _, _, _, _, _>(|config|
sc_cli::ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec),
sc_cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<_, _, _, _, _, _>(|config|
Ok(service::new_chain_ops::<R, D, E>(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
sc_cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
if cfg!(feature = "browser") {
Err(error::Error::Input("Cannot run validation worker in browser".into()))
} else {
Expand All @@ -177,6 +180,7 @@ where
}

/// Run the given `service` using the `runtime` until it exits or `e` fires.
#[cfg(feature = "cli")]
pub fn run_until_exit(
mut runtime: Runtime,
service: impl AbstractService,
Expand All @@ -185,7 +189,7 @@ pub fn run_until_exit(
let (exit_send, exit) = oneshot::channel();

let executor = runtime.executor();
let informant = cli::informant::build(&service);
let informant = sc_cli::informant::build(&service);
let future = select(exit, informant)
.map(|_| Ok(()))
.compat();
Expand Down
2 changes: 1 addition & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ consensus_common = { package = "sp-consensus", git = "https://github.com/parityt
grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa_primitives = { package = "sp-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", default-features = false }
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
Expand Down

0 comments on commit 6c8eb8a

Please sign in to comment.