Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve thread notify #597

Merged
merged 4 commits into from
Oct 25, 2017
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 119 additions & 52 deletions src/task_impl/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::{Arc, Once, ONCE_INIT};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT};
use std::sync::atomic::{AtomicUsize, Ordering};

use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
use super::core;
Expand Down Expand Up @@ -238,14 +237,15 @@ impl<F: Future> Spawn<F> {
/// to complete. When a future cannot make progress it will use
/// `thread::park` to block the current thread.
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
ThreadNotify::with_current(|notify| {

loop {
match self.poll_future_notify(&unpark, 0)? {
Async::NotReady => unpark.park(),
Async::Ready(e) => return Ok(e),
loop {
match self.poll_future_notify(notify, 0)? {
Async::NotReady => notify.park(),
Async::Ready(e) => return Ok(e),
}
}
}
})
}

/// A specialized function to request running a future to completion on the
Expand Down Expand Up @@ -296,15 +296,17 @@ impl<S: Stream> Spawn<S> {
/// Like `wait_future`, except only waits for the next element to arrive on
/// the underlying stream.
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
loop {
match self.poll_stream_notify(&unpark, 0) {
Ok(Async::NotReady) => unpark.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
ThreadNotify::with_current(|notify| {

loop {
match self.poll_stream_notify(notify, 0) {
Ok(Async::NotReady) => notify.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
}
}
}
})
}
}

Expand Down Expand Up @@ -340,14 +342,16 @@ impl<S: Sink> Spawn<S> {
/// be blocked until it's able to send the value.
pub fn wait_send(&mut self, mut value: S::SinkItem)
-> Result<(), S::SinkError> {
let notify = Arc::new(ThreadUnpark::new(thread::current()));
loop {
value = match self.start_send_notify(value, &notify, 0)? {
AsyncSink::NotReady(v) => v,
AsyncSink::Ready => return Ok(()),
};
notify.park();
}
ThreadNotify::with_current(|notify| {

loop {
value = match self.start_send_notify(value, notify, 0)? {
AsyncSink::NotReady(v) => v,
AsyncSink::Ready => return Ok(()),
};
notify.park();
}
})
}

/// Blocks the current thread until it's able to flush this sink.
Expand All @@ -359,13 +363,15 @@ impl<S: Sink> Spawn<S> {
/// The thread will be blocked until `poll_complete` returns that it's
/// ready.
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
let notify = Arc::new(ThreadUnpark::new(thread::current()));
loop {
if self.poll_flush_notify(&notify, 0)?.is_ready() {
return Ok(())
ThreadNotify::with_current(|notify| {

loop {
if self.poll_flush_notify(notify, 0)?.is_ready() {
return Ok(())
}
notify.park();
}
notify.park();
}
})
}

/// Blocks the current thread until it's able to close this sink.
Expand All @@ -374,13 +380,15 @@ impl<S: Sink> Spawn<S> {
/// is not ready to be close yet, then the current thread will be blocked
/// until it's closed.
pub fn wait_close(&mut self) -> Result<(), S::SinkError> {
let notify = Arc::new(ThreadUnpark::new(thread::current()));
loop {
if self.close_notify(&notify, 0)?.is_ready() {
return Ok(())
ThreadNotify::with_current(|notify| {

loop {
if self.close_notify(notify, 0)?.is_ready() {
return Ok(())
}
notify.park();
}
notify.park();
}
})
}
}

Expand Down Expand Up @@ -474,32 +482,91 @@ impl Unpark for RunInner {
}
}

// ===== ThreadUnpark =====
// ===== ThreadNotify =====

struct ThreadUnpark {
thread: thread::Thread,
ready: AtomicBool,
struct ThreadNotify {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}

impl ThreadUnpark {
fn new(thread: thread::Thread) -> ThreadUnpark {
ThreadUnpark {
thread: thread,
ready: AtomicBool::new(false),
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
}

impl ThreadNotify {
fn with_current<F, R>(f: F) -> R
where F: FnOnce(&Arc<ThreadNotify>) -> R,
{
CURRENT_THREAD_NOTIFY.with(|notify| f(notify))
}

fn park(&self) {
if !self.ready.swap(false, Ordering::SeqCst) {
thread::park();
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}

// Loop until we've been notified
loop {
m = self.condvar.wait(m).unwrap();

// Transition back to idle, loop otherwise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this ever not be NOTIFY? Can this be just an atomic store and return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condvars can wakeup spuriously

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I forgot that actual OS condvars can do that - Go's sync.Cond does not. Apologies!

if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
return;
}
}
}
}

impl Notify for ThreadUnpark {
impl Notify for ThreadNotify {
fn notify(&self, _unpark_id: usize) {
self.ready.store(true, Ordering::SeqCst);
self.thread.unpark()
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
Copy link

@twmb twmb Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worthwhile to have a separate notifier_mutex? This would allow something like...

let _nm = match self.notifier_mutex.try_lock() {
    Ok(g) => g,
    Err(e) => match e {
        TryLockResult::Poisoned(e) => panic!(e),
        TryLockResult::WouldBlock => { return ; }
    }
}
let _m = self.mutex.lock().unwrap();
...

which would avoid simultaneous notifications sitting on a mutex to notify a thread that will only need the first (and would also help avoid [not eliminate] the scenario where the first notification woke the sleeping thread, which then consumes all events, and then gets falsely notified by the other notifications that hadn't hit yet).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh nvm - this would still require keeping one notifier on deck, which TryLock does not provide.


// Transition from SLEEP -> NOTIFY
match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
Copy link

@twmb twmb Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this cas would just need to be a store with notifier_mutex.

[edit: nvm - it still needs to be a compare_and_swap, b/c a simultaneous notification that is slow to the try_lock could still fall in here after the parked thread awakens and swaps to idle]

SLEEP => {}
_ => return,
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}

Expand Down