From af8afe56aa04933cd603a5f13062a0c8d44e90a9 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 21 Feb 2022 08:44:55 -0800 Subject: [PATCH] Add a requeue channel (#8) As a controller processes events, it may not be able to act successfully on these events: there may be a transient problem when writing to the API server, or the system may not be in a converged state and it may be necessary to reprocess the update at a later time. The `requeue` channel provides a bounded, multi-producer/single- consumer, delay-queue channel. Applications can use this channel to reschedule updates for a later time. While this is especially useful for controllers, this channel is not inherently tied to Kubernetes in any way. --- .github/workflows/{lints.yml => lint.yml} | 4 +- .github/workflows/test.yml | 36 +++ deny.toml | 2 +- kubert/Cargo.toml | 21 ++ kubert/src/lib.rs | 4 + kubert/src/requeue.rs | 336 ++++++++++++++++++++++ 6 files changed, 400 insertions(+), 3 deletions(-) rename .github/workflows/{lints.yml => lint.yml} (97%) create mode 100644 .github/workflows/test.yml create mode 100644 kubert/src/requeue.rs diff --git a/.github/workflows/lints.yml b/.github/workflows/lint.yml similarity index 97% rename from .github/workflows/lints.yml rename to .github/workflows/lint.yml index 2b2929f..08deb6a 100644 --- a/.github/workflows/lints.yml +++ b/.github/workflows/lint.yml @@ -1,10 +1,10 @@ -name: lints +name: lint on: pull_request: paths: - '**/*.rs' - - .github/workflows/lints.yml + - .github/workflows/lint.yml permissions: contents: read diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..c54201b --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,36 @@ +name: test + +on: + pull_request: + paths: + - Cargo.lock + - '**/*.rs' + - .github/workflows/test.yml + +permissions: + contents: read + +env: + CARGO_ACTION_FMT_VERSION: v0.1.3 + CARGO_INCREMENTAL: 0 + CARGO_NET_RETRY: 10 + RUST_BACKTRACE: short + RUSTUP_MAX_RETRIES: 10 + +jobs: + test: + timeout-minutes: 10 + runs-on: ubuntu-latest + container: + image: docker://rust:1.58.1 + steps: + - name: install cargo-action-fmt + run: | + curl --proto =https --tlsv1.3 -vsSfLo /usr/local/bin/cargo-action-fmt "https://github.com/olix0r/cargo-action-fmt/releases/download/release%2F${CARGO_ACTION_FMT_VERSION}/cargo-action-fmt-x86_64-unknown-linux-gnu" + chmod 755 /usr/local/bin/cargo-action-fmt + - uses: actions/checkout@ec3a7ce113134d7a93b817d10a8272cb61118579 + - run: cargo fetch + - name: cargo test --no-run + run: cargo test --frozen --workspace --all-features --no-run --message-format=json | cargo-action-fmt + - name: cargo test + run: cargo test --frozen --workspace --all-features diff --git a/deny.toml b/deny.toml index 307be76..e81fe89 100644 --- a/deny.toml +++ b/deny.toml @@ -13,7 +13,7 @@ ignore = [ [licenses] unlicensed = "deny" -allow = ["Apache-2.0", "ISC", "MIT"] +allow = ["Apache-2.0", "BSD-3-Clause", "ISC", "MIT"] deny = [] copyleft = "deny" allow-osi-fsf-free = "neither" diff --git a/kubert/Cargo.toml b/kubert/Cargo.toml index c520339..6478ddc 100644 --- a/kubert/Cargo.toml +++ b/kubert/Cargo.toml @@ -13,6 +13,7 @@ keywords = ["kubernetes", "client", "runtime", "server"] client = ["kube-client", "thiserror"] # TODO controller = ["client", "kube/runtime"] log = ["thiserror", "tracing-subscriber"] +requeue = ["futures-core", "tokio/macros", "tokio/sync", "tokio-util/time"] # TODO runtime = ["clap", "drain", "log", "tokio/signal"] server = [ "drain", @@ -44,10 +45,12 @@ features = [ [dependencies] drain = { version = "0.1.0", optional = true, default-features = false } +futures-core = { version = "0.3", optional = true, default-features = false } hyper = { version = "0.14.17", optional = true, default-features = false } rustls-pemfile = { version = "0.3.0", optional = true } thiserror = { version = "1.0.30", optional = true } tokio = { version = "1.17.0", optional = false, default-features = false } +tokio-util = { version = "0.7", optional = true, default-features = false } tokio-rustls = { version = "0.23.2", optional = true, default-features = false } tower-service = { version = "0.3.1", optional = true } tracing = "0.1.31" @@ -64,6 +67,16 @@ optional = true default-features = false features = ["client", "config"] +[dependencies.kube-core] +version = "0.69.1" +optional = true +default-features = false + +[dependencies.kube-runtime] +version = "0.69.1" +optional = true +default-features = false + [dependencies.tracing-subscriber] version = "0.3.9" optional = true @@ -77,4 +90,12 @@ features = [ ] [dev-dependencies] +kube = { version = "0.69.1", default-features = false, features = ["runtime"] } k8s-openapi = { version = "0.14.0", default-features = false, features = ["v1_23"] } +tracing-subscriber = { version = "0.3", features = ["ansi"] } +tokio-test = "0.4" + +[dev-dependencies.tokio] +version = "1.17" +default-features = false +features = ["macros", "test-util"] diff --git a/kubert/src/lib.rs b/kubert/src/lib.rs index bdd79fd..8bed2c5 100644 --- a/kubert/src/lib.rs +++ b/kubert/src/lib.rs @@ -22,6 +22,10 @@ pub mod log; #[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] pub mod shutdown; +#[cfg(feature = "requeue")] +#[cfg_attr(docsrs, doc(cfg(any(feature = "requeue"))))] +pub mod requeue; + #[cfg(feature = "server")] #[cfg_attr(docsrs, doc(cfg(any(feature = "shutdown"))))] pub mod server; diff --git a/kubert/src/requeue.rs b/kubert/src/requeue.rs new file mode 100644 index 0000000..c7c962b --- /dev/null +++ b/kubert/src/requeue.rs @@ -0,0 +1,336 @@ +//! A bounded, delayed, multi-producer, single-consumer queue for deferring work in response to +//! scheduler updates. + +use std::{ + collections::{hash_map, HashMap}, + hash::Hash, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::{ + sync::mpsc::{self, error::SendError}, + time::{Duration, Instant}, +}; +use tokio_util::time::{delay_queue, DelayQueue}; + +/// Sends delayed values to the associated `Receiver`. +/// +/// Instances are created by the [`channel`] function. +pub struct Sender { + tx: mpsc::Sender>, +} + +/// Receives values from associated `Sender`s. +/// +/// Instances are created by the [`channel`] function. +pub struct Receiver +where + T: PartialEq + Eq + Hash, +{ + rx: mpsc::Receiver>, + rx_closed: bool, + q: DelayQueue, + pending: HashMap, +} + +/// Creates a bounded, delayed mpsc channel for requeuing controller updates. +pub fn channel(capacity: usize) -> (Sender, Receiver) +where + T: PartialEq + Eq + Hash, +{ + let (tx, rx) = mpsc::channel(capacity); + let rx = Receiver { + rx, + rx_closed: false, + q: DelayQueue::new(), + pending: HashMap::new(), + }; + (Sender { tx }, rx) +} + +enum Op { + Requeue(T, Instant), + Cancel(T), + Clear, +} + +// === impl Receiver === + +impl Receiver +where + T: Clone + PartialEq + Eq + Hash, +{ + /// Attempts to process requeues, obtaining the next value from the delay queueand registering + /// current task for wakeup if the value is not yet available, and returning `None` if the + /// channel is exhausted. + pub fn poll_requeued(&mut self, cx: &mut Context<'_>) -> Poll> { + tracing::trace!(rx.closed = self.rx_closed, pending = self.pending.len()); + + // We process messages from the sender before looking at the delay queue so that + // updates have a chance to reset/cancel pending updates. + if !self.rx_closed { + loop { + match self.rx.poll_recv(cx) { + Poll::Pending => break, + + Poll::Ready(None) => { + self.rx_closed = true; + break; + } + + Poll::Ready(Some(Op::Clear)) => { + self.pending.clear(); + self.q.clear(); + } + + Poll::Ready(Some(Op::Cancel(obj))) => { + if let Some(key) = self.pending.remove(&obj) { + tracing::trace!(?key, "canceling"); + self.q.remove(&key); + } + } + + Poll::Ready(Some(Op::Requeue(k, at))) => match self.pending.entry(k) { + hash_map::Entry::Occupied(ent) => { + let key = ent.get(); + tracing::trace!(?key, "resetting"); + self.q.reset_at(key, at); + } + hash_map::Entry::Vacant(slot) => { + let key = self.q.insert_at(slot.key().clone(), at); + tracing::trace!(?key, "inserting"); + slot.insert(key); + } + }, + } + } + } + + if !self.pending.is_empty() { + if let Poll::Ready(Some(exp)) = self.q.poll_expired(cx) { + tracing::trace!(key = ?exp.key(), "dequeued"); + let obj = exp.into_inner(); + self.pending.remove(&obj); + return Poll::Ready(Some(obj)); + } + } + + if self.rx_closed && self.pending.is_empty() { + Poll::Ready(None) + } else { + Poll::Pending + } + } +} + +// We never put `T` in a `Pin`... +impl Unpin for Receiver {} + +impl futures_core::Stream for Receiver { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Receiver::poll_requeued(self.get_mut(), cx) + } +} + +// === impl Receiver === + +impl Sender { + /// Waits for all receivers to be dropped. + pub async fn closed(&self) { + self.tx.closed().await + } + + /// Cancels all pending work. + pub async fn clear(&self) -> Result<(), SendError<()>> { + self.tx + .send(Op::Clear) + .await + .map_err(|SendError(_)| SendError(())) + } + + /// Schedule the given object to be rescheduled at the given time. + pub async fn requeue_at(&self, obj: T, time: Instant) -> Result<(), SendError> { + self.tx + .send(Op::Requeue(obj, time)) + .await + .map_err(|SendError(op)| match op { + Op::Requeue(obj, _) => SendError(obj), + _ => unreachable!(), + }) + } + + /// Schedule the given object to be rescheduled after the `defer` time has passed. + pub async fn requeue(&self, obj: T, defer: Duration) -> Result<(), SendError> { + self.requeue_at(obj, Instant::now() + defer).await + } + + /// Cancels pending updates for the given object. + pub async fn cancel(&self, obj: T) -> Result<(), SendError> { + self.tx + .send(Op::Cancel(obj)) + .await + .map_err(|SendError(op)| match op { + Op::Cancel(obj) => SendError(obj), + _ => unreachable!(), + }) + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} + +#[cfg(test)] +mod tests { + pub use super::*; + use k8s_openapi::api::core::v1::Pod; + use kube::runtime::reflector::ObjectRef; + use tokio::time; + use tokio_test::{assert_pending, assert_ready, task}; + + // === utils === + + // Spawns a task that reads from the receiver and publishes updates on another mpsc. This is all + // done so that we can use `[task::Spawn`] on a stream type. + fn spawn_channel( + capacity: usize, + ) -> ( + Sender>, + task::Spawn>>, + ) { + let (tx, rx) = channel::>(capacity); + (tx, task::spawn(rx)) + } + + fn init_tracing() -> tracing::subscriber::DefaultGuard { + tracing::subscriber::set_default( + tracing_subscriber::fmt() + .with_test_writer() + .with_max_level(tracing::Level::TRACE) + .finish(), + ) + } + + async fn sleep(d: Duration) { + let t0 = time::Instant::now(); + time::sleep(d).await; + tracing::trace!(duration = ?d, ?t0, now = ?time::Instant::now(), "slept") + } + + // === tests === + + #[tokio::test(flavor = "current_thread")] + async fn delays() { + let _tracing = init_tracing(); + time::pause(); + let (tx, mut rx) = spawn_channel(1); + + let pod_a = ObjectRef::new("pod-a").within("default"); + tx.requeue(pod_a.clone(), Duration::from_secs(10)) + .await + .expect("must send"); + assert_pending!(rx.poll_next()); + + sleep(Duration::from_millis(10001)).await; + assert_eq!( + assert_ready!(rx.poll_next()).expect("stream must not end"), + pod_a + ); + assert_pending!(rx.poll_next()); + } + + #[tokio::test(flavor = "current_thread")] + async fn drains_after_sender_dropped() { + let _tracing = init_tracing(); + time::pause(); + let (tx, mut rx) = spawn_channel(1); + + let pod_a = ObjectRef::new("pod-a").within("default"); + tx.requeue(pod_a.clone(), Duration::from_secs(10)) + .await + .expect("must send"); + drop(tx); + assert_pending!(rx.poll_next()); + + sleep(Duration::from_secs(11)).await; + assert_eq!( + assert_ready!(rx.poll_next()).expect("stream must not end"), + pod_a + ); + assert!(assert_ready!(rx.poll_next()).is_none()); + } + + #[tokio::test(flavor = "current_thread")] + async fn resets() { + let _tracing = init_tracing(); + time::pause(); + let (tx, mut rx) = spawn_channel(1); + + // Requeue a pod + let pod_a = ObjectRef::new("pod-a").within("default"); + tx.requeue(pod_a.clone(), Duration::from_secs(10)) + .await + .expect("must send"); + assert_pending!(rx.poll_next()); + + // Re-requeue the same pod after 9s. + sleep(Duration::from_secs(9)).await; + tx.requeue(pod_a.clone(), Duration::from_secs(10)) + .await + .expect("must send"); + + // Wait until the first requeue would timeout and check that it has not been sent. + sleep(Duration::from_millis(1001)).await; + assert_pending!(rx.poll_next()); + + // Wait until the second requeue would timeout and check that it has been sent. + sleep(Duration::from_secs(9)).await; + assert_eq!( + assert_ready!(rx.poll_next()).expect("stream must not end"), + pod_a + ); + assert_pending!(rx.poll_next()); + } + + #[tokio::test(flavor = "current_thread")] + async fn cancels() { + let _tracing = init_tracing(); + time::pause(); + let (tx, mut rx) = spawn_channel(1); + + // Requeue a pod + let pod_a = ObjectRef::new("pod-a").within("default"); + tx.requeue(pod_a.clone(), Duration::from_secs(10)) + .await + .expect("must send"); + assert_pending!(rx.poll_next()); + + sleep(Duration::from_millis(10001)).await; + tx.cancel(pod_a).await.expect("must send cancel"); + assert_pending!(rx.poll_next()); + } + + #[tokio::test(flavor = "current_thread")] + async fn clears() { + let _tracing = init_tracing(); + time::pause(); + let (tx, mut rx) = spawn_channel(1); + + // Requeue a pod + let pod_a = ObjectRef::new("pod-a").within("default"); + tx.requeue(pod_a, Duration::from_secs(10)) + .await + .expect("must send"); + assert_pending!(rx.poll_next()); + + sleep(Duration::from_millis(10001)).await; + tx.clear().await.expect("must send cancel"); + assert_pending!(rx.poll_next()); + } +}