Skip to content

Commit

Permalink
Support instant seal (paritytech#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshOrndorff committed Feb 3, 2021
1 parent 6c52dd4 commit e018d42
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ name = 'moonbase-alphanet'
path = 'src/main.rs'

[dependencies]
async-io = "1.3"
derive_more = '0.15.0'
exit-future = '0.1.4'
futures = { version = "0.3.1", features = ["compat"] }
Expand Down
34 changes: 34 additions & 0 deletions node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use sp_core::H160;
use std::path::PathBuf;
use std::str::FromStr;
use structopt::StructOpt;

/// Sub-commands supported by the collator.
Expand Down Expand Up @@ -96,6 +97,12 @@ pub struct RunCmd {
#[structopt(long)]
pub parachain_id: Option<u32>,

/// When blocks should be sealed in the dev service.
///
/// Options are "instant", "manual", or timer interval in milliseconds
#[structopt(long, default_value = "instant")]
pub sealing: Sealing,

/// Public identity for participating in staking and receiving rewards
#[structopt(long, parse(try_from_str = parse_h160))]
pub author_id: Option<H160>,
Expand Down Expand Up @@ -165,3 +172,30 @@ impl RelayChainCli {
}
}
}

/// Block authoring scheme to be used by the dev service.
#[derive(Debug)]
pub enum Sealing {
/// Author a block immediately upon receiving a transaction into the transaction pool
Instant,
/// Author a block upon receiving an RPC command
Manual,
/// Author blocks at a regular interval specified in milliseconds
Interval(u64),
}

impl FromStr for Sealing {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"instant" => Self::Instant,
"manual" => Self::Manual,
s => {
let millis =
u64::from_str_radix(s, 10).map_err(|_| "couldn't decode sealing param")?;
Self::Interval(millis)
}
})
}
}
2 changes: 1 addition & 1 deletion node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ pub fn run() -> Result<()> {
.expect("Gerald is a valid account"),
));

return crate::dev_service::new_full(config, author_id);
return crate::dev_service::new_full(config, cli.run.sealing, author_id);
}

let key = sp_core::Pair::generate().0;
Expand Down
101 changes: 67 additions & 34 deletions node/src/dev_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
//! Service and ServiceFactory implementation. Specialized wrapper over Substrate service.
//! This one is used specifically for the --dev service.

use crate::cli::Sealing;
use async_io::Timer;
use fc_consensus::FrontierBlockImport;
use fc_rpc_core::types::PendingTransactions;
use futures::Stream;
use futures::StreamExt;
use moonbeam_runtime::{self, opaque::Block, RuntimeApi};
use sc_client_api::BlockchainEvents;
use sc_consensus_manual_seal::{self as manual_seal};
use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
use sc_executor::native_executor_instance;
pub use sc_executor::NativeExecutor;
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sp_core::H160;
use sp_core::H256;
use std::time::Duration;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
Expand All @@ -44,7 +50,6 @@ type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;

pub fn new_partial(
config: &Configuration,
// manual_seal: bool, // For now only manual seal. Maybe bring this back to support instant later.
author: Option<H160>,
) -> Result<
sc_service::PartialComponents<
Expand Down Expand Up @@ -101,7 +106,7 @@ pub fn new_partial(
/// Builds a new service for a full client.
pub fn new_full(
config: Configuration,
// manual_seal: bool,
sealing: Sealing,
author_id: Option<H160>,
) -> Result<TaskManager, ServiceError> {
let sc_service::PartialComponents {
Expand All @@ -127,9 +132,6 @@ pub fn new_full(
block_announce_validator_builder: None,
})?;

// Channel for the rpc handler to communicate with the authorship task.
let (command_sink, commands_stream) = futures::channel::mpsc::channel(1000);

if config.offchain_worker.enabled {
sc_service::build_offchain_workers(
&config,
Expand All @@ -146,6 +148,64 @@ pub fn new_full(
let is_authority = role.is_authority();
let subscription_task_executor =
sc_rpc::SubscriptionTaskExecutor::new(task_manager.spawn_handle());
let mut command_sink = None;

if role.is_authority() {
let env = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
prometheus_registry.as_ref(),
);

let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
match sealing {
Sealing::Instant => {
Box::new(
// This bit cribbed from the implementation of instant seal.
transaction_pool
.pool()
.validated_pool()
.import_notification_stream()
.map(|_| EngineCommand::SealNewBlock {
create_empty: false,
finalize: false,
parent_hash: None,
sender: None,
}),
)
}
Sealing::Manual => {
let (sink, stream) = futures::channel::mpsc::channel(1000);
// Keep a reference to the other end of the channel. It goes to the RPC.
command_sink = Some(sink);
Box::new(stream)
}
Sealing::Interval(millis) => Box::new(StreamExt::map(
Timer::interval(Duration::from_millis(millis)),
|_| EngineCommand::SealNewBlock {
create_empty: true,
finalize: false,
parent_hash: None,
sender: None,
},
)),
};

task_manager.spawn_essential_handle().spawn_blocking(
"authorship_task",
run_manual_seal(ManualSealParams {
block_import,
env,
client: client.clone(),
pool: transaction_pool.pool().clone(),
commands_stream,
select_chain,
consensus_data_provider: None,
inherent_data_providers,
}),
);
}

let rpc_extensions_builder = {
let client = client.clone();
Expand All @@ -161,7 +221,7 @@ pub fn new_full(
is_authority,
network: network.clone(),
pending_transactions: pending.clone(),
command_sink: Some(command_sink.clone()),
command_sink: command_sink.clone(),
};
crate::rpc::create_full(deps, subscription_task_executor.clone())
})
Expand All @@ -186,7 +246,6 @@ pub fn new_full(
// Spawn Frontier pending transactions maintenance task (as essential, otherwise we leak).
if pending_transactions.is_some() {
use fp_consensus::{ConsensusLog, FRONTIER_ENGINE_ID};
use futures::StreamExt;
use sp_runtime::generic::OpaqueDigestItemId;

const TRANSACTION_RETAIN_THRESHOLD: u64 = 5;
Expand Down Expand Up @@ -230,32 +289,6 @@ pub fn new_full(
);
}

if role.is_authority() {
let env = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
prometheus_registry.as_ref(),
);

// Background authorship future
let authorship_future = manual_seal::run_manual_seal(manual_seal::ManualSealParams {
block_import,
env,
client,
pool: transaction_pool.pool().clone(),
commands_stream,
select_chain,
consensus_data_provider: None,
inherent_data_providers,
});

// we spawn the future on a background thread managed by service.
task_manager
.spawn_essential_handle()
.spawn_blocking("manual-seal", authorship_future);
}

log::info!("Development Service Ready");

network_starter.start_network();
Expand Down
1 change: 1 addition & 0 deletions tests/tests/util/testWithMoonbeam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export async function startMoonbeamNode(
`--no-telemetry`,
`--no-prometheus`,
`--dev`,
`--sealing=manual`,
`-l${MOONBEAM_LOG}`,
`--port=${PORT}`,
`--rpc-port=${RPC_PORT}`,
Expand Down

0 comments on commit e018d42

Please sign in to comment.