Skip to content

Commit

Permalink
Avoid multiple QueueId HashMaps for RuntimeInfo
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Jun 27, 2024
1 parent 95c7998 commit b82b2a8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 82 deletions.
41 changes: 19 additions & 22 deletions sidecar/src/service/runtime_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use manual_future::{ManualFuture, ManualFutureCompleter};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, MutexGuard};
use tracing::{debug, info};
use ddcommon::tag::Tag;

type AppMap = HashMap<(String, String), Shared<ManualFuture<Option<AppInstance>>>>;

Expand All @@ -27,18 +28,26 @@ pub(crate) struct SharedAppManualFut {

/// `RuntimeInfo` is a struct that contains information about a runtime.
/// It contains a map of apps and a map of app or actions.
/// Each app is represented by a shared future that may contain an `Option<AppInstance>`.
/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary
/// because service and env names are not known until later in the initialization process.
#[derive(Clone, Default)]
pub(crate) struct RuntimeInfo {
pub(crate) apps: Arc<Mutex<AppMap>>,
app_or_actions: Arc<Mutex<HashMap<QueueId, AppOrQueue>>>,
applications: Arc<Mutex<HashMap<QueueId, ActiveApplication>>>,
#[cfg(feature = "tracing")]
remote_config_guards: Arc<Mutex<HashMap<QueueId, RemoteConfigsGuard>>>,
pub(crate) instance_id: InstanceId,
}

/// `ActiveApplications` is a struct the contains information about a known in flight application.
/// Telemetry lifecycles (see `app_or_actions`) and remote_config `remote_config_guard` are bound to
/// it.
/// Each app is represented by a shared future that may contain an `Option<AppInstance>`.
/// Each action is represented by an `AppOrQueue` enum. Combining apps and actions are necessary
/// because service and env names are not known until later in the initialization process.
#[derive(Default)]
pub(crate) struct ActiveApplication {
pub app_or_actions: AppOrQueue,
pub remote_config_guard: Option<RemoteConfigsGuard>,
}

impl RuntimeInfo {
/// Retrieves the `AppInstance` for a given service name and environment name.
///
Expand Down Expand Up @@ -115,26 +124,14 @@ impl RuntimeInfo {
self.apps.lock().unwrap()
}

/// Locks the app or actions map and returns a mutable reference to it.
///
/// # Returns
///
/// * `MutexGuard<HashMap<QueueId, AppOrQueue>>` - A mutable reference to the app or actions
/// map.
pub(crate) fn lock_app_or_actions(&self) -> MutexGuard<HashMap<QueueId, AppOrQueue>> {
self.app_or_actions.lock().unwrap()
}

/// Locks the remote config guards map and returns a mutable reference to it.
/// Locks the applications map and returns a mutable reference to it.
///
/// # Returns
///
/// * `MutexGuard<HashMap<QueueId, RemoteConfigsGuard>>` - A mutable reference to the remote
/// config guards map.
pub(crate) fn lock_remote_config_guards(
&self,
) -> MutexGuard<HashMap<QueueId, RemoteConfigsGuard>> {
self.remote_config_guards.lock().unwrap()
/// * `MutexGuard<HashMap<QueueId, ActiveApplications>>` - A mutable reference to the
/// applications map.
pub(crate) fn lock_applications(&self) -> MutexGuard<HashMap<QueueId, ActiveApplication>> {
self.applications.lock().unwrap()
}
}

Expand Down
133 changes: 73 additions & 60 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::service::tracing::trace_flusher::TraceFlusherStats;
use datadog_ipc::platform::FileBackedHandle;
use datadog_ipc::tarpc::server::{Channel, InFlightRequest};
use datadog_remote_config::fetch::ConfigInvariants;
use crate::service::runtime_info::ActiveApplication;

type NoResponse = Ready<()>;

Expand All @@ -62,6 +63,7 @@ struct SidecarStats {
active_apps: u32,
enqueued_apps: u32,
enqueued_telemetry_data: EnqueuedTelemetryStats,
remote_config_clients: u32,
telemetry_metrics_contexts: u32,
telemetry_worker: TelemetryWorkerStats,
telemetry_worker_errors: u32,
Expand Down Expand Up @@ -167,7 +169,7 @@ impl SidecarServer {

async fn process_interceptor_response(
&self,
result: Result<(HashSet<String>, HashSet<InstanceId>), tokio::task::JoinError>,
result: Result<(HashSet<String>, HashSet<InstanceId>), JoinError>,
) {
match result {
Ok((sessions, instances)) => {
Expand Down Expand Up @@ -396,7 +398,7 @@ impl SidecarServer {
.map(|s| {
s.lock_runtimes()
.values()
.map(|r| r.lock_app_or_actions().len() as u32)
.map(|r| r.lock_applications().len() as u32)
.sum::<u32>()
})
.sum(),
Expand All @@ -406,9 +408,9 @@ impl SidecarServer {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_app_or_actions()
r.lock_applications()
.values()
.filter(|a| matches!(a, AppOrQueue::Queue(_)))
.filter(|a| matches!(a.app_or_actions, AppOrQueue::Queue(_)))
.count() as u32
})
.sum::<u32>()
Expand All @@ -420,9 +422,9 @@ impl SidecarServer {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_app_or_actions()
r.lock_applications()
.values()
.filter_map(|a| match a {
.filter_map(|a| match &a.app_or_actions {
AppOrQueue::Queue(q) => Some(q.stats()),
_ => None,
})
Expand All @@ -431,6 +433,20 @@ impl SidecarServer {
.sum()
})
.sum(),
remote_config_clients: sessions
.values()
.map(|s| {
s.lock_runtimes()
.values()
.map(|r| {
r.lock_applications()
.values()
.filter_map(|a| a.remote_config_guard.as_ref())
.count() as u32
})
.sum::<u32>()
})
.sum(),
telemetry_metrics_contexts: sessions
.values()
.map(|s| {
Expand Down Expand Up @@ -476,58 +492,63 @@ impl SidecarInterface for SidecarServer {
actions: Vec<SidecarAction>,
) -> Self::EnqueueActionsFut {
let rt_info = self.get_runtime(&instance_id);
let mut queue = rt_info.lock_app_or_actions();
match queue.entry(queue_id) {
Entry::Occupied(mut entry) => match entry.get_mut() {
AppOrQueue::Queue(ref mut data) => {
data.process(actions);
}
AppOrQueue::App(service_future) => {
let service_future = service_future.clone();
// drop on stop
if actions.iter().any(|action| {
matches!(
let mut applications = rt_info.lock_applications();
match applications.entry(queue_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
match value.app_or_actions {
AppOrQueue::Inactive => {
value.app_or_actions = AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions));
}
AppOrQueue::Queue(ref mut data) => {
data.process(actions);
}
AppOrQueue::App(ref service_future) => {
let service_future = service_future.clone();
// drop on stop
if actions.iter().any(|action| {
matches!(
action,
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop
))
)
}) {
entry.remove();
rt_info.lock_remote_config_guards().remove(&queue_id);
}
let apps = rt_info.apps.clone();
tokio::spawn(async move {
let service = service_future.await;
let app_future = if let Some(fut) = apps
.lock()
.expect("Unable to acquire lock on apps")
.get(&service)
{
fut.clone()
} else {
return;
};
if let Some(mut app) = app_future.await {
let actions =
EnqueuedTelemetryData::process_immediately(actions, &mut app).await;
app.telemetry.send_msgs(actions).await.ok();
}) {
entry.remove();
}
});
let apps = rt_info.apps.clone();
tokio::spawn(async move {
let service = service_future.await;
let app_future = if let Some(fut) = apps
.lock()
.expect("Unable to acquire lock on apps")
.get(&service)
{
fut.clone()
} else {
return;
};
if let Some(mut app) = app_future.await {
let actions = EnqueuedTelemetryData::process_immediately(actions, &mut app).await;
app.telemetry.send_msgs(actions).await.ok();
}
});
}
}
},
Entry::Vacant(entry) => {
if actions.len() == 1
&& matches!(
if actions.len() != 1
|| !matches!(
actions[0],
SidecarAction::Telemetry(TelemetryActions::Lifecycle(
LifecycleAction::Stop
))
)
{
rt_info.lock_remote_config_guards().remove(&queue_id);
} else {
entry.insert(AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)));
entry.insert(ActiveApplication {
app_or_actions: AppOrQueue::Queue(EnqueuedTelemetryData::processed(actions)),
..Default::default()
});
}
}
}
Expand All @@ -549,12 +570,12 @@ impl SidecarInterface for SidecarServer {
let (future, completer) = ManualFuture::new();
let app_or_queue = {
let rt_info = self.get_runtime(&instance_id);
let mut app_or_actions = rt_info.lock_app_or_actions();
match app_or_actions.get(&queue_id) {
Some(AppOrQueue::Queue(_)) => {
app_or_actions.insert(queue_id, AppOrQueue::App(future.shared()))
let mut applications = rt_info.lock_applications();
match applications.get_mut(&queue_id) {
Some(ActiveApplication { app_or_actions: ref mut app @ AppOrQueue::Queue(_), .. }) => {
Some(std::mem::replace(app, AppOrQueue::App(future.shared())))
}
None => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())),
None|Some(ActiveApplication { app_or_actions: AppOrQueue::Inactive, .. }) => Some(AppOrQueue::Queue(EnqueuedTelemetryData::default())),
_ => None,
}
};
Expand Down Expand Up @@ -590,10 +611,7 @@ impl SidecarInterface for SidecarServer {
matches!(action, TelemetryActions::Lifecycle(LifecycleAction::Stop))
}) {
self.get_runtime(&instance_id)
.lock_app_or_actions()
.remove(&queue_id);
self.get_runtime(&instance_id)
.lock_remote_config_guards()
.lock_applications()
.remove(&queue_id);
}

Expand Down Expand Up @@ -785,12 +803,8 @@ impl SidecarInterface for SidecarServer {
let notify_target = RemoteConfigNotifyTarget {
pid: session.pid.load(Ordering::Relaxed),
};
session
.get_runtime(&instance_id.runtime_id)
.lock_remote_config_guards()
.insert(
queue_id,
self.remote_configs.add_runtime(
let runtime_info = session.get_runtime(&instance_id.runtime_id);
runtime_info.lock_applications().entry(queue_id).or_default().remote_config_guard = Some(self.remote_configs.add_runtime(
session
.get_remote_config_invariants()
.as_ref()
Expand All @@ -801,8 +815,7 @@ impl SidecarInterface for SidecarServer {
env_name,
service_name,
app_version,
),
);
));

no_response()
}
Expand Down
3 changes: 3 additions & 0 deletions sidecar/src/service/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ pub mod enqueued_telemetry_data;
pub mod enqueued_telemetry_stats;

#[allow(clippy::large_enum_variant)]
#[derive(Default)]
pub(crate) enum AppOrQueue {
#[default]
Inactive,
App(Shared<ManualFuture<(String, String)>>),
Queue(EnqueuedTelemetryData),
}

0 comments on commit b82b2a8

Please sign in to comment.