Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timer: Reduce size of Delay struct #554

Merged
merged 3 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-timer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Timer facilities for Tokio
[dependencies]
futures = "0.1.19"
tokio-executor = { version = "0.1.1", path = "../tokio-executor" }
crossbeam-utils = "0.5.0"

# Backs `DelayQueue`
slab = "0.4.1"
Expand Down
56 changes: 16 additions & 40 deletions tokio-timer/src/delay.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use Error;
use timer::Registration;
use timer::{Registration, HandlePriv};

use futures::{Future, Poll};

Expand All @@ -16,19 +16,10 @@ use std::time::Instant;
/// [`new`]: #method.new
#[derive(Debug)]
pub struct Delay {
/// The instant at which the future completes.
deadline: Instant,

/// The link between the `Delay` instance at the timer that drives it.
///
/// When `Delay` is created with `new`, this is initialized to `None` and is
/// lazily set in `poll`. When `poll` is called, the default for the current
/// execution context is used (obtained via `Handle::current`).
///
/// When `delay` is created with `new_with_registration`, the value is set.
///
/// Once `registration` is set to `Some`, it is never changed.
registration: Option<Registration>,
/// This also stores the `deadline` value.
registration: Registration,
}

impl Delay {
Expand All @@ -38,34 +29,28 @@ impl Delay {
/// as to how the sub-millisecond portion of `deadline` will be handled.
/// `Delay` should not be used for high-resolution timer use cases.
pub fn new(deadline: Instant) -> Delay {
Delay {
deadline,
registration: None,
}
let registration = Registration::new(deadline);

Delay { registration }
}

pub(crate) fn new_with_registration(
deadline: Instant,
registration: Registration) -> Delay
{
Delay {
deadline,
registration: Some(registration),
}
pub(crate) fn new_with_handle(deadline: Instant, handle: HandlePriv) -> Delay {
let mut registration = Registration::new(deadline);
registration.register_with(handle);

Delay { registration }
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.deadline
self.registration.deadline()
}

/// Returns true if the `Delay` has elapsed
///
/// A `Delay` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
self.registration.as_ref()
.map(|r| r.is_elapsed())
.unwrap_or(false)
self.registration.is_elapsed()
}

/// Reset the `Delay` instance to a new deadline.
Expand All @@ -76,21 +61,13 @@ impl Delay {
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
self.deadline = deadline;

if let Some(registration) = self.registration.as_ref() {
registration.reset(deadline);
}
self.registration.reset(deadline);
}

/// Register the delay with the timer instance for the current execution
/// context.
fn register(&mut self) {
if self.registration.is_some() {
return;
}

self.registration = Some(Registration::new(self.deadline));
self.registration.register();
}
}

Expand All @@ -102,7 +79,6 @@ impl Future for Delay {
// Ensure the `Delay` instance is associated with a timer.
self.register();

self.registration.as_ref().unwrap()
.poll_elapsed()
self.registration.poll_elapsed()
}
}
1 change: 1 addition & 0 deletions tokio-timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

extern crate tokio_executor;

extern crate crossbeam_utils;
#[macro_use]
extern crate futures;
extern crate slab;
Expand Down
175 changes: 122 additions & 53 deletions tokio-timer/src/timer/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use Error;
use atomic::AtomicU64;
use timer::{HandlePriv, Inner};

use crossbeam_utils::CachePadded;
use futures::Poll;
use futures::task::AtomicTask;

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::{Arc, Weak};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::Ordering::{SeqCst, Relaxed};
use std::time::Instant;
use std::u64;

