From 903a87c815c9930a00e1066c391778c831a43a2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Mon, 14 Feb 2022 09:15:13 +0100 Subject: [PATCH] Update `Scheduler` for infallible `DelayQueue` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Teo Klestrup Röijezon --- kube-runtime/src/scheduler.rs | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index f63193c54..369717a40 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -9,14 +9,11 @@ use std::{ task::{Context, Poll}, }; use thiserror::Error; -use tokio::time::{self, Instant}; +use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue}; #[derive(Debug, Error)] -pub enum Error { - #[error("timer failure: {0}")] - TimerError(#[source] time::error::Error), -} +pub enum Error {} pub type Result = std::result::Result; /// A request to re-emit `message` at a given `Instant` (`run_at`). @@ -95,24 +92,23 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { &mut self, cx: &mut Context<'_>, can_take_message: impl Fn(&T) -> bool, - ) -> Poll> { + ) -> Poll { if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() { - return Poll::Ready(Ok(self.pending.take(&msg).unwrap())); + return Poll::Ready(self.pending.take(&msg).unwrap()); } loop { match self.queue.poll_expired(cx) { - Poll::Ready(Some(Ok(msg))) => { + Poll::Ready(Some(msg)) => { let msg = msg.into_inner(); self.scheduled.remove(&msg).expect( - "Expired message was popped from the Scheduler queue, but was not in the metadata map", - ); + "Expired message was popped from the Scheduler queue, but was not in the metadata map", + ); if can_take_message(&msg) { - break Poll::Ready(Ok(msg)); + break Poll::Ready(msg); } self.pending.insert(msg); } - Poll::Ready(Some(Err(err))) => break Poll::Ready(Err(err)), Poll::Ready(None) | Poll::Pending => break Poll::Pending, } } @@ -147,7 +143,7 @@ where } match scheduler.poll_pop_queue_message(cx, &can_take_message) { - Poll::Ready(expired) => Poll::Ready(Some(expired.map_err(Error::TimerError))), + Poll::Ready(expired) => Poll::Ready(Some(Ok(expired))), Poll::Pending => Poll::Pending, } }