Skip to content

Commit

Permalink
feat: Add explicit web support
Browse files Browse the repository at this point in the history
This commit adds WASM compilation support to this crate. The main thing
is that the wait() family of APIs are removed in WASM mode, as blocking
is not allowed in WASM.

In addition, tests are added to CI to support WASM.

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Aug 26, 2023
1 parent 85ca6d3 commit c278371
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 148 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ jobs:
run: cargo hack build --all --no-dev-deps
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps --features portable-atomic
- name: Install wasm-pack
uses: taiki-e/install-action@wasm-pack
- run: wasm-pack test --node
- run: wasm-pack test --node --no-default-features
- run: wasm-pack test --node --no-default-features --features portable-atomic

msrv:
runs-on: ubuntu-latest
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]

[dependencies]
concurrent-queue = { version = "2.2.0", default-features = false }
parking = { version = "2.0.0", optional = true }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.2", default-features = false, optional = true, features = ["alloc"] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }

[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
Expand All @@ -40,6 +42,9 @@ version = "0.4.0"
default-features = false
features = ["cargo_bench_support"]

[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"

[[bench]]
name = "bench"
harness = false
Expand Down
276 changes: 145 additions & 131 deletions examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,182 +2,196 @@
//!
//! This mutex exposes both blocking and async methods for acquiring a lock.

#![allow(dead_code)]
#[cfg(not(target_family = "wasm"))]
mod example {
#![allow(dead_code)]

use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};

use event_listener::Event;
use event_listener::Event;

/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,
/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,

/// Blocked lock operations.
lock_ops: Event,
/// Blocked lock operations.
lock_ops: Event,

/// The inner protected data.
data: UnsafeCell<T>,
}
/// The inner protected data.
data: UnsafeCell<T>,
}

unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}

impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
}
}
}

/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
}

/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;

// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait();

// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait();
}
}
}
}
}

/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait_deadline(deadline)?;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(mut l) => {
// Wait until a notification is received.
l.as_mut().wait_deadline(deadline)?;
}
}
}
}
}

/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
Some(l) => {
// Wait until a notification is received.
l.await;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
}
}
}
}

/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);
/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);

unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}

impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
}

impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
}

impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
}

fn main() {
const N: usize = 10;
pub(super) fn entry() {
const N: usize = 10;

// A shared counter.
let counter = Arc::new(Mutex::new(0));
// A shared counter.
let counter = Arc::new(Mutex::new(0));

// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();
// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();

// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();
// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();

thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;
thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;

// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}
// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}

// Wait until the last thread increments the counter.
rx.recv().unwrap();

// Wait until the last thread increments the counter.
rx.recv().unwrap();
// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);

// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);
println!("Done!");
}
}

println!("Done!");
#[cfg(target_family = "wasm")]
mod example {
pub(super) fn entry() {
println!("This example is not supported on wasm yet.");
}
}

fn main() {
example::entry();
}
Loading

0 comments on commit c278371

Please sign in to comment.