Skip to content

Commit

Permalink
chore(maitake): use std pin!, poll_fn, ready
Browse files Browse the repository at this point in the history
Currently, we use the versions of `pin_mut!`, `future::poll_fn`, and
`future::ready` from the `futures` crate. Now, `core::pin::pin!`,
`core::future::poll_fn`, and `core::future::ready` are stable, so we can
are stable, so we can just use the stdlib versions of these APIs
instead.

This allows us to reduce our dev dependencies on the `futures` crates
down to just depending on `futures_util` in `maitake-sync` for
`select_biased!`. Reducing the number of dev-deps will hopefully make
compiling the tests a little faster. And, the stdlib version of `pin!`
is *way* nicer than `futures::pin_mut!` as it can be on the right-hand
side of an assignment, rather than requiring an assignment followed by a
`pin!`.
  • Loading branch information
hawkw committed Sep 11, 2024
1 parent e51eb8a commit a7493cc
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 40 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion maitake-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ tracing = { version = "0.1", default_features = false }

[dev-dependencies]
futures-util = "0.3"
futures = "0.3"
tokio-test = "0.4"
tracing = { version = "0.1", default_features = false, features = ["std"] }
tracing-subscriber = { version = "0.3.11", features = ["fmt", "env-filter"] }
Expand Down
5 changes: 2 additions & 3 deletions maitake-sync/src/wait_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,22 @@ impl<'map, 'wait, K: PartialEq, V, Lock: ScopedRawMutex> Wait<'map, K, V, Lock>
///
/// ```ignore
/// use std::sync::Arc;
/// use std::pin::pin;
/// use maitake::scheduler;
/// use maitake_sync::wait_map::{WaitMap, WakeOutcome};
/// use futures_util::pin_mut;
///
/// let scheduler = Scheduler::new();
/// let q = Arc::new(WaitMap::new());
///
/// let q2 = q.clone();
/// scheduler.spawn(async move {
/// let wait = q2.wait(0);
/// let mut wait = pin!(q2.wait(0));
///
/// // At this point, we have created the future, but it has not yet
/// // been added to the queue. We could immediately await 'wait',
/// // but then we would be unable to progress further. We must
/// // first pin the `wait` future, to ensure that it does not move
/// // until it has been completed.
/// pin_mut!(wait);
/// wait.as_mut().subscribe().await.unwrap();
///
/// // We now know the waiter has been enqueued, at this point we could
Expand Down
27 changes: 12 additions & 15 deletions maitake-sync/src/wait_map/tests/alloc_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use super::super::*;
use crate::loom::sync::Arc;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use futures::{future::poll_fn, pin_mut, select_biased, FutureExt};
use core::{
future,
pin::pin,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
use futures_util::{select_biased, FutureExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, task};

#[test]
Expand Down Expand Up @@ -31,9 +35,7 @@ fn enqueue() {
let mut waiter2 = task::spawn({
let q = q.clone();
async move {
let wait = q.wait(1);

pin_mut!(wait);
let mut wait = pin!(q.wait(1));
wait.as_mut().subscribe().await.unwrap();
ENQUEUED.fetch_add(1, Ordering::Relaxed);

Expand Down Expand Up @@ -78,9 +80,7 @@ fn duplicate() {
let mut waiter1 = task::spawn({
let q = q.clone();
async move {
let wait = q.wait(0);

pin_mut!(wait);
let mut wait = pin!(q.wait(0));
wait.as_mut().subscribe().await.unwrap();
ENQUEUED.fetch_add(1, Ordering::Relaxed);

Expand All @@ -95,9 +95,7 @@ fn duplicate() {
let q = q.clone();
async move {
// Duplicate key!
let wait = q.wait(0);

pin_mut!(wait);
let mut wait = pin!(q.wait(0));
wait.as_mut().subscribe().await
}
});
Expand Down Expand Up @@ -345,19 +343,18 @@ fn drop_wake_bailed() {
.map(|i| {
let q = q.clone();
task::spawn(async move {
let mut bail_fut = poll_fn(|_| match BAIL.load(Ordering::Relaxed) {
let mut bail_fut = future::poll_fn(|_| match BAIL.load(Ordering::Relaxed) {
false => Poll::Pending,
true => Poll::Ready(()),
})
.fuse();

let wait_fut = q
let mut wait_fut = pin!(q
.wait(CountDropKey {
idx: i,
cnt: &KEY_DROPS,
})
.fuse();
pin_mut!(wait_fut);
.fuse());

// NOTE: `select_baised is used specifically to ensure the bail
// future is processed first.
Expand Down
2 changes: 1 addition & 1 deletion maitake-sync/src/wait_map/tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn wake_close() {

#[test]
fn wake_and_drop() {
use futures::FutureExt;
use futures_util::FutureExt;
loom::model(|| {
// use `Arc`s as the value type to ensure their destructors are run.
let q = Arc::new(WaitMap::<usize, Arc<()>>::new());
Expand Down
6 changes: 3 additions & 3 deletions maitake-sync/src/wait_queue/tests/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ fn wake_mixed() {

#[test]
fn drop_wait_future() {
use futures_util::future::poll_fn;
use std::future::Future;
use std::future::poll_fn;
use std::pin::pin;
use std::task::Poll;

loom::model(|| {
Expand All @@ -200,7 +200,7 @@ fn drop_wait_future() {
let thread1 = thread::spawn({
let q = q.clone();
move || {
let mut wait = Box::pin(q.wait());
let mut wait = pin!(q.wait());

future::block_on(poll_fn(|cx| {
if wait.as_mut().poll(cx).is_ready() {
Expand Down
2 changes: 0 additions & 2 deletions maitake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ version = "0.1.35"
optional = true

[dev-dependencies]
futures-util = "0.3"
futures = "0.3"
tokio-test = "0.4"

[target.'cfg(not(loom))'.dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions maitake/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub(crate) mod yield_future;
pub use self::yield_future::{yield_now, Yield};
pub use core::future::*;
7 changes: 3 additions & 4 deletions maitake/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
loom::sync::atomic::{AtomicUsize, Ordering::Relaxed},
sync::WaitCell,
};
use core::task::Poll;
use core::{pin::pin, task::Poll};
use std::sync::Arc;

#[cfg(all(feature = "alloc", not(loom)))]
Expand Down Expand Up @@ -35,13 +35,12 @@ impl Chan {
pub(crate) async fn wait(self: Arc<Chan>) {
let this = Arc::downgrade(&self);
drop(self);
futures_util::future::poll_fn(move |cx| {
future::poll_fn(move |cx| {
let Some(this) = this.upgrade() else {
return Poll::Ready(());
};

let res = this.task.wait();
futures_util::pin_mut!(res);
let res = pin!(this.task.wait());

if this.num_notify == this.num.load(Relaxed) {
return Poll::Ready(());
Expand Down
5 changes: 3 additions & 2 deletions maitake/src/task/tests/alloc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ fn task_is_valid_for_casts() {
fn empty_task_size() {
use core::{
any::type_name,
future,
mem::{size_of, size_of_val},
};

type Future = futures::future::Ready<()>;
type Future = future::Ready<()>;
type EmptyTask = Task<NopSchedule, Future, BoxStorage>;

println!(
Expand All @@ -65,7 +66,7 @@ fn empty_task_size() {
size_of::<EmptyTask>() - size_of::<Future>()
);

let task = Task::<Scheduler, Future, BoxStorage>::new(futures::future::ready(()));
let task = Task::<Scheduler, Future, BoxStorage>::new(future::ready(()));
println!("\nTask {{ // {}B", size_of_val(&task));
println!(
" schedulable: Schedulable {{ // {}B",
Expand Down
10 changes: 4 additions & 6 deletions maitake/src/time/timer/tests/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::*;
use crate::loom::{sync::Arc, thread};
use core::{
future::Future,
future::{self, Future},
pin::pin,
task::{Context, Poll},
};
use futures_util::{future, pin_mut};

#[cfg(loom)]
use loom::future::block_on;
Expand Down Expand Up @@ -116,8 +116,7 @@ fn two_sleeps_sequential() {
fn cancel_polled_sleeps() {
fn poll_and_cancel(timer: Arc<Timer>) {
block_on(async move {
let sleep = timer.sleep_ticks(15);
pin_mut!(sleep);
let mut sleep = pin!(timer.sleep_ticks(15));
future::poll_fn(move |cx| {
// poll once to register the sleep with the timer wheel, and
// then return `Ready` so it gets canceled.
Expand Down Expand Up @@ -164,8 +163,7 @@ fn reregister_waker() {
let clock = clock.test_clock();
move || {
let _clock = clock.enter();
let sleep = timer.sleep(Duration::from_secs(1));
pin_mut!(sleep);
let mut sleep = pin!(timer.sleep(Duration::from_secs(1)));
// poll the sleep future initially with a no-op waker.
let _ = sleep.as_mut().poll(&mut Context::from_waker(
futures_util::task::noop_waker_ref(),
Expand Down

0 comments on commit a7493cc

Please sign in to comment.