Skip to content

Commit

Permalink
Fix core sharing and make use of scheduling_lookahead (#4724)
Browse files Browse the repository at this point in the history
Implements most of
#1797

Core sharing (two parachains or more marachains scheduled on the same
core with the same `PartsOf57600` value) was not working correctly. The
expected behaviour is to have Backed and Included event in each block
for the paras sharing the core and the paras should take turns. E.g. for
two cores we expect: Backed(a); Included(a)+Backed(b);
Included(b)+Backed(a); etc. Instead of this each block contains just one
event and there are a lot of gaps (blocks w/o events) during the
session.

Core sharing should also work when collators are building collations
ahead of time

TODOs:

- [x] Add a zombienet test verifying that the behaviour mentioned above
works.
- [x] prdoc

---------

Co-authored-by: alindima <[email protected]>
  • Loading branch information
tdimitrov and alindima committed Jun 19, 2024
1 parent 6daa939 commit 739c37b
Show file tree
Hide file tree
Showing 28 changed files with 675 additions and 341 deletions.
17 changes: 17 additions & 0 deletions .gitlab/pipeline/zombienet/polkadot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ zombienet-polkadot-elastic-scaling-0001-basic-3cores-6s-blocks:
- .zombienet-polkadot-common
variables:
FORCED_INFRA_INSTANCE: "spot-iops"
before_script:
- !reference [.zombienet-polkadot-common, before_script]
- cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/elastic_scaling
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/elastic_scaling"
Expand All @@ -170,6 +173,9 @@ zombienet-polkadot-elastic-scaling-0001-basic-3cores-6s-blocks:
zombienet-polkadot-elastic-scaling-0002-elastic-scaling-doesnt-break-parachains:
extends:
- .zombienet-polkadot-common
before_script:
- !reference [.zombienet-polkadot-common, before_script]
- cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/elastic_scaling
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/elastic_scaling"
Expand Down Expand Up @@ -199,6 +205,17 @@ zombienet-polkadot-functional-0014-chunk-fetching-network-compatibility:
--local-dir="${LOCAL_DIR}/functional"
--test="0014-chunk-fetching-network-compatibility.zndsl"

zombienet-polkadot-functional-0015-coretime-shared-core:
extends:
- .zombienet-polkadot-common
before_script:
- !reference [.zombienet-polkadot-common, before_script]
- cp --remove-destination ${LOCAL_DIR}/assign-core.js ${LOCAL_DIR}/functional
script:
- /home/nonroot/zombie-net/scripts/ci/run-test-local-env-manager.sh
--local-dir="${LOCAL_DIR}/functional"
--test="0015-coretime-shared-core.zndsl"

zombienet-polkadot-smoke-0001-parachains-smoke-test:
extends:
- .zombienet-polkadot-common
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 43 additions & 45 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ use polkadot_node_subsystem_util::{
runtime::{
self, prospective_parachains_mode, request_min_backing_votes, ProspectiveParachainsMode,
},
vstaging::{fetch_claim_queue, ClaimQueueSnapshot},
Validator,
};
use polkadot_primitives::{
Expand Down Expand Up @@ -212,8 +213,6 @@ struct PerRelayParentState {
parent: Hash,
/// Session index.
session_index: SessionIndex,
/// The `ParaId` assigned to the local validator at this relay parent.
assigned_para: Option<ParaId>,
/// The `CoreIndex` assigned to the local validator at this relay parent.
assigned_core: Option<CoreIndex>,
/// The candidates that are backed by enough validators in their group, by hash.
Expand All @@ -233,8 +232,11 @@ struct PerRelayParentState {
/// If true, we're appending extra bits in the BackedCandidate validator indices bitfield,
/// which represent the assigned core index. True if ElasticScalingMVP is enabled.
inject_core_index: bool,
/// The core states for all cores.
cores: Vec<CoreState>,
/// The number of cores.
n_cores: u32,
/// Claim queue state. If the runtime API is not available, it'll be populated with info from
/// availability cores.
claim_queue: ClaimQueueSnapshot,
/// The validator index -> group mapping at this relay parent.
validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
/// The associated group rotation information.
Expand Down Expand Up @@ -1004,20 +1006,19 @@ macro_rules! try_runtime_api {
fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
cores: &[CoreState],
n_cores: u32,
claim_queue: &ClaimQueueSnapshot,
statement: &SignedFullStatementWithPVD,
) -> Option<CoreIndex> {
let compact_statement = statement.as_unchecked();
let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());

let n_cores = cores.len();

gum::trace!(
target:LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len(),
n_cores,
?candidate_hash,
"Extracting core index from statement"
);
Expand All @@ -1029,7 +1030,7 @@ fn core_index_from_statement(
?group_rotation_info,
?statement,
?validator_to_group,
n_cores = ?cores.len() ,
n_cores,
?candidate_hash,
"Invalid validator index: {:?}",
statement_validator_index
Expand All @@ -1038,37 +1039,25 @@ fn core_index_from_statement(
};

// First check if the statement para id matches the core assignment.
let core_index = group_rotation_info.core_for_group(*group_index, n_cores);
let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);

if core_index.0 as usize > n_cores {
if core_index.0 > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}

if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id;
let assigned_para_id = match &cores[core_index.0 as usize] {
CoreState::Free => {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Invalid CoreIndex, core is not assigned to any para_id");
return None
},
CoreState::Occupied(occupied) =>
if let Some(next) = &occupied.next_up_on_available {
next.para_id
} else {
return None
},
CoreState::Scheduled(scheduled) => scheduled.para_id,
};
let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);

if assigned_para_id != candidate_para_id {
if !assigned_paras.any(|id| id == &candidate_para_id) {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?core_index,
?assigned_para_id,
assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
?candidate_para_id,
"Invalid CoreIndex, core is assigned to a different para_id"
"Invalid CoreIndex, core is not assigned to this para_id"
);
return None
}
Expand Down Expand Up @@ -1129,6 +1118,8 @@ async fn construct_per_relay_parent_state<Context>(
Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
})?;

let maybe_claim_queue = try_runtime_api!(fetch_claim_queue(ctx.sender(), parent).await);

let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
&validators,
Expand All @@ -1153,31 +1144,35 @@ async fn construct_per_relay_parent_state<Context>(

let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
let mut assigned_core = None;
let mut assigned_para = None;

let has_claim_queue = maybe_claim_queue.is_some();
let mut claim_queue = maybe_claim_queue.unwrap_or_default().0;

for (idx, core) in cores.iter().enumerate() {
let core_para_id = match core {
CoreState::Scheduled(scheduled) => scheduled.para_id,
CoreState::Occupied(occupied) =>
if mode.is_enabled() {
let core_index = CoreIndex(idx as _);

if !has_claim_queue {
match core {
CoreState::Scheduled(scheduled) =>
claim_queue.insert(core_index, [scheduled.para_id].into_iter().collect()),
CoreState::Occupied(occupied) if mode.is_enabled() => {
// Async backing makes it legal to build on top of
// occupied core.
if let Some(next) = &occupied.next_up_on_available {
next.para_id
claim_queue.insert(core_index, [next.para_id].into_iter().collect())
} else {
continue
}
} else {
continue
},
CoreState::Free => continue,
};
_ => continue,
};
} else if !claim_queue.contains_key(&core_index) {
continue
}

let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
assigned_para = Some(core_para_id);
assigned_core = Some(core_index);
}
groups.insert(core_index, g.clone());
Expand Down Expand Up @@ -1212,7 +1207,6 @@ async fn construct_per_relay_parent_state<Context>(
parent,
session_index,
assigned_core,
assigned_para,
backed: HashSet::new(),
table: Table::new(table_config),
table_context,
Expand All @@ -1221,7 +1215,8 @@ async fn construct_per_relay_parent_state<Context>(
fallbacks: HashMap::new(),
minimum_backing_votes,
inject_core_index,
cores,
n_cores: cores.len() as u32,
claim_queue: ClaimQueueSnapshot::from(claim_queue),
validator_to_group: validator_to_group.clone(),
group_rotation_info,
}))
Expand Down Expand Up @@ -1674,7 +1669,8 @@ async fn import_statement<Context>(
let core = core_index_from_statement(
&rp_state.validator_to_group,
&rp_state.group_rotation_info,
&rp_state.cores,
rp_state.n_cores,
&rp_state.claim_queue,
statement,
)
.ok_or(Error::CoreIndexUnavailable)?;
Expand Down Expand Up @@ -2098,12 +2094,14 @@ async fn handle_second_message<Context>(
return Ok(())
}

let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));

// Sanity check that candidate is from our assignment.
if Some(candidate.descriptor().para_id) != rp_state.assigned_para {
if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id)) {
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_para = ?rp_state.assigned_para,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id,
"Subsystem asked to second for para outside of our assignment",
);
Expand All @@ -2113,7 +2111,7 @@ async fn handle_second_message<Context>(
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_para = ?rp_state.assigned_para,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id,
"Current assignments vs collation",
);
Expand Down
45 changes: 41 additions & 4 deletions polkadot/node/core/backing/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use sp_application_crypto::AppCrypto;
use sp_keyring::Sr25519Keyring;
use sp_keystore::Keystore;
use sp_tracing as _;
use std::{collections::HashMap, time::Duration};
use std::{
collections::{BTreeMap, HashMap, VecDeque},
time::Duration,
};

mod prospective_parachains;

Expand Down Expand Up @@ -75,6 +78,7 @@ pub(crate) struct TestState {
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
validator_to_group: IndexedVec<ValidatorIndex, Option<GroupIndex>>,
availability_cores: Vec<CoreState>,
claim_queue: BTreeMap<CoreIndex, VecDeque<ParaId>>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
relay_parent: Hash,
Expand Down Expand Up @@ -130,6 +134,10 @@ impl Default for TestState {
CoreState::Scheduled(ScheduledCore { para_id: chain_b, collator: None }),
];

let mut claim_queue = BTreeMap::new();
claim_queue.insert(CoreIndex(0), [chain_a].into_iter().collect());
claim_queue.insert(CoreIndex(1), [chain_b].into_iter().collect());

let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
head_data.insert(chain_b, HeadData(vec![5, 6, 7]));
Expand All @@ -153,6 +161,7 @@ impl Default for TestState {
validator_groups: (validator_groups, group_rotation_info),
validator_to_group,
availability_cores,
claim_queue,
head_data,
validation_data,
signing_context,
Expand Down Expand Up @@ -338,6 +347,26 @@ async fn test_startup(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
tx.send(Ok(test_state.disabled_validators.clone())).unwrap();
}
);

assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Version(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(RuntimeApiRequest::CLAIM_QUEUE_RUNTIME_REQUIREMENT)).unwrap();
}
);

assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ClaimQueue(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(
test_state.claim_queue.clone()
)).unwrap();
}
);
}

async fn assert_validation_requests(
Expand Down Expand Up @@ -730,11 +759,16 @@ fn get_backed_candidate_preserves_order() {
// Assign the second core to the same para as the first one.
test_state.availability_cores[1] =
CoreState::Scheduled(ScheduledCore { para_id: test_state.chain_ids[0], collator: None });
*test_state.claim_queue.get_mut(&CoreIndex(1)).unwrap() =
[test_state.chain_ids[0]].into_iter().collect();
// Add another availability core for paraid 2.
test_state.availability_cores.push(CoreState::Scheduled(ScheduledCore {
para_id: test_state.chain_ids[1],
collator: None,
}));
test_state
.claim_queue
.insert(CoreIndex(2), [test_state.chain_ids[1]].into_iter().collect());

test_harness(test_state.keystore.clone(), |mut virtual_overseer| async move {
test_startup(&mut virtual_overseer, &test_state).await;
Expand Down Expand Up @@ -1103,7 +1137,8 @@ fn extract_core_index_from_statement_works() {
let core_index_1 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_1,
)
.unwrap();
Expand All @@ -1113,7 +1148,8 @@ fn extract_core_index_from_statement_works() {
let core_index_2 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_2,
);

Expand All @@ -1123,7 +1159,8 @@ fn extract_core_index_from_statement_works() {
let core_index_3 = core_index_from_statement(
&test_state.validator_to_group,
&test_state.validator_groups.1,
&test_state.availability_cores,
test_state.availability_cores.len() as _,
&test_state.claim_queue.clone().into(),
&signed_statement_3,
)
.unwrap();
Expand Down
Loading

0 comments on commit 739c37b

Please sign in to comment.