Skip to content

Commit

Permalink
rt: rm internal Park,Unpark traits (#4991)
Browse files Browse the repository at this point in the history
Years ago, Tokio was organized as a cluster of separate crates. This
architecture required a trait to represent "park the thread and do
work.". When all crates were combined into a single crate, the `Park`
and `Unpark` traits remained as internal details.

This patch removes these traits as they are no longer needed. This is in
service of a future refactor that will decouple the various resource
drivers and store them in a single struct instead of nesting them.
However, in the mean time, removing the Park/Unpark traits adds a bit of
messy code to make the conditional compilation work. This code will
(hopefully) be short-lived.
  • Loading branch information
carllerche authored Sep 8, 2022
1 parent 2ad3474 commit 99aa8d1
Show file tree
Hide file tree
Showing 17 changed files with 343 additions and 463 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ jobs:
run: cargo install cargo-hack
- name: check --each-feature
run: cargo hack check --all --each-feature -Z avoid-dev-deps
- name: check net,time
run: cargo check -p tokio --no-default-features --features net,time -Z avoid-dev-deps
# Try with unstable feature flags
- name: check --each-feature --unstable
run: cargo hack check --all --each-feature -Z avoid-dev-deps
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/loom/mocked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ pub(crate) mod sys {
2
}
}

pub(crate) mod thread {
pub use loom::lazy_static::AccessError;
pub use loom::thread::*;
}
4 changes: 2 additions & 2 deletions tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub(crate) mod thread {

#[allow(unused_imports)]
pub(crate) use std::thread::{
current, panicking, park, park_timeout, sleep, spawn, Builder, JoinHandle, LocalKey,
Result, Thread, ThreadId,
current, panicking, park, park_timeout, sleep, spawn, AccessError, Builder, JoinHandle,
LocalKey, Result, Thread, ThreadId,
};
}
74 changes: 0 additions & 74 deletions tokio/src/park/either.rs

This file was deleted.

80 changes: 0 additions & 80 deletions tokio/src/park/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,84 +34,4 @@
//! * `park_timeout` does the same as `park` but allows specifying a maximum
//! time to block the thread for.

cfg_rt! {
pub(crate) mod either;
}

#[cfg(any(feature = "rt", feature = "sync"))]
pub(crate) mod thread;

use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

/// Blocks the current thread.
pub(crate) trait Park {
/// Unpark handle type for the `Park` implementation.
type Unpark: Unpark;

/// Error returned by `park`.
type Error: Debug;

/// Gets a new `Unpark` handle associated with this `Park` instance.
fn unpark(&self) -> Self::Unpark;

/// Blocks the current thread unless or until the token is available.
///
/// A call to `park` does not guarantee that the thread will remain blocked
/// forever, and callers should be prepared for this possibility. This
/// function may wakeup spuriously for any reason.
///
/// # Panics
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation.
fn park(&mut self) -> Result<(), Self::Error>;

/// Parks the current thread for at most `duration`.
///
/// This function is the same as `park` but allows specifying a maximum time
/// to block the thread for.
///
/// Same as `park`, there is no guarantee that the thread will remain
/// blocked for any amount of time. Spurious wakeups are permitted for any
/// reason.
///
/// # Panics
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation.
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;

/// Releases all resources held by the parker for proper leak-free shutdown.
fn shutdown(&mut self);
}

/// Unblock a thread blocked by the associated `Park` instance.
pub(crate) trait Unpark: Sync + Send + 'static {
/// Unblocks a thread that is blocked by the associated `Park` handle.
///
/// Calling `unpark` atomically makes available the unpark token, if it is
/// not already available.
///
/// # Panics
///
/// This function **should** not panic, but ultimately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Unpark` implementation.
fn unpark(&self);
}

impl Unpark for Box<dyn Unpark> {
fn unpark(&self) {
(**self).unpark()
}
}

