Skip to content

Commit

Permalink
lease: retry errors claiming lease in LeaseManager::spawn (#148)
Browse files Browse the repository at this point in the history
Currently, if the `ensure_claimed` call in the `LeaseManager::spawn`
background task fails, the entire task will bail and terminate
immediately. This is not desirable, as the intended use of
`LeaseManager::spawn` is to spawn a single background task that will
manage leasses for the entire lifetime of a controller process, and it
should not terminate in the face of a transient error claiming the
lease.

This branch changes `LeaseManager::spawn` to retry transient errors with
an exponential backoff strategy. Exponential backoffs are implemented
using the `backoff` crate, but --- because this dependency is not
exposed in the public API -- this is strictly an implementation detail,
and the dependency can be swapped out for another exponential backoff
implementation later, if desired. Errors marked as transient and retried
include network errors communicating with the API server, errors parsing
the API server response, and HTTP error statuses returned by the API
server. Errors that result from a bad configuration or missing
permissions are treated as fatal, as they are unlikely to ever succeed
after retrying.
  • Loading branch information
hawkw committed Apr 14, 2023
1 parent 7fdd7eb commit 5bf8147
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
2 changes: 2 additions & 0 deletions kubert/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ index = [
]
initialized = ["futures-core", "futures-util", "pin-project-lite", "tokio/sync"]
lease = [
"backoff",
"chrono",
"hyper",
"k8s-openapi",
Expand Down Expand Up @@ -103,6 +104,7 @@ features = ["k8s-openapi/v1_26"]

[dependencies]
ahash = { version = "0.8", optional = true }
backoff = { version = "0.4", features = ["tokio"], optional = true }
deflate = { version = "1", optional = true, default-features = false, features = [
"gzip",
] }
Expand Down
33 changes: 32 additions & 1 deletion kubert/src/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! [`LeaseManager`] interacts with a [`coordv1::Lease`] resource to ensure that
//! only a single claimant owns the lease at a time.

use futures_util::TryFutureExt;
use k8s_openapi::{api::coordination::v1 as coordv1, apimachinery::pkg::apis::meta::v1 as metav1};
use std::{borrow::Cow, sync::Arc};
use tokio::time::Duration;
Expand Down Expand Up @@ -125,6 +126,8 @@ impl Claim {

impl LeaseManager {
const DEFAULT_FIELD_MANAGER: &'static str = "kubert";
const DEFAULT_MIN_BACKOFF: Duration = Duration::from_millis(5);
const DEFAULT_BACKOFF_JITTER: f64 = 0.5; // up to 50% of the backoff duration

/// Initialize a lease's state from the Kubernetes API.
///
Expand Down Expand Up @@ -280,6 +283,10 @@ impl LeaseManager {
let claimant = claimant.to_string();
let mut claim = self.ensure_claimed(&claimant, &params).await?;
let (tx, rx) = tokio::sync::watch::channel(claim.clone());
let mut new_backoff = backoff::ExponentialBackoffBuilder::default();
new_backoff
.with_initial_interval(Self::DEFAULT_MIN_BACKOFF)
.with_randomization_factor(Self::DEFAULT_BACKOFF_JITTER);

let task = tokio::spawn(async move {
loop {
Expand All @@ -300,7 +307,31 @@ impl LeaseManager {
}

// Update the claim and broadcast it to all receivers.
claim = self.ensure_claimed(&claimant, &params).await?;
let backoff = new_backoff.with_max_interval(grace).build();
claim = backoff::future::retry(backoff, || {
self.ensure_claimed(&claimant, &params).map_err(|err| match err {
err @ Error::Api(kube_client::Error::Auth(_))
| err @ Error::Api(kube_client::Error::Discovery(_))
| err @ Error::Api(kube_client::Error::BuildRequest(_)) => {
backoff::Error::Permanent(err)
},
err @ Error::Api(kube_client::Error::InferConfig(_)) => {
debug_assert!(false, "InferConfig errors should only be returned when constructing a new client");
backoff::Error::Permanent(err)
},
// Retry any other API request errors.
err => {
tracing::debug!(error = %err, "Error claiming lease, retrying...");
backoff::Error::Transient {
err,
// Allow the backoff implementation to select how
// long to wait before retrying.
retry_after: None,
}
}
})
})
.await?;
if tx.send(claim.clone()).is_err() {
// All receivers have been dropped.
break;
Expand Down

0 comments on commit 5bf8147

Please sign in to comment.