Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait_for and wait_for_value to WaitCell and WaitQueue #479

Merged
merged 10 commits into from
Jul 11, 2024
153 changes: 153 additions & 0 deletions maitake-sync/src/wait_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,159 @@
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitCell`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
jamesmunns marked this conversation as resolved.
Show resolved Hide resolved
///
jamesmunns marked this conversation as resolved.
Show resolved Hide resolved
/// In particular, this function correctly *registers* interest in the [`WaitCell`]
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitCell`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the cell, allowing the polling function to check status fields for
/// partial progress or completion.
///
/// Consider using [`Self::wait_for_value()`] if your function does return a value.
///
/// Consider using [`WaitQueue::wait_for()`] if you need multiple waiters.

Check failure on line 353 in maitake-sync/src/wait_cell.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `WaitQueue::wait_for` --> maitake-sync/src/wait_cell.rs:353:26 | 353 | /// Consider using [`WaitQueue::wait_for()`] if you need multiple waiters. | ^^^^^^^^^^^^^^^^^^^^^ no item named `WaitQueue` in scope | = note: `-D rustdoc::broken-intra-doc-links` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(rustdoc::broken_intra_doc_links)]`
///
/// # Returns
///
/// * [`Ok`]`(())` if the closure returns `true`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitCell;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitCell::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
/// println!("received wakeup!");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
loop {
let wait = self.subscribe().await;
if f() {
return Ok(());
}
wait.await?;
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitCell`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitCell`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitCell`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the cell, allowing the polling function to check status fields for
/// partial progress or completion, and also return the status flags at the same time.
///
/// Consider using [`Self::wait_for()`] if your function does not return a value.
///
/// Consider using [`WaitQueue::wait_for_value()`] if you need multiple waiters.

Check failure on line 427 in maitake-sync/src/wait_cell.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `WaitQueue::wait_for_value` --> maitake-sync/src/wait_cell.rs:427:26 | 427 | /// Consider using [`WaitQueue::wait_for_value()`] if you need multiple waiters. | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ no item named `WaitQueue` in scope
///
/// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitCell;
/// use std::sync::atomic::{AtomicU8, Ordering};
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// let queue = Arc::new(WaitCell::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 5 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 5);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
loop {
let wait = self.subscribe().await;
if let Some(t) = f() {
return Ok(t);
}
wait.await?;
}
}

// TODO(eliza): is this an API we want to have?
/*
/// Returns `true` if this `WaitCell` is [closed](Self::close).
Expand Down
189 changes: 189 additions & 0 deletions maitake-sync/src/wait_queue.rs
jamesmunns marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,195 @@
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitQueue`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitQueue`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitQueue`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the queue, allowing the polling function to check status fields for
/// partial progress or completion.
///
/// Consider using [`Self::wait_for_value()`] if your function does return a value.
///
/// Consider using [`WaitCell::wait_for()`] if you do not need multiple waiters.

Check failure on line 710 in maitake-sync/src/wait_queue.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `WaitCell::wait_for` --> maitake-sync/src/wait_queue.rs:710:26 | 710 | /// Consider using [`WaitCell::wait_for()`] if you do not need multiple waiters. | ^^^^^^^^^^^^^^^^^^^^ no item named `WaitCell` in scope
///
/// # Returns
///
/// * [`Ok`]`(())` if the closure returns `true`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.

Check failure on line 715 in maitake-sync/src/wait_queue.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `Closed` --> maitake-sync/src/wait_queue.rs:715:23 | 715 | /// * [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. | ^^^^^^ no item named `Closed` in scope | = help: to escape `[` and `]` characters, add '\' before them like `\[` or `\]`
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitQueue;
/// use std::sync::atomic::{AtomicU8, Ordering};
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// let queue = Arc::new(WaitQueue::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter1 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await;
/// println!("received wakeup!");
/// }
/// });
///
/// let waiter2 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await;
/// println!("received wakeup!");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter1.await.unwrap();
/// waiter2.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> WaitResult<()> {
loop {
let wait = self.wait();
let mut wait = core::pin::pin!(wait);
let _ = wait.as_mut().subscribe()?;
if f() {
return Ok(());
}
wait.await?;
}
}

/// Asynchronously poll the given function `f` until a condition occurs,
/// using the [`WaitQueue`] to only re-poll when notified.
///
/// This can be used to implement a "wait loop", turning a "try" function
/// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
/// "recv" or "send").
///
/// In particular, this function correctly *registers* interest in the [`WaitQueue`]
/// prior to polling the function, ensuring that there is not a chance of a race
/// where the condition occurs AFTER checking but BEFORE registering interest
/// in the [`WaitQueue`], which could lead to deadlock.
///
/// This is intended to have similar behavior to `Condvar` in the standard library,
/// but asynchronous, and not requiring operating system intervention (or existence).
///
/// In particular, this can be used in cases where interrupts or events are used
/// to signify readiness or completion of some task, such as the completion of a
/// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
/// can wake the queue, allowing the polling function to check status fields for
/// partial progress or completion, and also return the status flags at the same time.
///
/// Consider using [`Self::wait_for()`] if your function does not return a value.
///
/// Consider using [`WaitCell::wait_for_value()`] if you do not need multiple waiters.

Check failure on line 797 in maitake-sync/src/wait_queue.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `WaitCell::wait_for_value` --> maitake-sync/src/wait_queue.rs:797:26 | 797 | /// Consider using [`WaitCell::wait_for_value()`] if you do not need multiple waiters. | ^^^^^^^^^^^^^^^^^^^^^^^^^^ no item named `WaitCell` in scope
///
/// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`.
/// * [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.

Check failure on line 800 in maitake-sync/src/wait_queue.rs

View workflow job for this annotation

GitHub Actions / docs

error: unresolved link to `Closed` --> maitake-sync/src/wait_queue.rs:800:23 | 800 | /// * [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. | ^^^^^^ no item named `Closed` in scope | = help: to escape `[` and `]` characters, add '\' before them like `\[` or `\]`
///
/// # Examples
///
/// ```
/// # use tokio::task;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn test() {
/// use std::sync::Arc;
/// use maitake_sync::WaitQueue;
/// use std::sync::atomic::{AtomicU8, Ordering};
///
/// let queue = Arc::new(WaitQueue::new());
/// let num = Arc::new(AtomicU8::new(0));
///
/// let waiter1 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 5 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 5);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// let waiter2 = task::spawn({
/// // clone items to move into the spawned task
/// let queue = queue.clone();
/// let num = num.clone();
/// async move {
/// let rxd = queue.wait_for_value(|| {
/// let val = num.load(Ordering::Relaxed);
/// if val > 10 {
/// return Some(val);
/// }
/// None
/// }).await.unwrap();
/// assert!(rxd > 10);
/// println!("received wakeup with value: {rxd}");
/// }
/// });
///
/// println!("poking task...");
///
/// for i in 0..20 {
/// num.store(i, Ordering::Relaxed);
/// queue.wake();
/// }
///
/// waiter1.await.unwrap();
/// waiter2.await.unwrap();
/// # }
/// # test();
/// ```
pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> WaitResult<T> {
loop {
let wait = self.wait();
let mut wait = core::pin::pin!(wait);
match wait.as_mut().subscribe() {
Poll::Ready(wr) => wr?,
Poll::Pending => {}
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
if let Some(t) = f() {
return Ok(t);
}
wait.await?;
}
}

/// Returns a [`Waiter`] entry in this queue.
///
/// This is factored out into a separate function because it's used by both
Expand Down
Loading