Skip to content

Commit

Permalink
Rework the event system of sc-network (#1370)
Browse files Browse the repository at this point in the history
This commit introduces a new concept called `NotificationService` which
allows Polkadot protocols to communicate with the underlying
notification protocol implementation directly, without routing events
through `NetworkWorker`. This implies that each protocol has its own
service which it uses to communicate with remote peers and that each
`NotificationService` is unique with respect to the underlying
notification protocol, meaning `NotificationService` for the transaction
protocol can only be used to send and receive transaction-related
notifications.

The `NotificationService` concept introduces two additional benefits:
  * allow protocols to start using custom handshakes
  * allow protocols to accept/reject inbound peers

Previously the validation of inbound connections was solely the
responsibility of `ProtocolController`. This caused issues with light
peers and `SyncingEngine` as `ProtocolController` would accept more
peers than `SyncingEngine` could accept which caused peers to have
differing views of their own states. `SyncingEngine` would reject excess
peers but these rejections were not properly communicated to those peers
causing them to assume that they were accepted.

With `NotificationService`, the local handshake is not sent to remote
peer if peer is rejected which allows it to detect that it was rejected.

This commit also deprecates the use of `NetworkEventStream` for all
notification-related events and going forward only DHT events are
provided through `NetworkEventStream`. If protocols wish to follow each
other's events, they must introduce additional abtractions, as is done
for GRANDPA and transactions protocols by following the syncing protocol
through `SyncEventStream`.

Fixes #512
Fixes #514
Fixes #515
Fixes #554
Fixes #556

---
These changes are transferred from
paritytech/substrate#14197 but there are no
functional changes compared to that PR

---------

Co-authored-by: Dmitry Markin <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
  • Loading branch information
3 people committed Nov 28, 2023
1 parent ec3a61e commit e71c484
Show file tree
Hide file tree
Showing 102 changed files with 5,628 additions and 2,537 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/relay-chain-minimal-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ array-bytes = "6.1"
tracing = "0.1.37"
async-trait = "0.1.73"
futures = "0.3.28"
parking_lot = "0.12.1"

15 changes: 12 additions & 3 deletions cumulus/client/relay-chain-minimal-node/src/collator_overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use futures::{select, StreamExt};
use std::sync::Arc;
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};

use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_collator_protocol::{CollatorProtocolSubsystem, ProtocolSide};
Expand All @@ -28,7 +29,7 @@ use polkadot_node_core_chain_api::ChainApiSubsystem;
use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem;
use polkadot_node_core_runtime_api::RuntimeApiSubsystem;
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1::{self, AvailableDataFetchingRequest},
v2, IncomingRequestReceiver, ReqProtocolNames,
Expand All @@ -42,7 +43,7 @@ use polkadot_overseer::{
use polkadot_primitives::CollatorPair;

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::NetworkStateInfo;
use sc_network::{NetworkStateInfo, NotificationService};
use sc_service::TaskManager;
use sc_utils::mpsc::tracing_unbounded;

Expand Down Expand Up @@ -77,6 +78,8 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
/// Notification services for validation/collation protocols.
pub notification_services: HashMap<PeerSet, Box<dyn NotificationService>>,
}

fn build_overseer(
Expand All @@ -94,13 +97,16 @@ fn build_overseer(
collator_pair,
req_protocol_names,
peer_set_protocol_names,
notification_services,
}: CollatorOverseerGenArgs<'_>,
) -> Result<
(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
RelayChainError,
> {
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let notification_sinks = Arc::new(Mutex::new(HashMap::new()));

let builder = Overseer::builder()
.availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::for_collator(
Expand Down Expand Up @@ -131,13 +137,16 @@ fn build_overseer(
sync_oracle,
network_bridge_metrics.clone(),
peer_set_protocol_names.clone(),
notification_services,
notification_sinks.clone(),
))
.network_bridge_tx(NetworkBridgeTxSubsystem::new(
network_service,
authority_discovery_service,
network_bridge_metrics,
req_protocol_names,
peer_set_protocol_names,
notification_sinks,
))
.provisioner(DummySubsystem)
.runtime_api(RuntimeApiSubsystem::new(
Expand Down
14 changes: 9 additions & 5 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterf
use network::build_collator_network;
use polkadot_network_bridge::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
peer_set::{PeerSet, PeerSetProtocolNames},
request_response::{
v1, v2, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
},
Expand Down Expand Up @@ -175,10 +175,13 @@ async fn new_minimal_relay_chain(
let peer_set_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };

for config in peer_sets_info(is_authority, &peer_set_protocol_names) {
net_config.add_notification_protocol(config);
}
let notification_services = peer_sets_info(is_authority, &peer_set_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
(peerset, service)
})
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_receiver_v1, collation_req_receiver_v2, available_data_req_receiver) =
Expand Down Expand Up @@ -218,6 +221,7 @@ async fn new_minimal_relay_chain(
collator_pair,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
notification_services,
};

let overseer_handle =
Expand Down
25 changes: 10 additions & 15 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use sc_network::{
NetworkService,
};

use sc_network::config::FullNetworkConfiguration;
use sc_network::{config::FullNetworkConfiguration, NotificationService};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
use sc_utils::mpsc::tracing_unbounded;

use std::{iter, sync::Arc};

Expand All @@ -45,7 +44,7 @@ pub(crate) fn build_collator_network(
Error,
> {
let protocol_id = config.protocol_id();
let block_announce_config = get_block_announce_proto_config::<Block>(
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Block>(
protocol_id.clone(),
&None,
Roles::from(&config.role),
Expand All @@ -69,8 +68,6 @@ pub(crate) fn build_collator_network(
let peer_store_handle = peer_store.handle();
spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());

// RX is not used for anything because syncing is not started for the minimal node
let (tx, _rx) = tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let network_params = sc_network::config::Params::<Block> {
role: config.role.clone(),
executor: {
Expand All @@ -86,7 +83,6 @@ pub(crate) fn build_collator_network(
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
};

let network_worker = sc_network::NetworkWorker::new(network_params)?;
Expand Down Expand Up @@ -150,7 +146,7 @@ fn get_block_announce_proto_config<B: BlockT>(
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> NonDefaultSetConfig {
) -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
if let Some(ref fork_id) = fork_id {
Expand All @@ -160,24 +156,23 @@ fn get_block_announce_proto_config<B: BlockT>(
}
};

NonDefaultSetConfig {
notifications_protocol: block_announces_protocol.into(),
fallback_names: iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into())
.collect(),
max_notification_size: 1024 * 1024,
handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
NonDefaultSetConfig::new(
block_announces_protocol.into(),
iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1024 * 1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
roles,
best_number,
best_hash,
genesis_hash,
))),
// NOTE: `set_config` will be ignored by `protocol.rs` as the block announcement
// protocol is still hardcoded into the peerset.
set_config: SetConfig {
SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
}
)
}
1 change: 1 addition & 0 deletions polkadot/node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(crate) enum WireMessage<M> {
ViewUpdate(View),
}

#[derive(Debug)]
pub(crate) struct PeerData {
/// The Latest view sent by the peer.
view: View,
Expand Down
Loading

0 comments on commit e71c484

Please sign in to comment.