Skip to content

Commit

Permalink
statement-distribution: prep for re-enabling (#4431)
Browse files Browse the repository at this point in the history
In preparation for launching re-enabling
(#2418), we need to
adjust the disabling strategy of statement-distribution to use the relay
parent's state instead of the latest state (union of active leaves).
This will also ensure no raciness of getting the latest state vs
accepting statements from disabling validators at the cost of being more
lenient/potentially accepting more statements from disabled validators.

- [x] PRDoc
  • Loading branch information
ordian authored Jun 5, 2024
1 parent d129968 commit 0d661ea
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 510 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ use polkadot_primitives::{CandidateHash, CompactStatement, Hash, ValidatorIndex}
use crate::LOG_TARGET;
use std::collections::{HashMap, HashSet};

#[derive(Hash, PartialEq, Eq)]
struct ValidStatementManifest {
remote: ValidatorIndex,
originator: ValidatorIndex,
candidate_hash: CandidateHash,
}

// A piece of knowledge about a candidate
#[derive(Hash, Clone, PartialEq, Eq)]
enum Knowledge {
Expand Down
147 changes: 54 additions & 93 deletions polkadot/node/network/statement-distribution/src/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use futures::{
use std::{
collections::{
hash_map::{Entry, HashMap},
BTreeSet, HashSet,
HashSet,
},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -156,6 +156,7 @@ struct PerRelayParentState {
seconding_limit: usize,
session: SessionIndex,
groups_per_para: HashMap<ParaId, Vec<GroupIndex>>,
disabled_validators: HashSet<ValidatorIndex>,
}

impl PerRelayParentState {
Expand All @@ -166,6 +167,17 @@ impl PerRelayParentState {
fn active_validator_state_mut(&mut self) -> Option<&mut ActiveValidatorState> {
self.local_validator.as_mut().and_then(|local| local.active.as_mut())
}

/// Returns `true` if the given validator is disabled in the context of the relay parent.
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}

/// A convenience function to generate a disabled bitmask for the given backing group.
/// The output bits are set to `true` for validators that are disabled.
pub fn disabled_bitmask(&self, group: &[ValidatorIndex]) -> BitVec<u8, Lsb0> {
BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)))
}
}

// per-relay-parent local validator state.
Expand Down Expand Up @@ -206,8 +218,6 @@ struct PerSessionState {
// getting the topology from the gossip-support subsystem
grid_view: Option<grid::SessionTopologyView>,
local_validator: Option<LocalValidatorIndex>,
// We store the latest state here based on union of leaves.
disabled_validators: BTreeSet<ValidatorIndex>,
}

impl PerSessionState {
Expand All @@ -224,16 +234,7 @@ impl PerSessionState {
)
.map(|(_, index)| LocalValidatorIndex::Active(index));

let disabled_validators = BTreeSet::new();

PerSessionState {
session_info,
groups,
authority_lookup,
grid_view: None,
local_validator,
disabled_validators,
}
PerSessionState { session_info, groups, authority_lookup, grid_view: None, local_validator }
}

fn supply_topology(
Expand Down Expand Up @@ -269,33 +270,6 @@ impl PerSessionState {
fn is_not_validator(&self) -> bool {
self.grid_view.is_some() && self.local_validator.is_none()
}

/// A convenience function to generate a disabled bitmask for the given backing group.
/// The output bits are set to `true` for validators that are disabled.
/// Returns `None` if the group index is out of bounds.
pub fn disabled_bitmask(&self, group: GroupIndex) -> Option<BitVec<u8, Lsb0>> {
let group = self.groups.get(group)?;
let mask = BitVec::from_iter(group.iter().map(|v| self.is_disabled(v)));
Some(mask)
}

/// Returns `true` if the given validator is disabled in the current session.
pub fn is_disabled(&self, validator_index: &ValidatorIndex) -> bool {
self.disabled_validators.contains(validator_index)
}

/// Extend the list of disabled validators.
pub fn extend_disabled_validators(
&mut self,
disabled: impl IntoIterator<Item = ValidatorIndex>,
) {
self.disabled_validators.extend(disabled);
}

/// Clear the list of disabled validators.
pub fn clear_disabled_validators(&mut self) {
self.disabled_validators.clear();
}
}

pub(crate) struct State {
Expand Down Expand Up @@ -582,19 +556,16 @@ pub(crate) async fn handle_active_leaves_update<Context>(
let new_relay_parents =
state.implicit_view.all_allowed_relay_parents().cloned().collect::<Vec<_>>();

// We clear the list of disabled validators to reset it properly based on union of leaves.
let mut cleared_disabled_validators: BTreeSet<SessionIndex> = BTreeSet::new();

for new_relay_parent in new_relay_parents.iter().cloned() {
// Even if we processed this relay parent before, we need to fetch the list of disabled
// validators based on union of active leaves.
let disabled_validators =
let disabled_validators: HashSet<_> =
polkadot_node_subsystem_util::vstaging::get_disabled_validators_with_fallback(
ctx.sender(),
new_relay_parent,
)
.await
.map_err(JfyiError::FetchDisabledValidators)?;
.map_err(JfyiError::FetchDisabledValidators)?
.into_iter()
.collect();

let session_index = polkadot_node_subsystem_util::request_session_index_for_child(
new_relay_parent,
Expand Down Expand Up @@ -644,10 +615,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
.get_mut(&session_index)
.expect("either existed or just inserted; qed");

if cleared_disabled_validators.insert(session_index) {
per_session.clear_disabled_validators();
}

if !disabled_validators.is_empty() {
gum::debug!(
target: LOG_TARGET,
Expand All @@ -656,8 +623,6 @@ pub(crate) async fn handle_active_leaves_update<Context>(
?disabled_validators,
"Disabled validators detected"
);

per_session.extend_disabled_validators(disabled_validators);
}

if state.per_relay_parent.contains_key(&new_relay_parent) {
Expand Down Expand Up @@ -723,6 +688,7 @@ pub(crate) async fn handle_active_leaves_update<Context>(
seconding_limit,
session: session_index,
groups_per_para,
disabled_validators,
},
);
}
Expand Down Expand Up @@ -1581,6 +1547,17 @@ async fn handle_incoming_statement<Context>(
};
let session_info = &per_session.session_info;

if per_relay_parent.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
return
}

let local_validator = match per_relay_parent.local_validator.as_mut() {
None => {
// we shouldn't be receiving statements unless we're a validator
Expand Down Expand Up @@ -1614,17 +1591,6 @@ async fn handle_incoming_statement<Context>(
},
};

if per_session.is_disabled(&statement.unchecked_validator_index()) {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?statement.unchecked_validator_index(),
"Ignoring a statement from disabled validator."
);
modify_reputation(reputation, ctx.sender(), peer, COST_DISABLED_VALIDATOR).await;
return
}

let (active, cluster_sender_index) = {
// This block of code only returns `Some` when both the originator and
// the sending peer are in the cluster.
Expand Down Expand Up @@ -2379,21 +2345,18 @@ async fn handle_incoming_manifest_common<'a, Context>(
Some(s) => s,
};

let local_validator = match relay_parent_state.local_validator.as_mut() {
None => {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
},
Some(x) => x,
};
if relay_parent_state.local_validator.is_none() {
if per_session.is_not_validator() {
modify_reputation(
reputation,
ctx.sender(),
peer,
COST_UNEXPECTED_MANIFEST_MISSING_KNOWLEDGE,
)
.await;
}
return None
}

let Some(expected_groups) = relay_parent_state.groups_per_para.get(&para_id) else {
modify_reputation(reputation, ctx.sender(), peer, COST_MALFORMED_MANIFEST).await;
Expand Down Expand Up @@ -2436,10 +2399,13 @@ async fn handle_incoming_manifest_common<'a, Context>(
let claimed_parent_hash = manifest_summary.claimed_parent_hash;

// Ignore votes from disabled validators when counting towards the threshold.
let disabled_mask = per_session.disabled_bitmask(group_index).unwrap_or_default();
let group = per_session.groups.get(group_index).unwrap_or(&[]);
let disabled_mask = relay_parent_state.disabled_bitmask(group);
manifest_summary.statement_knowledge.mask_seconded(&disabled_mask);
manifest_summary.statement_knowledge.mask_valid(&disabled_mask);

let local_validator = relay_parent_state.local_validator.as_mut().expect("checked above; qed");

let acknowledge = match local_validator.grid_tracker.import_manifest(
grid_topology,
&per_session.groups,
Expand Down Expand Up @@ -3018,9 +2984,7 @@ pub(crate) async fn dispatch_requests<Context>(ctx: &mut Context, state: &mut St
}

// Add disabled validators to the unwanted mask.
let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group existence checked above; qed");
let disabled_mask = relay_parent_state.disabled_bitmask(group);
unwanted_mask.seconded_in_group |= &disabled_mask;
unwanted_mask.validated_in_group |= &disabled_mask;

Expand Down Expand Up @@ -3111,9 +3075,7 @@ pub(crate) async fn handle_response<Context>(
Some(g) => g,
};

let disabled_mask = per_session
.disabled_bitmask(group_index)
.expect("group_index checked above; qed");
let disabled_mask = relay_parent_state.disabled_bitmask(group);

let res = response.validate_response(
&mut state.request_manager,
Expand Down Expand Up @@ -3258,7 +3220,7 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {
Some(s) => s,
};

let local_validator = match relay_parent_state.local_validator.as_mut() {
let local_validator = match relay_parent_state.local_validator.as_ref() {
None => return,
Some(s) => s,
};
Expand Down Expand Up @@ -3332,16 +3294,15 @@ pub(crate) fn answer_request(state: &mut State, message: ResponderMessage) {

// Transform mask with 'OR' semantics into one with 'AND' semantics for the API used
// below.
let mut and_mask = StatementFilter {
let and_mask = StatementFilter {
seconded_in_group: !mask.seconded_in_group.clone(),
validated_in_group: !mask.validated_in_group.clone(),
};

// Ignore disabled validators from the latest state when sending the response.
let disabled_mask =
per_session.disabled_bitmask(group_index).expect("group existence checked; qed");
and_mask.mask_seconded(&disabled_mask);
and_mask.mask_valid(&disabled_mask);
let local_validator = match relay_parent_state.local_validator.as_mut() {
None => return,
Some(s) => s,
};

let mut sent_filter = StatementFilter::blank(group_size);
let statements: Vec<_> = relay_parent_state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ struct TestLeaf {
parent_hash: Hash,
session: SessionIndex,
availability_cores: Vec<CoreState>,
disabled_validators: Vec<ValidatorIndex>,
pub disabled_validators: Vec<ValidatorIndex>,
para_data: Vec<(ParaId, PerParaData)>,
minimum_backing_votes: u32,
}
Expand Down
Loading

0 comments on commit 0d661ea

Please sign in to comment.