Skip to content

Commit

Permalink
task: introduce RcCell helper (#4977)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Sep 6, 2022
1 parent 1fd7f82 commit 116fa7c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 29 deletions.
47 changes: 18 additions & 29 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task};
use crate::sync::AtomicWaker;
use crate::util::VecDequeCell;
use crate::util::{RcCell, VecDequeCell};

use std::cell::Cell;
use std::collections::VecDeque;
Expand Down Expand Up @@ -261,7 +261,11 @@ pin_project! {
}
}

thread_local!(static CURRENT: Cell<Option<Rc<Context>>> = Cell::new(None));
#[cfg(any(loom, tokio_no_const_thread_local))]
thread_local!(static CURRENT: RcCell<Context> = RcCell::new());

#[cfg(not(any(loom, tokio_no_const_thread_local)))]
thread_local!(static CURRENT: RcCell<Context> = const { RcCell::new() });

cfg_rt! {
/// Spawns a `!Send` future on the local task set.
Expand Down Expand Up @@ -311,12 +315,10 @@ cfg_rt! {
F::Output: 'static
{
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
match maybe_cx.get() {
None => panic!("`spawn_local` called from outside of a `task::LocalSet`"),
Some(cx) => cx.spawn(future, name)
}

})
}
}
Expand All @@ -336,7 +338,7 @@ pub struct LocalEnterGuard(Option<Rc<Context>>);
impl Drop for LocalEnterGuard {
fn drop(&mut self) {
CURRENT.with(|ctx| {
ctx.replace(self.0.take());
ctx.set(self.0.take());
})
}
}
Expand Down Expand Up @@ -615,12 +617,12 @@ impl LocalSet {
fn with<T>(&self, f: impl FnOnce() -> T) -> T {
CURRENT.with(|ctx| {
struct Reset<'a> {
ctx_ref: &'a Cell<Option<Rc<Context>>>,
ctx_ref: &'a RcCell<Context>,
val: Option<Rc<Context>>,
}
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.ctx_ref.replace(self.val.take());
self.ctx_ref.set(self.val.take());
}
}
let old = ctx.replace(Some(self.context.clone()));
Expand Down Expand Up @@ -822,19 +824,11 @@ impl<T: Future> Future for RunUntil<'_, T> {
}
}

fn clone_rc<T>(rc: &Cell<Option<Rc<T>>>) -> Option<Rc<T>> {
let value = rc.take();
let cloned = value.clone();
rc.set(value);
cloned
}

impl Shared {
/// Schedule the provided task on the scheduler.
fn schedule(&self, task: task::Notified<Arc<Self>>) {
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
match maybe_cx.get() {
Some(cx) if cx.shared.ptr_eq(self) => {
cx.queue.push_back(task);
}
Expand All @@ -861,14 +855,11 @@ impl Shared {

impl task::Schedule for Arc<Shared> {
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
}
CURRENT.with(|maybe_cx| match maybe_cx.get() {
None => panic!("scheduler context missing"),
Some(cx) => {
assert!(cx.shared.ptr_eq(self));
cx.owned.remove(task)
}
})
}
Expand All @@ -889,15 +880,13 @@ impl task::Schedule for Arc<Shared> {
// This hook is only called from within the runtime, so
// `CURRENT` should match with `&self`, i.e. there is no
// opportunity for a nested scheduler to be called.
CURRENT.with(|maybe_cx| {
let ctx = clone_rc(maybe_cx);
match ctx {
CURRENT.with(|maybe_cx| match maybe_cx.get() {
Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
cx.unhandled_panic.set(true);
cx.owned.close_and_shutdown_all();
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
}})
})
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ cfg_rt! {

mod vec_deque_cell;
pub(crate) use vec_deque_cell::VecDequeCell;

mod rc_cell;
pub(crate) use rc_cell::RcCell;
}

cfg_rt_multi_thread! {
Expand Down
57 changes: 57 additions & 0 deletions tokio/src/util/rc_cell.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::loom::cell::UnsafeCell;

use std::rc::Rc;

/// This is exactly like `Cell<Option<Rc<T>>>`, except that it provides a `get`
/// method even though `Rc` is not `Copy`.
pub(crate) struct RcCell<T> {
inner: UnsafeCell<Option<Rc<T>>>,
}

impl<T> RcCell<T> {
#[cfg(not(loom))]
pub(crate) const fn new() -> Self {
Self {
inner: UnsafeCell::new(None),
}
}

// The UnsafeCell in loom does not have a const `new` fn.
#[cfg(loom)]
pub(crate) fn new() -> Self {
Self {
inner: UnsafeCell::new(None),
}
}

/// Safety: This method may not be called recursively.
#[inline]
unsafe fn with_inner<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Option<Rc<T>>) -> R,
{
// safety: This type is not Sync, so concurrent calls of this method
// cannot happen. Furthermore, the caller guarantees that the method is
// not called recursively. Finally, this is the only place that can
// create mutable references to the inner Rc. This ensures that any
// mutable references created here are exclusive.
self.inner.with_mut(|ptr| f(&mut *ptr))
}

pub(crate) fn get(&self) -> Option<Rc<T>> {
// safety: The `Rc::clone` method will not call any unknown user-code,
// so it will not result in a recursive call to `with_inner`.
unsafe { self.with_inner(|rc| rc.clone()) }
}

pub(crate) fn replace(&self, val: Option<Rc<T>>) -> Option<Rc<T>> {
// safety: No destructors or other unknown user-code will run inside the
// `with_inner` call, so no recursive call to `with_inner` can happen.
unsafe { self.with_inner(|rc| std::mem::replace(rc, val)) }
}

pub(crate) fn set(&self, val: Option<Rc<T>>) {
let old = self.replace(val);
drop(old);
}
}

0 comments on commit 116fa7c

Please sign in to comment.