Expand All @@ -26,12 +27,14 @@ use std::u64;
/// processed during that timer tick.
#[derive(Debug)]
pub(crate) struct Entry {
/// Only accessed from `Registration`.
time: CachePadded<UnsafeCell<Time>>,

/// Timer internals. Using a weak pointer allows the timer to shutdown
/// without all `Delay` instances having completed.
inner: Weak<Inner>,

/// Task to notify once the deadline is reached.
task: AtomicTask,
///
/// When `None`, the entry has not yet been linked with a timer instance.
inner: Option<Weak<Inner>>,

/// Tracks the entry state. This value contains the following information:
///
Expand All @@ -44,22 +47,19 @@ pub(crate) struct Entry {
/// instant, this value is changed.
state: AtomicU64,

/// When true, the entry is counted by `Inner` towards the max outstanding
/// timeouts. The drop fn uses this to know if it should decrement the
/// counter.
///
/// One might think that it would be easier to just not create the `Entry`.
/// The problem is that `Delay` expects creating a `Registration` to always
/// return a `Registration` instance. This simplifying factor allows it to
/// improve the struct layout. To do this, we must always allocate the node.
counted: bool,
/// Task to notify once the deadline is reached.
task: AtomicTask,

/// True when the entry is queued in the "process" stack. This value
/// is set before pushing the value and unset after popping the value.
///
/// TODO: This could possibly be rolled up into `state`.
pub(super) queued: AtomicBool,

/// Next entry in the "process" linked list.
///
/// Access to this field is coordinated by the `queued` flag.
///
/// Represents a strong Arc ref.
pub(super) next_atomic: UnsafeCell<*mut Entry>,

Expand Down Expand Up @@ -91,6 +91,12 @@ pub(crate) struct Entry {
pub(super) prev_stack: UnsafeCell<*const Entry>,
}

/// Stores the info for `Delay`.
#[derive(Debug)]
pub(crate) struct Time {
pub(crate) deadline: Instant,
}

/// Flag indicating a timer entry has elapsed
const ELAPSED: u64 = 1 << 63;

Expand All @@ -100,14 +106,14 @@ const ERROR: u64 = u64::MAX;
// ===== impl Entry =====

impl Entry {
pub fn new(when: u64, handle: HandlePriv) -> Entry {
assert!(when > 0 && when < u64::MAX);

pub fn new(deadline: Instant) -> Entry {
Entry {
inner: handle.into_inner(),
time: CachePadded::new(UnsafeCell::new(Time {
deadline,
})),
inner: None,
task: AtomicTask::new(),
state: AtomicU64::new(when),
counted: true,
state: AtomicU64::new(0),
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
Expand All @@ -116,34 +122,89 @@ impl Entry {
}
}

pub fn new_elapsed(handle: HandlePriv) -> Entry {
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(ELAPSED),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
/// Only called by `Registration`
pub fn time_ref(&self) -> &Time {
unsafe { &*self.time.get() }
}

/// Create a new `Entry` that is in the error state. Calling `poll_elapsed` on
/// this `Entry` will always result in `Err` being returned.
pub fn new_error() -> Entry {
Entry {
inner: Weak::new(),
task: AtomicTask::new(),
state: AtomicU64::new(ERROR),
counted: false,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
/// Only called by `Registration`
pub fn time_mut(&self) -> &mut Time {
unsafe { &mut *self.time.get() }
}

/// Returns `true` if the `Entry` is currently associated with a timer
/// instance.
pub fn is_registered(&self) -> bool {
self.inner.is_some()
}

/// Only called by `Registration`
pub fn register(me: &mut Arc<Self>) {
let handle = match HandlePriv::try_current() {
Ok(handle) => handle,
Err(_) => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}
};

Entry::register_with(me, handle)
}

/// Only called by `Registration`
pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
assert!(!me.is_registered(), "only register an entry once");

let deadline = me.time_ref().deadline;

let inner = match handle.inner() {
Some(inner) => inner,
None => {
// Could not associate the entry with a timer, transition the
// state to error
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}
};

// Increment the number of active timeouts
if inner.increment().is_err() {
Arc::get_mut(me).unwrap()
.transition_to_error();

return;
}

// Associate the entry with the timer
Arc::get_mut(me).unwrap()
.inner = Some(handle.into_inner());

let when = inner.normalize_deadline(deadline);

// Relaxed OK: At this point, there are no other threads that have
// access to this entry.
if when <= inner.elapsed() {
me.state.store(ELAPSED, Relaxed);
return;
} else {
me.state.store(when, Relaxed);
}

if inner.queue(me).is_err() {
// The timer has shutdown, transition the entry to the error state.
me.error();
}
}

fn transition_to_error(&mut self) {
self.inner = Some(Weak::new());
self.state = AtomicU64::new(ERROR);
}

/// The current entry state as known by the timer. This is not the value of
Expand Down Expand Up @@ -224,7 +285,8 @@ impl Entry {
return;
}

let inner = match entry.inner.upgrade() {
// If registered with a timer instance, try to upgrade the Arc.
let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};
Expand Down Expand Up @@ -260,12 +322,18 @@ impl Entry {
Ok(NotReady)
}

pub fn reset(entry: &Arc<Entry>, deadline: Instant) {
let inner = match entry.inner.upgrade() {
/// Only called by `Registration`
pub fn reset(entry: &mut Arc<Entry>) {
if !entry.is_registered() {
return;
}

let inner = match entry.upgrade_inner() {
Some(inner) => inner,
None => return,
};

let deadline = entry.time_ref().deadline;
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();

Expand Down Expand Up @@ -305,6 +373,11 @@ impl Entry {
let _ = inner.queue(entry);
}
}

fn upgrade_inner(&self) -> Option<Arc<Inner>> {
self.inner.as_ref()
.and_then(|inner| inner.upgrade())
}
}

fn is_elapsed(state: u64) -> bool {
Expand All @@ -313,11 +386,7 @@ fn is_elapsed(state: u64) -> bool {

impl Drop for Entry {
fn drop(&mut self) {
if !self.counted {
return;
}

let inner = match self.inner.upgrade() {
let inner = match self.upgrade_inner() {
Some(inner) => inner,
None => return,
};
Expand Down
Loading