Skip to content

Commit

Permalink
feat(s2n-quic-events): Adds metrics subscriber (#2335)
Browse files Browse the repository at this point in the history
  • Loading branch information
maddeleine authored Sep 28, 2024
1 parent a2148a9 commit 246ba90
Show file tree
Hide file tree
Showing 6 changed files with 880 additions and 14 deletions.
75 changes: 66 additions & 9 deletions dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,73 @@ mod traits {
}
}
}
pub mod metrics {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use s2n_quic_core::event::metrics::Recorder;
#[derive(Clone, Debug)]
pub struct Subscriber<S: super::Subscriber>
where
S::ConnectionContext: Recorder,
{
subscriber: S,
}
impl<S: super::Subscriber> Subscriber<S>
where
S::ConnectionContext: Recorder,
{
pub fn new(subscriber: S) -> Self {
Self { subscriber }
}
}
pub struct Context<R: Recorder> {
recorder: R,
frame_sent: AtomicU32,
}
impl<S: super::Subscriber> super::Subscriber for Subscriber<S>
where
S::ConnectionContext: Recorder,
{
type ConnectionContext = Context<S::ConnectionContext>;
fn create_connection_context(
&self,
meta: &api::ConnectionMeta,
info: &api::ConnectionInfo,
) -> Self::ConnectionContext {
Context {
recorder: self.subscriber.create_connection_context(meta, info),
frame_sent: AtomicU32::new(0),
}
}
#[inline]
fn on_frame_sent(
&self,
context: &Self::ConnectionContext,
meta: &api::ConnectionMeta,
event: &api::FrameSent,
) {
context.frame_sent.fetch_add(1, Ordering::Relaxed);
self.subscriber
.on_frame_sent(&mut context.recorder, meta, event);
}
}
impl<R: Recorder> Drop for Context<R> {
fn drop(&mut self) {
self.recorder
.increment_counter("frame_sent", self.frame_sent.load(Ordering::Relaxed) as _);
}
}
}
#[cfg(any(test, feature = "testing"))]
pub mod testing {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
#[derive(Clone, Debug)]
pub struct Subscriber {
location: Option<Location>,
output: Arc<Mutex<Vec<String>>>,
pub frame_sent: Arc<AtomicU32>,
output: Mutex<Vec<String>>,
pub frame_sent: AtomicU32,
}
impl Drop for Subscriber {
fn drop(&mut self) {
Expand All @@ -410,7 +467,7 @@ pub mod testing {
Self {
location: None,
output: Default::default(),
frame_sent: Arc::new(AtomicU32::new(0)),
frame_sent: AtomicU32::new(0),
}
}
}
Expand All @@ -428,7 +485,7 @@ pub mod testing {
meta: &api::ConnectionMeta,
event: &api::FrameSent,
) {
self.frame_sent.fetch_add(1, Ordering::SeqCst);
self.frame_sent.fetch_add(1, Ordering::Relaxed);
if self.location.is_some() {
self.output
.lock()
Expand All @@ -440,8 +497,8 @@ pub mod testing {
#[derive(Clone, Debug)]
pub struct Publisher {
location: Option<Location>,
output: Arc<Mutex<Vec<String>>>,
pub frame_sent: Arc<AtomicU32>,
output: Mutex<Vec<String>>,
pub frame_sent: AtomicU32,
}
impl Publisher {
#[doc = r" Creates a publisher with snapshot assertions enabled"]
Expand All @@ -456,7 +513,7 @@ pub mod testing {
Self {
location: None,
output: Default::default(),
frame_sent: Arc::new(AtomicU32::new(0)),
frame_sent: AtomicU32::new(0),
}
}
}
Expand All @@ -467,7 +524,7 @@ pub mod testing {
}
impl super::ConnectionPublisher for Publisher {
fn on_frame_sent(&self, event: builder::FrameSent) {
self.frame_sent.fetch_add(1, Ordering::SeqCst);
self.frame_sent.fetch_add(1, Ordering::Relaxed);
let event = event.into_event();
if self.location.is_some() {
self.output.lock().unwrap().push(format!("{event:?}"));
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{connection, endpoint};
use core::{ops::RangeInclusive, time::Duration};

mod generated;
pub mod metrics;
pub use generated::*;

/// All event types which can be emitted from this library.
Expand Down
Loading

0 comments on commit 246ba90

Please sign in to comment.