Skip to content

Commit

Permalink
Update Scheduler for infallible DelayQueue
Browse files Browse the repository at this point in the history
Signed-off-by: Teo Klestrup Röijezon <[email protected]>
  • Loading branch information
nightkr committed Feb 14, 2022
1 parent df94062 commit 903a87c
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ use std::{
task::{Context, Poll},
};
use thiserror::Error;
use tokio::time::{self, Instant};
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue};

#[derive(Debug, Error)]
pub enum Error {
#[error("timer failure: {0}")]
TimerError(#[source] time::error::Error),
}
pub enum Error {}
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// A request to re-emit `message` at a given `Instant` (`run_at`).
Expand Down Expand Up @@ -95,24 +92,23 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
&mut self,
cx: &mut Context<'_>,
can_take_message: impl Fn(&T) -> bool,
) -> Poll<Result<T, time::error::Error>> {
) -> Poll<T> {
if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() {
return Poll::Ready(Ok(self.pending.take(&msg).unwrap()));
return Poll::Ready(self.pending.take(&msg).unwrap());
}

loop {
match self.queue.poll_expired(cx) {
Poll::Ready(Some(Ok(msg))) => {
Poll::Ready(Some(msg)) => {
let msg = msg.into_inner();
self.scheduled.remove(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
if can_take_message(&msg) {
break Poll::Ready(Ok(msg));
break Poll::Ready(msg);
}
self.pending.insert(msg);
}
Poll::Ready(Some(Err(err))) => break Poll::Ready(Err(err)),
Poll::Ready(None) | Poll::Pending => break Poll::Pending,
}
}
Expand Down Expand Up @@ -147,7 +143,7 @@ where
}

match scheduler.poll_pop_queue_message(cx, &can_take_message) {
Poll::Ready(expired) => Poll::Ready(Some(expired.map_err(Error::TimerError))),
Poll::Ready(expired) => Poll::Ready(Some(Ok(expired))),
Poll::Pending => Poll::Pending,
}
}
Expand Down

0 comments on commit 903a87c

Please sign in to comment.