diff --git a/sidecar/src/service/runtime_info.rs b/sidecar/src/service/runtime_info.rs index 9f2f8215d..dbfa8bd34 100644 --- a/sidecar/src/service/runtime_info.rs +++ b/sidecar/src/service/runtime_info.rs @@ -14,6 +14,7 @@ use manual_future::{ManualFuture, ManualFutureCompleter}; use std::collections::HashMap; use std::sync::{Arc, Mutex, MutexGuard}; use tracing::{debug, info}; +use ddcommon::tag::Tag; type AppMap = HashMap<(String, String), Shared>>>; @@ -27,18 +28,26 @@ pub(crate) struct SharedAppManualFut { /// `RuntimeInfo` is a struct that contains information about a runtime. /// It contains a map of apps and a map of app or actions. -/// Each app is represented by a shared future that may contain an `Option`. -/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary -/// because service and env names are not known until later in the initialization process. #[derive(Clone, Default)] pub(crate) struct RuntimeInfo { pub(crate) apps: Arc>, - app_or_actions: Arc>>, + applications: Arc>>, #[cfg(feature = "tracing")] - remote_config_guards: Arc>>, pub(crate) instance_id: InstanceId, } +/// `ActiveApplications` is a struct the contains information about a known in flight application. +/// Telemetry lifecycles (see `app_or_actions`) and remote_config `remote_config_guard` are bound to +/// it. +/// Each app is represented by a shared future that may contain an `Option`. +/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary +/// because service and env names are not known until later in the initialization process. +#[derive(Default)] +pub(crate) struct ActiveApplication { + pub app_or_actions: AppOrQueue, + pub remote_config_guard: Option, +} + impl RuntimeInfo { /// Retrieves the `AppInstance` for a given service name and environment name. /// @@ -115,26 +124,14 @@ impl RuntimeInfo { self.apps.lock().unwrap() } - /// Locks the app or actions map and returns a mutable reference to it. - /// - /// # Returns - /// - /// * `MutexGuard>` - A mutable reference to the app or actions - /// map. - pub(crate) fn lock_app_or_actions(&self) -> MutexGuard> { - self.app_or_actions.lock().unwrap() - } - - /// Locks the remote config guards map and returns a mutable reference to it. + /// Locks the applications map and returns a mutable reference to it. /// /// # Returns /// - /// * `MutexGuard>` - A mutable reference to the remote - /// config guards map. - pub(crate) fn lock_remote_config_guards( - &self, - ) -> MutexGuard> { - self.remote_config_guards.lock().unwrap() + /// * `MutexGuard>` - A mutable reference to the + /// applications map. + pub(crate) fn lock_applications(&self) -> MutexGuard> { + self.applications.lock().unwrap() } } diff --git a/sidecar/src/service/sidecar_server.rs b/sidecar/src/service/sidecar_server.rs index 077f95bbd..15893cd7b 100644 --- a/sidecar/src/service/sidecar_server.rs +++ b/sidecar/src/service/sidecar_server.rs @@ -45,6 +45,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats; use datadog_ipc::platform::FileBackedHandle; use datadog_ipc::tarpc::server::{Channel, InFlightRequest}; use datadog_remote_config::fetch::ConfigInvariants; +use crate::service::runtime_info::ActiveApplication; type NoResponse = Ready<()>; @@ -62,6 +63,7 @@ struct SidecarStats { active_apps: u32, enqueued_apps: u32, enqueued_telemetry_data: EnqueuedTelemetryStats, + remote_config_clients: u32, telemetry_metrics_contexts: u32, telemetry_worker: TelemetryWorkerStats, telemetry_worker_errors: u32, @@ -167,7 +169,7 @@ impl SidecarServer { async fn process_interceptor_response( &self, - result: Result<(HashSet, HashSet), tokio::task::JoinError>, + result: Result<(HashSet, HashSet), JoinError>, ) { match result { Ok((sessions, instances)) => { @@ -396,7 +398,7 @@ impl SidecarServer { .map(|s| { s.lock_runtimes() .values() - .map(|r| r.lock_app_or_actions().len() as u32) + .map(|r| r.lock_applications().len() as u32) .sum::() }) .sum(), @@ -406,9 +408,9 @@ impl SidecarServer { s.lock_runtimes() .values() .map(|r| { - r.lock_app_or_actions() + r.lock_applications() .values() - .filter(|a| matches!(a, AppOrQueue::Queue(_))) + .filter(|a| matches!(a.app_or_actions, AppOrQueue::Queue(_))) .count() as u32 }) .sum::() @@ -420,9 +422,9 @@ impl SidecarServer { s.lock_runtimes() .values() .map(|r| { - r.lock_app_or_actions() + r.lock_applications() .values() - .filter_map(|a| match a { + .filter_map(|a| match &a.app_or_actions { AppOrQueue::Queue(q) => Some(q.stats()), _ => None, }) @@ -431,6 +433,20 @@ impl SidecarServer { .sum() }) .sum(), + remote_config_clients: sessions + .values() + .map(|s| { + s.lock_runtimes() + .values() + .map(|r| { + r.lock_applications() + .values() + .filter_map(|a| a.remote_config_guard.as_ref()) + .count() as u32 + }) + .sum::() + }) + .sum(), telemetry_metrics_contexts: sessions .values() .map(|s| { @@ -476,58 +492,63 @@ impl SidecarInterface for SidecarServer { actions: Vec, ) -> Self::EnqueueActionsFut { let rt_info = self.get_runtime(&instance_id); - let mut queue = rt_info.lock_app_or_actions(); - match queue.entry(queue_id) { - Entry::Occupied(mut entry) => match entry.get_mut() { - AppOrQueue::Queue(ref mut data) => { - data.process(actions); - } - AppOrQueue::App(service_future) => { - let service_future = service_future.clone(); - // drop on stop - if actions.iter().any(|action| { - matches!( + let mut applications = rt_info.lock_applications(); + match applications.entry(queue_id) { + Entry::Occupied(mut entry) => { + let value = entry.get_mut(); + match value.app_or_actions { + AppOrQueue::Inactive => { + value.app_or_actions = AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)); + } + AppOrQueue::Queue(ref mut data) => { + data.process(actions); + } + AppOrQueue::App(ref service_future) => { + let service_future = service_future.clone(); + // drop on stop + if actions.iter().any(|action| { + matches!( action, SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop )) ) - }) { - entry.remove(); - rt_info.lock_remote_config_guards().remove(&queue_id); - } - let apps = rt_info.apps.clone(); - tokio::spawn(async move { - let service = service_future.await; - let app_future = if let Some(fut) = apps - .lock() - .expect("Unable to acquire lock on apps") - .get(&service) - { - fut.clone() - } else { - return; - }; - if let Some(mut app) = app_future.await { - let actions = - EnqueuedTelemetryData::process_immediately(actions, &mut app).await; - app.telemetry.send_msgs(actions).await.ok(); + }) { + entry.remove(); } - }); + let apps = rt_info.apps.clone(); + tokio::spawn(async move { + let service = service_future.await; + let app_future = if let Some(fut) = apps + .lock() + .expect("Unable to acquire lock on apps") + .get(&service) + { + fut.clone() + } else { + return; + }; + if let Some(mut app) = app_future.await { + let actions = EnqueuedTelemetryData::process_immediately(actions, &mut app).await; + app.telemetry.send_msgs(actions).await.ok(); + } + }); + } } }, Entry::Vacant(entry) => { - if actions.len() == 1 - && matches!( + if actions.len() != 1 + || !matches!( actions[0], SidecarAction::Telemetry(TelemetryActions::Lifecycle( LifecycleAction::Stop )) ) { - rt_info.lock_remote_config_guards().remove(&queue_id); - } else { - entry.insert(AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions))); + entry.insert(ActiveApplication { + app_or_actions: AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)), + ..Default::default() + }); } } } @@ -549,12 +570,12 @@ impl SidecarInterface for SidecarServer { let (future, completer) = ManualFuture::new(); let app_or_queue = { let rt_info = self.get_runtime(&instance_id); - let mut app_or_actions = rt_info.lock_app_or_actions(); - match app_or_actions.get(&queue_id) { - Some(AppOrQueue::Queue(_)) => { - app_or_actions.insert(queue_id, AppOrQueue::App(future.shared())) + let mut applications = rt_info.lock_applications(); + match applications.get_mut(&queue_id) { + Some(ActiveApplication { app_or_actions: ref mut app @ AppOrQueue::Queue(_), .. }) => { + Some(std::mem::replace(app, AppOrQueue::App(future.shared()))) } - None => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())), + None|Some(ActiveApplication { app_or_actions: AppOrQueue::Inactive, .. }) => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())), _ => None, } }; @@ -590,10 +611,7 @@ impl SidecarInterface for SidecarServer { matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop)) }) { self.get_runtime(&instance_id) - .lock_app_or_actions() - .remove(&queue_id); - self.get_runtime(&instance_id) - .lock_remote_config_guards() + .lock_applications() .remove(&queue_id); } @@ -785,12 +803,8 @@ impl SidecarInterface for SidecarServer { let notify_target = RemoteConfigNotifyTarget { pid: session.pid.load(Ordering::Relaxed), }; - session - .get_runtime(&instance_id.runtime_id) - .lock_remote_config_guards() - .insert( - queue_id, - self.remote_configs.add_runtime( + let runtime_info = session.get_runtime(&instance_id.runtime_id); + runtime_info.lock_applications().entry(queue_id).or_default().remote_config_guard = Some(self.remote_configs.add_runtime( session .get_remote_config_invariants() .as_ref() @@ -801,8 +815,7 @@ impl SidecarInterface for SidecarServer { env_name, service_name, app_version, - ), - ); + )); no_response() } diff --git a/sidecar/src/service/telemetry/mod.rs b/sidecar/src/service/telemetry/mod.rs index a609f5d6c..99d61a218 100644 --- a/sidecar/src/service/telemetry/mod.rs +++ b/sidecar/src/service/telemetry/mod.rs @@ -11,7 +11,10 @@ pub mod enqueued_telemetry_data; pub mod enqueued_telemetry_stats; #[allow(clippy::large_enum_variant)] +#[derive(Default)] pub(crate) enum AppOrQueue { + #[default] + Inactive, App(Shared>), Queue(EnqueuedTelemetryData), }