Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin: Include tokio runtime metrics in the default metrics export #215

Merged
merged 11 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
rustdocflags = ["--cfg", "tokio_unstable"]
28 changes: 16 additions & 12 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
{
"name": "kubert",
"image": "ghcr.io/linkerd/dev:v42",
"extensions": [
"kokakiwi.vscode-just",
"NathanRidley.autotrim",
"rust-lang.rust-analyzer",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"samverschueren.final-newline",
"tamasfe.even-better-toml"
],
"runArgs": [
"--init",
// Use the host network so we can access k3d, etc.
Expand All @@ -19,9 +11,6 @@
],
"overrideCommand": false,
"remoteUser": "code",
"containerEnv": {
"CXX": "clang++-14",
},
"mounts": [
{
"source": "/var/run/docker.sock",
Expand All @@ -33,5 +22,20 @@
"target": "/home/code/.docker",
"type": "bind"
}
]
],
"containerEnv": {
"CXX": "clang++-14",
},
"customizations": {
"vscode": {
"extensions": [
"kokakiwi.vscode-just",
"NathanRidley.autotrim",
"rust-lang.rust-analyzer",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"samverschueren.final-newline",
"tamasfe.even-better-toml"
]
}
}
}
86 changes: 86 additions & 0 deletions .github/workflows/release-prometheus-tokio.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
name: Release kubernetes-prometheus-tokio

on:
pull_request:
paths:
- .github/workflows/release-prometheus-tokio.yml
push:
tags:
- 'kubert-prometheus-tokio/*'

env:
CARGO_INCREMENTAL: 0
CARGO_NET_RETRY: 10
RUSTUP_MAX_RETRIES: 10

permissions:
contents: read

jobs:
cleanup:
runs-on: ubuntu-latest
permissions:
actions: write
steps:
- uses: styfle/cancel-workflow-action@01ce38bf961b4e243a6342cbade0dbc8ba3f0432
with:
all_but_latest: true
access_token: ${{ github.token }}

