diff --git a/benches/thread_notify.rs b/benches/thread_notify.rs new file mode 100644 index 0000000000..92932353d8 --- /dev/null +++ b/benches/thread_notify.rs @@ -0,0 +1,114 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::{Future, Poll, Async}; +use futures::task::{self, Task}; + +use test::Bencher; + +#[bench] +fn thread_yield_single_thread_one_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + let y = Yield { rem: NUM }; + y.wait().unwrap(); + }); +} + +#[bench] +fn thread_yield_single_thread_many_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + for _ in 0..NUM { + let y = Yield { rem: 1 }; + y.wait().unwrap(); + } + }); +} + +#[bench] +fn thread_yield_multi_thread(b: &mut Bencher) { + use std::sync::mpsc; + use std::thread; + + const NUM: usize = 1_000; + + let (tx, rx) = mpsc::sync_channel::(10_000); + + struct Yield { + rem: usize, + tx: mpsc::SyncSender, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + self.tx.send(task::current()).unwrap(); + Ok(Async::NotReady) + } + } + } + + thread::spawn(move || { + while let Ok(task) = rx.recv() { + task.notify(); + } + }); + + b.iter(move || { + let y = Yield { + rem: NUM, + tx: tx.clone(), + }; + + y.wait().unwrap(); + }); +} diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index af9fee1008..108c4e575e 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -5,9 +5,8 @@ use std::fmt; use std::marker::PhantomData; use std::mem; use std::ptr; -use std::sync::{Arc, Once, ONCE_INIT}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; +use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; +use std::sync::atomic::{AtomicUsize, Ordering}; use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; use super::core; @@ -238,14 +237,15 @@ impl Spawn { /// to complete. When a future cannot make progress it will use /// `thread::park` to block the current thread. pub fn wait_future(&mut self) -> Result { - let unpark = Arc::new(ThreadNotify::new(thread::current())); + ThreadNotify::with_current(|notify| { - loop { - match self.poll_future_notify(&unpark, 0)? { - Async::NotReady => unpark.park(), - Async::Ready(e) => return Ok(e), + loop { + match self.poll_future_notify(notify, 0)? { + Async::NotReady => notify.park(), + Async::Ready(e) => return Ok(e), + } } - } + }) } /// A specialized function to request running a future to completion on the @@ -296,15 +296,17 @@ impl Spawn { /// Like `wait_future`, except only waits for the next element to arrive on /// the underlying stream. pub fn wait_stream(&mut self) -> Option> { - let unpark = Arc::new(ThreadNotify::new(thread::current())); - loop { - match self.poll_stream_notify(&unpark, 0) { - Ok(Async::NotReady) => unpark.park(), - Ok(Async::Ready(Some(e))) => return Some(Ok(e)), - Ok(Async::Ready(None)) => return None, - Err(e) => return Some(Err(e)), + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_stream_notify(notify, 0) { + Ok(Async::NotReady) => notify.park(), + Ok(Async::Ready(Some(e))) => return Some(Ok(e)), + Ok(Async::Ready(None)) => return None, + Err(e) => return Some(Err(e)), + } } - } + }) } } @@ -340,14 +342,16 @@ impl Spawn { /// be blocked until it's able to send the value. pub fn wait_send(&mut self, mut value: S::SinkItem) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - value = match self.start_send_notify(value, ¬ify, 0)? { - AsyncSink::NotReady(v) => v, - AsyncSink::Ready => return Ok(()), - }; - notify.park(); - } + ThreadNotify::with_current(|notify| { + + loop { + value = match self.start_send_notify(value, notify, 0)? { + AsyncSink::NotReady(v) => v, + AsyncSink::Ready => return Ok(()), + }; + notify.park(); + } + }) } /// Blocks the current thread until it's able to flush this sink. @@ -359,13 +363,15 @@ impl Spawn { /// The thread will be blocked until `poll_complete` returns that it's /// ready. pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - if self.poll_flush_notify(¬ify, 0)?.is_ready() { - return Ok(()) + ThreadNotify::with_current(|notify| { + + loop { + if self.poll_flush_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); } - notify.park(); - } + }) } /// Blocks the current thread until it's able to close this sink. @@ -374,13 +380,15 @@ impl Spawn { /// is not ready to be close yet, then the current thread will be blocked /// until it's closed. pub fn wait_close(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - if self.close_notify(¬ify, 0)?.is_ready() { - return Ok(()) + ThreadNotify::with_current(|notify| { + + loop { + if self.close_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); } - notify.park(); - } + }) } } @@ -477,29 +485,88 @@ impl Unpark for RunInner { // ===== ThreadNotify ===== struct ThreadNotify { - thread: thread::Thread, - ready: AtomicBool, + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, +} + +const IDLE: usize = 0; +const NOTIFY: usize = 1; +const SLEEP: usize = 2; + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { + state: AtomicUsize::new(IDLE), + mutex: Mutex::new(()), + condvar: Condvar::new(), + }); } impl ThreadNotify { - fn new(thread: thread::Thread) -> ThreadNotify { - ThreadNotify { - thread: thread, - ready: AtomicBool::new(false), - } + fn with_current(f: F) -> R + where F: FnOnce(&Arc) -> R, + { + CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) } fn park(&self) { - if !self.ready.swap(false, Ordering::SeqCst) { - thread::park(); + // If currently notified, then we skip sleeping. This is checked outside + // of the lock to avoid acquiring a mutex if not necessary. + match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + NOTIFY => return, + IDLE => {}, + _ => unreachable!(), + } + + // The state is currently idle, so obtain the lock and then try to + // transition to a sleeping state. + let mut m = self.mutex.lock().unwrap(); + + // Transition to sleeping + match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { + NOTIFY => { + // Notified before we could sleep, consume the notification and + // exit + self.state.store(IDLE, Ordering::SeqCst); + return; + } + IDLE => {}, + _ => unreachable!(), + } + + // Loop until we've been notified + loop { + m = self.condvar.wait(m).unwrap(); + + // Transition back to idle, loop otherwise + if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + return; + } } } } impl Notify for ThreadNotify { fn notify(&self, _unpark_id: usize) { - self.ready.store(true, Ordering::SeqCst); - self.thread.unpark() + // First, try transitioning from IDLE -> NOTIFY, this does not require a + // lock. + match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { + IDLE | NOTIFY => return, + SLEEP => {} + _ => unreachable!(), + } + + // The other half is sleeping, this requires a lock + let _m = self.mutex.lock().unwrap(); + + // Transition from SLEEP -> NOTIFY + match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { + SLEEP => {} + _ => return, + } + + // Wakeup the sleeper + self.condvar.notify_one(); } }