Skip to content

Commit

Permalink
Merge pull request #597 from carllerche/improve-thread-notify
Browse files Browse the repository at this point in the history
Improve thread notify
  • Loading branch information
alexcrichton authored Oct 25, 2017
2 parents e4be304 + 60d9ce1 commit 6d861ee
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 48 deletions.
114 changes: 114 additions & 0 deletions benches/thread_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#![feature(test)]

extern crate futures;
extern crate test;

use futures::{Future, Poll, Async};
use futures::task::{self, Task};

use test::Bencher;

#[bench]
fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
const NUM: usize = 10_000;

struct Yield {
rem: usize,
}

impl Future for Yield {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
if self.rem == 0 {
Ok(Async::Ready(()))
} else {
self.rem -= 1;
task::current().notify();
Ok(Async::NotReady)
}
}
}

b.iter(|| {
let y = Yield { rem: NUM };
y.wait().unwrap();
});
}

#[bench]
fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
const NUM: usize = 10_000;

struct Yield {
rem: usize,
}

impl Future for Yield {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
if self.rem == 0 {
Ok(Async::Ready(()))
} else {
self.rem -= 1;
task::current().notify();
Ok(Async::NotReady)
}
}
}

b.iter(|| {
for _ in 0..NUM {
let y = Yield { rem: 1 };
y.wait().unwrap();
}
});
}

#[bench]
fn thread_yield_multi_thread(b: &mut Bencher) {
use std::sync::mpsc;
use std::thread;

const NUM: usize = 1_000;

let (tx, rx) = mpsc::sync_channel::<Task>(10_000);

struct Yield {
rem: usize,
tx: mpsc::SyncSender<Task>,
}

impl Future for Yield {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
if self.rem == 0 {
Ok(Async::Ready(()))
} else {
self.rem -= 1;
self.tx.send(task::current()).unwrap();
Ok(Async::NotReady)
}
}
}

thread::spawn(move || {
while let Ok(task) = rx.recv() {
task.notify();
}
});

b.iter(move || {
let y = Yield {
rem: NUM,
tx: tx.clone(),
};

y.wait().unwrap();
});
}
163 changes: 115 additions & 48 deletions src/task_impl/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::{Arc, Once, ONCE_INIT};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT};
use std::sync::atomic::{AtomicUsize, Ordering};

use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
use super::core;
Expand Down Expand Up @@ -238,14 +237,15 @@ impl<F: Future> Spawn<F> {
/// to complete. When a future cannot make progress it will use
/// `thread::park` to block the current thread.
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
let unpark = Arc::new(ThreadNotify::new(thread::current()));
ThreadNotify::with_current(|notify| {

loop {
match self.poll_future_notify(&unpark, 0)? {
Async::NotReady => unpark.park(),
Async::Ready(e) => return Ok(e),
loop {
match self.poll_future_notify(notify, 0)? {
Async::NotReady => notify.park(),
Async::Ready(e) => return Ok(e),
}
}
}
})
}

/// A specialized function to request running a future to completion on the
Expand Down Expand Up @@ -296,15 +296,17 @@ impl<S: Stream> Spawn<S> {
/// Like `wait_future`, except only waits for the next element to arrive on
/// the underlying stream.
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
let unpark = Arc::new(ThreadNotify::new(thread::current()));
loop {
match self.poll_stream_notify(&unpark, 0) {
Ok(Async::NotReady) => unpark.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
ThreadNotify::with_current(|notify| {

loop {
match self.poll_stream_notify(notify, 0) {
Ok(Async::NotReady) => notify.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
}
}
}
})
}
}

Expand Down Expand Up @@ -340,14 +342,16 @@ impl<S: Sink> Spawn<S> {
/// be blocked until it's able to send the value.
pub fn wait_send(&mut self, mut value: S::SinkItem)
-> Result<(), S::SinkError> {
let notify = Arc::new(ThreadNotify::new(thread::current()));
loop {
value = match self.start_send_notify(value, &notify, 0)? {
AsyncSink::NotReady(v) => v,
AsyncSink::Ready => return Ok(()),
};
notify.park();
}
ThreadNotify::with_current(|notify| {

loop {
value = match self.start_send_notify(value, notify, 0)? {
AsyncSink::NotReady(v) => v,
AsyncSink::Ready => return Ok(()),
};
notify.park();
}
})
}

/// Blocks the current thread until it's able to flush this sink.
Expand All @@ -359,13 +363,15 @@ impl<S: Sink> Spawn<S> {
/// The thread will be blocked until `poll_complete` returns that it's
/// ready.
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
let notify = Arc::new(ThreadNotify::new(thread::current()));
loop {
if self.poll_flush_notify(&notify, 0)?.is_ready() {
return Ok(())
ThreadNotify::with_current(|notify| {

loop {
if self.poll_flush_notify(notify, 0)?.is_ready() {
return Ok(())
}
notify.park();
}
notify.park();
}
})
}

/// Blocks the current thread until it's able to close this sink.
Expand All @@ -374,13 +380,15 @@ impl<S: Sink> Spawn<S> {
/// is not ready to be close yet, then the current thread will be blocked
/// until it's closed.
pub fn wait_close(&mut self) -> Result<(), S::SinkError> {
let notify = Arc::new(ThreadNotify::new(thread::current()));
loop {
if self.close_notify(&notify, 0)?.is_ready() {
return Ok(())
ThreadNotify::with_current(|notify| {

loop {
if self.close_notify(notify, 0)?.is_ready() {
return Ok(())
}
notify.park();
}
notify.park();
}
})
}
}

Expand Down Expand Up @@ -477,29 +485,88 @@ impl Unpark for RunInner {
// ===== ThreadNotify =====

struct ThreadNotify {
thread: thread::Thread,
ready: AtomicBool,
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}

const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
}

impl ThreadNotify {
fn new(thread: thread::Thread) -> ThreadNotify {
ThreadNotify {
thread: thread,
ready: AtomicBool::new(false),
}
fn with_current<F, R>(f: F) -> R
where F: FnOnce(&Arc<ThreadNotify>) -> R,
{
CURRENT_THREAD_NOTIFY.with(|notify| f(notify))
}

fn park(&self) {
if !self.ready.swap(false, Ordering::SeqCst) {
thread::park();
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}

// Loop until we've been notified
loop {
m = self.condvar.wait(m).unwrap();

// Transition back to idle, loop otherwise
if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
return;
}
}
}
}

impl Notify for ThreadNotify {
fn notify(&self, _unpark_id: usize) {
self.ready.store(true, Ordering::SeqCst);
self.thread.unpark()
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();

// Transition from SLEEP -> NOTIFY
match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
SLEEP => {}
_ => return,
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}

Expand Down

0 comments on commit 6d861ee

Please sign in to comment.