Skip to content

Commit

Permalink
feat(runtime): Add metrics for watch events (#243)
Browse files Browse the repository at this point in the history
When the runtime watches a Kubernetes resource, it's useful to be able to
observe and monitor this activity.

This commit introduces a `Builder::with_metrics` method and a
`kubert::RuntimeMetrics` type to instrument the runtime with general
runtime metrics. Currently, these metrics only include watch metrics that
count the number of events and errors per resource type.

The watch_pods example has been updated to use these new metrics as well.

Signed-off-by: Alex Leong <[email protected]>
Co-authored-by: Oliver Gould <[email protected]>
  • Loading branch information
adleong and olix0r authored Apr 26, 2024
1 parent f200d64 commit ea68984
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ jobs:
- prometheus-client
- requeue
- runtime
- runtime,prometheus-client
- server
- "server rustls-tls"
- "server openssl-tls"
Expand Down
4 changes: 3 additions & 1 deletion examples/watch_pods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ async fn main() -> Result<()> {
let mut prom = prometheus_client::registry::Registry::default();

// Register application metrics before configuring the admin server.
let metrics = Metrics::register(prom.sub_registry_with_prefix("kubert_watch_pods"));
let metrics = Metrics::register(prom.sub_registry_with_prefix("example_watch_pods"));
let runtime_metrics = kubert::RuntimeMetrics::register(prom.sub_registry_with_prefix("kubert"));

// Configure a runtime with:
// - a Kubernetes client
Expand All @@ -80,6 +81,7 @@ async fn main() -> Result<()> {
let rt = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin.into_builder().with_prometheus(prom))
.with_metrics(runtime_metrics)
.with_client(client);

let deadline = time::Instant::now() + timeout;
Expand Down
3 changes: 3 additions & 0 deletions kubert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,8 @@ pub use self::log::{LogFilter, LogFormat, LogInitError};
#[cfg(feature = "runtime")]
pub use self::runtime::Runtime;

#[cfg(all(feature = "runtime", feature = "prometheus-client"))]
pub use self::runtime::RuntimeMetrics;

#[cfg(feature = "server")]
pub use self::server::ServerArgs;
47 changes: 47 additions & 0 deletions kubert/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use tower::Service;
pub use kube_client::Api;
pub use reflector::Store;

#[cfg(feature = "prometheus-client")]
mod metrics;

/// Configures a controller [`Runtime`]
#[derive(Debug, Default)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
Expand All @@ -34,6 +37,9 @@ pub struct Builder<S = NoServer> {
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,

#[cfg(feature = "prometheus-client")]
metrics: Option<RuntimeMetrics>,
}

/// Provides infrastructure for running:
Expand All @@ -59,12 +65,23 @@ pub struct Runtime<S = NoServer> {
server: S,
#[cfg(not(feature = "server"))]
server: std::marker::PhantomData<S>,

#[cfg(feature = "prometheus-client")]
metrics: Option<RuntimeMetrics>,
}

/// Indicates that no HTTPS server is configured
#[derive(Debug, Default)]
pub struct NoServer(());

/// Holds metrics for the runtime.
#[cfg(feature = "prometheus-client")]
#[must_use = "RuntimeMetrics must be passed to `Builder::with_metrics`"]
#[derive(Debug)]
pub struct RuntimeMetrics {
watch: metrics::ResourceWatchMetrics,
}

/// Indicates that the [`Builder`] could not configure a [`Runtime`]
#[derive(Debug, thiserror::Error)]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
Expand Down Expand Up @@ -126,6 +143,13 @@ impl<S> Builder<S> {
self
}

/// Configures the runtime to record watch metrics with the given registry
#[cfg(feature = "prometheus-client")]
pub fn with_metrics(mut self, metrics: RuntimeMetrics) -> Self {
self.metrics = Some(metrics);
self
}

#[inline]
async fn build_inner<F>(
self,
Expand All @@ -147,6 +171,8 @@ impl<S> Builder<S> {
initialized: Initialized::default(),
// Server must be built by `Builder::build`
server: self.server,
#[cfg(feature = "prometheus-client")]
metrics: self.metrics,
})
}
}
Expand All @@ -162,6 +188,7 @@ impl Builder<NoServer> {
client: self.client,
error_delay: self.error_delay,
log: self.log,
metrics: self.metrics,
}
}

