diff --git a/dc/s2n-quic-dc/src/event/generated.rs b/dc/s2n-quic-dc/src/event/generated.rs index 7639587ee..f166a6c73 100644 --- a/dc/s2n-quic-dc/src/event/generated.rs +++ b/dc/s2n-quic-dc/src/event/generated.rs @@ -376,16 +376,73 @@ mod traits { } } } +pub mod metrics { + use super::*; + use core::sync::atomic::{AtomicU32, Ordering}; + use s2n_quic_core::event::metrics::Recorder; + #[derive(Clone, Debug)] + pub struct Subscriber + where + S::ConnectionContext: Recorder, + { + subscriber: S, + } + impl Subscriber + where + S::ConnectionContext: Recorder, + { + pub fn new(subscriber: S) -> Self { + Self { subscriber } + } + } + pub struct Context { + recorder: R, + frame_sent: AtomicU32, + } + impl super::Subscriber for Subscriber + where + S::ConnectionContext: Recorder, + { + type ConnectionContext = Context; + fn create_connection_context( + &self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + Context { + recorder: self.subscriber.create_connection_context(meta, info), + frame_sent: AtomicU32::new(0), + } + } + #[inline] + fn on_frame_sent( + &self, + context: &Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + context.frame_sent.fetch_add(1, Ordering::Relaxed); + self.subscriber + .on_frame_sent(&mut context.recorder, meta, event); + } + } + impl Drop for Context { + fn drop(&mut self) { + self.recorder + .increment_counter("frame_sent", self.frame_sent.load(Ordering::Relaxed) as _); + } + } +} #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::Mutex; #[derive(Clone, Debug)] pub struct Subscriber { location: Option, - output: Arc>>, - pub frame_sent: Arc, + output: Mutex>, + pub frame_sent: AtomicU32, } impl Drop for Subscriber { fn drop(&mut self) { @@ -410,7 +467,7 @@ pub mod testing { Self { location: None, output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), + frame_sent: AtomicU32::new(0), } } } @@ -428,7 +485,7 @@ pub mod testing { meta: &api::ConnectionMeta, event: &api::FrameSent, ) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); + self.frame_sent.fetch_add(1, Ordering::Relaxed); if self.location.is_some() { self.output .lock() @@ -440,8 +497,8 @@ pub mod testing { #[derive(Clone, Debug)] pub struct Publisher { location: Option, - output: Arc>>, - pub frame_sent: Arc, + output: Mutex>, + pub frame_sent: AtomicU32, } impl Publisher { #[doc = r" Creates a publisher with snapshot assertions enabled"] @@ -456,7 +513,7 @@ pub mod testing { Self { location: None, output: Default::default(), - frame_sent: Arc::new(AtomicU32::new(0)), + frame_sent: AtomicU32::new(0), } } } @@ -467,7 +524,7 @@ pub mod testing { } impl super::ConnectionPublisher for Publisher { fn on_frame_sent(&self, event: builder::FrameSent) { - self.frame_sent.fetch_add(1, Ordering::SeqCst); + self.frame_sent.fetch_add(1, Ordering::Relaxed); let event = event.into_event(); if self.location.is_some() { self.output.lock().unwrap().push(format!("{event:?}")); diff --git a/quic/s2n-quic-core/src/event.rs b/quic/s2n-quic-core/src/event.rs index 18feaf211..0d193c4e5 100644 --- a/quic/s2n-quic-core/src/event.rs +++ b/quic/s2n-quic-core/src/event.rs @@ -5,6 +5,7 @@ use crate::{connection, endpoint}; use core::{ops::RangeInclusive, time::Duration}; mod generated; +pub mod metrics; pub use generated::*; /// All event types which can be emitted from this library. diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 4463609f3..6c3ebd825 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -6834,6 +6834,703 @@ mod traits { } } } +pub mod metrics { + use super::*; + use crate::event::metrics::Recorder; + #[derive(Clone, Debug)] + pub struct Subscriber + where + S::ConnectionContext: Recorder, + { + subscriber: S, + } + impl Subscriber + where + S::ConnectionContext: Recorder, + { + pub fn new(subscriber: S) -> Self { + Self { subscriber } + } + } + pub struct Context { + recorder: R, + application_protocol_information: u32, + server_name_information: u32, + packet_skipped: u32, + packet_sent: u32, + packet_received: u32, + active_path_updated: u32, + path_created: u32, + frame_sent: u32, + frame_received: u32, + packet_lost: u32, + recovery_metrics: u32, + congestion: u32, + ack_processed: u32, + rx_ack_range_dropped: u32, + ack_range_received: u32, + ack_range_sent: u32, + packet_dropped: u32, + key_update: u32, + key_space_discarded: u32, + connection_started: u32, + connection_closed: u32, + duplicate_packet: u32, + transport_parameters_received: u32, + datagram_sent: u32, + datagram_received: u32, + datagram_dropped: u32, + connection_id_updated: u32, + ecn_state_changed: u32, + connection_migration_denied: u32, + handshake_status_updated: u32, + tls_exporter_ready: u32, + path_challenge_updated: u32, + tls_client_hello: u32, + tls_server_hello: u32, + rx_stream_progress: u32, + tx_stream_progress: u32, + keep_alive_timer_expired: u32, + mtu_updated: u32, + slow_start_exited: u32, + delivery_rate_sampled: u32, + pacing_rate_updated: u32, + bbr_state_changed: u32, + dc_state_changed: u32, + } + impl super::Subscriber for Subscriber + where + S::ConnectionContext: Recorder, + { + type ConnectionContext = Context; + fn create_connection_context( + &mut self, + meta: &api::ConnectionMeta, + info: &api::ConnectionInfo, + ) -> Self::ConnectionContext { + Context { + recorder: self.subscriber.create_connection_context(meta, info), + application_protocol_information: 0, + server_name_information: 0, + packet_skipped: 0, + packet_sent: 0, + packet_received: 0, + active_path_updated: 0, + path_created: 0, + frame_sent: 0, + frame_received: 0, + packet_lost: 0, + recovery_metrics: 0, + congestion: 0, + ack_processed: 0, + rx_ack_range_dropped: 0, + ack_range_received: 0, + ack_range_sent: 0, + packet_dropped: 0, + key_update: 0, + key_space_discarded: 0, + connection_started: 0, + connection_closed: 0, + duplicate_packet: 0, + transport_parameters_received: 0, + datagram_sent: 0, + datagram_received: 0, + datagram_dropped: 0, + connection_id_updated: 0, + ecn_state_changed: 0, + connection_migration_denied: 0, + handshake_status_updated: 0, + tls_exporter_ready: 0, + path_challenge_updated: 0, + tls_client_hello: 0, + tls_server_hello: 0, + rx_stream_progress: 0, + tx_stream_progress: 0, + keep_alive_timer_expired: 0, + mtu_updated: 0, + slow_start_exited: 0, + delivery_rate_sampled: 0, + pacing_rate_updated: 0, + bbr_state_changed: 0, + dc_state_changed: 0, + } + } + #[inline] + fn on_application_protocol_information( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ApplicationProtocolInformation, + ) { + context.application_protocol_information += 1; + self.subscriber + .on_application_protocol_information(&mut context.recorder, meta, event); + } + #[inline] + fn on_server_name_information( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ServerNameInformation, + ) { + context.server_name_information += 1; + self.subscriber + .on_server_name_information(&mut context.recorder, meta, event); + } + #[inline] + fn on_packet_skipped( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacketSkipped, + ) { + context.packet_skipped += 1; + self.subscriber + .on_packet_skipped(&mut context.recorder, meta, event); + } + #[inline] + fn on_packet_sent( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacketSent, + ) { + context.packet_sent += 1; + self.subscriber + .on_packet_sent(&mut context.recorder, meta, event); + } + #[inline] + fn on_packet_received( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacketReceived, + ) { + context.packet_received += 1; + self.subscriber + .on_packet_received(&mut context.recorder, meta, event); + } + #[inline] + fn on_active_path_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ActivePathUpdated, + ) { + context.active_path_updated += 1; + self.subscriber + .on_active_path_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_path_created( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PathCreated, + ) { + context.path_created += 1; + self.subscriber + .on_path_created(&mut context.recorder, meta, event); + } + #[inline] + fn on_frame_sent( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameSent, + ) { + context.frame_sent += 1; + self.subscriber + .on_frame_sent(&mut context.recorder, meta, event); + } + #[inline] + fn on_frame_received( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::FrameReceived, + ) { + context.frame_received += 1; + self.subscriber + .on_frame_received(&mut context.recorder, meta, event); + } + #[inline] + fn on_packet_lost( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacketLost, + ) { + context.packet_lost += 1; + self.subscriber + .on_packet_lost(&mut context.recorder, meta, event); + } + #[inline] + fn on_recovery_metrics( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::RecoveryMetrics, + ) { + context.recovery_metrics += 1; + self.subscriber + .on_recovery_metrics(&mut context.recorder, meta, event); + } + #[inline] + fn on_congestion( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::Congestion, + ) { + context.congestion += 1; + self.subscriber + .on_congestion(&mut context.recorder, meta, event); + } + #[inline] + #[allow(deprecated)] + fn on_ack_processed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::AckProcessed, + ) { + context.ack_processed += 1; + self.subscriber + .on_ack_processed(&mut context.recorder, meta, event); + } + #[inline] + fn on_rx_ack_range_dropped( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::RxAckRangeDropped, + ) { + context.rx_ack_range_dropped += 1; + self.subscriber + .on_rx_ack_range_dropped(&mut context.recorder, meta, event); + } + #[inline] + fn on_ack_range_received( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::AckRangeReceived, + ) { + context.ack_range_received += 1; + self.subscriber + .on_ack_range_received(&mut context.recorder, meta, event); + } + #[inline] + fn on_ack_range_sent( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::AckRangeSent, + ) { + context.ack_range_sent += 1; + self.subscriber + .on_ack_range_sent(&mut context.recorder, meta, event); + } + #[inline] + fn on_packet_dropped( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacketDropped, + ) { + context.packet_dropped += 1; + self.subscriber + .on_packet_dropped(&mut context.recorder, meta, event); + } + #[inline] + fn on_key_update( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::KeyUpdate, + ) { + context.key_update += 1; + self.subscriber + .on_key_update(&mut context.recorder, meta, event); + } + #[inline] + fn on_key_space_discarded( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::KeySpaceDiscarded, + ) { + context.key_space_discarded += 1; + self.subscriber + .on_key_space_discarded(&mut context.recorder, meta, event); + } + #[inline] + fn on_connection_started( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ConnectionStarted, + ) { + context.connection_started += 1; + self.subscriber + .on_connection_started(&mut context.recorder, meta, event); + } + #[inline] + fn on_connection_closed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ConnectionClosed, + ) { + context.connection_closed += 1; + self.subscriber + .on_connection_closed(&mut context.recorder, meta, event); + } + #[inline] + fn on_duplicate_packet( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DuplicatePacket, + ) { + context.duplicate_packet += 1; + self.subscriber + .on_duplicate_packet(&mut context.recorder, meta, event); + } + #[inline] + fn on_transport_parameters_received( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::TransportParametersReceived, + ) { + context.transport_parameters_received += 1; + self.subscriber + .on_transport_parameters_received(&mut context.recorder, meta, event); + } + #[inline] + fn on_datagram_sent( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DatagramSent, + ) { + context.datagram_sent += 1; + self.subscriber + .on_datagram_sent(&mut context.recorder, meta, event); + } + #[inline] + fn on_datagram_received( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DatagramReceived, + ) { + context.datagram_received += 1; + self.subscriber + .on_datagram_received(&mut context.recorder, meta, event); + } + #[inline] + fn on_datagram_dropped( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DatagramDropped, + ) { + context.datagram_dropped += 1; + self.subscriber + .on_datagram_dropped(&mut context.recorder, meta, event); + } + #[inline] + fn on_connection_id_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ConnectionIdUpdated, + ) { + context.connection_id_updated += 1; + self.subscriber + .on_connection_id_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_ecn_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::EcnStateChanged, + ) { + context.ecn_state_changed += 1; + self.subscriber + .on_ecn_state_changed(&mut context.recorder, meta, event); + } + #[inline] + fn on_connection_migration_denied( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::ConnectionMigrationDenied, + ) { + context.connection_migration_denied += 1; + self.subscriber + .on_connection_migration_denied(&mut context.recorder, meta, event); + } + #[inline] + fn on_handshake_status_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::HandshakeStatusUpdated, + ) { + context.handshake_status_updated += 1; + self.subscriber + .on_handshake_status_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_tls_exporter_ready( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::TlsExporterReady, + ) { + context.tls_exporter_ready += 1; + self.subscriber + .on_tls_exporter_ready(&mut context.recorder, meta, event); + } + #[inline] + fn on_path_challenge_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PathChallengeUpdated, + ) { + context.path_challenge_updated += 1; + self.subscriber + .on_path_challenge_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_tls_client_hello( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::TlsClientHello, + ) { + context.tls_client_hello += 1; + self.subscriber + .on_tls_client_hello(&mut context.recorder, meta, event); + } + #[inline] + fn on_tls_server_hello( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::TlsServerHello, + ) { + context.tls_server_hello += 1; + self.subscriber + .on_tls_server_hello(&mut context.recorder, meta, event); + } + #[inline] + fn on_rx_stream_progress( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::RxStreamProgress, + ) { + context.rx_stream_progress += 1; + self.subscriber + .on_rx_stream_progress(&mut context.recorder, meta, event); + } + #[inline] + fn on_tx_stream_progress( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::TxStreamProgress, + ) { + context.tx_stream_progress += 1; + self.subscriber + .on_tx_stream_progress(&mut context.recorder, meta, event); + } + #[inline] + fn on_keep_alive_timer_expired( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::KeepAliveTimerExpired, + ) { + context.keep_alive_timer_expired += 1; + self.subscriber + .on_keep_alive_timer_expired(&mut context.recorder, meta, event); + } + #[inline] + fn on_mtu_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::MtuUpdated, + ) { + context.mtu_updated += 1; + self.subscriber + .on_mtu_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_slow_start_exited( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::SlowStartExited, + ) { + context.slow_start_exited += 1; + self.subscriber + .on_slow_start_exited(&mut context.recorder, meta, event); + } + #[inline] + fn on_delivery_rate_sampled( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DeliveryRateSampled, + ) { + context.delivery_rate_sampled += 1; + self.subscriber + .on_delivery_rate_sampled(&mut context.recorder, meta, event); + } + #[inline] + fn on_pacing_rate_updated( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::PacingRateUpdated, + ) { + context.pacing_rate_updated += 1; + self.subscriber + .on_pacing_rate_updated(&mut context.recorder, meta, event); + } + #[inline] + fn on_bbr_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::BbrStateChanged, + ) { + context.bbr_state_changed += 1; + self.subscriber + .on_bbr_state_changed(&mut context.recorder, meta, event); + } + #[inline] + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DcStateChanged, + ) { + context.dc_state_changed += 1; + self.subscriber + .on_dc_state_changed(&mut context.recorder, meta, event); + } + } + impl Drop for Context { + fn drop(&mut self) { + self.recorder.increment_counter( + "application_protocol_information", + self.application_protocol_information as _, + ); + self.recorder + .increment_counter("server_name_information", self.server_name_information as _); + self.recorder + .increment_counter("packet_skipped", self.packet_skipped as _); + self.recorder + .increment_counter("packet_sent", self.packet_sent as _); + self.recorder + .increment_counter("packet_received", self.packet_received as _); + self.recorder + .increment_counter("active_path_updated", self.active_path_updated as _); + self.recorder + .increment_counter("path_created", self.path_created as _); + self.recorder + .increment_counter("frame_sent", self.frame_sent as _); + self.recorder + .increment_counter("frame_received", self.frame_received as _); + self.recorder + .increment_counter("packet_lost", self.packet_lost as _); + self.recorder + .increment_counter("recovery_metrics", self.recovery_metrics as _); + self.recorder + .increment_counter("congestion", self.congestion as _); + self.recorder + .increment_counter("ack_processed", self.ack_processed as _); + self.recorder + .increment_counter("rx_ack_range_dropped", self.rx_ack_range_dropped as _); + self.recorder + .increment_counter("ack_range_received", self.ack_range_received as _); + self.recorder + .increment_counter("ack_range_sent", self.ack_range_sent as _); + self.recorder + .increment_counter("packet_dropped", self.packet_dropped as _); + self.recorder + .increment_counter("key_update", self.key_update as _); + self.recorder + .increment_counter("key_space_discarded", self.key_space_discarded as _); + self.recorder + .increment_counter("connection_started", self.connection_started as _); + self.recorder + .increment_counter("connection_closed", self.connection_closed as _); + self.recorder + .increment_counter("duplicate_packet", self.duplicate_packet as _); + self.recorder.increment_counter( + "transport_parameters_received", + self.transport_parameters_received as _, + ); + self.recorder + .increment_counter("datagram_sent", self.datagram_sent as _); + self.recorder + .increment_counter("datagram_received", self.datagram_received as _); + self.recorder + .increment_counter("datagram_dropped", self.datagram_dropped as _); + self.recorder + .increment_counter("connection_id_updated", self.connection_id_updated as _); + self.recorder + .increment_counter("ecn_state_changed", self.ecn_state_changed as _); + self.recorder.increment_counter( + "connection_migration_denied", + self.connection_migration_denied as _, + ); + self.recorder.increment_counter( + "handshake_status_updated", + self.handshake_status_updated as _, + ); + self.recorder + .increment_counter("tls_exporter_ready", self.tls_exporter_ready as _); + self.recorder + .increment_counter("path_challenge_updated", self.path_challenge_updated as _); + self.recorder + .increment_counter("tls_client_hello", self.tls_client_hello as _); + self.recorder + .increment_counter("tls_server_hello", self.tls_server_hello as _); + self.recorder + .increment_counter("rx_stream_progress", self.rx_stream_progress as _); + self.recorder + .increment_counter("tx_stream_progress", self.tx_stream_progress as _); + self.recorder.increment_counter( + "keep_alive_timer_expired", + self.keep_alive_timer_expired as _, + ); + self.recorder + .increment_counter("mtu_updated", self.mtu_updated as _); + self.recorder + .increment_counter("slow_start_exited", self.slow_start_exited as _); + self.recorder + .increment_counter("delivery_rate_sampled", self.delivery_rate_sampled as _); + self.recorder + .increment_counter("pacing_rate_updated", self.pacing_rate_updated as _); + self.recorder + .increment_counter("bbr_state_changed", self.bbr_state_changed as _); + self.recorder + .increment_counter("dc_state_changed", self.dc_state_changed as _); + } + } +} #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; diff --git a/quic/s2n-quic-core/src/event/metrics.rs b/quic/s2n-quic-core/src/event/metrics.rs new file mode 100644 index 000000000..92547d0c4 --- /dev/null +++ b/quic/s2n-quic-core/src/event/metrics.rs @@ -0,0 +1,13 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +pub use super::generated::*; + +/// A Recorder should arrange to emit the properties and counters on Drop into some output. +pub trait Recorder: 'static + Send + Sync { + /// Registers a counter with the recorder instance + fn increment_counter(&self, name: &str, amount: usize); + + /// Associates a key/value pair with the recorder instance + fn set_value(&self, key: &str, value: V); +} diff --git a/quic/s2n-quic-events/src/main.rs b/quic/s2n-quic-events/src/main.rs index 97387e216..94d00b995 100644 --- a/quic/s2n-quic-events/src/main.rs +++ b/quic/s2n-quic-events/src/main.rs @@ -25,25 +25,32 @@ impl OutputMode { } fn counter_type(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc), + OutputMode::Ref => quote!(AtomicU32), OutputMode::Mut => quote!(u32), } } fn counter_init(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc::new(AtomicU32::new(0))), + OutputMode::Ref => quote!(AtomicU32::new(0)), OutputMode::Mut => quote!(0), } } fn counter_increment(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(.fetch_add(1, Ordering::SeqCst)), + OutputMode::Ref => quote!(.fetch_add(1, Ordering::Relaxed)), OutputMode::Mut => quote!(+= 1), } } + fn counter_load(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!(.load(Ordering::Relaxed)), + OutputMode::Mut => quote!(), + } + } + fn lock(&self) -> TokenStream { match self { OutputMode::Ref => quote!(.lock().unwrap()), @@ -55,7 +62,15 @@ impl OutputMode { match self { OutputMode::Ref => quote!( use core::sync::atomic::{AtomicU32, Ordering}; - use std::sync::{Arc, Mutex}; + ), + OutputMode::Mut => quote!(), + } + } + + fn mutex(&self) -> TokenStream { + match self { + OutputMode::Ref => quote!( + use std::sync::Mutex; ), OutputMode::Mut => quote!(), } @@ -63,7 +78,7 @@ impl OutputMode { fn testing_output_type(&self) -> TokenStream { match self { - OutputMode::Ref => quote!(Arc>>), + OutputMode::Ref => quote!(Mutex>), OutputMode::Mut => quote!(Vec), } } @@ -298,8 +313,13 @@ struct Output { pub subscriber_testing: TokenStream, pub endpoint_publisher_testing: TokenStream, pub connection_publisher_testing: TokenStream, + pub metrics_fields: TokenStream, + pub metrics_fields_init: TokenStream, + pub metrics_record: TokenStream, + pub subscriber_metrics: TokenStream, pub extra: TokenStream, pub mode: OutputMode, + pub s2n_quic_core_path: TokenStream, } impl ToTokens for Output { @@ -319,11 +339,17 @@ impl ToTokens for Output { subscriber_testing, endpoint_publisher_testing, connection_publisher_testing, + metrics_fields, + metrics_fields_init, + metrics_record, + subscriber_metrics, extra, mode, + s2n_quic_core_path, } = self; let imports = self.mode.imports(); + let mutex = self.mode.mutex(); let testing_output_type = self.mode.testing_output_type(); let lock = self.mode.lock(); let target_crate = self.mode.target_crate(); @@ -670,10 +696,56 @@ impl ToTokens for Output { } } + pub mod metrics { + use super::*; + #imports + use #s2n_quic_core_path::event::metrics::Recorder; + + #[derive(Clone, Debug)] + pub struct Subscriber + where S::ConnectionContext: Recorder { + subscriber: S, + } + + impl Subscriber + where S::ConnectionContext: Recorder { + pub fn new(subscriber: S) -> Self { + Self { subscriber } + } + } + + pub struct Context { + recorder: R, + #metrics_fields + } + + impl super::Subscriber for Subscriber + where S::ConnectionContext: Recorder { + type ConnectionContext = Context; + + fn create_connection_context(&#mode self, meta: &api::ConnectionMeta, info: &api::ConnectionInfo) -> Self::ConnectionContext { + Context { + recorder: self.subscriber.create_connection_context(meta, info), + #metrics_fields_init + } + } + + #subscriber_metrics + } + + impl Drop for Context { + fn drop(&mut self) { + #metrics_record + } + } + } + #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; #imports + #mutex + #[derive(Clone, Debug)] pub struct Subscriber { location: Option, @@ -844,6 +916,7 @@ struct EventInfo<'a> { input_path: &'a str, output_path: &'a str, output_mode: OutputMode, + s2n_quic_core_path: TokenStream, } fn main() -> Result<()> { @@ -858,6 +931,7 @@ fn main() -> Result<()> { "/../../dc/s2n-quic-dc/src/event/generated.rs" ), output_mode: OutputMode::Ref, + s2n_quic_core_path: quote!(s2n_quic_core), }, EventInfo { input_path: concat!(env!("CARGO_MANIFEST_DIR"), "/events/**/*.rs"), @@ -866,6 +940,7 @@ fn main() -> Result<()> { "/../s2n-quic-core/src/event/generated.rs" ), output_mode: OutputMode::Mut, + s2n_quic_core_path: quote!(crate), }, ]; @@ -880,6 +955,7 @@ fn main() -> Result<()> { let mut output = Output { mode: event_info.output_mode, + s2n_quic_core_path: event_info.s2n_quic_core_path, ..Default::default() }; diff --git a/quic/s2n-quic-events/src/parser.rs b/quic/s2n-quic-events/src/parser.rs index 948bee45b..469c50267 100644 --- a/quic/s2n-quic-events/src/parser.rs +++ b/quic/s2n-quic-events/src/parser.rs @@ -150,6 +150,7 @@ impl Struct { let counter_type = output.mode.counter_type(); let counter_init = output.mode.counter_init(); + let counter_load = output.mode.counter_load(); // add a counter for testing structs output.testing_fields.extend(quote!( @@ -282,6 +283,27 @@ impl Struct { } )); + // Metrics is only connection-level events + output.metrics_fields.extend(quote!( + #counter: #counter_type, + )); + output.metrics_fields_init.extend(quote!( + #counter: #counter_init, + )); + + output.metrics_record.extend(quote!( + self.recorder.increment_counter(#snake, self.#counter #counter_load as _); + )); + + output.subscriber_metrics.extend(quote!( + #[inline] + #allow_deprecated + fn #function(&#receiver self, context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) { + context.#counter #counter_increment; + self.subscriber.#function(&mut context.recorder, meta, event); + } + )); + output.subscriber_testing.extend(quote!( #allow_deprecated fn #function(&#receiver self, _context: &#receiver Self::ConnectionContext, meta: &api::ConnectionMeta, event: &api::#ident) {