Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(swarm): expose ConnectionId and add conn duration metric #3927

Merged
merged 10 commits into from
May 17, 2023
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ fn main() -> Result<(), Box<dyn Error>> {
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
_ => {}
Expand Down
5 changes: 4 additions & 1 deletion examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@ impl EventLoop {
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"),
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} => eprintln!("Dialing {peer_id}"),
e => panic!("{e:?}"),
}
}
Expand Down
5 changes: 5 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
- Raise MSRV to 1.65.
See [PR 3715].

- Replace `libp2p_swarm_connections_closed` `Counter` with `libp2p_swarm_connections_duration` `Histogram` which additionally tracks the duration of a connection.
Note that you can use the `_count` metric of the `Histogram` as a replacement for the `Counter`.
See [PR 3927].

[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3927]: https://github.com/libp2p/rust-libp2p/pull/3927
[PR 3325]: https://github.com/libp2p/rust-libp2p/pull/3325

## 0.12.0
Expand Down
3 changes: 2 additions & 1 deletion misc/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ relay = ["libp2p-relay"]
dcutr = ["libp2p-dcutr"]

[dependencies]
instant = "0.1.11"
libp2p-core = { workspace = true }
libp2p-dcutr = { workspace = true, optional = true }
libp2p-identify = { workspace = true, optional = true }
Expand All @@ -27,7 +28,7 @@ libp2p-ping = { workspace = true, optional = true }
libp2p-relay = { workspace = true, optional = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
prometheus-client = { version = "0.21.0" }
prometheus-client = { version = "0.21.1"}
once_cell = "1.16.0"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
Expand Down
107 changes: 77 additions & 30 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use crate::protocol_stack;
use instant::Instant;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use prometheus_client::registry::{Registry, Unit};

pub(crate) struct Metrics {
connections_incoming: Family<AddressLabels, Counter>,
connections_incoming_error: Family<IncomingConnectionErrorLabels, Counter>,

connections_established: Family<ConnectionEstablishedLabels, Counter>,
connections_establishment_duration: Family<ConnectionEstablishmentDurationLabels, Histogram>,
connections_closed: Family<ConnectionClosedLabels, Counter>,
connections_established: Family<ConnectionLabels, Counter>,
connections_establishment_duration: Family<ConnectionLabels, Histogram>,
connections_duration: Family<ConnectionClosedLabels, Histogram>,

new_listen_addr: Family<AddressLabels, Counter>,
expired_listen_addr: Family<AddressLabels, Counter>,
Expand All @@ -41,6 +46,8 @@ pub(crate) struct Metrics {

dial_attempt: Counter,
outgoing_connection_error: Family<OutgoingConnectionErrorLabels, Counter>,

connections: Arc<Mutex<HashMap<ConnectionId, Instant>>>,
}

impl Metrics {
Expand Down Expand Up @@ -110,34 +117,42 @@ impl Metrics {
connections_established.clone(),
);

let connections_closed = Family::default();
let connections_establishment_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 1.5, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register(
"connections_closed",
"Number of connections closed",
connections_closed.clone(),
"connections_establishment_duration",
"Time it took (locally) to establish connections",
connections_establishment_duration.clone(),
);

let connections_establishment_duration = Family::new_with_constructor(
create_connection_establishment_duration_histogram as fn() -> Histogram,
);
sub_registry.register(
let connections_duration = {
let constructor: fn() -> Histogram =
|| Histogram::new(exponential_buckets(0.01, 3.0, 20));
Family::new_with_constructor(constructor)
};
sub_registry.register_with_unit(
"connections_establishment_duration",
"Time it took (locally) to establish connections",
Unit::Seconds,
connections_establishment_duration.clone(),
);

Self {
connections_incoming,
connections_incoming_error,
connections_established,
connections_closed,
new_listen_addr,
expired_listen_addr,
listener_closed,
listener_error,
dial_attempt,
outgoing_connection_error,
connections_establishment_duration,
connections_duration,
connections: Default::default(),
}
}
}
Expand All @@ -149,24 +164,44 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ConnectionEstablished {
endpoint,
established_in: time_taken,
connection_id,
..
} => {
let labels = ConnectionEstablishedLabels {
let labels = ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
};
self.connections_established.get_or_create(&labels).inc();
self.connections_establishment_duration
.get_or_create(&labels)
.observe(time_taken.as_secs_f64());
self.connections
.lock()
.expect("lock not to be poisoned")
.insert(*connection_id, Instant::now());
}
libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => {
self.connections_closed
.get_or_create(&ConnectionClosedLabels {
libp2p_swarm::SwarmEvent::ConnectionClosed {
endpoint,
connection_id,
cause,
..
} => {
let labels = ConnectionClosedLabels {
connection: ConnectionLabels {
role: endpoint.into(),
protocols: protocol_stack::as_string(endpoint.get_remote_address()),
})
.inc();
},
cause: cause.as_ref().map(Into::into),
};
self.connections_duration.get_or_create(&labels).observe(
self.connections
.lock()
.expect("lock not to be poisoned")
.remove(connection_id)
.expect("closed connection to previously be established")
.elapsed()
.as_secs_f64(),
);
}
libp2p_swarm::SwarmEvent::IncomingConnection { send_back_addr, .. } => {
self.connections_incoming
Expand All @@ -187,7 +222,7 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
})
.inc();
}
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id } => {
libp2p_swarm::SwarmEvent::OutgoingConnectionError { error, peer_id, .. } => {
let peer = match peer_id {
Some(_) => PeerStatus::Known,
None => PeerStatus::Unknown,
Expand Down Expand Up @@ -261,25 +296,41 @@ impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleE
libp2p_swarm::SwarmEvent::ListenerError { .. } => {
self.listener_error.inc();
}
libp2p_swarm::SwarmEvent::Dialing(_) => {
libp2p_swarm::SwarmEvent::Dialing { .. } => {
self.dial_attempt.inc();
}
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionEstablishedLabels {
struct ConnectionLabels {
role: Role,
protocols: String,
}

type ConnectionEstablishmentDurationLabels = ConnectionEstablishedLabels;

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct ConnectionClosedLabels {
role: Role,
protocols: String,
cause: Option<ConnectionError>,
#[prometheus(flatten)]
connection: ConnectionLabels,
}

#[derive(EncodeLabelValue, Hash, Clone, Eq, PartialEq, Debug)]
enum ConnectionError {
Io,
KeepAliveTimeout,
Handler,
}

impl<E> From<&libp2p_swarm::ConnectionError<E>> for ConnectionError {
fn from(value: &libp2p_swarm::ConnectionError<E>) -> Self {
match value {
libp2p_swarm::ConnectionError::IO(_) => ConnectionError::Io,
libp2p_swarm::ConnectionError::KeepAliveTimeout => ConnectionError::KeepAliveTimeout,
libp2p_swarm::ConnectionError::Handler(_) => ConnectionError::Handler,
}
}
}

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -359,7 +410,3 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError {
}
}
}

fn create_connection_establishment_duration_histogram() -> Histogram {
Histogram::new(exponential_buckets(0.01, 1.5, 20))
}
18 changes: 14 additions & 4 deletions protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,18 @@ async fn test_dial_back() {
num_established,
concurrent_dial_errors,
established_in: _,
connection_id: _,
} => {
assert_eq!(peer_id, client_id);
assert_eq!(num_established, NonZeroU32::new(2).unwrap());
assert!(concurrent_dial_errors.unwrap().is_empty());
assert_eq!(address, expect_addr);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down Expand Up @@ -143,12 +147,15 @@ async fn test_dial_error() {

loop {
match server.next_swarm_event().await {
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
assert_eq!(peer_id.unwrap(), client_id);
assert!(matches!(error, DialError::Transport(_)));
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down Expand Up @@ -307,7 +314,10 @@ async fn test_dial_multiple_addr() {
assert_eq!(address, dial_addresses[1]);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::Dialing {
peer_id: Some(peer),
..
} => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } => {}
other => panic!("Unexpected swarm event: {other:?}."),
}
Expand Down
5 changes: 4 additions & 1 deletion protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ async fn wait_for_reservation(
break;
}
}
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{e:?}"),
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/perf/src/bin/perf-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn main() -> Result<()> {
let server_peer_id = loop {
match swarm.next().await.unwrap() {
SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id,
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
bail!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
e => panic!("{e:?}"),
Expand All @@ -113,7 +113,7 @@ async fn main() -> Result<()> {
} => {
info!("Established connection to {:?} via {:?}", peer_id, endpoint);
}
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
info!("Outgoing connection error to {:?}: {:?}", peer_id, error);
}
SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?,
Expand Down
2 changes: 1 addition & 1 deletion protocols/perf/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn perf() {
.wait(|e| match e {
SwarmEvent::IncomingConnection { .. } => panic!(),
SwarmEvent::ConnectionEstablished { .. } => None,
SwarmEvent::Dialing(_) => None,
SwarmEvent::Dialing { .. } => None,
SwarmEvent::Behaviour(client::Event { result, .. }) => Some(result),
e => panic!("{e:?}"),
})
Expand Down
10 changes: 8 additions & 2 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ async fn connection_established_to(
) {
loop {
match swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => {
break
Expand Down Expand Up @@ -419,7 +422,10 @@ async fn wait_for_reservation(
async fn wait_for_dial(client: &mut Swarm<Client>, remote: PeerId) -> bool {
loop {
match client.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == remote => {}
SwarmEvent::Dialing {
peer_id: Some(peer_id),
..
} if peer_id == remote => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == remote => return true,
SwarmEvent::OutgoingConnectionError { peer_id, .. } if peer_id == Some(remote) => {
return false
Expand Down
Loading