Skip to content

Commit

Permalink
Add a check that a single Waker is active per Poll instance (#1329)
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomasdezeeuw authored Oct 24, 2020
1 parent 95393bc commit f4874f2
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 7 deletions.
9 changes: 9 additions & 0 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,15 @@ impl Registry {
.try_clone()
.map(|selector| Registry { selector })
}

/// Internal check to ensure only a single `Waker` is active per [`Poll`]
/// instance.
#[cfg(debug_assertions)]
pub(crate) fn register_waker(&self) {
if self.selector.register_waker() {
panic!("Only a single `Waker` can be active per `Poll` instance");
}
}
}

impl fmt::Debug for Registry {
Expand Down
5 changes: 5 additions & 0 deletions src/sys/shell/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ impl Selector {
pub fn select(&self, _: &mut Events, _: Option<Duration>) -> io::Result<()> {
os_required!();
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
os_required!();
}
}

#[cfg(unix)]
Expand Down
13 changes: 12 additions & 1 deletion src/sys/unix/selector/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP};
use log::error;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, i32, io, ptr};

Expand All @@ -17,6 +17,8 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
ep: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -33,6 +35,8 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -42,6 +46,8 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
ep,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -93,6 +99,11 @@ impl Selector {
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}
}

cfg_net! {
Expand Down
13 changes: 12 additions & 1 deletion src/sys/unix/selector/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::mem::MaybeUninit;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};

Expand Down Expand Up @@ -69,6 +69,8 @@ pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
kq: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -79,6 +81,8 @@ impl Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

Expand All @@ -88,6 +92,8 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
kq,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand Down Expand Up @@ -208,6 +214,11 @@ impl Selector {
kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn setup_waker(&self, token: Token) -> io::Result<()> {
Expand Down
18 changes: 14 additions & 4 deletions src/sys/windows/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cfg_net! {

use miow::iocp::{CompletionPort, CompletionStatus};
use std::collections::VecDeque;
use std::io;
use std::marker::PhantomPinned;
use std::os::windows::io::RawSocket;
use std::pin::Pin;
Expand All @@ -20,7 +21,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::io;
use winapi::shared::ntdef::NT_SUCCESS;
use winapi::shared::ntdef::{HANDLE, PVOID};
use winapi::shared::ntstatus::STATUS_CANCELLED;
Expand Down Expand Up @@ -327,8 +327,9 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
pub struct Selector {
#[cfg(debug_assertions)]
id: usize,

pub(super) inner: Arc<SelectorInner>,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
Expand All @@ -340,6 +341,8 @@ impl Selector {
#[cfg(debug_assertions)]
id,
inner: Arc::new(inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
}
})
}
Expand All @@ -349,6 +352,8 @@ impl Selector {
#[cfg(debug_assertions)]
id: self.id,
inner: Arc::clone(&self.inner),
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

Expand All @@ -360,6 +365,11 @@ impl Selector {
self.inner.select(events, timeout)
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}

pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
self.inner.cp.clone()
}
Expand Down Expand Up @@ -499,7 +509,7 @@ impl SelectorInner {
} else if iocp_event.token() % 2 == 1 {
// Handle is a named pipe. This could be extended to be any non-AFD event.
let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;

let len = events.len();
callback(iocp_event.entry(), Some(events));
n += events.len() - len;
Expand Down Expand Up @@ -701,7 +711,7 @@ impl Drop for SelectorInner {
let callback = unsafe {
(*(iocp_event.overlapped() as *mut super::Overlapped)).callback
};

callback(iocp_event.entry(), None);
} else {
// drain sock state to release memory of Arc reference
Expand Down
4 changes: 3 additions & 1 deletion src/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::io;
/// `Waker` events are only guaranteed to be delivered while the `Waker` value
/// is alive.
///
/// Only a single `Waker` should active per [`Poll`], if multiple threads need
/// Only a single `Waker` can be active per [`Poll`], if multiple threads need
/// access to the `Waker` it can be shared via for example an `Arc`. What
/// happens if multiple `Waker`s are registered with the same `Poll` is
/// undefined.
Expand Down Expand Up @@ -81,6 +81,8 @@ pub struct Waker {
impl Waker {
/// Create a new `Waker`.
pub fn new(registry: &Registry, token: Token) -> io::Result<Waker> {
#[cfg(debug_assertions)]
registry.register_waker();
sys::Waker::new(poll::selector(&registry), token).map(|inner| Waker { inner })
}

Expand Down
21 changes: 21 additions & 0 deletions tests/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,27 @@ fn waker_multiple_wakeups_different_thread() {
handle2.join().unwrap();
}

#[test]
#[cfg_attr(
not(debug_assertions),
ignore = "only works with debug_assertions enabled"
)]
#[should_panic = "Only a single `Waker` can be active per `Poll` instance"]
fn using_multiple_wakers_panics() {
init();

let poll = Poll::new().expect("unable to create new Poll instance");
let token1 = Token(10);
let token2 = Token(11);

let waker1 = Waker::new(poll.registry(), token1).expect("unable to first waker");
// This should panic.
let waker2 = Waker::new(poll.registry(), token2).unwrap();

drop(waker1);
drop(waker2);
}

fn expect_waker_event(poll: &mut Poll, events: &mut Events, token: Token) {
poll.poll(events, Some(Duration::from_millis(100))).unwrap();
assert!(!events.is_empty());
Expand Down

0 comments on commit f4874f2

Please sign in to comment.