Skip to content

Commit

Permalink
Document plan code and split up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonasher committed Aug 2, 2024
1 parent d165d0e commit 8010a4d
Showing 1 changed file with 136 additions and 61 deletions.
197 changes: 136 additions & 61 deletions src/plan.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,37 @@
//! A priority queue that stores arbitrary data sorted by time
//!
//! Defines a `Queue<T>` that is intended to store a queue of items of type T,
//! sorted by `f64` time, called 'plans'. This queue has methods for adding
//! plans, cancelling plans, and retrieving the earliest plan in the queue.
//! Adding a plan is *O*(log(*n*)) while cancellation and retrieval are *O*(1).
//!
//! This queue is used by `Context` to store future events where some callback
//! closure `FnOnce(&mut Context)` will be executed at a given point in time.

use std::{
cmp::Ordering,
collections::{BinaryHeap, HashMap},
};

pub struct Id {
id: usize,
}

pub struct Plan<T> {
pub time: f64,
pub data: T,
}

#[derive(PartialEq, Debug)]
pub struct Record {
pub time: f64,
id: usize,
}

impl Eq for Record {}

impl PartialOrd for Record {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Record {
fn cmp(&self, other: &Self) -> Ordering {
let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
if time_ordering == Ordering::Equal {
// Break time ties in order of plan id
self.id.cmp(&other.id).reverse()
} else {
time_ordering
}
}
}

#[derive(Debug)]
/// A priority queue that stores arbitrary data sorted by time
///
/// Items of type T are stored in order by `f64` time and called `Plan<T>`.
/// When plans are created they are sequentially assigned an `Id` that is a
/// wrapped `u64`. If two plans are scheduled for the same time the plan that is
/// scheduled first (i.e. that has the lowest id) is placed earlier.
///
/// The pair of time and plan id are stored in a binary heap of `Entry` objects.
/// The data payload of the event is stored in a hash map by plan id.
/// Plan cancellation occurs by removing the corresponding entry from the data
/// hash map.
pub struct Queue<T> {
queue: BinaryHeap<Record>,
data_map: HashMap<usize, T>,
plan_counter: usize,
}

impl<T> Default for Queue<T> {
fn default() -> Self {
Self::new()
}
queue: BinaryHeap<Entry>,
data_map: HashMap<u64, T>,
plan_counter: u64,
}

impl<T> Queue<T> {
/// Creates a new empty `Queue<T>`
#[must_use]
pub fn new() -> Queue<T> {
Queue {
Expand All @@ -61,30 +41,43 @@ impl<T> Queue<T> {
}
}

/// Add a plan to the queue at the specified time
///
/// Returns an `Id` for the newly-added plan that can be used to cancel it
/// if needed.
pub fn add_plan(&mut self, time: f64, data: T) -> Id {
// Add plan to queue, store data, and increment counter
let id = self.plan_counter;
self.queue.push(Record { time, id });
self.queue.push(Entry { time, id });
self.data_map.insert(id, data);
self.plan_counter += 1;
Id { id }
}

/// Cancel a plan that has been added to the queue
///
/// # Panics
///
/// This function panics if you cancel a plan which has already
/// been cancelled or executed.
pub fn cancel_plan(&mut self, id: &Id) {
// Delete the plan from the map, but leave in the queue
// It will be skipped when the plan is popped from the queue
self.data_map.remove(&id.id).expect("Plan does not exist");
}

/// Retrieve the earliest plan in the queue
///
/// Returns the next plan if it exists or else `None` if the queue is empty
pub fn get_next_plan(&mut self) -> Option<Plan<T>> {
loop {
// Pop from queue until we find a plan with data or queue is empty
match self.queue.pop() {
Some(plan_record) => {
if let Some(data) = self.data_map.remove(&plan_record.id) {
Some(entry) => {
// Skip plans that have been cancelled and thus have no data
if let Some(data) = self.data_map.remove(&entry.id) {
return Some(Plan {
time: plan_record.time,
time: entry.time,
data,
});
}
Expand All @@ -97,40 +90,122 @@ impl<T> Queue<T> {
}
}

impl<T> Default for Queue<T> {
fn default() -> Self {
Self::new()
}
}

/// A time and id pair used to order plans in the `Queue<T>`
///
/// `Entry` objects are sorted in increasing order of time and then plan id
#[derive(PartialEq, Debug)]
struct Entry {
time: f64,
id: u64,
}

impl Eq for Entry {}

impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for Entry {
fn cmp(&self, other: &Self) -> Ordering {
let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
match time_ordering {
// Break time ties in order of plan id
Ordering::Equal => self.id.cmp(&other.id).reverse(),
_ => time_ordering,
}
}
}

/// A unique identifier for a plan added to a `Queue<T>`
pub struct Id {
id: u64,
}

/// A plan that holds data of type `T` intended to be used at the specified time
pub struct Plan<T> {
pub time: f64,
pub data: T,
}

#[cfg(test)]
mod tests {
use super::Queue;

#[test]
fn test_add_cancel() {
// Add some plans and cancel and make sure ordering occurs as expected
let mut plan_queue = Queue::<usize>::new();
fn empty_queue() {
let mut plan_queue = Queue::<()>::new();
assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_plans() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
plan_queue.add_plan(3.0, 3);
plan_queue.add_plan(3.0, 4);
let plan_to_cancel = plan_queue.add_plan(1.5, 0);
plan_queue.add_plan(2.0, 2);
plan_queue.cancel_plan(&plan_to_cancel);

assert_eq!(plan_queue.get_next_plan().unwrap().time, 1.0);
assert_eq!(plan_queue.get_next_plan().unwrap().time, 2.0);
let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 1);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 2.0);
assert_eq!(next_plan.data, 2);

// Check tie handling
let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 3.0);
assert_eq!(next_plan.data, 3);

assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_plans_at_same_time() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
plan_queue.add_plan(1.0, 2);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 1);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 2);

assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_and_cancel_plans() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
let plan_to_cancel = plan_queue.add_plan(2.0, 2);
plan_queue.add_plan(3.0, 3);
plan_queue.cancel_plan(&plan_to_cancel);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 1);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 3.0);
assert_eq!(next_plan.data, 4);
assert_eq!(next_plan.data, 3);

assert!(plan_queue.get_next_plan().is_none());
}

#[test]
#[should_panic]
fn test_invalid_cancel() {
// Cancel a plan that has already occured and make sure it panics
fn cancel_invalid_plan() {
let mut plan_queue = Queue::<()>::new();
let plan_to_cancel = plan_queue.add_plan(1.0, ());
plan_queue.get_next_plan();
Expand Down

0 comments on commit 8010a4d

Please sign in to comment.