Skip to content

Commit

Permalink
sc-consensus-beefy: improve gossip logic (paritytech#1852)
Browse files Browse the repository at this point in the history
- Remove cached messages used for deduplication in `GossipValidator`
since they're already deduplicated in upper layer `NetworkGossip`.
- Add cache for "justified rounds" to quickly discard any further (even
if potentially different) justifications at the gossip level, once a
valid one (for a respective round) is submitted to the worker.
- Add short-circuit in worker `finalize()` method to not attempt to
finalize same block multiple times (for example when we get
justifications for same block from multiple components like
block-import, gossip or on-demand).
- Change a test which had A LOT of latency in syncing blocks for some
weird reason and would only run after ~150seconds. It now runs
instantly.

Fixes paritytech#1728
  • Loading branch information
acatangiu authored Oct 13, 2023
1 parent 8b127b1 commit 0e01998
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 114 deletions.
156 changes: 55 additions & 101 deletions substrate/client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{collections::BTreeSet, sync::Arc, time::Duration};

use sc_network::{PeerId, ReputationChange};
use sc_network_gossip::{MessageIntent, ValidationResult, Validator, ValidatorContext};
use sp_core::hashing::twox_64;
use sp_runtime::traits::{Block, Hash, Header, NumberFor};

use codec::{Decode, DecodeAll, Encode};
Expand Down Expand Up @@ -115,9 +114,6 @@ where
<<B::Header as Header>::Hashing as Hash>::hash(b"beefy-justifications")
}

/// A type that represents hash of the message.
pub type MessageHash = [u8; 8];

#[derive(Clone, Debug)]
pub(crate) struct GossipFilterCfg<'a, B: Block> {
pub start: NumberFor<B>,
Expand All @@ -133,18 +129,21 @@ struct FilterInner<B: Block> {
}

struct Filter<B: Block> {
// specifies live rounds
inner: Option<FilterInner<B>>,
live_votes: BTreeMap<NumberFor<B>, fnv::FnvHashSet<MessageHash>>,
// cache of seen valid justifications in active rounds
rounds_with_valid_proofs: BTreeSet<NumberFor<B>>,
}

