From 6d0ba19af51015dcd80558ae768215448e285fdf Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 26 Oct 2020 08:54:25 -0700 Subject: [PATCH] sync: make oneshot::Sender::poll_closed public again (#3032) --- tokio/src/sync/oneshot.rs | 136 ++++++++++++++++++++++++-------------- 1 file changed, 88 insertions(+), 48 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index e0a9e793f83..ece9abaeb64 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -196,54 +196,6 @@ impl Sender { Ok(()) } - fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - let inner = self.inner.as_ref().unwrap(); - - let mut state = State::load(&inner.state, Acquire); - - if state.is_closed() { - coop.made_progress(); - return Poll::Ready(()); - } - - if state.is_tx_task_set() { - let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) }; - - if !will_notify { - state = State::unset_tx_task(&inner.state); - - if state.is_closed() { - // Set the flag again so that the waker is released in drop - State::set_tx_task(&inner.state); - coop.made_progress(); - return Ready(()); - } else { - unsafe { inner.drop_tx_task() }; - } - } - } - - if !state.is_tx_task_set() { - // Attempt to set the task - unsafe { - inner.set_tx_task(cx); - } - - // Update the state - state = State::set_tx_task(&inner.state); - - if state.is_closed() { - coop.made_progress(); - return Ready(()); - } - } - - Pending - } - /// Waits for the associated [`Receiver`] handle to close. /// /// A [`Receiver`] is closed by either calling [`close`] explicitly or the @@ -350,6 +302,94 @@ impl Sender { let state = State::load(&inner.state, Acquire); state.is_closed() } + + /// Check whether the oneshot channel has been closed, and if not, schedules the + /// `Waker` in the provided `Context` to receive a notification when the channel is + /// closed. + /// + /// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the + /// [`Receiver`] value is dropped. + /// + /// Note that on multiple calls to poll, only the `Waker` from the `Context` passed + /// to the most recent call will be scheduled to receive a wakeup. + /// + /// [`Receiver`]: struct@crate::sync::oneshot::Receiver + /// [`close`]: fn@crate::sync::oneshot::Receiver::close + /// + /// # Return value + /// + /// This function returns: + /// + /// * `Poll::Pending` if the channel is still open. + /// * `Poll::Ready(())` if the channel is closed. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::oneshot; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = oneshot::channel::<()>(); + /// + /// tokio::spawn(async move { + /// rx.close(); + /// }); + /// + /// poll_fn(|cx| tx.poll_closed(cx)).await; + /// + /// println!("the receiver dropped"); + /// } + /// ``` + pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + let inner = self.inner.as_ref().unwrap(); + + let mut state = State::load(&inner.state, Acquire); + + if state.is_closed() { + coop.made_progress(); + return Poll::Ready(()); + } + + if state.is_tx_task_set() { + let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) }; + + if !will_notify { + state = State::unset_tx_task(&inner.state); + + if state.is_closed() { + // Set the flag again so that the waker is released in drop + State::set_tx_task(&inner.state); + coop.made_progress(); + return Ready(()); + } else { + unsafe { inner.drop_tx_task() }; + } + } + } + + if !state.is_tx_task_set() { + // Attempt to set the task + unsafe { + inner.set_tx_task(cx); + } + + // Update the state + state = State::set_tx_task(&inner.state); + + if state.is_closed() { + coop.made_progress(); + return Ready(()); + } + } + + Pending + } } impl Drop for Sender {