diff --git a/maitake-sync/src/wait_cell.rs b/maitake-sync/src/wait_cell.rs index 45ef4d17..802867c8 100644 --- a/maitake-sync/src/wait_cell.rs +++ b/maitake-sync/src/wait_cell.rs @@ -327,6 +327,160 @@ impl WaitCell { } } + /// Asynchronously poll the given function `f` until a condition occurs, + /// using the [`WaitCell`] to only re-poll when notified. + /// + /// This can be used to implement a "wait loop", turning a "try" function + /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. + /// "recv" or "send"). + /// + /// In particular, this function correctly *registers* interest in the [`WaitCell`] + /// prior to polling the function, ensuring that there is not a chance of a race + /// where the condition occurs AFTER checking but BEFORE registering interest + /// in the [`WaitCell`], which could lead to deadlock. + /// + /// This is intended to have similar behavior to `Condvar` in the standard library, + /// but asynchronous, and not requiring operating system intervention (or existence). + /// + /// In particular, this can be used in cases where interrupts or events are used + /// to signify readiness or completion of some task, such as the completion of a + /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt + /// can wake the cell, allowing the polling function to check status fields for + /// partial progress or completion. + /// + /// Consider using [`Self::wait_for_value()`] if your function does return a value. + /// + /// Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for) + /// if you need multiple waiters. + /// + /// # Returns + /// + /// * [`Ok`]`(())` if the closure returns `true`. + /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed. + /// + /// # Examples + /// + /// ``` + /// # use tokio::task; + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn test() { + /// use std::sync::Arc; + /// use maitake_sync::WaitCell; + /// use std::sync::atomic::{AtomicU8, Ordering}; + /// + /// let queue = Arc::new(WaitCell::new()); + /// let num = Arc::new(AtomicU8::new(0)); + /// + /// let waiter = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await; + /// println!("received wakeup!"); + /// } + /// }); + /// + /// println!("poking task..."); + /// + /// for i in 0..20 { + /// num.store(i, Ordering::Relaxed); + /// queue.wake(); + /// } + /// + /// waiter.await.unwrap(); + /// # } + /// # test(); + /// ``` + pub async fn wait_for bool>(&self, mut f: F) -> Result<(), Closed> { + loop { + let wait = self.subscribe().await; + if f() { + return Ok(()); + } + wait.await?; + } + } + + /// Asynchronously poll the given function `f` until a condition occurs, + /// using the [`WaitCell`] to only re-poll when notified. + /// + /// This can be used to implement a "wait loop", turning a "try" function + /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. + /// "recv" or "send"). + /// + /// In particular, this function correctly *registers* interest in the [`WaitCell`] + /// prior to polling the function, ensuring that there is not a chance of a race + /// where the condition occurs AFTER checking but BEFORE registering interest + /// in the [`WaitCell`], which could lead to deadlock. + /// + /// This is intended to have similar behavior to `Condvar` in the standard library, + /// but asynchronous, and not requiring operating system intervention (or existence). + /// + /// In particular, this can be used in cases where interrupts or events are used + /// to signify readiness or completion of some task, such as the completion of a + /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt + /// can wake the cell, allowing the polling function to check status fields for + /// partial progress or completion, and also return the status flags at the same time. + /// + /// Consider using [`Self::wait_for()`] if your function does not return a value. + /// + /// Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters. + /// + /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`. + /// * [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed. + /// + /// # Examples + /// + /// ``` + /// # use tokio::task; + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn test() { + /// use std::sync::Arc; + /// use maitake_sync::WaitCell; + /// use std::sync::atomic::{AtomicU8, Ordering}; + /// + /// let queue = Arc::new(WaitCell::new()); + /// let num = Arc::new(AtomicU8::new(0)); + /// + /// let waiter = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// let rxd = queue.wait_for_value(|| { + /// let val = num.load(Ordering::Relaxed); + /// if val > 5 { + /// return Some(val); + /// } + /// None + /// }).await.unwrap(); + /// assert!(rxd > 5); + /// println!("received wakeup with value: {rxd}"); + /// } + /// }); + /// + /// println!("poking task..."); + /// + /// for i in 0..20 { + /// num.store(i, Ordering::Relaxed); + /// queue.wake(); + /// } + /// + /// waiter.await.unwrap(); + /// # } + /// # test(); + /// ``` + pub async fn wait_for_value Option>(&self, mut f: F) -> Result { + loop { + let wait = self.subscribe().await; + if let Some(t) = f() { + return Ok(t); + } + wait.await?; + } + } + // TODO(eliza): is this an API we want to have? /* /// Returns `true` if this `WaitCell` is [closed](Self::close). diff --git a/maitake-sync/src/wait_queue.rs b/maitake-sync/src/wait_queue.rs index 9ed5100e..04843476 100644 --- a/maitake-sync/src/wait_queue.rs +++ b/maitake-sync/src/wait_queue.rs @@ -684,6 +684,197 @@ impl WaitQueue { } } + /// Asynchronously poll the given function `f` until a condition occurs, + /// using the [`WaitQueue`] to only re-poll when notified. + /// + /// This can be used to implement a "wait loop", turning a "try" function + /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. + /// "recv" or "send"). + /// + /// In particular, this function correctly *registers* interest in the [`WaitQueue`] + /// prior to polling the function, ensuring that there is not a chance of a race + /// where the condition occurs AFTER checking but BEFORE registering interest + /// in the [`WaitQueue`], which could lead to deadlock. + /// + /// This is intended to have similar behavior to `Condvar` in the standard library, + /// but asynchronous, and not requiring operating system intervention (or existence). + /// + /// In particular, this can be used in cases where interrupts or events are used + /// to signify readiness or completion of some task, such as the completion of a + /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt + /// can wake the queue, allowing the polling function to check status fields for + /// partial progress or completion. + /// + /// Consider using [`Self::wait_for_value()`] if your function does return a value. + /// + /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for) + /// if you do not need multiple waiters. + /// + /// # Returns + /// + /// * [`Ok`]`(())` if the closure returns `true`. + /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed. + /// + /// # Examples + /// + /// ``` + /// # use tokio::task; + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn test() { + /// use std::sync::Arc; + /// use maitake_sync::WaitQueue; + /// use std::sync::atomic::{AtomicU8, Ordering}; + /// + /// let queue = Arc::new(WaitQueue::new()); + /// let num = Arc::new(AtomicU8::new(0)); + /// + /// let waiter1 = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// queue.wait_for(|| num.load(Ordering::Relaxed) > 5).await; + /// println!("received wakeup!"); + /// } + /// }); + /// + /// let waiter2 = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// queue.wait_for(|| num.load(Ordering::Relaxed) > 10).await; + /// println!("received wakeup!"); + /// } + /// }); + /// + /// println!("poking task..."); + /// + /// for i in 0..20 { + /// num.store(i, Ordering::Relaxed); + /// queue.wake(); + /// } + /// + /// waiter1.await.unwrap(); + /// waiter2.await.unwrap(); + /// # } + /// # test(); + /// ``` + pub async fn wait_for bool>(&self, mut f: F) -> WaitResult<()> { + loop { + let wait = self.wait(); + let mut wait = core::pin::pin!(wait); + let _ = wait.as_mut().subscribe()?; + if f() { + return Ok(()); + } + wait.await?; + } + } + + /// Asynchronously poll the given function `f` until a condition occurs, + /// using the [`WaitQueue`] to only re-poll when notified. + /// + /// This can be used to implement a "wait loop", turning a "try" function + /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. + /// "recv" or "send"). + /// + /// In particular, this function correctly *registers* interest in the [`WaitQueue`] + /// prior to polling the function, ensuring that there is not a chance of a race + /// where the condition occurs AFTER checking but BEFORE registering interest + /// in the [`WaitQueue`], which could lead to deadlock. + /// + /// This is intended to have similar behavior to `Condvar` in the standard library, + /// but asynchronous, and not requiring operating system intervention (or existence). + /// + /// In particular, this can be used in cases where interrupts or events are used + /// to signify readiness or completion of some task, such as the completion of a + /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt + /// can wake the queue, allowing the polling function to check status fields for + /// partial progress or completion, and also return the status flags at the same time. + /// + /// Consider using [`Self::wait_for()`] if your function does not return a value. + /// + /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value) + /// if you do not need multiple waiters. + /// + /// * [`Ok`]`(T)` if the closure returns [`Some`]`(T)`. + /// * [`Err`]`(`[`Closed`](crate::Closed)`)` if the [`WaitQueue`] is closed. + /// + /// # Examples + /// + /// ``` + /// # use tokio::task; + /// # #[tokio::main(flavor = "current_thread")] + /// # async fn test() { + /// use std::sync::Arc; + /// use maitake_sync::WaitQueue; + /// use std::sync::atomic::{AtomicU8, Ordering}; + /// + /// let queue = Arc::new(WaitQueue::new()); + /// let num = Arc::new(AtomicU8::new(0)); + /// + /// let waiter1 = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// let rxd = queue.wait_for_value(|| { + /// let val = num.load(Ordering::Relaxed); + /// if val > 5 { + /// return Some(val); + /// } + /// None + /// }).await.unwrap(); + /// assert!(rxd > 5); + /// println!("received wakeup with value: {rxd}"); + /// } + /// }); + /// + /// let waiter2 = task::spawn({ + /// // clone items to move into the spawned task + /// let queue = queue.clone(); + /// let num = num.clone(); + /// async move { + /// let rxd = queue.wait_for_value(|| { + /// let val = num.load(Ordering::Relaxed); + /// if val > 10 { + /// return Some(val); + /// } + /// None + /// }).await.unwrap(); + /// assert!(rxd > 10); + /// println!("received wakeup with value: {rxd}"); + /// } + /// }); + /// + /// println!("poking task..."); + /// + /// for i in 0..20 { + /// num.store(i, Ordering::Relaxed); + /// queue.wake(); + /// } + /// + /// waiter1.await.unwrap(); + /// waiter2.await.unwrap(); + /// # } + /// # test(); + /// ``` + pub async fn wait_for_value Option>(&self, mut f: F) -> WaitResult { + loop { + let wait = self.wait(); + let mut wait = core::pin::pin!(wait); + match wait.as_mut().subscribe() { + Poll::Ready(wr) => wr?, + Poll::Pending => {} + } + if let Some(t) = f() { + return Ok(t); + } + wait.await?; + } + } + /// Returns a [`Waiter`] entry in this queue. /// /// This is factored out into a separate function because it's used by both diff --git a/maitake/src/time/timer/tests.rs b/maitake/src/time/timer/tests.rs index c121161a..e7a2baa2 100644 --- a/maitake/src/time/timer/tests.rs +++ b/maitake/src/time/timer/tests.rs @@ -5,6 +5,9 @@ use core::cell::RefCell; use core::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use crate::time::{Clock, timer::Ticks}; +use std::time::Duration; + crate::loom::thread_local! { static CLOCK: RefCell>> = RefCell::new(None); }