Skip to content

Commit

Permalink
examples: Update lease to export metrics (#214)
Browse files Browse the repository at this point in the history
The lease example now exports the lease state as a metric:

    # HELP kubert_lease_claimed Indicates whether this instance is owns the lease.
    # TYPE kubert_lease_claimed gauge
    kubert_lease_claimed 1
    # HELP kubert_lease_claim_changes Counts changes of this process's claim of the lease.
    # TYPE kubert_lease_claim_changes counter
    kubert_lease_claim_changes_total 1

    # HELP kubert_lease_claimed Indicates whether this instance is owns the lease.
    # TYPE kubert_lease_claimed gauge
    kubert_lease_claimed 0
    # HELP kubert_lease_claim_changes Counts changes of this process's claim of the lease.
    # TYPE kubert_lease_claim_changes counter
    kubert_lease_claim_changes_total 2
  • Loading branch information
olix0r authored Dec 10, 2023
1 parent 30e1038 commit f741924
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 110 deletions.
273 changes: 164 additions & 109 deletions examples/lease.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![deny(warnings, rust_2018_idioms)]
#![forbid(unsafe_code)]

use anyhow::{bail, Result};
use anyhow::Result;
use k8s_openapi::{api::coordination::v1 as coordv1, apimachinery::pkg::apis::meta::v1 as metav1};
use kube::ResourceExt;
use tokio::time;
Expand Down Expand Up @@ -84,138 +84,193 @@ async fn main() -> Result<()> {
command,
} = Args::parse();

// Configure a runtime with:
// - a Kubernetes client
// - an admin server with /live and /ready endpoints
// - a tracing (logging) subscriber
let rt = kubert::Runtime::builder()
.with_log(log_level, log_format)
.with_admin(admin)
.with_client(client)
.build()
.await?;

let api = kube::Api::namespaced(rt.client(), &namespace);
let shutdown = rt.shutdown_handle();
let task = match command {
Command::Create { name } => tokio::spawn(async move {
let lease = api
.create(
&Default::default(),
&coordv1::Lease {
metadata: metav1::ObjectMeta {
name: Some(name),
namespace: Some(namespace),
.with_client(client);

match command {
Command::Create { name } => {
let rt = rt.build().await?;
let api = kube::Api::namespaced(rt.client(), &namespace);
run(rt, async move {
let lease = api
.create(
&Default::default(),
&coordv1::Lease {
metadata: metav1::ObjectMeta {
name: Some(name),
namespace: Some(namespace),
..Default::default()
},
..Default::default()
},
..Default::default()
},
)
.await?;
println!("Created lease: {}", lease.name_unchecked());
Ok::<_, kubert::lease::Error>(0)
}),

Command::Get { name } => tokio::spawn(async move {
let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
match lease.claimed().await {
Some(claim) => print_claim(&claim, &identity),
None => println!("? Unclaimed"),
}
Ok::<_, kubert::lease::Error>(0)
}),
)
.await?;
println!("Created lease: {}", lease.name_unchecked());
Ok::<_, kubert::lease::Error>(0)
})
.await
}

Command::Get { name } => {
let rt = rt.build().await?;
let api = kube::Api::namespaced(rt.client(), &namespace);
run(rt, async move {
let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
match lease.claimed().await {
Some(claim) => print_claim(&claim, &identity),
None => println!("? Unclaimed"),
}
Ok::<_, kubert::lease::Error>(0)
})
.await
}

Command::Claim {
duration: Timeout(lease_duration),
renew_grace_period: Timeout(renew_grace_period),
name,
} => tokio::spawn(async move {
let params = kubert::lease::ClaimParams {
lease_duration,
renew_grace_period,
};

let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
let claim = lease.ensure_claimed(&identity, &params).await?;
print_claim(&claim, &identity);

Ok::<_, kubert::lease::Error>(!claim.is_current_for(&identity) as i32)
}),

Command::Vacate { name } => tokio::spawn(async move {
let released = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager)
.vacate(&identity)
.await?;
let code = if released {
println!("+ Claim vacated");
0
} else {
println!("- Claim not vacated");
1
};
Ok::<_, kubert::lease::Error>(code)
}),
} => {
let rt = rt.build().await?;
let api = kube::Api::namespaced(rt.client(), &namespace);
run(rt, async move {
let params = kubert::lease::ClaimParams {
lease_duration,
renew_grace_period,
};

let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
let claim = lease.ensure_claimed(&identity, &params).await?;
print_claim(&claim, &identity);

Ok::<_, kubert::lease::Error>(!claim.is_current_for(&identity) as i32)
})
.await
}

Command::Vacate { name } => {
let rt = rt.build().await?;
let api = kube::Api::namespaced(rt.client(), &namespace);
run(rt, async move {
let released = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager)
.vacate(&identity)
.await?;
let code = if released {
println!("+ Claim vacated");
0
} else {
println!("- Claim not vacated");
1
};
Ok::<_, kubert::lease::Error>(code)
})
.await
}

Command::Run {
duration: Timeout(lease_duration),
renew_grace_period: Timeout(renew_grace_period),
name,
} => tokio::spawn(async move {
let params = kubert::lease::ClaimParams {
lease_duration,
renew_grace_period,
};

let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
let (mut claims, task) = lease.spawn(&identity, params).await?;
loop {
print_claim(&claims.borrow_and_update(), &identity);

let shutdown = shutdown.clone();
tokio::select! {
biased;
_ = shutdown.signaled() => {
return Ok(0);
}
res = claims.changed() => {
if res.is_err() {
task.await.expect("task")?;
} => {
let mut prom = prometheus_client::registry::Registry::default();

let claim_state = prometheus_client::metrics::gauge::Gauge::<i64>::default();
prom.sub_registry_with_prefix("kubert_lease").register(
"claimed",
"Indicates whether this instance is owns the lease",
claim_state.clone(),
);

let state_changes = prometheus_client::metrics::counter::Counter::<u64>::default();
prom.sub_registry_with_prefix("kubert_lease").register(
"claim_changes",
"Counts changes of this process's claim of the lease",
state_changes.clone(),
);

let rt = rt
.with_admin(admin.into_builder().with_prometheus(prom))
.build()
.await?;
let api = kube::Api::namespaced(rt.client(), &namespace);
let shutdown = rt.shutdown_handle();
run(rt, async move {
let params = kubert::lease::ClaimParams {
lease_duration,
renew_grace_period,
};

let lease = kubert::LeaseManager::init(api, name)
.await?
.with_field_manager(field_manager);
let mut claimed = false;
let (mut claims, task) = lease.spawn(&identity, params).await?;
loop {
claimed = {
let claim = claims.borrow_and_update();
print_claim(&claim, &identity);
match (claimed, claim.is_current_for(&identity)) {
(true, true) => true,
(false, false) => false,
(true, false) => {
claim_state.set(0);
state_changes.inc();
false
}
(false, true) => {
claim_state.set(1);
state_changes.inc();
true
}
}
};

let shutdown = shutdown.clone();
tokio::select! {
biased;
_ = shutdown.signaled() => {
return Ok(0);
}
res = claims.changed() => {
if res.is_err() {
task.await.expect("task")?;
return Ok(0);
}
}
}
}
}
}),
};
})
.await
}
}
}