impl Unpark for Arc<dyn Unpark> {
fn unpark(&self) {
(**self).unpark()
}
}
72 changes: 27 additions & 45 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::park::{Park, Unpark};

use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
Expand All @@ -12,8 +11,6 @@ pub(crate) struct ParkThread {
inner: Arc<Inner>,
}

pub(crate) type ParkError = ();

/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub(crate) struct UnparkThread {
Expand Down Expand Up @@ -47,32 +44,25 @@ impl ParkThread {
}),
}
}
}

impl Park for ParkThread {
type Unpark = UnparkThread;
type Error = ParkError;

fn unpark(&self) -> Self::Unpark {
pub(crate) fn unpark(&self) -> UnparkThread {
let inner = self.inner.clone();
UnparkThread { inner }
}

fn park(&mut self) -> Result<(), Self::Error> {
pub(crate) fn park(&mut self) {
self.inner.park();
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
pub(crate) fn park_timeout(&mut self, duration: Duration) {
// Wasm doesn't have threads, so just sleep.
#[cfg(not(tokio_wasm))]
self.inner.park_timeout(duration);
#[cfg(tokio_wasm)]
std::thread::sleep(duration);
Ok(())
}

fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
}
}
Expand Down Expand Up @@ -212,12 +202,13 @@ impl Default for ParkThread {

// ===== impl UnparkThread =====

impl Unpark for UnparkThread {
fn unpark(&self) {
impl UnparkThread {
pub(crate) fn unpark(&self) {
self.inner.unpark();
}
}

use crate::loom::thread::AccessError;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
Expand All @@ -241,24 +232,38 @@ impl CachedParkThread {
}
}

pub(crate) fn get_unpark(&self) -> Result<UnparkThread, ParkError> {
pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
self.unpark().map(|unpark| unpark.into_waker())
}

fn unpark(&self) -> Result<UnparkThread, AccessError> {
self.with_current(|park_thread| park_thread.unpark())
}

pub(crate) fn park(&mut self) {
self.with_current(|park_thread| park_thread.inner.park())
.unwrap();
}

pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))
.unwrap();
}

/// Gets a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> Result<R, ParkError>
fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ())
CURRENT_PARKER.try_with(|inner| f(inner))
}

pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, ParkError> {
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;

// `get_unpark()` should not return a Result
let waker = self.get_unpark()?.into_waker();
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);

pin!(f);
Expand All @@ -268,34 +273,11 @@ impl CachedParkThread {
return Ok(v);
}

self.park()?;
self.park();
}
}
}

impl Park for CachedParkThread {
type Unpark = UnparkThread;
type Error = ParkError;

fn unpark(&self) -> Self::Unpark {
self.get_unpark().unwrap()
}

fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park())?;
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?;
Ok(())
}

fn shutdown(&mut self) {
let _ = self.with_current(|park_thread| park_thread.inner.shutdown());
}
}

impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
Expand Down
26 changes: 8 additions & 18 deletions tokio/src/process/unix/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

//! Process driver.

use crate::park::Park;
use crate::process::unix::GlobalOrphanQueue;
use crate::runtime::io::Handle;
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};

use std::io;
use std::time::Duration;

/// Responsible for cleaning up orphaned child processes on Unix platforms.
Expand All @@ -28,31 +27,22 @@ impl Driver {
signal_handle,
}
}
}

// ===== impl Park for Driver =====

impl Park for Driver {
type Unpark = <SignalDriver as Park>::Unpark;
type Error = io::Error;

fn unpark(&self) -> Self::Unpark {
self.park.unpark()
pub(crate) fn handle(&self) -> Handle {
self.park.io_handle()
}

fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
pub(crate) fn park(&mut self) {
self.park.park();
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.park.park_timeout(duration);
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Ok(())
}

fn shutdown(&mut self) {
pub(crate) fn shutdown(&mut self) {
self.park.shutdown()
}
}
Loading

0 comments on commit 99aa8d1

Please sign in to comment.