Skip to content

Commit

Permalink
Add trusted peers (#1640)
Browse files Browse the repository at this point in the history
## Issue Addressed

Closes #1581 

## Proposed Changes

Adds a new cli option for trusted peers who always have the maximum possible score.
  • Loading branch information
pawanjay176 committed Sep 22, 2020
1 parent 5d17eb8 commit 14ff385
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 39 deletions.
6 changes: 5 additions & 1 deletion beacon_node/eth2_libp2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::types::GossipKind;
use crate::Enr;
use crate::{Enr, PeerIdSerialized};
use discv5::{Discv5Config, Discv5ConfigBuilder};
use libp2p::gossipsub::{
GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId, ValidationMode,
Expand Down Expand Up @@ -58,6 +58,9 @@ pub struct Config {
/// List of libp2p nodes to initially connect to.
pub libp2p_nodes: Vec<Multiaddr>,

/// List of trusted libp2p nodes which are not scored.
pub trusted_peers: Vec<PeerIdSerialized>,

/// Client version
pub client_version: String,

Expand Down Expand Up @@ -139,6 +142,7 @@ impl Default for Config {
boot_nodes_enr: vec![],
boot_nodes_multiaddr: vec![],
libp2p_nodes: vec![],
trusted_peers: vec![],
client_version: lighthouse_version::version_with_platform(),
disable_discovery: false,
topics,
Expand Down
44 changes: 44 additions & 0 deletions beacon_node/eth2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,50 @@ pub mod rpc;
mod service;
pub mod types;

use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::str::FromStr;

/// Wrapper over a libp2p `PeerId` which implements `Serialize` and `Deserialize`
#[derive(Clone, Debug)]
pub struct PeerIdSerialized(libp2p::PeerId);

impl From<PeerIdSerialized> for PeerId {
fn from(peer_id: PeerIdSerialized) -> Self {
peer_id.0
}
}

impl FromStr for PeerIdSerialized {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
PeerId::from_str(s).map_err(|e| format!("Invalid peer id: {}", e))?,
))
}
}

impl Serialize for PeerIdSerialized {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&self.0.to_string())
}
}

impl<'de> Deserialize<'de> for PeerIdSerialized {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
Ok(Self(PeerId::from_str(&s).map_err(|e| {
de::Error::custom(format!("Failed to deserialise peer id: {:?}", e))
})?))
}
}

pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage, SubnetDiscovery};
pub use behaviour::{BehaviourEvent, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
Expand Down
36 changes: 18 additions & 18 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason) {
// get the peer info
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score.to_string());
debug!(self.log, "Sending goodbye to peer"; "peer_id" => peer_id.to_string(), "reason" => reason.to_string(), "score" => info.score().to_string());
// Goodbye's are fatal
info.score.apply_peer_action(PeerAction::Fatal);
info.apply_peer_action_to_score(PeerAction::Fatal);
if info.connection_status.is_connected_or_dialing() {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id.clone(), reason));
Expand All @@ -155,12 +155,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut unban_peer = None;

if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_state = info.score.state();
info.score.apply_peer_action(action);
if previous_state != info.score.state() {
match info.score.state() {
let previous_state = info.score_state();
info.apply_peer_action_to_score(action);
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
ban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
Expand All @@ -170,7 +170,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
unban_peer = Some(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
Expand All @@ -182,13 +182,13 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// TODO: Update the peer manager to inform that the peer is disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
unban_peer = Some(peer_id.clone());
}
}
} else {
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer score adjusted"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
}
}

Expand Down Expand Up @@ -689,9 +689,9 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let mut to_unban_peers = Vec::new();

