Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collator overseer builder unification #3335

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ polkadot-node-collation-generation = { path = "../../../polkadot/node/collation-
polkadot-node-core-runtime-api = { path = "../../../polkadot/node/core/runtime-api" }
polkadot-node-core-chain-api = { path = "../../../polkadot/node/core/chain-api" }
polkadot-node-core-prospective-parachains = { path = "../../../polkadot/node/core/prospective-parachains" }
polkadot-service = { path = "../../../polkadot/node/service" }

# substrate deps
sc-authority-discovery = { path = "../../../substrate/client/authority-discovery" }
Expand Down
144 changes: 7 additions & 137 deletions cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,167 +15,37 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::{select, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
use polkadot_network_bridge::{
Metrics as NetworkBridgeMetrics, NetworkBridgeRx as NetworkBridgeRxSubsystem,
NetworkBridgeTx as NetworkBridgeTxSubsystem,
};
use polkadot_node_collation_generation::CollationGenerationSubsystem;
use polkadot_node_core_chain_api::ChainApiSubsystem;
use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1::{self, AvailableDataFetchingRequest},
v2, IncomingRequestReceiver, ReqProtocolNames,
},
};
use polkadot_node_subsystem_util::metrics::{prometheus::Registry, Metrics};
use polkadot_overseer::{
BlockInfo, DummySubsystem, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
UnpinHandle,
BlockInfo, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, UnpinHandle,
};
use polkadot_primitives::CollatorPair;
use polkadot_service::overseer::{collator_overseer_builder, OverseerGenArgs};

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::{NetworkStateInfo, NotificationService};
use sc_service::TaskManager;
use sc_utils::mpsc::tracing_unbounded;

use cumulus_primitives_core::relay_chain::{Block, Hash as PHash};
use cumulus_relay_chain_interface::RelayChainError;

use crate::BlockChainRpcClient;

/// Arguments passed for overseer construction.
pub(crate) struct CollatorOverseerGenArgs<'a> {
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
pub runtime_client: Arc<BlockChainRpcClient>,
/// Underlying network service implementation.
pub network_service: Arc<sc_network::NetworkService<Block, PHash>>,
/// Syncing oracle.
pub sync_oracle: Box<dyn sp_consensus::SyncOracle + Send>,
/// Underlying authority discovery service.
pub authority_discovery_service: AuthorityDiscoveryService,
/// Receiver for collation request protocol v1.
pub collation_req_receiver_v1: IncomingRequestReceiver<v1::CollationFetchingRequest>,
/// Receiver for collation request protocol v2.
pub collation_req_receiver_v2: IncomingRequestReceiver<v2::CollationFetchingRequest>,
/// Receiver for availability request protocol
pub available_data_req_receiver: IncomingRequestReceiver<AvailableDataFetchingRequest>,
/// Prometheus registry, commonly used for production systems, less so for test.
pub registry: Option<&'a Registry>,
/// Task spawner to be used throughout the overseer and the APIs it provides.
pub spawner: sc_service::SpawnTaskHandle,
/// Determines the behavior of the collator.
pub collator_pair: CollatorPair,
/// Request response protocols
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
/// Notification services for validation/collation protocols.
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
}

fn build_overseer(
connector: OverseerConnector,
CollatorOverseerGenArgs {
runtime_client,
network_service,
sync_oracle,
authority_discovery_service,
collation_req_receiver_v1,
collation_req_receiver_v2,
available_data_req_receiver,
registry,
spawner,
collator_pair,
req_protocol_names,
peer_set_protocol_names,
notification_services,
}: CollatorOverseerGenArgs<'_>,
args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
) -> Result<
(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
RelayChainError,
> {
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));

