Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: lazy init TimerShared in TimerEntry #6512

Merged
merged 25 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bde0742
This commit is part of reducing timeout performance overhead.
wathenjiang Apr 23, 2024
0eb1d3e
feat: panic if the time driver is not enabled
wathenjiang Apr 23, 2024
a4e1231
rustfmt
wathenjiang Apr 23, 2024
cb54a1e
add use crate::runtime::scheduler
wathenjiang Apr 24, 2024
f8874b2
add #[track_caller] for new_with_delay
wathenjiang Apr 24, 2024
9dd1e33
update comments
wathenjiang Apr 24, 2024
2d3efb6
make TimerEntry lazy init
wathenjiang Apr 24, 2024
1ae3de2
feat: add single_thread_sleep and multi_thread_sleep-8
wathenjiang Apr 24, 2024
ef3630b
feat: use new_current_thread
wathenjiang Apr 24, 2024
d4a95a3
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 24, 2024
f6fa09b
fix: new me variable
wathenjiang Apr 24, 2024
b1fa990
fix: create me variable in the block
wathenjiang Apr 24, 2024
8c0c2e8
fix: ci
wathenjiang Apr 24, 2024
a9a11e9
fix: ownership
wathenjiang Apr 24, 2024
ef657d2
fix: fmt
wathenjiang Apr 24, 2024
a446e44
fix: lifetime issue
wathenjiang Apr 24, 2024
3a89c45
rebase
wathenjiang Apr 25, 2024
c47b739
rm unnecessary code
wathenjiang Apr 25, 2024
8e40ec4
add is_inner_init
wathenjiang Apr 25, 2024
d328298
Merge branch 'master' into timeout-lazy-init-sleep
wathenjiang Apr 26, 2024
07953b5
adopt code review suggestions from mox692
wathenjiang May 2, 2024
373044d
get mutable ref only if the inner is none
wathenjiang May 2, 2024
0129c92
fix ci
wathenjiang May 2, 2024
b492723
fix typo
wathenjiang May 2, 2024
8af2a97
Merge branch 'master' into timeout-lazy-init-sleep
Darksonn May 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
109 changes: 109 additions & 0 deletions benches/time_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::{
runtime::Runtime,
time::{sleep, timeout},
};

// a very quick async task, but might timeout
async fn quick_job() -> usize {
1
}

fn build_run_time(workers: usize) -> Runtime {
if workers == 1 {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap()
}
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 1, "single_thread_timeout");
}

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 8, "multi_thread_timeout-8");
}

fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);
c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_timeout_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_timeout_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

fn single_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 1, "single_thread_sleep");
}

fn multi_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 8, "multi_thread_sleep-8");
}

fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_sleep_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_sleep_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let _h = black_box(sleep(Duration::from_secs(1)));
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
single_thread_scheduler_sleep,
multi_thread_scheduler_sleep
);

criterion_main!(timeout_benchmark);
34 changes: 24 additions & 10 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ pub(crate) struct TimerEntry {
///
/// This is manipulated only under the inner mutex. TODO: Can we use loom
/// cells for this?
inner: StdUnsafeCell<TimerShared>,
inner: StdUnsafeCell<Option<TimerShared>>,
/// Deadline for the timer. This is used to register on the first
/// poll, as we can't register prior to being pinned.
deadline: Instant,
Expand Down Expand Up @@ -469,35 +469,48 @@ unsafe impl linked_list::Link for TimerShared {

impl TimerEntry {
#[track_caller]
pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self {
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
// Panic if the time driver is not enabled
let _ = handle.driver().time();

let driver = handle.clone();

Self {
driver,
inner: StdUnsafeCell::new(TimerShared::new()),
driver: handle,
inner: StdUnsafeCell::new(None),
deadline,
registered: false,
_m: std::marker::PhantomPinned,
}
}

fn is_inner_init(&self) -> bool {
unsafe { &*self.inner.get() }.is_some()
}

// This lazy initialization is for performance purposes.
fn inner(&self) -> &TimerShared {
unsafe { &*self.inner.get() }
let inner = unsafe { &*self.inner.get() };
if inner.is_none() {
unsafe {
*self.inner.get() = Some(TimerShared::new());
}
}
return inner.as_ref().unwrap();
}

pub(crate) fn deadline(&self) -> Instant {
self.deadline
}

pub(crate) fn is_elapsed(&self) -> bool {
!self.inner().state.might_be_registered() && self.registered
self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered
}

/// Cancels and deregisters the timer. This operation is irreversible.
pub(crate) fn cancel(self: Pin<&mut Self>) {
// Avoid calling the `clear_entry` method, because it has not been initialized yet.
if !self.is_inner_init() {
return;
}
// We need to perform an acq/rel fence with the driver thread, and the
// simplest way to do so is to grab the driver lock.
//
Expand All @@ -524,8 +537,9 @@ impl TimerEntry {
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister;
let this = unsafe { self.as_mut().get_unchecked_mut() };
this.deadline = new_time;
this.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);

Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn single_timer() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -83,7 +83,7 @@ fn drop_timer() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -117,7 +117,7 @@ fn change_waker() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -157,7 +157,7 @@ fn reset_future() {
let start = handle.inner.driver().clock().now();

let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1));
let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
pin!(entry);

let _ = entry
Expand Down Expand Up @@ -219,7 +219,7 @@ fn poll_process_levels() {

for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle.inner,
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(i),
));

Expand Down Expand Up @@ -253,7 +253,7 @@ fn poll_process_levels_targeted() {
let handle = rt.handle();

let e1 = TimerEntry::new(
&handle.inner,
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(193),
);
pin!(e1);
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,11 @@ impl Sleep {
location: Option<&'static Location<'static>>,
) -> Sleep {
use crate::runtime::scheduler;

let handle = scheduler::Handle::current();
let entry = TimerEntry::new(&handle, deadline);

let entry = TimerEntry::new(handle, deadline);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = scheduler::Handle::current();
let clock = handle.driver().clock();
let handle = &handle.driver().time();
let time_source = handle.time_source();
Expand Down
Loading