Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Batch vote import in dispute-distribution (#5894)
Browse files Browse the repository at this point in the history
* Start work on batching in dispute-distribution.

* Guide work.

* More guide changes. Still very much WIP.

* Finish guide changes.

* Clarification

* Adjust argument about slashing.

* WIP: Add constants to receiver.

* Maintain order of disputes.

* dispute-distribuion sender Rate limit.

* Cleanup

* WIP: dispute-distribution receiver.

- [ ] Rate limiting
- [ ] Batching

* WIP: Batching.

* fmt

* Update `PeerQueues` to maintain more invariants.

* WIP: Batching.

* Small cleanup

* Batching logic.

* Some integration work.

* Finish.

Missing: Tests

* Typo.

* Docs.

* Report missing metric.

* Doc pass.

* Tests for waiting_queue.

* Speed up some crypto by 10x.

* Fix redundant import.

* Add some tracing.

* Better sender rate limit

* Some tests.

* Tests

* Add logging to rate limiter

* Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md

Co-authored-by: Tsvetomir Dimitrov <[email protected]>

* Update roadmap/implementers-guide/src/node/disputes/dispute-distribution.md

Co-authored-by: Tsvetomir Dimitrov <[email protected]>

* Update node/network/dispute-distribution/src/receiver/mod.rs

Co-authored-by: Tsvetomir Dimitrov <[email protected]>

* Review feedback.

* Also log peer in log messages.

* Fix indentation.

* waker -> timer

* Guide improvement.

* Remove obsolete comment.

* waker -> timer

* Fix spell complaints.

* Fix Cargo.lock

Co-authored-by: Tsvetomir Dimitrov <[email protected]>
  • Loading branch information
eskimor and tdimitrov committed Oct 4, 2022
1 parent efb82ef commit ce430c2
Show file tree
Hide file tree
Showing 17 changed files with 1,783 additions and 469 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ maintenance = { status = "actively-developed" }
#
# This list is ordered alphabetically.
[profile.dev.package]
blake2b_simd = { opt-level = 3 }
blake2 = { opt-level = 3 }
blake2-rfc = { opt-level = 3 }
blake2b_simd = { opt-level = 3 }
chacha20poly1305 = { opt-level = 3 }
cranelift-codegen = { opt-level = 3 }
cranelift-wasm = { opt-level = 3 }
Expand All @@ -138,8 +138,8 @@ curve25519-dalek = { opt-level = 3 }
ed25519-dalek = { opt-level = 3 }
flate2 = { opt-level = 3 }
futures-channel = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hash-db = { opt-level = 3 }
hashbrown = { opt-level = 3 }
hmac = { opt-level = 3 }
httparse = { opt-level = 3 }
integer-sqrt = { opt-level = 3 }
Expand All @@ -151,8 +151,8 @@ libz-sys = { opt-level = 3 }
mio = { opt-level = 3 }
nalgebra = { opt-level = 3 }
num-bigint = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
parking_lot = { opt-level = 3 }
parking_lot_core = { opt-level = 3 }
percent-encoding = { opt-level = 3 }
primitive-types = { opt-level = 3 }
reed-solomon-novelpoly = { opt-level = 3 }
Expand All @@ -162,6 +162,7 @@ sha2 = { opt-level = 3 }
sha3 = { opt-level = 3 }
smallvec = { opt-level = 3 }
snow = { opt-level = 3 }
substrate-bip39 = {opt-level = 3}
twox-hash = { opt-level = 3 }
uint = { opt-level = 3 }
wasmi = { opt-level = 3 }
Expand Down
2 changes: 2 additions & 0 deletions node/network/dispute-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"

[dependencies]
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
derive_more = "0.99.17"
parity-scale-codec = { version = "3.1.5", features = ["std"] }
Expand All @@ -21,6 +22,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
thiserror = "1.0.31"
fatality = "0.0.6"
lru = "0.8.0"
indexmap = "1.9.1"

[dev-dependencies]
async-trait = "0.1.57"
Expand Down
38 changes: 31 additions & 7 deletions node/network/dispute-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! The sender is responsible for getting our vote out, see [`sender`]. The receiver handles
//! incoming [`DisputeRequest`]s and offers spam protection, see [`receiver`].

use std::num::NonZeroUsize;
use std::{num::NonZeroUsize, time::Duration};

use futures::{channel::mpsc, FutureExt, StreamExt, TryFutureExt};

Expand Down Expand Up @@ -66,16 +66,19 @@ use self::sender::{DisputeSender, TaskFinish};
/// via a dedicated channel and forwarding them to the dispute coordinator via
/// `DisputeCoordinatorMessage::ImportStatements`. Being the interface to the network and untrusted
/// nodes, the reality is not that simple of course. Before importing statements the receiver will
/// make sure as good as it can to filter out malicious/unwanted/spammy requests. For this it does
/// the following:
/// batch up imports as well as possible for efficient imports while maintaining timely dispute
/// resolution and handling of spamming validators:
///
/// - Drop all messages from non validator nodes, for this it requires the [`AuthorityDiscovery`]
/// service.
/// - Drop messages from a node, if we are already importing a message from that node (flood).
/// - Drop messages from nodes, that provided us messages where the statement import failed.
/// - Drop messages from a node, if it sends at a too high rate.
/// - Filter out duplicate messages (over some period of time).
/// - Drop any obviously invalid votes (invalid signatures for example).
/// - Ban peers whose votes were deemed invalid.
///
/// In general dispute-distribution works on limiting the work the dispute-coordinator will have to
/// do, while at the same time making it aware of new disputes as fast as possible.
///
/// For successfully imported votes, we will confirm the receipt of the message back to the sender.
/// This way a received confirmation guarantees, that the vote has been stored to disk by the
/// receiver.
Expand All @@ -95,6 +98,20 @@ pub use metrics::Metrics;

const LOG_TARGET: &'static str = "parachain::dispute-distribution";

/// Rate limit on the `receiver` side.
///
/// If messages from one peer come in at a higher rate than every `RECEIVE_RATE_LIMIT` on average, we
/// start dropping messages from that peer to enforce that limit.
pub const RECEIVE_RATE_LIMIT: Duration = Duration::from_millis(100);

/// Rate limit on the `sender` side.
///
/// In order to not hit the `RECEIVE_RATE_LIMIT` on the receiving side, we limit out sending rate as
/// well.
///
/// We add 50ms extra, just to have some save margin to the `RECEIVE_RATE_LIMIT`.
pub const SEND_RATE_LIMIT: Duration = RECEIVE_RATE_LIMIT.saturating_add(Duration::from_millis(50));

/// The dispute distribution subsystem.
pub struct DisputeDistributionSubsystem<AD> {
/// Easy and efficient runtime access for this subsystem.
Expand Down Expand Up @@ -175,6 +192,12 @@ where
ctx.spawn("disputes-receiver", receiver.run().boxed())
.map_err(FatalError::SpawnTask)?;

// Process messages for sending side.
//
// Note: We want the sender to be rate limited and we are currently taking advantage of the
// fact that the root task of this subsystem is only concerned with sending: Functions of
// `DisputeSender` might back pressure if the rate limit is hit, which will slow down this
// loop. If this fact ever changes, we will likely need another task.
loop {
let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await;
match message {
Expand Down Expand Up @@ -250,9 +273,10 @@ impl MuxedMessage {
// ends.
let from_overseer = ctx.recv().fuse();
futures::pin_mut!(from_overseer, from_sender);
futures::select!(
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
// We select biased to make sure we finish up loose ends, before starting new work.
futures::select_biased!(
msg = from_sender.next() => MuxedMessage::Sender(msg),
msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)),
)
}
}
Expand Down
7 changes: 5 additions & 2 deletions node/network/dispute-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ impl Metrics {
}

/// Statements have been imported.
pub fn on_imported(&self, label: &'static str) {
pub fn on_imported(&self, label: &'static str, num_requests: usize) {
if let Some(metrics) = &self.0 {
metrics.imported_requests.with_label_values(&[label]).inc()
metrics
.imported_requests
.with_label_values(&[label])
.inc_by(num_requests as u64)
}
}

Expand Down
209 changes: 209 additions & 0 deletions node/network/dispute-distribution/src/receiver/batches/batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{collections::HashMap, time::Instant};

use gum::CandidateHash;
use polkadot_node_network_protocol::{
request_response::{incoming::OutgoingResponseSender, v1::DisputeRequest},
PeerId,
};
use polkadot_node_primitives::SignedDisputeStatement;
use polkadot_primitives::v2::{CandidateReceipt, ValidatorIndex};

use crate::receiver::{BATCH_COLLECTING_INTERVAL, MIN_KEEP_BATCH_ALIVE_VOTES};

use super::MAX_BATCH_LIFETIME;

/// A batch of votes to be imported into the `dispute-coordinator`.
///
/// Vote imports are way more efficient when performed in batches, hence we batch together incoming
/// votes until the rate of incoming votes falls below a threshold, then we import into the dispute
/// coordinator.
///
/// A `Batch` keeps track of the votes to be imported and the current incoming rate, on rate update
/// it will "flush" in case the incoming rate dropped too low, preparing the import.
pub struct Batch {
/// The actual candidate this batch is concerned with.
candidate_receipt: CandidateReceipt,

/// Cache of `CandidateHash` (candidate_receipt.hash()).
candidate_hash: CandidateHash,

/// All valid votes received in this batch so far.
///
/// We differentiate between valid and invalid votes, so we can detect (and drop) duplicates,
/// while still allowing validators to equivocate.
///
/// Detecting and rejecting duplicates is crucial in order to effectively enforce
/// `MIN_KEEP_BATCH_ALIVE_VOTES` per `BATCH_COLLECTING_INTERVAL`. If we would count duplicates
/// here, the mechanism would be broken.
valid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// All invalid votes received in this batch so far.
invalid_votes: HashMap<ValidatorIndex, SignedDisputeStatement>,

/// How many votes have been batched since the last tick/creation.
votes_batched_since_last_tick: u32,

/// Expiry time for the batch.
///
/// By this time the latest this batch will get flushed.
best_before: Instant,

/// Requesters waiting for a response.
requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
}

/// Result of checking a batch every `BATCH_COLLECTING_INTERVAL`.
pub(super) enum TickResult {
/// Batch is still alive, please call `tick` again at the given `Instant`.
Alive(Batch, Instant),
/// Batch is done, ready for import!
Done(PreparedImport),
}

/// Ready for import.
pub struct PreparedImport {
pub candidate_receipt: CandidateReceipt,
pub statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
/// Information about original requesters.
pub requesters: Vec<(PeerId, OutgoingResponseSender<DisputeRequest>)>,
}

impl From<Batch> for PreparedImport {
fn from(batch: Batch) -> Self {
let Batch {
candidate_receipt,
valid_votes,
invalid_votes,
requesters: pending_responses,
..
} = batch;

let statements = valid_votes
.into_iter()
.chain(invalid_votes.into_iter())
.map(|(index, statement)| (statement, index))
.collect();

Self { candidate_receipt, statements, requesters: pending_responses }
}
}

impl Batch {
/// Create a new empty batch based on the given `CandidateReceipt`.
///
/// To create a `Batch` use Batches::find_batch`.
///
/// Arguments:
///
/// * `candidate_receipt` - The candidate this batch is meant to track votes for.
/// * `now` - current time stamp for calculating the first tick.
///
/// Returns: A batch and the first `Instant` you are supposed to call `tick`.
pub(super) fn new(candidate_receipt: CandidateReceipt, now: Instant) -> (Self, Instant) {
let s = Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
valid_votes: HashMap::new(),
invalid_votes: HashMap::new(),
votes_batched_since_last_tick: 0,
best_before: Instant::now() + MAX_BATCH_LIFETIME,
requesters: Vec::new(),
};
let next_tick = s.calculate_next_tick(now);
(s, next_tick)
}

/// Receipt of the candidate this batch is batching votes for.
pub fn candidate_receipt(&self) -> &CandidateReceipt {
&self.candidate_receipt
}

/// Add votes from a validator into the batch.
///
/// The statements are supposed to be the valid and invalid statements received in a
/// `DisputeRequest`.
///
/// The given `pending_response` is the corresponding response sender for responding to `peer`.
/// If at least one of the votes is new as far as this batch is concerned we record the
/// pending_response, for later use. In case both votes are known already, we return the
/// response sender as an `Err` value.
pub fn add_votes(
&mut self,
valid_vote: (SignedDisputeStatement, ValidatorIndex),
invalid_vote: (SignedDisputeStatement, ValidatorIndex),
peer: PeerId,
pending_response: OutgoingResponseSender<DisputeRequest>,
) -> Result<(), OutgoingResponseSender<DisputeRequest>> {
debug_assert!(valid_vote.0.candidate_hash() == invalid_vote.0.candidate_hash());
debug_assert!(valid_vote.0.candidate_hash() == &self.candidate_hash);

let mut duplicate = true;

if self.valid_votes.insert(valid_vote.1, valid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}
if self.invalid_votes.insert(invalid_vote.1, invalid_vote.0).is_none() {
self.votes_batched_since_last_tick += 1;
duplicate = false;
}

if duplicate {
Err(pending_response)
} else {
self.requesters.push((peer, pending_response));
Ok(())
}
}

/// Check batch for liveness.
///
/// This function is supposed to be called at instants given at construction and as returned as
/// part of `TickResult`.
pub(super) fn tick(mut self, now: Instant) -> TickResult {
if self.votes_batched_since_last_tick >= MIN_KEEP_BATCH_ALIVE_VOTES &&
now < self.best_before
{
// Still good:
let next_tick = self.calculate_next_tick(now);
// Reset counter:
self.votes_batched_since_last_tick = 0;
TickResult::Alive(self, next_tick)
} else {
TickResult::Done(PreparedImport::from(self))
}
}

/// Calculate when the next tick should happen.
///
/// This will usually return `now + BATCH_COLLECTING_INTERVAL`, except if the lifetime of this batch
/// would exceed `MAX_BATCH_LIFETIME`.
///
/// # Arguments
///
/// * `now` - The current time.
fn calculate_next_tick(&self, now: Instant) -> Instant {
let next_tick = now + BATCH_COLLECTING_INTERVAL;
if next_tick < self.best_before {
next_tick
} else {
self.best_before
}
}
}
Loading

0 comments on commit ce430c2

Please sign in to comment.