impl<B: Block> Filter<B> {
pub fn new() -> Self {
Self { inner: None, live_votes: BTreeMap::new() }
Self { inner: None, rounds_with_valid_proofs: BTreeSet::new() }
}

/// Update filter to new `start` and `set_id`.
fn update(&mut self, cfg: GossipFilterCfg<B>) {
self.live_votes.retain(|&round, _| round >= cfg.start && round <= cfg.end);
self.rounds_with_valid_proofs
.retain(|&round| round >= cfg.start && round <= cfg.end);
// only clone+overwrite big validator_set if set_id changed
match self.inner.as_mut() {
Some(f) if f.validator_set.id() == cfg.validator_set.id() => {
Expand Down Expand Up @@ -203,14 +202,14 @@ impl<B: Block> Filter<B> {
.unwrap_or(Consider::RejectOutOfScope)
}

/// Add new _known_ `hash` to the round's known votes.
fn add_known_vote(&mut self, round: NumberFor<B>, hash: MessageHash) {
self.live_votes.entry(round).or_default().insert(hash);
/// Add new _known_ `round` to the set of seen valid justifications.
fn mark_round_as_proven(&mut self, round: NumberFor<B>) {
self.rounds_with_valid_proofs.insert(round);
}

/// Check if `hash` is already part of round's known votes.
fn is_known_vote(&self, round: NumberFor<B>, hash: &MessageHash) -> bool {
self.live_votes.get(&round).map(|known| known.contains(hash)).unwrap_or(false)
/// Check if `round` is already part of seen valid justifications.
fn is_already_proven(&self, round: NumberFor<B>) -> bool {
self.rounds_with_valid_proofs.contains(&round)
}

fn validator_set(&self) -> Option<&ValidatorSet<AuthorityId>> {
Expand Down Expand Up @@ -273,16 +272,13 @@ where
&self,
vote: VoteMessage<NumberFor<B>, AuthorityId, Signature>,
sender: &PeerId,
data: &[u8],
) -> Action<B::Hash> {
let msg_hash = twox_64(data);
let round = vote.commitment.block_number;
let set_id = vote.commitment.validator_set_id;
self.known_peers.lock().note_vote_for(*sender, round);

// Verify general usefulness of the message.
// We are going to discard old votes right away (without verification)
// Also we keep track of already received votes to avoid verifying duplicates.
// We are going to discard old votes right away (without verification).
{
let filter = self.gossip_filter.read();

Expand All @@ -293,10 +289,6 @@ where
Consider::Accept => {},
}

if filter.is_known_vote(round, &msg_hash) {
return Action::Keep(self.votes_topic, benefit::KNOWN_VOTE_MESSAGE)
}

// ensure authority is part of the set.
if !filter
.validator_set()
Expand All @@ -309,7 +301,6 @@ where
}

if BeefyKeystore::verify(&vote.id, &vote.signature, &vote.commitment.encode()) {
self.gossip_filter.write().add_known_vote(round, msg_hash);
Action::Keep(self.votes_topic, benefit::VOTE_MESSAGE)
} else {
debug!(
Expand All @@ -328,34 +319,46 @@ where
let (round, set_id) = proof_block_num_and_set_id::<B>(&proof);
self.known_peers.lock().note_vote_for(*sender, round);

let guard = self.gossip_filter.read();
// Verify general usefulness of the justification.
match guard.consider_finality_proof(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
Consider::Accept => {},
let action = {
let guard = self.gossip_filter.read();

// Verify general usefulness of the justification.
match guard.consider_finality_proof(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
Consider::Accept => {},
}

if guard.is_already_proven(round) {
return Action::Discard(benefit::NOT_INTERESTED)
}

// Verify justification signatures.
guard
.validator_set()
.map(|validator_set| {
if let Err((_, signatures_checked)) =
verify_with_validator_set::<B>(round, validator_set, &proof)
{
debug!(
target: LOG_TARGET,
"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
);
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Action::Discard(cost)
} else {
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
}
})
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
};
if matches!(action, Action::Keep(_, _)) {
self.gossip_filter.write().mark_round_as_proven(round);
}
// Verify justification signatures.
guard
.validator_set()
.map(|validator_set| {
if let Err((_, signatures_checked)) =
verify_with_validator_set::<B>(round, validator_set, &proof)
{
debug!(
target: LOG_TARGET,
"🥩 Bad signatures on message: {:?}, from: {:?}", proof, sender
);
let mut cost = cost::INVALID_PROOF;
cost.value +=
cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked as i32);
Action::Discard(cost)
} else {
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
}
})
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
action
}
}

Expand All @@ -375,7 +378,7 @@ where
) -> ValidationResult<B::Hash> {
let raw = data;
let action = match GossipMessage::<B>::decode_all(&mut data) {
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender, raw),
Ok(GossipMessage::Vote(msg)) => self.validate_vote(msg, sender),
Ok(GossipMessage::FinalityProof(proof)) => self.validate_finality_proof(proof, sender),
Err(e) => {
debug!(target: LOG_TARGET, "Error decoding message: {}", e);
Expand Down Expand Up @@ -483,41 +486,6 @@ pub(crate) mod tests {
};
use sp_keystore::{testing::MemoryKeystore, Keystore};

#[test]
fn known_votes_insert_remove() {
let mut filter = Filter::<Block>::new();
let msg_hash = twox_64(b"data");
let keys = vec![Keyring::Alice.public()];
let validator_set = ValidatorSet::<AuthorityId>::new(keys.clone(), 1).unwrap();

filter.add_known_vote(1, msg_hash);
filter.add_known_vote(1, msg_hash);
filter.add_known_vote(2, msg_hash);
assert_eq!(filter.live_votes.len(), 2);

filter.add_known_vote(3, msg_hash);
assert!(filter.is_known_vote(3, &msg_hash));
assert!(!filter.is_known_vote(3, &twox_64(b"other")));
assert!(!filter.is_known_vote(4, &msg_hash));
assert_eq!(filter.live_votes.len(), 3);

assert!(filter.inner.is_none());
assert_eq!(filter.consider_vote(1, 1), Consider::RejectOutOfScope);

filter.update(GossipFilterCfg { start: 3, end: 10, validator_set: &validator_set });
assert_eq!(filter.live_votes.len(), 1);
assert!(filter.live_votes.contains_key(&3));
assert_eq!(filter.consider_vote(2, 1), Consider::RejectPast);
assert_eq!(filter.consider_vote(3, 1), Consider::Accept);
assert_eq!(filter.consider_vote(4, 1), Consider::Accept);
assert_eq!(filter.consider_vote(20, 1), Consider::RejectFuture);
assert_eq!(filter.consider_vote(4, 2), Consider::RejectFuture);

let validator_set = ValidatorSet::<AuthorityId>::new(keys, 2).unwrap();
filter.update(GossipFilterCfg { start: 5, end: 10, validator_set: &validator_set });
assert!(filter.live_votes.is_empty());
}

struct TestContext;
impl<B: sp_runtime::traits::Block> ValidatorContext<B> for TestContext {
fn broadcast_topic(&mut self, _topic: B::Hash, _force: bool) {
Expand Down Expand Up @@ -610,20 +578,6 @@ pub(crate) mod tests {
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
assert_eq!(
gv.gossip_filter
.read()
.live_votes
.get(&vote.commitment.block_number)
.map(|x| x.len()),
Some(1)
);

// second time we should hit the cache
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::ProcessAndKeep(_)));
expected_report.cost_benefit = benefit::KNOWN_VOTE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);

// reject vote, voter not in validator set
let mut bad_vote = vote.clone();
Expand Down Expand Up @@ -692,7 +646,7 @@ pub(crate) mod tests {
// reject proof, bad signatures (Bob instead of Alice)
let bad_validator_set =
ValidatorSet::<AuthorityId>::new(vec![Keyring::Bob.public()], 0).unwrap();
let proof = dummy_proof(20, &bad_validator_set);
let proof = dummy_proof(21, &bad_validator_set);
let encoded_proof = GossipMessage::<Block>::FinalityProof(proof).encode();
let res = gv.validate(&mut context, &sender, &encoded_proof);
assert!(matches!(res, ValidationResult::Discard));
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod cost {
mod benefit {
use sc_network::ReputationChange as Rep;
pub(super) const VOTE_MESSAGE: Rep = Rep::new(100, "BEEFY: Round vote message");
pub(super) const KNOWN_VOTE_MESSAGE: Rep = Rep::new(50, "BEEFY: Known vote");
pub(super) const NOT_INTERESTED: Rep = Rep::new(10, "BEEFY: Not interested in round");
pub(super) const VALIDATED_PROOF: Rep = Rep::new(100, "BEEFY: Justification");
}

Expand Down
12 changes: 3 additions & 9 deletions substrate/client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ async fn gossipped_finality_proofs() {
// Only Alice and Bob are running the voter -> finality threshold not reached
let peers = [BeefyKeyring::Alice, BeefyKeyring::Bob];
let validator_set = ValidatorSet::new(make_beefy_ids(&validators), 0).unwrap();
let session_len = 30;
let session_len = 10;
let min_block_delta = 1;

let mut net = BeefyTestNet::new(3);
Expand Down Expand Up @@ -1332,14 +1332,8 @@ async fn gossipped_finality_proofs() {

let net = Arc::new(Mutex::new(net));

// Pump net + Charlie gossip to see peers.
let timeout = Box::pin(tokio::time::sleep(Duration::from_millis(200)));
let gossip_engine_pump = &mut charlie_gossip_engine;
let pump_with_timeout = future::select(gossip_engine_pump, timeout);
run_until(pump_with_timeout, &net).await;

// push 10 blocks
let hashes = net.lock().generate_blocks_and_sync(10, session_len, &validator_set, true).await;
// push 42 blocks
let hashes = net.lock().generate_blocks_and_sync(42, session_len, &validator_set, true).await;

let peers = peers.into_iter().enumerate();

Expand Down
11 changes: 8 additions & 3 deletions substrate/client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,11 @@ where
VersionedFinalityProof::V1(ref sc) => sc.commitment.block_number,
};

if block_num <= self.persisted_state.voting_oracle.best_beefy_block {
// we've already finalized this round before, short-circuit.
return Ok(())
}

// Finalize inner round and update voting_oracle state.
self.persisted_state.voting_oracle.finalize(block_num)?;

Expand All @@ -629,7 +634,7 @@ where
self.backend
.append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode()))
}) {
error!(
debug!(
target: LOG_TARGET,
"🥩 Error {:?} on appending justification: {:?}", e, finality_proof
);
Expand All @@ -648,7 +653,7 @@ where
}

/// Handle previously buffered justifications, that now land in the voting interval.
fn try_pending_justififactions(&mut self) -> Result<(), Error> {
fn try_pending_justifications(&mut self) -> Result<(), Error> {
// Interval of blocks for which we can process justifications and votes right now.
let (start, end) = self.voting_oracle().accepted_interval()?;
// Process pending justifications.
Expand Down Expand Up @@ -782,7 +787,7 @@ where

fn process_new_state(&mut self) {
// Handle pending justifications and/or votes for now GRANDPA finalized blocks.
if let Err(err) = self.try_pending_justififactions() {
if let Err(err) = self.try_pending_justifications() {
debug!(target: LOG_TARGET, "🥩 {}", err);
}

Expand Down

0 comments on commit 0e01998

Please sign in to comment.