From 5323e68dd79c806ef136c57d00a983b26ea7015c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 30 Sep 2017 21:44:39 -0700 Subject: [PATCH 1/4] Rename ThreadUnpark -> ThreadNotify This renames an internal type away from deprecated terminology. --- src/task_impl/std/mod.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index f3667820e7..d95e3a9053 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -238,7 +238,7 @@ impl Spawn { /// to complete. When a future cannot make progress it will use /// `thread::park` to block the current thread. pub fn wait_future(&mut self) -> Result { - let unpark = Arc::new(ThreadUnpark::new(thread::current())); + let unpark = Arc::new(ThreadNotify::new(thread::current())); loop { match self.poll_future_notify(&unpark, 0)? { @@ -296,7 +296,7 @@ impl Spawn { /// Like `wait_future`, except only waits for the next element to arrive on /// the underlying stream. pub fn wait_stream(&mut self) -> Option> { - let unpark = Arc::new(ThreadUnpark::new(thread::current())); + let unpark = Arc::new(ThreadNotify::new(thread::current())); loop { match self.poll_stream_notify(&unpark, 0) { Ok(Async::NotReady) => unpark.park(), @@ -340,7 +340,7 @@ impl Spawn { /// be blocked until it's able to send the value. pub fn wait_send(&mut self, mut value: S::SinkItem) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadUnpark::new(thread::current())); + let notify = Arc::new(ThreadNotify::new(thread::current())); loop { value = match self.start_send_notify(value, ¬ify, 0)? { AsyncSink::NotReady(v) => v, @@ -359,7 +359,7 @@ impl Spawn { /// The thread will be blocked until `poll_complete` returns that it's /// ready. pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadUnpark::new(thread::current())); + let notify = Arc::new(ThreadNotify::new(thread::current())); loop { if self.poll_flush_notify(¬ify, 0)?.is_ready() { return Ok(()) @@ -374,7 +374,7 @@ impl Spawn { /// is not ready to be close yet, then the current thread will be blocked /// until it's closed. pub fn wait_close(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadUnpark::new(thread::current())); + let notify = Arc::new(ThreadNotify::new(thread::current())); loop { if self.close_notify(¬ify, 0)?.is_ready() { return Ok(()) @@ -474,16 +474,16 @@ impl Unpark for RunInner { } } -// ===== ThreadUnpark ===== +// ===== ThreadNotify ===== -struct ThreadUnpark { +struct ThreadNotify { thread: thread::Thread, ready: AtomicBool, } -impl ThreadUnpark { - fn new(thread: thread::Thread) -> ThreadUnpark { - ThreadUnpark { +impl ThreadNotify { + fn new(thread: thread::Thread) -> ThreadNotify { + ThreadNotify { thread: thread, ready: AtomicBool::new(false), } @@ -496,7 +496,7 @@ impl ThreadUnpark { } } -impl Notify for ThreadUnpark { +impl Notify for ThreadNotify { fn notify(&self, _unpark_id: usize) { self.ready.store(true, Ordering::SeqCst); self.thread.unpark() From 28df950631050879f39d57e08b960002c0eade21 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sun, 1 Oct 2017 09:59:53 -0700 Subject: [PATCH 2/4] Reuse ThreadNotify when blocking thread for future This change avoids the Arc allocation for each blocking call as well as eliminates the need to perform the Arc ref count increment if unnecessary. --- src/task_impl/std/mod.rs | 95 +++++++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index d95e3a9053..33dcb457d5 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -238,14 +238,15 @@ impl Spawn { /// to complete. When a future cannot make progress it will use /// `thread::park` to block the current thread. pub fn wait_future(&mut self) -> Result { - let unpark = Arc::new(ThreadNotify::new(thread::current())); + ThreadNotify::with_current(|notify| { - loop { - match self.poll_future_notify(&unpark, 0)? { - Async::NotReady => unpark.park(), - Async::Ready(e) => return Ok(e), + loop { + match self.poll_future_notify(notify, 0)? { + Async::NotReady => notify.park(), + Async::Ready(e) => return Ok(e), + } } - } + }) } /// A specialized function to request running a future to completion on the @@ -296,15 +297,17 @@ impl Spawn { /// Like `wait_future`, except only waits for the next element to arrive on /// the underlying stream. pub fn wait_stream(&mut self) -> Option> { - let unpark = Arc::new(ThreadNotify::new(thread::current())); - loop { - match self.poll_stream_notify(&unpark, 0) { - Ok(Async::NotReady) => unpark.park(), - Ok(Async::Ready(Some(e))) => return Some(Ok(e)), - Ok(Async::Ready(None)) => return None, - Err(e) => return Some(Err(e)), + ThreadNotify::with_current(|notify| { + + loop { + match self.poll_stream_notify(notify, 0) { + Ok(Async::NotReady) => notify.park(), + Ok(Async::Ready(Some(e))) => return Some(Ok(e)), + Ok(Async::Ready(None)) => return None, + Err(e) => return Some(Err(e)), + } } - } + }) } } @@ -340,14 +343,16 @@ impl Spawn { /// be blocked until it's able to send the value. pub fn wait_send(&mut self, mut value: S::SinkItem) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - value = match self.start_send_notify(value, ¬ify, 0)? { - AsyncSink::NotReady(v) => v, - AsyncSink::Ready => return Ok(()), - }; - notify.park(); - } + ThreadNotify::with_current(|notify| { + + loop { + value = match self.start_send_notify(value, notify, 0)? { + AsyncSink::NotReady(v) => v, + AsyncSink::Ready => return Ok(()), + }; + notify.park(); + } + }) } /// Blocks the current thread until it's able to flush this sink. @@ -359,13 +364,15 @@ impl Spawn { /// The thread will be blocked until `poll_complete` returns that it's /// ready. pub fn wait_flush(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - if self.poll_flush_notify(¬ify, 0)?.is_ready() { - return Ok(()) + ThreadNotify::with_current(|notify| { + + loop { + if self.poll_flush_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); } - notify.park(); - } + }) } /// Blocks the current thread until it's able to close this sink. @@ -374,13 +381,15 @@ impl Spawn { /// is not ready to be close yet, then the current thread will be blocked /// until it's closed. pub fn wait_close(&mut self) -> Result<(), S::SinkError> { - let notify = Arc::new(ThreadNotify::new(thread::current())); - loop { - if self.close_notify(¬ify, 0)?.is_ready() { - return Ok(()) + ThreadNotify::with_current(|notify| { + + loop { + if self.close_notify(notify, 0)?.is_ready() { + return Ok(()) + } + notify.park(); } - notify.park(); - } + }) } } @@ -477,16 +486,22 @@ impl Unpark for RunInner { // ===== ThreadNotify ===== struct ThreadNotify { - thread: thread::Thread, ready: AtomicBool, + thread: thread::Thread, +} + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { + ready: AtomicBool::new(false), + thread: thread::current(), + }); } impl ThreadNotify { - fn new(thread: thread::Thread) -> ThreadNotify { - ThreadNotify { - thread: thread, - ready: AtomicBool::new(false), - } + fn with_current(f: F) -> R + where F: FnOnce(&Arc) -> R, + { + CURRENT_THREAD_NOTIFY.with(|notify| f(notify)) } fn park(&self) { From d832074f895e29609f8b4d37fae6504edb9aedc7 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sun, 1 Oct 2017 13:54:59 -0700 Subject: [PATCH 3/4] Use an atomic to avoid unnecessary mutex locking Unfortunately, using an atomic requires a final atomic CAS within the "wakeup" mutex. This means we cannot use the thread park / unpark helpers from std. This change increases a single threaded "yield" benchmark by almost 40%. --- src/task_impl/std/mod.rs | 74 ++++++++++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 11 deletions(-) diff --git a/src/task_impl/std/mod.rs b/src/task_impl/std/mod.rs index 33dcb457d5..9f7b6981f5 100644 --- a/src/task_impl/std/mod.rs +++ b/src/task_impl/std/mod.rs @@ -5,9 +5,8 @@ use std::fmt; use std::marker::PhantomData; use std::mem; use std::ptr; -use std::sync::{Arc, Once, ONCE_INIT}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::thread; +use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT}; +use std::sync::atomic::{AtomicUsize, Ordering}; use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink}; use super::core; @@ -486,14 +485,20 @@ impl Unpark for RunInner { // ===== ThreadNotify ===== struct ThreadNotify { - ready: AtomicBool, - thread: thread::Thread, + state: AtomicUsize, + mutex: Mutex<()>, + condvar: Condvar, } +const IDLE: usize = 0; +const NOTIFY: usize = 1; +const SLEEP: usize = 2; + thread_local! { static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { - ready: AtomicBool::new(false), - thread: thread::current(), + state: AtomicUsize::new(IDLE), + mutex: Mutex::new(()), + condvar: Condvar::new(), }); } @@ -505,16 +510,63 @@ impl ThreadNotify { } fn park(&self) { - if !self.ready.swap(false, Ordering::SeqCst) { - thread::park(); + // If currently notified, then we skip sleeping. This is checked outside + // of the lock to avoid acquiring a mutex if not necessary. + match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + NOTIFY => return, + IDLE => {}, + _ => unreachable!(), + } + + // The state is currently idle, so obtain the lock and then try to + // transition to a sleeping state. + let mut m = self.mutex.lock().unwrap(); + + // Transition to sleeping + match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { + NOTIFY => { + // Notified before we could sleep, consume the notification and + // exit + self.state.store(IDLE, Ordering::SeqCst); + return; + } + IDLE => {}, + _ => unreachable!(), + } + + // Loop until we've been notified + loop { + m = self.condvar.wait(m).unwrap(); + + // Transition back to idle, loop otherwise + if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { + return; + } } } } impl Notify for ThreadNotify { fn notify(&self, _unpark_id: usize) { - self.ready.store(true, Ordering::SeqCst); - self.thread.unpark() + // First, try transitioning from IDLE -> NOTIFY, this does not require a + // lock. + match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { + IDLE | NOTIFY => return, + SLEEP => {} + _ => unreachable!(), + } + + // The other half is sleeping, this requires a lock + let _m = self.mutex.lock().unwrap(); + + // Transition from SLEEP -> NOTIFY + match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) { + SLEEP => {} + _ => return, + } + + // Wakeup the sleeper + self.condvar.notify_one(); } } From 60d9ce10b2235888bec5b288d6b9cd2e76cf6680 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sun, 1 Oct 2017 14:55:35 -0700 Subject: [PATCH 4/4] Add ThreadNotify benchmark --- benches/thread_notify.rs | 114 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 benches/thread_notify.rs diff --git a/benches/thread_notify.rs b/benches/thread_notify.rs new file mode 100644 index 0000000000..92932353d8 --- /dev/null +++ b/benches/thread_notify.rs @@ -0,0 +1,114 @@ +#![feature(test)] + +extern crate futures; +extern crate test; + +use futures::{Future, Poll, Async}; +use futures::task::{self, Task}; + +use test::Bencher; + +#[bench] +fn thread_yield_single_thread_one_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + let y = Yield { rem: NUM }; + y.wait().unwrap(); + }); +} + +#[bench] +fn thread_yield_single_thread_many_wait(b: &mut Bencher) { + const NUM: usize = 10_000; + + struct Yield { + rem: usize, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + task::current().notify(); + Ok(Async::NotReady) + } + } + } + + b.iter(|| { + for _ in 0..NUM { + let y = Yield { rem: 1 }; + y.wait().unwrap(); + } + }); +} + +#[bench] +fn thread_yield_multi_thread(b: &mut Bencher) { + use std::sync::mpsc; + use std::thread; + + const NUM: usize = 1_000; + + let (tx, rx) = mpsc::sync_channel::(10_000); + + struct Yield { + rem: usize, + tx: mpsc::SyncSender, + } + + impl Future for Yield { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + if self.rem == 0 { + Ok(Async::Ready(())) + } else { + self.rem -= 1; + self.tx.send(task::current()).unwrap(); + Ok(Async::NotReady) + } + } + } + + thread::spawn(move || { + while let Ok(task) = rx.recv() { + task.notify(); + } + }); + + b.iter(move || { + let y = Yield { + rem: NUM, + tx: tx.clone(), + }; + + y.wait().unwrap(); + }); +}