Skip to content

Commit

Permalink
collator protocol changes for elastic scaling (validator side) (parit…
Browse files Browse the repository at this point in the history
…ytech#3302)

Fixes paritytech#3128.

This introduces a new variant for the collation response from the
collator that includes the parent head data. For now, collators won't
send this new variant. We'll need to change the collator side of the
collator protocol to detect all the cores assigned to a para and send
the parent head data in the case when it's more than 1 core.

- [x] validate approach
- [x] check head data hash
  • Loading branch information
ordian authored and dharjeezy committed Mar 24, 2024
1 parent b817603 commit 08003bb
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 133 deletions.
8 changes: 5 additions & 3 deletions polkadot/node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ async fn construct_and_distribute_receipt(
} = collation;

let persisted_validation_data_hash = validation_data.hash();
let parent_head_data = validation_data.parent_head.clone();
let parent_head_data_hash = validation_data.parent_head.hash();

// Apply compression to the block data.
Expand Down Expand Up @@ -551,12 +552,13 @@ async fn construct_and_distribute_receipt(
metrics.on_collation_generated();

sender
.send_message(CollatorProtocolMessage::DistributeCollation(
ccr,
.send_message(CollatorProtocolMessage::DistributeCollation {
candidate_receipt: ccr,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
))
})
.await;
}

Expand Down
33 changes: 17 additions & 16 deletions polkadot/node/collation-generation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ fn sends_distribute_collation_message() {

assert_eq!(to_collator_protocol.len(), 1);
match AllMessages::from(to_collator_protocol.pop().unwrap()) {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
// descriptor has a valid signature, then just copy in the generated signature
Expand Down Expand Up @@ -529,11 +529,11 @@ fn fallback_when_no_validation_code_hash_api() {

assert_eq!(to_collator_protocol.len(), 1);
match &to_collator_protocol[0] {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..,
)) => {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
..
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(expect_validation_code_hash, descriptor.validation_code_hash);
},
_ => panic!("received wrong message type"),
Expand Down Expand Up @@ -619,15 +619,16 @@ fn submit_collation_leads_to_distribution() {

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
ccr,
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation {
candidate_receipt,
parent_head_data_hash,
..
)) => {
}) => {
let CandidateReceipt { descriptor, .. } = candidate_receipt;
assert_eq!(parent_head_data_hash, parent_head.hash());
assert_eq!(ccr.descriptor().persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(ccr.descriptor().para_head, dummy_head_data().hash());
assert_eq!(ccr.descriptor().validation_code_hash, validation_code_hash);
assert_eq!(descriptor.persisted_validation_data_hash, expected_pvd.hash());
assert_eq!(descriptor.para_head, dummy_head_data().hash());
assert_eq!(descriptor.validation_code_hash, validation_code_hash);
}
);

Expand Down
17 changes: 12 additions & 5 deletions polkadot/node/core/prospective-parachains/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ use futures::{channel::oneshot, prelude::*};
use polkadot_node_subsystem::{
messages::{
Ancestors, ChainApiMessage, FragmentTreeMembership, HypotheticalCandidate,
HypotheticalFrontierRequest, IntroduceCandidateRequest, ProspectiveParachainsMessage,
ProspectiveValidationDataRequest, RuntimeApiMessage, RuntimeApiRequest,
HypotheticalFrontierRequest, IntroduceCandidateRequest, ParentHeadData,
ProspectiveParachainsMessage, ProspectiveValidationDataRequest, RuntimeApiMessage,
RuntimeApiRequest,
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
};
Expand Down Expand Up @@ -764,8 +765,14 @@ fn answer_prospective_validation_data_request(
Some(s) => s,
};

let mut head_data =
storage.head_data_by_hash(&request.parent_head_data_hash).map(|x| x.clone());
let (mut head_data, parent_head_data_hash) = match request.parent_head_data {
ParentHeadData::OnlyHash(parent_head_data_hash) => (
storage.head_data_by_hash(&parent_head_data_hash).map(|x| x.clone()),
parent_head_data_hash,
),
ParentHeadData::WithData { head_data, hash } => (Some(head_data), hash),
};

let mut relay_parent_info = None;
let mut max_pov_size = None;

Expand All @@ -783,7 +790,7 @@ fn answer_prospective_validation_data_request(
}
if head_data.is_none() {
let required_parent = &fragment_tree.scope().base_constraints().required_parent;
if required_parent.hash() == request.parent_head_data_hash {
if required_parent.hash() == parent_head_data_hash {
head_data = Some(required_parent.clone());
}
}
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/core/prospective-parachains/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use assert_matches::assert_matches;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
AllMessages, HypotheticalFrontierRequest, ProspectiveParachainsMessage,
AllMessages, HypotheticalFrontierRequest, ParentHeadData, ProspectiveParachainsMessage,
ProspectiveValidationDataRequest,
},
};
Expand Down Expand Up @@ -468,7 +468,7 @@ async fn get_pvd(
let request = ProspectiveValidationDataRequest {
para_id,
candidate_relay_parent,
parent_head_data_hash: parent_head_data.hash(),
parent_head_data: ParentHeadData::OnlyHash(parent_head_data.hash()),
};
let (tx, rx) = oneshot::channel();
virtual_overseer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use polkadot_node_network_protocol::{
PeerId,
};
use polkadot_node_primitives::PoV;
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, Id as ParaId};
use polkadot_primitives::{CandidateHash, CandidateReceipt, Hash, HeadData, Id as ParaId};

/// The status of a collation as seen from the collator.
pub enum CollationStatus {
Expand Down Expand Up @@ -63,6 +63,8 @@ pub struct Collation {
pub parent_head_data_hash: Hash,
/// Proof to verify the state transition of the parachain.
pub pov: PoV,
/// Parent head-data needed for elastic scaling.
pub parent_head_data: HeadData,
/// Collation status.
pub status: CollationStatus,
}
Expand Down
65 changes: 47 additions & 18 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState,
GroupIndex, Hash, Id as ParaId, SessionIndex,
GroupIndex, Hash, HeadData, Id as ParaId, SessionIndex,
};

use super::LOG_TARGET;
Expand Down Expand Up @@ -347,6 +347,7 @@ async fn distribute_collation<Context>(
receipt: CandidateReceipt,
parent_head_data_hash: Hash,
pov: PoV,
parent_head_data: HeadData,
result_sender: Option<oneshot::Sender<CollationSecondedSignal>>,
) -> Result<()> {
let candidate_relay_parent = receipt.descriptor.relay_parent;
Expand Down Expand Up @@ -465,7 +466,13 @@ async fn distribute_collation<Context>(

per_relay_parent.collations.insert(
candidate_hash,
Collation { receipt, parent_head_data_hash, pov, status: CollationStatus::Created },
Collation {
receipt,
parent_head_data_hash,
pov,
parent_head_data,
status: CollationStatus::Created,
},
);

// If prospective parachains are disabled, a leaf should be known to peer.
Expand Down Expand Up @@ -763,20 +770,26 @@ async fn process_msg<Context>(
CollateOn(id) => {
state.collating_on = Some(id);
},
DistributeCollation(receipt, parent_head_data_hash, pov, result_sender) => {
DistributeCollation {
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
} => {
let _span1 = state
.span_per_relay_parent
.get(&receipt.descriptor.relay_parent)
.get(&candidate_receipt.descriptor.relay_parent)
.map(|s| s.child("distributing-collation"));
let _span2 = jaeger::Span::new(&pov, "distributing-collation");

match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
Some(id) if candidate_receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
// the one we expect, we ignore the message.
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
Expand All @@ -788,17 +801,18 @@ async fn process_msg<Context>(
runtime,
state,
id,
receipt,
candidate_receipt,
parent_head_data_hash,
pov,
parent_head_data,
result_sender,
)
.await?;
},
None => {
gum::warn!(
target: LOG_TARGET,
para_id = %receipt.descriptor.para_id,
para_id = %candidate_receipt.descriptor.para_id,
"DistributeCollation message while not collating on any",
);
},
Expand Down Expand Up @@ -835,20 +849,30 @@ async fn send_collation(
request: VersionedCollationRequest,
receipt: CandidateReceipt,
pov: PoV,
_parent_head_data: HeadData,
) {
let (tx, rx) = oneshot::channel();

let relay_parent = request.relay_parent();
let peer_id = request.peer_id();
let candidate_hash = receipt.hash();

// The response payload is the same for both versions of protocol
// The response payload is the same for v1 and v2 versions of protocol
// and doesn't have v2 alias for simplicity.
let response = OutgoingResponse {
result: Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
// For now, we don't send parent head data to the collation requester.
let result =
// if assigned_multiple_cores {
// Ok(request_v1::CollationFetchingResponse::CollationWithParentHeadData {
// receipt,
// pov,
// parent_head_data,
// })
// } else {
Ok(request_v1::CollationFetchingResponse::Collation(receipt, pov))
// }
;
let response =
OutgoingResponse { result, reputation_changes: Vec::new(), sent_feedback: Some(tx) };

if let Err(_) = request.send_outgoing_response(response) {
gum::warn!(target: LOG_TARGET, "Sending collation response failed");
Expand Down Expand Up @@ -1027,9 +1051,13 @@ async fn handle_incoming_request<Context>(
return Ok(())
},
};
let (receipt, pov) = if let Some(collation) = collation {
let (receipt, pov, parent_head_data) = if let Some(collation) = collation {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
(
collation.receipt.clone(),
collation.pov.clone(),
collation.parent_head_data.clone(),
)
} else {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1068,7 +1096,7 @@ async fn handle_incoming_request<Context>(
waiting.collation_fetch_active = true;
// Obtain a timer for sending collation
let _ = state.metrics.time_collation_distribution("send");
send_collation(state, req, receipt, pov).await;
send_collation(state, req, receipt, pov, parent_head_data).await;
}
},
Some(our_para_id) => {
Expand Down Expand Up @@ -1453,8 +1481,9 @@ async fn run_inner<Context>(
if let Some(collation) = next_collation {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
let parent_head_data = collation.parent_head_data.clone();

send_collation(&mut state, next, receipt, pov).await;
send_collation(&mut state, next, receipt, pov, parent_head_data).await;
}
},
(candidate_hash, peer_id) = state.advertisement_timeouts.select_next_some() => {
Expand Down
Loading

0 comments on commit 08003bb

Please sign in to comment.