Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve libp2p connected peer metrics #5314

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 111 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ validator_client = { path = "validator_client" }
validator_dir = { path = "common/validator_dir" }
warp_utils = { path = "common/warp_utils" }

[patch.crates-io]
yamux = { git = "https://github.com/sigp/rust-yamux.git" }

[profile.maxperf]
inherits = "release"
lto = "fat"
Expand Down
10 changes: 2 additions & 8 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};
use system_health::observe_system_health_bn;
use system_health::{observe_nat, observe_system_health_bn};
use task_spawner::{Priority, TaskSpawner};
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
Expand Down Expand Up @@ -3965,13 +3965,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_json_task(Priority::P1, move || {
Ok(api_types::GenericResponse::from(
lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0,
))
Ok(api_types::GenericResponse::from(observe_nat()))
})
});

Expand Down
5 changes: 4 additions & 1 deletion beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,10 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
discv5::Event::SocketUpdated(socket_addr) => {
info!(self.log, "Address updated"; "ip" => %socket_addr.ip(), "udp_port" => %socket_addr.port());
metrics::inc_counter(&metrics::ADDRESS_UPDATE_COUNT);
metrics::check_nat();
// We have SOCKET_UPDATED messages. This occurs when discovery has a majority of
// users reporting an external port and our ENR gets updated.
// Which means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["discv5"], 1);
// Discv5 will have updated our local ENR. We save the updated version
// to disk.

Expand Down
54 changes: 12 additions & 42 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
pub use lighthouse_metrics::*;

lazy_static! {
pub static ref NAT_OPEN: Result<IntCounter> = try_create_int_counter(
pub static ref NAT_OPEN: Result<IntGaugeVec> = try_create_int_gauge_vec(
"nat_open",
"An estimate indicating if the local node is exposed to the internet."
"An estimate indicating if the local node is reachable from external nodes",
&["protocol"]
);
pub static ref ADDRESS_UPDATE_COUNT: Result<IntCounter> = try_create_int_counter(
"libp2p_address_update_total",
Expand All @@ -14,6 +15,9 @@ lazy_static! {
"Count of libp2p peers currently connected"
);

pub static ref PEERS_CONNECTED_MULTI: Result<IntGaugeVec> =
try_create_int_gauge_vec("libp2p_peers_multi", "Count of libp2p peers currently connected", &["direction", "transport"]);

pub static ref TCP_PEERS_CONNECTED: Result<IntGauge> = try_create_int_gauge(
"libp2p_tcp_peers",
"Count of libp2p peers currently connected via TCP"
Expand All @@ -32,13 +36,10 @@ lazy_static! {
"libp2p_peer_disconnect_event_total",
"Count of libp2p peer disconnect events"
);
pub static ref DISCOVERY_SENT_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_sent_bytes",
"The number of bytes sent in discovery"
);
pub static ref DISCOVERY_RECV_BYTES: Result<IntGauge> = try_create_int_gauge(
"discovery_recv_bytes",
"The number of bytes received in discovery"
pub static ref DISCOVERY_BYTES: Result<IntGaugeVec> = try_create_int_gauge_vec(
"discovery_bytes",
"The number of bytes sent and received in discovery",
&["direction"]
);
pub static ref DISCOVERY_QUEUE: Result<IntGauge> = try_create_int_gauge(
"discovery_queue_size",
Expand Down Expand Up @@ -135,17 +136,6 @@ lazy_static! {
&["type"]
);

/*
* Inbound/Outbound peers
*/
/// The number of peers that dialed us.
pub static ref NETWORK_INBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_inbound_peers","The number of peers that are currently connected that have dialed us.");

/// The number of peers that we dialed us.
pub static ref NETWORK_OUTBOUND_PEERS: Result<IntGauge> =
try_create_int_gauge("network_outbound_peers","The number of peers that are currently connected that we dialed.");

/*
* Peer Reporting
*/
Expand All @@ -156,31 +146,11 @@ lazy_static! {
);
}

/// Checks if we consider the NAT open.
///
/// Conditions for an open NAT:
/// 1. We have 1 or more SOCKET_UPDATED messages. This occurs when discovery has a majority of
/// users reporting an external port and our ENR gets updated.
/// 2. We have 0 SOCKET_UPDATED messages (can be true if the port was correct on boot), then we
/// rely on whether we have any inbound messages. If we have no socket update messages, but
/// manage to get at least one inbound peer, we are exposed correctly.
pub fn check_nat() {
// NAT is already deemed open.
if NAT_OPEN.as_ref().map(|v| v.get()).unwrap_or(0) != 0 {
return;
}
if ADDRESS_UPDATE_COUNT.as_ref().map(|v| v.get()).unwrap_or(0) != 0
|| NETWORK_INBOUND_PEERS.as_ref().map(|v| v.get()).unwrap_or(0) != 0_i64
{
inc_counter(&NAT_OPEN);
}
}

pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
set_float_gauge(&DISCOVERY_REQS, metrics.unsolicited_requests_per_second);
set_gauge(&DISCOVERY_SESSIONS, metrics.active_sessions as i64);
set_gauge(&DISCOVERY_SENT_BYTES, metrics.bytes_sent as i64);
set_gauge(&DISCOVERY_RECV_BYTES, metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["inbound"], metrics.bytes_recv as i64);
set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64);
}
43 changes: 1 addition & 42 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ use delay_map::HashSetDelay;
use discv5::Enr;
use libp2p::identify::Info as IdentifyInfo;
use lru_cache::LRUTimeCache;
use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult};
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn};
use smallvec::SmallVec;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use strum::IntoEnumIterator;
use types::{EthSpec, SyncSubnetId};

pub use libp2p::core::Multiaddr;
Expand Down Expand Up @@ -719,46 +718,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
}
}