let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
available_data_req_receiver,
Metrics::register(registry)?,
))
.availability_store(DummySubsystem)
.bitfield_distribution(DummySubsystem)
.bitfield_signing(DummySubsystem)
.candidate_backing(DummySubsystem)
.candidate_validation(DummySubsystem)
.pvf_checker(DummySubsystem)
.chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = ProtocolSide::Collator {
peer_id: network_service.local_peer_id(),
collator_pair,
request_receiver_v1: collation_req_receiver_v1,
request_receiver_v2: collation_req_receiver_v2,
metrics: Metrics::register(registry)?,
};
CollatorProtocolSubsystem::new(side)
})
.network_bridge_rx(NetworkBridgeRxSubsystem::new(
network_service.clone(),
authority_discovery_service.clone(),
sync_oracle,
network_bridge_metrics.clone(),
peer_set_protocol_names.clone(),
notification_services,
notification_sinks.clone(),
))
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service,
authority_discovery_service,
network_bridge_metrics,
req_protocol_names,
peer_set_protocol_names,
notification_sinks,
))
.provisioner(DummySubsystem)
.runtime_api(RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
))
.statement_distribution(DummySubsystem)
.prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?))
.approval_distribution(DummySubsystem)
.approval_voting(DummySubsystem)
.gossip_support(DummySubsystem)
.dispute_coordinator(DummySubsystem)
.dispute_distribution(DummySubsystem)
.chain_selection(DummySubsystem)
.activation_external_listeners(Default::default())
.span_per_active_leaf(Default::default())
.active_leaves(Default::default())
.supports_parachains(runtime_client)
.metrics(Metrics::register(registry)?)
.spawner(spawner);
let builder =
collator_overseer_builder(args).map_err(|e| RelayChainError::Application(e.into()))?;

builder
.build_with_connector(connector)
.map_err(|e| RelayChainError::Application(e.into()))
}

