Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Update dispute participation on active leaves update #6303

Merged
merged 37 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c039614
Passed candidate events from scraper to participation
BradleyOlson64 Nov 15, 2022
7666c67
First draft PR 5875
BradleyOlson64 Nov 16, 2022
3829bd7
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Nov 16, 2022
1cdc72d
Added support for timestamp in changes
BradleyOlson64 Nov 16, 2022
e9d1273
Some necessary refactoring
BradleyOlson64 Nov 18, 2022
4eebacd
Removed SessionIndex from unconfirmed_disputes key
BradleyOlson64 Nov 19, 2022
45477d2
Removed duplicate logic in import statements
BradleyOlson64 Nov 19, 2022
5b11951
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Nov 19, 2022
04b214c
Replaced queue_participation call with re-prio
BradleyOlson64 Nov 19, 2022
58f6371
Simplifying refactor. Backed were already handled
BradleyOlson64 Nov 21, 2022
efa2870
Removed unneeded spam slots logic
BradleyOlson64 Nov 21, 2022
c0ee1fe
Implementers guide edits
BradleyOlson64 Nov 22, 2022
8b27aef
Undid the spam slots refactor
BradleyOlson64 Nov 22, 2022
319ea05
Added comments and implementers guide edit
BradleyOlson64 Nov 22, 2022
26c9ebe
Added test for participation upon backing
BradleyOlson64 Nov 25, 2022
78cc97b
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Nov 25, 2022
16edc2b
Merge branch 'master' into brad-issue-5875
Dec 1, 2022
e1c356c
Round of fixes + ran fmt
BradleyOlson64 Dec 1, 2022
a7615e5
Round of changes + fmt
BradleyOlson64 Dec 1, 2022
9aea6c6
Merge branch 'brad-issue-5875' of https://github.com/paritytech/polka…
BradleyOlson64 Dec 1, 2022
87cb5a2
Error handling draft
BradleyOlson64 Dec 3, 2022
595fe63
Changed errors to bubble up from reprioritization
BradleyOlson64 Dec 5, 2022
38c6986
Starting to construct new test
BradleyOlson64 Dec 5, 2022
afd63f6
Clarifying participation function rename
BradleyOlson64 Dec 6, 2022
efd7088
Reprio test draft
BradleyOlson64 Dec 9, 2022
8382edb
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Dec 9, 2022
e9eb4e8
Very rough bump to priority queue test draft
BradleyOlson64 Dec 13, 2022
d49bd0b
Improving logging
BradleyOlson64 Dec 15, 2022
7546335
Most concise reproduction of error on third import
BradleyOlson64 Dec 15, 2022
16e35c8
Add `handle_approval_vote_request`
tdimitrov Dec 16, 2022
2954a96
Removing reprioritization on included event test
BradleyOlson64 Dec 16, 2022
b3b3d8a
Removing unneeded test config
BradleyOlson64 Dec 16, 2022
72ae47a
cargo fmt
BradleyOlson64 Dec 16, 2022
e38a933
Test works
tdimitrov Dec 21, 2022
b53035c
Fixing final nits
BradleyOlson64 Dec 22, 2022
dede25c
Merge branch 'brad-issue-5875-temp' of https://github.com/paritytech/…
BradleyOlson64 Dec 22, 2022
d7f0250
Tweaks to test Tsveto figured out
BradleyOlson64 Dec 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use polkadot_node_subsystem_util::rolling_session_window::{
use polkadot_primitives::v2::{
BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement,
DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex,
ValidDisputeStatementKind, ValidatorId, ValidatorIndex, CandidateEvent,
};

use crate::{
Expand Down Expand Up @@ -269,8 +269,9 @@ impl Initialized {
update: ActiveLeavesUpdate,
now: u64,
) -> Result<()> {
let on_chain_votes =
let (on_chain_votes, candidate_events) =
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
self.participation.prioritize_newly_included(ctx, &candidate_events).await;
self.participation.process_active_leaves_update(ctx, &update).await?;

if let Some(new_leaf) = update.activated {
Expand Down Expand Up @@ -319,6 +320,9 @@ impl Initialized {
},
);
}

// Decrement spam slots for freshly backed or included candidates
self.reduce_spam_on_backed_included(candidate_events);
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
Expand Down Expand Up @@ -828,8 +832,15 @@ impl Initialized {
let new_state = import_result.new_state();

let is_included = self.scraper.is_candidate_included(&candidate_hash);

let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_confirmed = new_state.is_confirmed();
let potential_spam = !is_included && !is_backed
&& !new_state.is_confirmed() && !new_state.has_own_vote();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

gum::trace!(
target: LOG_TARGET,
Expand All @@ -844,7 +855,7 @@ impl Initialized {

if !potential_spam {
// Former spammers have not been spammers after all:
self.spam_slots.clear(&(session, candidate_hash));
self.spam_slots.clear(&candidate_hash);

// Potential spam:
} else if !import_result.new_invalid_voters().is_empty() {
Expand All @@ -871,14 +882,6 @@ impl Initialized {
}
}

let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let is_confirmed = new_state.is_confirmed();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

// Participate in dispute if we did not cast a vote before and actually have keys to cast a
// local vote. Disputes should fall in one of the categories below, otherwise we will refrain
// from participation:
Expand Down Expand Up @@ -1186,6 +1189,33 @@ impl Initialized {

Ok(())
}

/// Decrements spam slots for validators who voted on potential spam
/// candidates that are newly backed or included, and therefore no longer
/// potential spam.
fn reduce_spam_on_backed_included(
&mut self,
candidate_events: Vec<CandidateEvent>,
)
{
for event in candidate_events {
// Filter out events we don't care about and repackage information
let maybe_event_contents = match event {
CandidateEvent::CandidateBacked(receipt, _, _, _) => {
Some(receipt)
}
CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
Some(receipt)
}
_ => None
};

if let Some(receipt) = maybe_event_contents {
// Clear spam slots
self.spam_slots.clear(&receipt.hash());
}
}
}
}

/// Messages to be handled in this subsystem.
Expand Down
2 changes: 1 addition & 1 deletion node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl DisputeCoordinatorSubsystem {
let is_included = scraper.is_candidate_included(&votes.candidate_receipt.hash());

if !status.is_confirmed_concluded() && !is_included {
unconfirmed_disputes.insert((session, *candidate_hash), voted_indices);
unconfirmed_disputes.insert(*candidate_hash, (session, voted_indices));
}

// Participate for all non-concluded disputes which do not have a
Expand Down
25 changes: 24 additions & 1 deletion node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{
overseer, ActiveLeavesUpdate, RecoveryError,
};
use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash;
use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex};
use polkadot_primitives::v2::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex, CandidateEvent};

use crate::LOG_TARGET;

Expand Down Expand Up @@ -212,6 +212,29 @@ impl Participation {
Ok(())
}

/// Reprioritizes participation requests for disputes that are freshly included
pub async fn prioritize_newly_included<Context>(&mut self, ctx: &mut Context, events: &Vec<CandidateEvent>) {
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
for event in events {
// Filter the incoming events list for candidate inclusions
let maybe_event_contents = match event {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
Some(receipt)
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
}
_ => None
};

if let Some(receipt) = maybe_event_contents {
let r = self.queue.prioritize_if_present(ctx.sender(), receipt).await;
if let Err(queue_error) = r {
match queue_error {
QueueError::PriorityFull => return, // Avoid working through the rest of the vec
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
_ => (),
}
}
}
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached.
async fn dequeue_until_capacity<Context>(
&mut self,
Expand Down
22 changes: 21 additions & 1 deletion node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct ParticipationRequest {
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
pub enum ParticipationPriority {
BestEffort,
Priority,
Expand Down Expand Up @@ -103,6 +103,8 @@ pub enum QueueError {
BestEffortFull,
#[error("Request could not be queued, because priority queue was already full.")]
PriorityFull,
#[error("A comparator could not be generated for the given request.")]
CouldNotGenerateComparator,
}

impl ParticipationRequest {
Expand Down Expand Up @@ -159,6 +161,24 @@ impl Queues {
self.pop_best_effort().map(|d| d.1)
}

/// Reprioritizes any participation requests pertaining to the
/// passed candidates from best effort to priority.
pub async fn prioritize_if_present(
&mut self,
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
receipt: &CandidateReceipt,
) -> std::result::Result<(), QueueError>{
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}

let comparator = CandidateComparator::new(sender, receipt).await.map_err(|_e| QueueError::CouldNotGenerateComparator)?;
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(request) = self.best_effort.remove(&comparator){
self.priority.insert(comparator, request);
}
Ok(())
}

fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
Expand Down
19 changes: 11 additions & 8 deletions node/core/dispute-coordinator/src/scraping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl ChainScraper {
};
let update =
ActiveLeavesUpdate { activated: Some(initial_head), deactivated: Default::default() };
let votes = s.process_active_leaves_update(sender, &update).await?;
let (votes, _) = s.process_active_leaves_update(sender, &update).await?;
Ok((s, votes))
}

Expand All @@ -137,13 +137,13 @@ impl ChainScraper {
&mut self,
sender: &mut Sender,
update: &ActiveLeavesUpdate,
) -> Result<Vec<ScrapedOnChainVotes>>
) -> Result<(Vec<ScrapedOnChainVotes>, Vec<CandidateEvent>)>
where
Sender: overseer::DisputeCoordinatorSenderTrait,
{
let activated = match update.activated.as_ref() {
Some(activated) => activated,
None => return Ok(Vec::new()),
None => return Ok((Vec::new(), Vec::new())),
};

// Fetch ancestry up to last finalized block.
Expand All @@ -157,11 +157,13 @@ impl ChainScraper {

let block_hashes = std::iter::once(activated.hash).chain(ancestors);

let mut candidate_events: Vec<CandidateEvent> = Vec::new();
let mut on_chain_votes = Vec::new();
for (block_number, block_hash) in block_numbers.zip(block_hashes) {
gum::trace!(?block_number, ?block_hash, "In ancestor processing.");

self.process_candidate_events(sender, block_number, block_hash).await?;
let events_for_block = self.process_candidate_events(sender, block_number, block_hash).await?;
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
candidate_events.extend(events_for_block);

if let Some(votes) = get_on_chain_votes(sender, block_hash).await? {
on_chain_votes.push(votes);
Expand All @@ -170,7 +172,7 @@ impl ChainScraper {

self.last_observed_blocks.put(activated.hash, ());

Ok(on_chain_votes)
Ok((on_chain_votes, candidate_events))
}

/// Prune finalized candidates.
Expand Down Expand Up @@ -201,12 +203,13 @@ impl ChainScraper {
sender: &mut Sender,
block_number: BlockNumber,
block_hash: Hash,
) -> Result<()>
) -> Result<Vec<CandidateEvent>>
where
Sender: overseer::DisputeCoordinatorSenderTrait,
{
let events = get_candidate_events(sender, block_hash).await?;
// Get included and backed events:
for ev in get_candidate_events(sender, block_hash).await? {
for ev in &events {
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved
match ev {
CandidateEvent::CandidateIncluded(receipt, _, _, _) => {
let candidate_hash = receipt.hash();
Expand All @@ -233,7 +236,7 @@ impl ChainScraper {
},
}
}
Ok(())
Ok(events)
}

/// Returns ancestors of `head` in the descending order, stopping
Expand Down
19 changes: 9 additions & 10 deletions node/core/dispute-coordinator/src/spam_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ pub struct SpamSlots {
}

/// Unconfirmed disputes to be passed at initialization.
pub type UnconfirmedDisputes = HashMap<(SessionIndex, CandidateHash), BTreeSet<ValidatorIndex>>;
pub type UnconfirmedDisputes = HashMap<CandidateHash, (SessionIndex, BTreeSet<ValidatorIndex>)>;
BradleyOlson64 marked this conversation as resolved.
Show resolved Hide resolved

impl SpamSlots {
/// Recover `SpamSlots` from state on startup.
///
/// Initialize based on already existing active disputes.
pub fn recover_from_state(unconfirmed_disputes: UnconfirmedDisputes) -> Self {
let mut slots: HashMap<(SessionIndex, ValidatorIndex), SpamCount> = HashMap::new();
for ((session, _), validators) in unconfirmed_disputes.iter() {
for (_, (session, validators)) in unconfirmed_disputes.iter() {
for validator in validators {
let spam_vote_count = slots.entry((*session, *validator)).or_default();
*spam_vote_count += 1;
Expand Down Expand Up @@ -97,9 +97,9 @@ impl SpamSlots {
if *spam_vote_count >= MAX_SPAM_VOTES {
return false
}
let validators = self.unconfirmed.entry((session, candidate)).or_default();
let validators = self.unconfirmed.entry(candidate).or_default();

if validators.insert(validator) {
if validators.1.insert(validator) {
// We only increment spam slots once per candidate, as each validator has to provide an
// opposing vote for sending out its own vote. Therefore, receiving multiple votes for
// a single candidate is expected and should not get punished here.
Expand All @@ -114,22 +114,21 @@ impl SpamSlots {
/// This effectively reduces the spam slot count for all validators participating in a dispute
/// for that candidate. You should call this function once a dispute became obsolete or got
/// confirmed and thus votes for it should no longer be treated as potential spam.
pub fn clear(&mut self, key: &(SessionIndex, CandidateHash)) {
if let Some(validators) = self.unconfirmed.remove(key) {
let (session, _) = key;
pub fn clear(&mut self, key: &CandidateHash) {
if let Some((session, validators)) = self.unconfirmed.remove(key) {
for validator in validators {
if let Some(spam_vote_count) = self.slots.remove(&(*session, validator)) {
if let Some(spam_vote_count) = self.slots.remove(&(session,validator)) {
let new = spam_vote_count - 1;
if new > 0 {
self.slots.insert((*session, validator), new);
self.slots.insert((session, validator), new);
}
}
}
}
}
/// Prune all spam slots for sessions older than the given index.
pub fn prune_old(&mut self, oldest_index: SessionIndex) {
self.unconfirmed.retain(|(session, _), _| *session >= oldest_index);
self.unconfirmed.retain(| _, (session, _)| *session >= oldest_index);
self.slots.retain(|(session, _), _| *session >= oldest_index);
}
}