diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 246ec212f53..42cde81dc9b 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,7 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; -use crate::util::linked_list::{self, LinkedList}; +use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; use std::fmt; @@ -366,6 +366,17 @@ struct Waiter { _p: PhantomPinned, } +impl Waiter { + fn new() -> Self { + Self { + queued: false, + waker: None, + pointers: linked_list::Pointers::new(), + _p: PhantomPinned, + } + } +} + generate_addr_of_methods! { impl<> Waiter { unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { @@ -817,12 +828,75 @@ fn new_receiver(shared: Arc>) -> Receiver { Receiver { shared, next } } +/// List used in `Shared::notify_rx`. It wraps a guarded linked list +/// and gates the access to it on the `Shared.tail` mutex. It also empties +/// the list on drop. +struct WaitersList<'a, T> { + list: GuardedLinkedList::Target>, + is_empty: bool, + shared: &'a Shared, +} + +impl<'a, T> Drop for WaitersList<'a, T> { + fn drop(&mut self) { + // If the list is not empty, we unlink all waiters from it. + // We do not wake the waiters to avoid double panics. + if !self.is_empty { + let _lock_guard = self.shared.tail.lock(); + while self.list.pop_back().is_some() {} + } + } +} + +impl<'a, T> WaitersList<'a, T> { + fn new( + unguarded_list: LinkedList::Target>, + guard: Pin<&'a Waiter>, + shared: &'a Shared, + ) -> Self { + let guard_ptr = NonNull::from(guard.get_ref()); + let list = unguarded_list.into_guarded(guard_ptr); + WaitersList { + list, + is_empty: false, + shared, + } + } + + /// Removes the last element from the guarded list. Modifying this list + /// requires an exclusive access to the main list in `Notify`. + fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option> { + let result = self.list.pop_back(); + if result.is_none() { + // Save information about emptiness to avoid waiting for lock + // in the destructor. + self.is_empty = true; + } + result + } +} + impl Shared { fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) { + // It is critical for `GuardedLinkedList` safety that the guard node is + // pinned in memory and is not dropped until the guarded list is dropped. + let guard = Waiter::new(); + pin!(guard); + + // We move all waiters to a secondary list. It uses a `GuardedLinkedList` + // underneath to allow every waiter to safely remove itself from it. + // + // * This list will be still guarded by the `waiters` lock. + // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it. + // * This wrapper will empty the list on drop. It is critical for safety + // that we will not leave any list entry with a pointer to the local + // guard node after this function returns / panics. + let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self); + let mut wakers = WakeList::new(); 'outer: loop { while wakers.can_push() { - match tail.waiters.pop_back() { + match list.pop_back_locked(&mut tail) { Some(mut waiter) => { // Safety: `tail` lock is still held. let waiter = unsafe { waiter.as_mut() };