pub(crate) fn spawn_overseer(
overseer_args: CollatorOverseerGenArgs,
overseer_args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
task_manager: &TaskManager,
relay_chain_rpc_client: Arc<BlockChainRpcClient>,
) -> Result<polkadot_overseer::Handle, RelayChainError> {
Expand Down
30 changes: 16 additions & 14 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use collator_overseer::{CollatorOverseerGenArgs, NewMinimalNode};
use collator_overseer::NewMinimalNode;

use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterface, Url};
Expand All @@ -29,6 +29,7 @@ use polkadot_node_network_protocol::{

use polkadot_node_subsystem_util::metrics::prometheus::Registry;
use polkadot_primitives::CollatorPair;
use polkadot_service::{overseer::OverseerGenArgs, IsParachainNode};

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::{config::FullNetworkConfiguration, Event, NetworkEventStream, NetworkService};
Expand Down Expand Up @@ -172,10 +173,10 @@ async fn new_minimal_relay_chain(
}

let genesis_hash = relay_chain_rpc_client.block_get_hash(Some(0)).await?.unwrap_or_default();
let peer_set_protocol_names =
let peerset_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
let notification_services = peer_sets_info(is_authority, &peerset_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
Expand All @@ -184,14 +185,14 @@ async fn new_minimal_relay_chain(
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver) =
let (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver) =
build_request_response_protocol_receivers(&request_protocol_names, &mut net_config);

let best_header = relay_chain_rpc_client
.chain_get_header(None)
.await?
.ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
let (network, network_starter, sync_oracle) = build_collator_network(
let (network, network_starter, sync_service) = build_collator_network(
&config,
net_config,
task_manager.spawn_handle(),
Expand All @@ -208,19 +209,20 @@ async fn new_minimal_relay_chain(
prometheus_registry.cloned(),
);

let overseer_args = CollatorOverseerGenArgs {
let overseer_args = OverseerGenArgs {
runtime_client: relay_chain_rpc_client.clone(),
network_service: network,
sync_oracle,
sync_service,
authority_discovery_service,
collation_req_receiver_v1,
collation_req_receiver_v2,
collation_req_v1_receiver,
collation_req_v2_receiver,
available_data_req_receiver,
registry: prometheus_registry,
spawner: task_manager.spawn_handle(),
collator_pair,
is_parachain_node: IsParachainNode::Collator(collator_pair),
overseer_message_channel_capacity_override: None,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
peerset_protocol_names,
notification_services,
};

Expand All @@ -240,16 +242,16 @@ fn build_request_response_protocol_receivers(
IncomingRequestReceiver<v2::CollationFetchingRequest>,
IncomingRequestReceiver<v1::AvailableDataFetchingRequest>,
) {
let (collation_req_receiver_v1, cfg) =
let (collation_req_v1_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let (collation_req_receiver_v2, cfg) =
let (collation_req_v2_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let (available_data_req_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.add_request_response_protocol(cfg);
let cfg = Protocol::ChunkFetchingV1.get_outbound_only_config(request_protocol_names);
config.add_request_response_protocol(cfg);
(collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver)
(collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver)
}
8 changes: 6 additions & 2 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ pub(crate) fn build_collator_network(
genesis_hash: Hash,
best_header: Header,
) -> Result<
(Arc<NetworkService<Block, Hash>>, NetworkStarter, Box<dyn sp_consensus::SyncOracle + Send>),
(
Arc<NetworkService<Block, Hash>>,
NetworkStarter,
Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
),
Error,
> {
let protocol_id = config.protocol_id();
Expand Down Expand Up @@ -112,7 +116,7 @@ pub(crate) fn build_collator_network(

let network_starter = NetworkStarter::new(network_start_tx);

Ok((network_service, network_starter, Box::new(SyncOracle {})))
Ok((network_service, network_starter, Arc::new(SyncOracle {})))
}

fn adjust_network_config_light_in_peers(config: &mut NetworkConfiguration) {
Expand Down
15 changes: 5 additions & 10 deletions polkadot/node/malus/src/variants/back_garbage_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

use polkadot_cli::{
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, ExtendedOverseerGenArgs,
HeaderBackend, Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle,
ParachainHost, ProvideRuntimeApi,
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_types::DefaultSubsystemClient;
use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient};
use sp_core::traits::SpawnNamed;

use crate::{
Expand Down Expand Up @@ -63,13 +62,9 @@ impl OverseerGen for BackGarbageCandidates {
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
let spawner = args.spawner.clone();
Expand Down
4 changes: 0 additions & 4 deletions polkadot/node/malus/src/variants/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ use crate::{

use polkadot_node_core_candidate_validation::find_validation_data;
use polkadot_node_primitives::{InvalidCandidate, ValidationResult};
use polkadot_node_subsystem::{
messages::{CandidateValidationMessage, ValidationFailed},
overseer,
};

use polkadot_primitives::{
CandidateCommitments, CandidateDescriptor, CandidateReceipt, PersistedValidationData,
Expand Down
17 changes: 6 additions & 11 deletions polkadot/node/malus/src/variants/dispute_finalized_candidates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@
use futures::channel::oneshot;
use polkadot_cli::{
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, ExtendedOverseerGenArgs,
HeaderBackend, Overseer, OverseerConnector, OverseerGen, OverseerGenArgs, OverseerHandle,
ParachainHost, ProvideRuntimeApi,
AuxStore, Error, ExtendedOverseerGenArgs, Overseer, OverseerConnector, OverseerGen,
OverseerGenArgs, OverseerHandle,
},
validator_overseer_builder, Cli,
};
use polkadot_node_subsystem::{messages::ApprovalVotingMessage, SpawnGlue};
use polkadot_node_subsystem_types::{DefaultSubsystemClient, OverseerSignal};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_types::{ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient};
use polkadot_node_subsystem_util::request_candidate_events;
use polkadot_primitives::CandidateEvent;
use sp_core::traits::SpawnNamed;
Expand Down Expand Up @@ -237,13 +236,9 @@ impl OverseerGen for DisputeFinalizedCandidates {
connector: OverseerConnector,
args: OverseerGenArgs<'_, Spawner, RuntimeClient>,
ext_args: Option<ExtendedOverseerGenArgs>,
) -> Result<
(Overseer<SpawnGlue<Spawner>, Arc<DefaultSubsystemClient<RuntimeClient>>>, OverseerHandle),
Error,
>
) -> Result<(Overseer<SpawnGlue<Spawner>, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
RuntimeClient: RuntimeApiSubsystemClient + ChainApiBackend + AuxStore + 'static,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
gum::info!(
Expand Down
Loading
Loading