async fn run<F>(rt: kubert::Runtime, fut: F) -> Result<()>
where
F: std::future::Future<Output = Result<i32, kubert::lease::Error>> + Send + 'static,
{
tokio::select! {
// Block the main thread on the shutdown signal. This won't complete until the watch stream
// stops (after pending Pod updates are logged). If a second signal is received before the watch
// stream completes, the future fails.
res = rt.run() => {
if let Err(error) = res {
bail!(error);
}
}
// Block the main thread on the shutdown signal. This won't complete
// until the watch stream stops (after pending Pod updates are logged).
// If a second signal is received before the watch stream completes, the
// future fails.
res = rt.run() => res.map_err(Into::into),

// If the watch stream completes, exit gracefully
res = task => match res {
Ok(Ok(0)) => {}
Ok(Ok(code)) => std::process::exit(code),
Err(error) => bail!(error),
Ok(Err(error)) => bail!(error),
res = fut => {
let code = res?;
if code != 0 {
std::process::exit(code);
}
Ok(())
},
}

Ok(())
}

fn print_claim(claim: &kubert::lease::Claim, identity: &str) {
Expand Down
2 changes: 1 addition & 1 deletion examples/watch_pods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn main() -> Result<()> {
let mut prom = prometheus_client::registry::Registry::default();

// Register application metrics before configuring the admin server.
let metrics = Metrics::register(prom.sub_registry_with_prefix("watch_pods"));
let metrics = Metrics::register(prom.sub_registry_with_prefix("kubert_watch_pods"));

// Configure a runtime with:
// - a Kubernetes client
Expand Down

0 comments on commit f741924

Please sign in to comment.