meta:
timeout-minutes: 5
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- id: meta
shell: bash
run: |
ref="${{ github.ref }}"
if [[ "$ref" == refs/tags/kubert-prometheus-tokio/* ]]; then
version="${ref##refs/tags/kubert-prometheus-tokio/}"
crate=$(just-cargo crate-version kubert-prometheus-tokio)
if [[ "$crate" != "$version" ]]; then
echo "::error ::Crate version $crate does not match tag $version" >&2
exit 1
fi
( echo version="$version"
echo mode=release
) >> "$GITHUB_OUTPUT"
else
sha="${{ github.sha }}"
( echo version="$(just-cargo crate-version kubert-prometheus-tokio)-git-${sha:0:7}"
echo mode=test
) >> "$GITHUB_OUTPUT"
fi
outputs:
mode: ${{ steps.meta.outputs.mode }}
version: ${{ steps.meta.outputs.version }}

release:
needs: [meta]
permissions:
contents: write
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- if: needs.meta.outputs.mode == 'release'
uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844
with:
name: kubert-prometheus-tokio ${{ needs.meta.outputs.version }}
generate_release_notes: true

crate:
# Only publish the crate after the rest of the release succeeds.
needs: [meta, release]
timeout-minutes: 10
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
env:
RUSTFLAGS: '--cfg tokio_unstable'
RUSTDOCFLAGS: '--cfg tokio_unstable'
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- run: cargo publish --package=kubert-prometheus-tokio --dry-run
- if: needs.meta.outputs.mode == 'release'
run: cargo publish --package=kubert-prometheus-tokio --token=${{ secrets.CRATESIO_TOKEN }}
3 changes: 3 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ jobs:
timeout-minutes: 10
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
env:
RUSTFLAGS: '--cfg tokio_unstable'
RUSTDOCFLAGS: '--cfg tokio_unstable'
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- run: cargo publish --package=kubert --dry-run
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
[workspace]
resolver = "2"
default-members = ["kubert", "kubert-prometheus-process"]
default-members = [
"kubert",
"kubert-prometheus-process",
"kubert-prometheus-tokio",
]
members = [
"kubert",
"kubert-prometheus-process",
"kubert-prometheus-tokio",
"examples",
]
1 change: 1 addition & 0 deletions examples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ COPY . .
RUN --mount=type=cache,target=/usr/local/cargo/registry \
CARGO_NET_RETRY=10 just-cargo fetch
ARG FEATURES="rustls-tls"
ENV RUSTFLAGS="--cfg tokio_unstable"
RUN --mount=type=cache,target=/usr/local/cargo/registry \
CARGO_INCREMENTAL=0 just-cargo build \
--frozen --package=kubert-examples --examples \
Expand Down
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ _features := if features == "all" {
# Required to build openssl
export CXX := 'clang++-14'

# Enable tokio-metrics
export RUSTFLAGS := env_var_or_default('RUSTFLAGS', '--cfg tokio_unstable')
export RUSTDOCFLAGS := env_var_or_default('RUSTDOCFLAGS', '--cfg tokio_unstable')

#
# Recipes
#
Expand Down
19 changes: 19 additions & 0 deletions kubert-prometheus-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "kubert-prometheus-tokio"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
description = "A prometheus-client tokio runtime metrics collector"
readme = "../README.md"
repository = "https://github.com/olix0r/kubert"
rust-version = "1.65"
keywords = ["prometheus-client", "rokio", "metrics", "monitoring"]

[features]
rt = ["tokio/rt", "tokio/time", "tokio-metrics/rt"]

[dependencies]
prometheus-client = "0.22"
tokio = "1"
tokio-metrics = "0.3"
tracing = "0.1"
186 changes: 186 additions & 0 deletions kubert-prometheus-tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//! A `tokio-metrics` exporter for `prometheus-client`.

#![deny(rust_2018_idioms, missing_docs, warnings)]
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(all(feature = "rt", not(tokio_unstable)))]
compile_error!("RUSTFLAGS='--cfg tokio_unstable' must be set to use `tokio-metrics/rt`");

#[cfg(all(feature = "rt", tokio_unstable))]
pub use self::rt::Runtime;

#[cfg(all(feature = "rt", tokio_unstable))]
mod rt {
use prometheus_client::{
metrics::{counter::Counter, gauge::Gauge},
registry::{Registry, Unit},
};
use tokio::time;
use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};

/// Tokio runtime metrics.
///
/// NOTE that this module requires unstable tokio functionality that must be
/// enabled via the `tokio_unstable` feature. When it is not enabled, no metrics
/// will be registered.
///
/// `RUSTFLAGS="--cfg tokio_unstable"` must be set at build-time to use this featur
#[derive(Debug)]
pub struct Runtime {
runtime: tokio::runtime::Handle,
metrics: Metrics,
}

#[derive(Debug, Default)]
struct Metrics {
workers: Gauge,
park: Counter,
noop: Counter,
steal: Counter,
steal_operations: Counter,
remote_schedule: Counter,
local_schedule: Counter,
overflow: Counter,
polls: Counter,
busy: Counter<f64>,
injection_queue_depth: Gauge,
local_queue_depth: Gauge,
budget_forced_yield: Counter,
io_driver_ready: Counter,
// TODO poll_count_histogram requires configuration
}

impl Runtime {
/// Registers Tokio runtime metrics with the given registry. Note that
/// metrics are NOT prefixed.
pub fn register(reg: &mut Registry, runtime: tokio::runtime::Handle) -> Self {
let metrics = Metrics::default();

reg.register(
"workers",
"The number of worker threads used by the runtime",
metrics.workers.clone(),
);

reg.register(
"park",
"Total number of times worker threads parked",
metrics.park.clone(),
);
reg.register(
"noop",
"Number of times workers unparked but found no new work",
metrics.noop.clone(),
);
reg.register(
"steal",
"Number of tasks stolen by workers from others",
metrics.steal.clone(),
);
reg.register(
"steal_operations",
"Number of times workers stole tasks from other",
metrics.steal_operations.clone(),
);

reg.register(
"remote_schedule",
"Total number of remote schedule operations",
metrics.remote_schedule.clone(),
);
reg.register(
"local_schedule",
"Total number of local schedule operations",
metrics.local_schedule.clone(),
);

reg.register(
"overflow",
"Total number of overflow operations",
metrics.overflow.clone(),
);
reg.register(
"polls",
"The number of tasks that have been polled across all worker threads",
metrics.polls.clone(),
);
reg.register_with_unit(
"busy",
"Total duration of time when worker threads were busy processing tasks",
Unit::Seconds,
metrics.busy.clone(),
);

reg.register(
"injection_queue_depth",
"The number of tasks currently scheduled in the runtime's injection queue",
metrics.injection_queue_depth.clone(),
);
reg.register(
"local_queue_depth",
"The total number of tasks currently scheduled in workers' local queues",
metrics.local_queue_depth.clone(),
);

reg.register(
"budget_forced_yield",
"Number of times a worker thread was forced to yield due to budget exhaustion",
metrics.budget_forced_yield.clone(),
);
reg.register(
"io_driver_ready",
"Number of times the IO driver was woken up",
metrics.io_driver_ready.clone(),
);

Self { runtime, metrics }
}

/// Drives metrics updates for a runtime according to a fixed interval.
pub async fn updated(&self, interval: &mut time::Interval) -> ! {
let mut probes = RuntimeMonitor::new(&self.runtime).intervals();
loop {
interval.tick().await;
self.metrics.probe(&mut probes);
}
}
}

impl Metrics {
#[tracing::instrument(skip_all, ret, level = tracing::Level::TRACE)]
fn probe(&self, probes: &mut RuntimeIntervals) {
let probe = probes.next().expect("runtime metrics stream must not end");

// Tokio-metrics tracks all of these values as rates so we have
// to turn them back into absolute counters:
self.park.inc_by(probe.total_park_count);
self.noop.inc_by(probe.total_noop_count);
self.steal.inc_by(probe.total_steal_count);
self.steal_operations.inc_by(probe.total_steal_operations);
self.remote_schedule.inc_by(probe.num_remote_schedules);
self.local_schedule.inc_by(probe.total_local_schedule_count);
self.overflow.inc_by(probe.total_overflow_count);
self.polls.inc_by(probe.total_polls_count);
self.busy.inc_by(probe.total_busy_duration.as_secs_f64());
self.io_driver_ready.inc_by(probe.io_driver_ready_count);

// Instantaneous gauges:
self.workers.set(probe.workers_count as i64);
self.injection_queue_depth
.set(probe.total_local_queue_depth as i64);
self.local_queue_depth
.set(probe.total_local_queue_depth as i64);

// Absolute counters need to be incremented by the delta:
if let Some(delta) = probe
.budget_forced_yield_count
.checked_sub(self.budget_forced_yield.get())
{
self.budget_forced_yield.inc_by(delta);
} else {
tracing::trace!("budget_forced_yield_count overflow");
}
}
}
}
Loading
Loading