Skip to content

Commit

Permalink
Unsplit the 'Incomplete' commit
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Mar 31, 2023
1 parent 3fefa6c commit 1ec136a
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 179 deletions.
144 changes: 77 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
//! }
//! ```

#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]

extern crate alloc;
Expand All @@ -69,7 +69,7 @@ extern crate alloc;
#[cfg_attr(not(feature = "std"), path = "no_std.rs")]
mod sys;

use alloc::sync::Arc;
use alloc::boxed::Box;

use core::borrow::Borrow;
use core::fmt;
Expand All @@ -79,15 +79,23 @@ use core::mem::ManuallyDrop;
use core::pin::Pin;
use core::ptr;
use core::task::{Context, Poll, Waker};
use core::usize;

use sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};

#[cfg(feature = "std")]
use std::panic::{RefUnwindSafe, UnwindSafe};
#[cfg(feature = "std")]
use std::time::{Duration, Instant};

use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::{Arc, WithMut};

/// 1.39-compatible replacement for `matches!`
macro_rules! matches {
($expr:expr, $($pattern:pat)|+ $(if $guard: expr)?) => {
match $expr {
$($pattern)|+ $(if $guard)? => true,
_ => false,
}
};
}

/// Inner state of [`Event`].
struct Inner {
/// The number of notified entries, or `usize::MAX` if all of them have been notified.
Expand All @@ -96,11 +104,14 @@ struct Inner {
notified: AtomicUsize,

/// Inner queue of event listeners.
///
/// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a
/// more traditional `Vec` of listeners, with an atomic queue used as a backup for high
/// contention.
list: sys::List,
}

impl Inner {
/// Create a new `Inner`.
fn new() -> Self {
Self {
notified: AtomicUsize::new(core::usize::MAX),
Expand Down Expand Up @@ -137,10 +148,26 @@ pub struct Event {
inner: AtomicPtr<Inner>,
}

unsafe impl Send for Event {}
unsafe impl Sync for Event {}

#[cfg(feature = "std")]
impl UnwindSafe for Event {}
impl std::panic::UnwindSafe for Event {}
#[cfg(feature = "std")]
impl RefUnwindSafe for Event {}
impl std::panic::RefUnwindSafe for Event {}

impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Pad { .. }")
}
}

impl Default for Event {
#[inline]
fn default() -> Self {
Self::new()
}
}

impl Event {
/// Creates a new [`Event`].
Expand All @@ -153,15 +180,17 @@ impl Event {
/// let event = Event::new();
/// ```
#[inline]
pub const fn new() -> Event {
Event {
pub const fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

/// Returns a guard listening for a notification.
///
/// This method emits a `SeqCst` fence after registering a listener.
/// This method emits a `SeqCst` fence after registering a listener. For now, this method
/// is an alias for calling [`EventListener::new()`], pinning it to the heap, and then
/// inserting it into a list.
///
/// # Examples
///
Expand Down Expand Up @@ -260,7 +289,7 @@ impl Event {
// Notify if there is at least one unnotified listener and the number of notified
// listeners is less than `n`.
if inner.notified.load(Ordering::Acquire) < n {
inner.notify(n, false);
inner.notify(n, true);
}
}
}
Expand Down Expand Up @@ -302,7 +331,7 @@ impl Event {

if let Some(inner) = self.try_inner() {
// Notify if there is at least one unnotified listener.
if inner.notified.load(Ordering::Acquire) < usize::MAX {
if inner.notified.load(Ordering::Acquire) < core::usize::MAX {
inner.notify(n, true);
}
}
Expand Down Expand Up @@ -347,34 +376,35 @@ impl Event {
pub fn notify_additional_relaxed(&self, n: usize) {
if let Some(inner) = self.try_inner() {
// Notify if there is at least one unnotified listener.
if inner.notified.load(Ordering::Acquire) < usize::MAX {
if inner.notified.load(Ordering::Acquire) < core::usize::MAX {
inner.notify(n, true);
}
}
}

/// Returns a reference to the inner state if it was initialized.
/// Return a reference to the inner state if it has been initialized.
#[inline]
fn try_inner(&self) -> Option<&Inner> {
let inner = self.inner.load(Ordering::Acquire);
unsafe { inner.as_ref() }
}

/// Returns a raw pointer to the inner state, initializing it if necessary.
/// Returns a raw, initialized pointer to the inner state.
///
/// This returns a raw pointer instead of reference because `from_raw`
/// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>
/// requires raw/mut provenance: <https://github.com/rust-lang/rust/pull/67339>.
fn inner(&self) -> *const Inner {
let mut inner = self.inner.load(Ordering::Acquire);

// Initialize the state if this is its first use.
// If this is the first use, initialize the state.
if inner.is_null() {
// Allocate on the heap.
// Allocate the state on the heap.
let new = Arc::new(Inner::new());
// Convert the heap-allocated state into a raw pointer.

// Convert the state to a raw pointer.
let new = Arc::into_raw(new) as *mut Inner;

// Attempt to replace the null-pointer with the new state pointer.
// Replace the null pointer with the new state pointer.
inner = self
.inner
.compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire)
Expand All @@ -400,26 +430,14 @@ impl Event {
impl Drop for Event {
#[inline]
fn drop(&mut self) {
let inner: *mut Inner = *self.inner.get_mut();

// If the state pointer has been initialized, deallocate it.
if !inner.is_null() {
unsafe {
drop(Arc::from_raw(inner));
self.inner.with_mut(|&mut inner| {
// If the state pointer has been initialized, drop it.
if !inner.is_null() {
unsafe {
drop(Arc::from_raw(inner));
}
}
}
}
}

impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Event { .. }")
}
}

impl Default for Event {
fn default() -> Event {
Event::new()
})
}
}

Expand Down Expand Up @@ -770,40 +788,35 @@ impl<B: Borrow<Inner> + Unpin> Drop for Listener<B> {

/// The state of a listener.
#[derive(Debug, PartialEq)]
pub(crate) enum State {
/// It has just been created.
enum State {
/// The listener was just created.
Created,

/// It has received a notification.
/// The listener has received a notification.
///
/// The `bool` is `true` if this was an "additional" notification.
Notified(bool),

/// A task is polling it.
/// A task is waiting for a notification.
Task(Task),

/// Empty hole used to replace a notified listener.
NotifiedTaken,
}

impl State {
/// Returns `true` if this is the `Notified` state.
#[inline]
pub(crate) fn is_notified(&self) -> bool {
match self {
State::Notified(_) | Self::NotifiedTaken => true,
_ => false,
}
fn is_notified(&self) -> bool {
matches!(self, Self::Notified(_) | Self::NotifiedTaken)
}
}

/// An asynchronous waker or thread unparker that can be used to notify a task or thread.
#[derive(Debug)]
/// A task that can be woken up.
#[derive(Debug, Clone)]
enum Task {
/// A waker that can be used to notify a task.
/// A waker that wakes up a future.
Waker(Waker),

/// An unparker that can be used to notify a thread.
/// An unparker that wakes up a thread.
#[cfg(feature = "std")]
Unparker(parking::Unparker),
}
Expand All @@ -817,12 +830,11 @@ impl Task {
}
}

/// Notifies the task or thread.
fn wake(self) {
match self {
Task::Waker(waker) => waker.wake(),
Self::Waker(waker) => waker.wake(),
#[cfg(feature = "std")]
Task::Unparker(unparker) => {
Self::Unparker(unparker) => {
unparker.unpark();
}
}
Expand All @@ -836,7 +848,7 @@ impl PartialEq for Task {
}

/// A reference to a task.
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
enum TaskRef<'a> {
/// A waker that wakes up a future.
Waker(&'a Waker),
Expand Down Expand Up @@ -890,23 +902,21 @@ fn full_fence() {
// The ideal solution here would be to use inline assembly, but we're instead creating a
// temporary atomic variable and compare-and-exchanging its value. No sane compiler to
// x86 platforms is going to optimize this away.
atomic::compiler_fence(Ordering::SeqCst);
sync::atomic::compiler_fence(Ordering::SeqCst);
let a = AtomicUsize::new(0);
let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst);
atomic::compiler_fence(Ordering::SeqCst);
sync::atomic::compiler_fence(Ordering::SeqCst);
} else {
atomic::fence(Ordering::SeqCst);
sync::atomic::fence(Ordering::SeqCst);
}
}

/// Synchronization primitive implementation.
mod sync {
pub(super) use alloc::sync::Arc;
pub(super) use core::cell;
pub(super) use core::sync::atomic;

#[cfg(not(feature = "std"))]
pub(super) use alloc::sync::Arc;

#[cfg(feature = "std")]
pub(super) use std::sync::{Mutex, MutexGuard};

Expand Down
19 changes: 13 additions & 6 deletions src/no_std/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! An operation that can be delayed.

//! The node that makes up queues.

use super::ListenerSlab;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::sync::Arc;
use crate::sys::ListenerSlab;
use crate::{State, Task};

use alloc::boxed::Box;
Expand Down Expand Up @@ -32,7 +34,7 @@ pub(crate) enum Node {
/// This node is removing a listener.
RemoveListener {
/// The ID of the listener to remove.
key: NonZeroUsize,
listener: NonZeroUsize,

/// Whether to propagate notifications to the next listener.
propagate: bool,
Expand All @@ -42,6 +44,7 @@ pub(crate) enum Node {
Waiting(Task),
}

#[derive(Debug)]
pub(crate) struct TaskWaiting {
/// The task that is being waited on.
task: AtomicCell<Task>,
Expand Down Expand Up @@ -69,23 +72,27 @@ impl Node {
}

/// Apply the node to the list.
pub(crate) fn apply(self, list: &mut ListenerSlab) -> Option<Task> {
pub(super) fn apply(self, list: &mut ListenerSlab) -> Option<Task> {
match self {
Node::AddListener { task_waiting } => {
// Add a new entry to the list.
let key = list.insert(State::Created);

// Send the new key to the listener and wake it if necessary.
task_waiting.entry_id.store(key.get(), Ordering::Release);

return task_waiting.task.take().map(|t| *t);
}
Node::Notify { count, additional } => {
// Notify the listener.
// Notify the next `count` listeners.
list.notify(count, additional);
}
Node::RemoveListener { key, propagate } => {
Node::RemoveListener {
listener,
propagate,
} => {
// Remove the listener from the list.
list.remove(key, propagate);
list.remove(listener, propagate);
}
Node::Waiting(task) => {
return Some(task);
Expand Down
Loading

0 comments on commit 1ec136a

Please sign in to comment.