// This function updates metrics for all connected peers.
fn update_connected_peer_metrics(&self) {
// Do nothing if we don't have metrics enabled.
if !self.metrics_enabled {
return;
}

let mut connected_peer_count = 0;
let mut inbound_connected_peers = 0;
let mut outbound_connected_peers = 0;
let mut clients_per_peer = HashMap::new();

for (_peer, peer_info) in self.network_globals.peers.read().connected_peers() {
connected_peer_count += 1;
if let PeerConnectionStatus::Connected { n_in, .. } = peer_info.connection_status() {
if *n_in > 0 {
inbound_connected_peers += 1;
} else {
outbound_connected_peers += 1;
}
}
*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;
}

metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peer_count);
metrics::set_gauge(&metrics::NETWORK_INBOUND_PEERS, inbound_connected_peers);
metrics::set_gauge(&metrics::NETWORK_OUTBOUND_PEERS, outbound_connected_peers);

for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[client_kind.as_ref()],
*value as i64,
);
}
}

/* Internal functions */

/// Sets a peer as connected as long as their reputation allows it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.on_dial_failure(peer_id);
}
FromSwarm::ExternalAddrConfirmed(_) => {
// TODO: we likely want to check this against our assumed external tcp
// address
// We have an external address confirmed, means we are able to do NAT traversal.
metrics::set_gauge_vec(&metrics::NAT_OPEN, &["libp2p"], 1);
}
_ => {
// NOTE: FromSwarm is a non exhaustive enum so updates should be based on release
Expand Down Expand Up @@ -243,33 +243,34 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// Check NAT if metrics are enabled
if self.network_globals.local_enr.read().udp4().is_some() {
metrics::check_nat();
}

// increment prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr)
}
};

self.update_connected_peer_metrics();
metrics::inc_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
}

Expand Down Expand Up @@ -339,22 +340,29 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge(&metrics::QUIC_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge(&metrics::TCP_PEERS_CONNECTED);
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
};
self.update_connected_peer_metrics();
// Legacy standard metrics.
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);
}
}
Expand Down
25 changes: 20 additions & 5 deletions common/system_health/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,25 @@ pub fn observe_system_health_vc(
}
}

/// Observes if NAT traversal is possible.
pub fn observe_nat() -> bool {
let discv5_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["discv5"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();

let libp2p_nat = lighthouse_network::metrics::get_int_gauge(
&lighthouse_network::metrics::NAT_OPEN,
&["libp2p"],
)
.map(|g| g.get() == 1)
.unwrap_or_default();

discv5_nat && libp2p_nat
}

/// Observes the Beacon Node system health.
pub fn observe_system_health_bn<TSpec: EthSpec>(
sysinfo: Arc<RwLock<System>>,
Expand All @@ -223,11 +242,7 @@ pub fn observe_system_health_bn<TSpec: EthSpec>(
.unwrap_or_else(|| (String::from("None"), 0, 0));

// Determine if the NAT is open or not.
let nat_open = lighthouse_network::metrics::NAT_OPEN
.as_ref()
.map(|v| v.get())
.unwrap_or(0)
!= 0;
let nat_open = observe_nat();

SystemHealthBN {
system_health,
Expand Down
Loading