Expand All @@ -177,6 +204,7 @@ impl Builder<NoServer> {
client: self.client,
error_delay: self.error_delay,
log: self.log,
metrics: self.metrics,
}
}
}
Expand Down Expand Up @@ -295,6 +323,11 @@ impl<S> Runtime<S> {
T::DynamicType: Default,
{
let watch = watcher::watcher(api, watcher_config);
#[cfg(feature = "prometheus-client")]
let watch = metrics::ResourceWatchMetrics::instrument_watch(
self.metrics.as_ref().map(|m| m.watch.clone()),
watch,
);
let successful = errors::LogAndSleep::fixed_delay(self.error_delay, watch);
let initialized = self.initialized.add_handle().release_on_ready(successful);
shutdown::CancelOnShutdown::new(self.shutdown_rx.clone(), initialized)
Expand Down Expand Up @@ -410,6 +443,7 @@ impl<S> Runtime<S> {
initialized: self.initialized,
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
metrics: self.metrics,
})
}

Expand All @@ -424,6 +458,7 @@ impl<S> Runtime<S> {
initialized: self.initialized,
shutdown_rx: self.shutdown_rx,
shutdown: self.shutdown,
metrics: self.metrics,
}
}
}
Expand Down Expand Up @@ -553,3 +588,15 @@ impl LogSettings {
self.format.try_init(self.filter)
}
}

// === impl RuntimeMetrics ===

#[cfg(feature = "prometheus-client")]
impl RuntimeMetrics {
/// Creates a new set of metrics and registers them.
pub fn register(registry: &mut prometheus_client::registry::Registry) -> Self {
let watch =
metrics::ResourceWatchMetrics::register(registry.sub_registry_with_prefix("watch"));
Self { watch }
}
}
131 changes: 131 additions & 0 deletions kubert/src/runtime/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use futures_core::Stream;
use futures_util::StreamExt;
use kube_core::Resource;
use kube_runtime::watcher;
use prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
metrics::{counter::Counter, family::Family},
registry::Registry,
};
use std::fmt::Debug;

/// Metrics for tracking resource watch events.
#[derive(Clone, Debug)]
pub(super) struct ResourceWatchMetrics {
watch_events: Family<EventLabels, Counter>,
watch_errors: Family<ErrorLabels, Counter>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct EventLabels {
op: EventOp,
kind: String,
group: String,
version: String,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct ErrorLabels {
kind: String,
group: String,
version: String,
error: &'static str,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
enum EventOp {
Apply,
Restart,
Delete,
}

impl ResourceWatchMetrics {
/// Creates a new set of metrics and registers them.
pub(super) fn register(registry: &mut Registry) -> Self {
let watch_events = Family::default();
registry.register(
"events",
"Count of apply events for a resource watch",
watch_events.clone(),
);

let watch_errors = Family::default();
registry.register(
"errors",
"Count of errors for a resource watch",
watch_errors.clone(),
);

Self {
watch_events,
watch_errors,
}
}
}

impl ResourceWatchMetrics {
pub(crate) fn instrument_watch<T, S: Stream<Item = watcher::Result<watcher::Event<T>>> + Send>(
metrics: Option<Self>,
watch: S,
) -> impl Stream<Item = watcher::Result<watcher::Event<T>>> + Send
where
T: Resource + Send,
T::DynamicType: Default,
{
let dt = Default::default();
let kind = T::kind(&dt).into_owned();
let group = T::group(&dt).into_owned();
let version = T::version(&dt).into_owned();
let apply_labels = EventLabels {
kind,
group,
version,
op: EventOp::Apply,
};
let restart_labels = EventLabels {
op: EventOp::Restart,
..apply_labels.clone()
};
let delete_labels = EventLabels {
op: EventOp::Delete,
..apply_labels.clone()
};
let error_labels = ErrorLabels {
kind: apply_labels.kind.clone(),
group: apply_labels.group.clone(),
version: apply_labels.version.clone(),
error: "", // replaced later
};

watch.map(move |event| {
if let Some(metrics) = &metrics {
match event {
Ok(watcher::Event::Restarted(_)) => {
metrics.watch_events.get_or_create(&restart_labels).inc();
}
Ok(watcher::Event::Applied(_)) => {
metrics.watch_events.get_or_create(&apply_labels).inc();
}
Ok(watcher::Event::Deleted(_)) => {
metrics.watch_events.get_or_create(&delete_labels).inc();
}
Err(ref e) => {
let labels = ErrorLabels {
error: match e {
watcher::Error::InitialListFailed(_) => "InitialListFailed",
watcher::Error::WatchStartFailed(_) => "WatchStartFailed",
watcher::Error::WatchError(_) => "WatchError",
watcher::Error::WatchFailed(_) => "WatchFailed",
watcher::Error::NoResourceVersion => "NoResourceVersion",
watcher::Error::TooManyObjects => "TooManyObjects",
},
..error_labels.clone()
};
metrics.watch_errors.get_or_create(&labels).inc();
}
};
}
event
})
}
}

0 comments on commit ea68984

Please sign in to comment.