for (peer_id, info) in pdb.peers_mut() {
let previous_state = info.score.state();
let previous_state = info.score_state();
// Update scores
info.score.update();
info.score_update();

/* TODO: Implement logic about connection lifetimes
match info.connection_status {
Expand Down Expand Up @@ -746,10 +746,10 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
*/

// handle score transitions
if previous_state != info.score.state() {
match info.score.state() {
if previous_state != info.score_state() {
match info.score_state() {
ScoreState::Banned => {
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string());
debug!(self.log, "Peer has been banned"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string());
to_ban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
self.events.push(PeerManagerEvent::DisconnectPeer(
Expand All @@ -759,7 +759,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}
ScoreState::Disconnected => {
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to disconnect state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// disconnect the peer if it's currently connected or dialing
to_unban_peers.push(peer_id.clone());
if info.connection_status.is_connected_or_dialing() {
Expand All @@ -771,7 +771,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// TODO: Update peer manager to report that it's disconnecting.
}
ScoreState::Healthy => {
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score.to_string(), "past_state" => previous_state.to_string());
debug!(self.log, "Peer transitioned to healthy state"; "peer_id" => peer_id.to_string(), "score" => info.score().to_string(), "past_state" => previous_state.to_string());
// unban the peer if it was previously banned.
to_unban_peers.push(peer_id.clone());
}
Expand Down Expand Up @@ -821,7 +821,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
.take(connected_peer_count - self.target_peers)
//we only need to disconnect peers with healthy scores, since the others got already
//disconnected in update_peer_scores
.filter(|(_, info)| info.score.state() == ScoreState::Healthy)
.filter(|(_, info)| info.score_state() == ScoreState::Healthy)
{
self.events.push(PeerManagerEvent::DisconnectPeer(
(*peer_id).clone(),
Expand Down
48 changes: 46 additions & 2 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::client::Client;
use super::score::Score;
use super::score::{PeerAction, Score, ScoreState};
use super::PeerSyncStatus;
use crate::rpc::MetaData;
use crate::Multiaddr;
Expand All @@ -19,7 +19,7 @@ pub struct PeerInfo<T: EthSpec> {
/// The connection status of the peer
_status: PeerStatus,
/// The peers reputation
pub score: Score,
score: Score,
/// Client managing this peer
pub client: Client,
/// Connection status of this peer
Expand All @@ -36,6 +36,8 @@ pub struct PeerInfo<T: EthSpec> {
/// necessary.
#[serde(skip)]
pub min_ttl: Option<Instant>,
/// Is the peer a trusted peer.
pub is_trusted: bool,
}

impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
Expand All @@ -49,11 +51,21 @@ impl<TSpec: EthSpec> Default for PeerInfo<TSpec> {
sync_status: PeerSyncStatus::Unknown,
meta_data: None,
min_ttl: None,
is_trusted: false,
}
}
}

impl<T: EthSpec> PeerInfo<T> {
/// Return a PeerInfo struct for a trusted peer.
pub fn trusted_peer_info() -> Self {
PeerInfo {
score: Score::max_score(),
is_trusted: true,
..Default::default()
}
}

/// Returns if the peer is subscribed to a given `SubnetId`
pub fn on_subnet(&self, subnet_id: SubnetId) -> bool {
if let Some(meta_data) = &self.meta_data {
Expand All @@ -69,6 +81,38 @@ impl<T: EthSpec> PeerInfo<T> {
pub fn has_future_duty(&self) -> bool {
self.min_ttl.map_or(false, |i| i >= Instant::now())
}

/// Returns score of the peer.
pub fn score(&self) -> Score {
self.score
}

/// Returns the state of the peer based on the score.
pub(crate) fn score_state(&self) -> ScoreState {
self.score.state()
}

/// Applies decay rates to a non-trusted peer's score.
pub fn score_update(&mut self) {
if !self.is_trusted {
self.score.update()
}
}

/// Apply peer action to a non-trusted peer's score.
pub fn apply_peer_action_to_score(&mut self, peer_action: PeerAction) {
if !self.is_trusted {
self.score.apply_peer_action(peer_action)
}
}

#[cfg(test)]
/// Add an f64 to a non-trusted peer's score abiding by the limits.
pub fn add_to_score(&mut self, score: f64) {
if !self.is_trusted {
self.score.add(score)
}
}
}

#[derive(Clone, Debug, Serialize)]
Expand Down
Loading

0 comments on commit 14ff385

Please sign in to comment.