From bde0742d81f42d461cb887724327799d02924388 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Wed, 24 Apr 2024 00:20:36 +0800 Subject: [PATCH 01/22] This commit is part of reducing timeout performance overhead. See https://github.com/tokio-rs/tokio/issues/6504 Below are relevant benchmark results of this PR on m1 mac: single_thread_timeout time: [21.869 ns 21.987 ns 22.135 ns] change: [-3.4429% -2.0709% -0.8759%] (p = 0.00 < 0.05) Change within noise threshold. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe multi_thread_timeout-8 time: [4.4835 ns 4.6138 ns 4.7614 ns] change: [-4.3554% +0.1643% +4.5114%] (p = 0.95 > 0.05) No change in performance detected. Found 9 outliers among 100 measurements (9.00%) 8 (8.00%) high mild 1 (1.00%) high severe Below are relevant benchmark results of current version on m1 mac: single_thread_timeout time: [40.227 ns 40.416 ns 40.691 ns] change: [+81.321% +82.817% +84.121%] (p = 0.00 < 0.05) Performance has regressed. Found 14 outliers among 100 measurements (14.00%) 3 (3.00%) high mild 11 (11.00%) high severe multi_thread_timeout-8 time: [183.16 ns 186.02 ns 188.21 ns] change: [+3765.0% +3880.4% +3987.4%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 4 (4.00%) low severe 6 (6.00%) low mild --- benches/Cargo.toml | 5 ++++ benches/time_timeout.rs | 62 +++++++++++++++++++++++++++++++++++++++ tokio/src/time/sleep.rs | 10 +++++++ tokio/src/time/timeout.rs | 53 +++++++++++++++++++++++---------- 4 files changed, 114 insertions(+), 16 deletions(-) create mode 100644 benches/time_timeout.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index c581055cf65..0465cb00e1f 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -90,3 +90,8 @@ harness = false name = "time_now" path = "time_now.rs" harness = false + +[[bench]] +name = "time_timeout" +path = "time_timeout.rs" +harness = false \ No newline at end of file diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs new file mode 100644 index 00000000000..66f3e6f6b9e --- /dev/null +++ b/benches/time_timeout.rs @@ -0,0 +1,62 @@ +//! Benchmark spawning a task onto the basic and threaded Tokio executors. +//! This essentially measure the time to enqueue a task in the local and remote +//! case. + +use std::time::{Duration, Instant}; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use tokio::time::timeout; + +// a vevry quick async task, but might timeout +async fn quick_job() -> usize { + 1 +} + +fn single_thread_scheduler_timeout(c: &mut Criterion) { + do_test(c, 1, "single_thread_timeout"); +} + +fn multi_thread_scheduler_timeout(c: &mut Criterion) { + do_test(c, 8, "multi_thread_timeout-8"); +} + +fn do_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap(); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let h = timeout(Duration::from_secs(1), quick_job()); + assert_eq!(black_box(h.await.unwrap()), 1); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +criterion_group!( + timeout_benchmark, + single_thread_scheduler_timeout, + multi_thread_scheduler_timeout, +); + +criterion_main!(timeout_benchmark); diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 36f6e83c6b1..e7c1b1e5db5 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -256,6 +256,16 @@ impl Sleep { use crate::runtime::scheduler; let handle = scheduler::Handle::current(); + Self::new_timeout_with_handle(deadline, location, handle) + } + + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[track_caller] + pub(crate) fn new_timeout_with_handle( + deadline: Instant, + location: Option<&'static Location<'static>>, + handle: crate::runtime::scheduler::Handle, + ) -> Sleep { let entry = TimerEntry::new(&handle, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 52ab9891c69..3a1b6f90058 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -6,7 +6,7 @@ use crate::{ runtime::coop, - time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, + time::{error::Elapsed, Duration, Instant, Sleep}, util::trace, }; @@ -87,14 +87,7 @@ pub fn timeout(duration: Duration, future: F) -> Timeout where F: Future, { - let location = trace::caller_location(); - - let deadline = Instant::now().checked_add(duration); - let delay = match deadline { - Some(deadline) => Sleep::new_timeout(deadline, location), - None => Sleep::far_future(location), - }; - Timeout::new_with_delay(future, delay) + Timeout::new_with_delay(future, Instant::now().checked_add(duration)) } /// Requires a `Future` to complete before the specified instant in time. @@ -146,11 +139,14 @@ pub fn timeout_at(deadline: Instant, future: F) -> Timeout where F: Future, { - let delay = sleep_until(deadline); + use crate::runtime::scheduler; + let handle = scheduler::Handle::current(); Timeout { value: future, - delay, + deadline: Some(deadline), + delay: None, + handle, } } @@ -162,13 +158,23 @@ pin_project! { #[pin] value: T, #[pin] - delay: Sleep, + delay: Option, + deadline : Option, + handle: crate::runtime::scheduler::Handle, } } impl Timeout { - pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout { - Timeout { value, delay } + pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { + use crate::runtime::scheduler; + let handle = scheduler::Handle::current(); + + Timeout { + value, + deadline, + delay: None, + handle, + } } /// Gets a reference to the underlying value in this timeout. @@ -194,7 +200,7 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let me = self.project(); + let mut me = self.project(); let had_budget_before = coop::has_budget_remaining(); @@ -205,10 +211,25 @@ where let has_budget_now = coop::has_budget_remaining(); + // If the above inner future is ready, the below code will not be executed. + // This lazy initiation is for performance purposes, + // it can avoid unnecessary of `Sleep` creation and drop. + if me.delay.is_none() { + let location = trace::caller_location(); + let delay = match me.deadline { + Some(deadline) => { + Sleep::new_timeout_with_handle(*deadline, location, me.handle.clone()) + } + None => Sleep::far_future(location), + }; + me.delay.as_mut().set(Some(delay)); + } + let delay = me.delay; let poll_delay = || -> Poll { - match delay.poll(cx) { + // Safety: we have just assigned it a value of `Some`. + match delay.as_pin_mut().unwrap().poll(cx) { Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), Poll::Pending => Poll::Pending, } From 0eb1d3edaa363246c25fe1a8a0b7b56fdf31a54d Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Wed, 24 Apr 2024 01:27:15 +0800 Subject: [PATCH 02/22] feat: panic if the time driver is not enabled --- tokio/src/time/timeout.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 3a1b6f90058..e79234eb66d 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -141,7 +141,9 @@ where { use crate::runtime::scheduler; let handle = scheduler::Handle::current(); - + // Panic if the time driver is not enabled + let _ = handle.driver().time(); + Timeout { value: future, deadline: Some(deadline), @@ -168,6 +170,8 @@ impl Timeout { pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { use crate::runtime::scheduler; let handle = scheduler::Handle::current(); + // Panic if the time driver is not enabled + let _ = handle.driver().time(); Timeout { value, From a4e1231b9975c1d32a59527b6f3eb325308bc60d Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Wed, 24 Apr 2024 01:31:56 +0800 Subject: [PATCH 03/22] rustfmt --- tokio/src/time/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index e79234eb66d..ff00f808e3f 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -143,7 +143,7 @@ where let handle = scheduler::Handle::current(); // Panic if the time driver is not enabled let _ = handle.driver().time(); - + Timeout { value: future, deadline: Some(deadline), From cb54a1ea7e5d0a8a4b486a17f6ab8ab94a03cc85 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Wed, 24 Apr 2024 08:09:55 +0800 Subject: [PATCH 04/22] add use crate::runtime::scheduler --- tokio/src/time/timeout.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index ff00f808e3f..05ae93c48ac 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -6,6 +6,7 @@ use crate::{ runtime::coop, + runtime::scheduler, time::{error::Elapsed, Duration, Instant, Sleep}, util::trace, }; @@ -139,7 +140,6 @@ pub fn timeout_at(deadline: Instant, future: F) -> Timeout where F: Future, { - use crate::runtime::scheduler; let handle = scheduler::Handle::current(); // Panic if the time driver is not enabled let _ = handle.driver().time(); @@ -162,13 +162,12 @@ pin_project! { #[pin] delay: Option, deadline : Option, - handle: crate::runtime::scheduler::Handle, + handle: scheduler::Handle, } } impl Timeout { pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { - use crate::runtime::scheduler; let handle = scheduler::Handle::current(); // Panic if the time driver is not enabled let _ = handle.driver().time(); From f8874b2807404c1b730d687755f235d5db391fcd Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Wed, 24 Apr 2024 08:40:24 +0800 Subject: [PATCH 05/22] add #[track_caller] for new_with_delay --- tokio/src/time/timeout.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 05ae93c48ac..a0b1b6040d6 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -167,6 +167,7 @@ pin_project! { } impl Timeout { + #[track_caller] pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { let handle = scheduler::Handle::current(); // Panic if the time driver is not enabled From 9dd1e33edb10f6bb6152aefa5947c0a8668a510d Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 11:01:12 +0800 Subject: [PATCH 06/22] update comments --- tokio/src/time/timeout.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index a0b1b6040d6..f6d985a6e56 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -141,7 +141,7 @@ where F: Future, { let handle = scheduler::Handle::current(); - // Panic if the time driver is not enabled + // Panic if the time driver is not enabled. let _ = handle.driver().time(); Timeout { @@ -170,7 +170,7 @@ impl Timeout { #[track_caller] pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { let handle = scheduler::Handle::current(); - // Panic if the time driver is not enabled + // Panic if the time driver is not enabled. let _ = handle.driver().time(); Timeout { @@ -217,7 +217,7 @@ where // If the above inner future is ready, the below code will not be executed. // This lazy initiation is for performance purposes, - // it can avoid unnecessary of `Sleep` creation and drop. + // it can avoid the unnecessary creation and drop of `Sleep`. if me.delay.is_none() { let location = trace::caller_location(); let delay = match me.deadline { From 2d3efb6abe6784129bc84e6dd0c0274a3d6a8a17 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 15:07:27 +0800 Subject: [PATCH 07/22] make TimerEntry lazy init --- tokio/src/runtime/time/entry.rs | 4 -- tokio/src/time/sleep.rs | 83 +++++++++++++++++++++++---------- tokio/src/time/timeout.rs | 43 ++++++----------- 3 files changed, 72 insertions(+), 58 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 0998b53011d..8f04cd39d57 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -496,10 +496,6 @@ impl TimerEntry { unsafe { &*self.inner.get() } } - pub(crate) fn deadline(&self) -> Instant { - self.deadline - } - pub(crate) fn is_elapsed(&self) -> bool { !self.inner().state.might_be_registered() && self.registered } diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index e7c1b1e5db5..159bf1e5258 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,3 +1,4 @@ +use crate::runtime::scheduler; use crate::runtime::time::TimerEntry; use crate::time::{error::Error, Duration, Instant}; use crate::util::trace; @@ -226,10 +227,11 @@ pin_project! { #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { inner: Inner, - + deadline : Instant, + handle: scheduler::Handle, // The link between the `Sleep` instance and the timer that drives it. #[pin] - entry: TimerEntry, + entry: Option, } } @@ -266,8 +268,6 @@ impl Sleep { location: Option<&'static Location<'static>>, handle: crate::runtime::scheduler::Handle, ) -> Sleep { - let entry = TimerEntry::new(&handle, deadline); - #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { let clock = handle.driver().clock(); @@ -312,23 +312,27 @@ impl Sleep { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let inner = Inner {}; - Sleep { inner, entry } - } - - pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { - Self::new_timeout(Instant::far_future(), location) + Sleep { + inner, + deadline, + handle, + entry: None, + } } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.entry.deadline() + self.deadline } /// Returns `true` if `Sleep` has elapsed. /// /// A `Sleep` instance is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() + if self.entry.is_none() { + return false; + } + self.entry.as_ref().unwrap().is_elapsed() } /// Resets the `Sleep` instance to a new deadline. @@ -372,14 +376,22 @@ impl Sleep { /// without having it registered. This is required in e.g. the /// [`crate::time::Interval`] where we want to reset the internal [Sleep] /// without having it wake up the last task that polled it. - pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { - let mut me = self.project(); - me.entry.as_mut().reset(deadline, false); + pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, deadline: Instant) { + self.as_mut().lazy_init_timer_entry(deadline); + self.project() + .entry + .as_pin_mut() + .unwrap() + .reset(deadline, false); } - fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { - let mut me = self.project(); - me.entry.as_mut().reset(deadline, true); + fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) { + self.as_mut().lazy_init_timer_entry(deadline); + self.project() + .entry + .as_pin_mut() + .unwrap() + .reset(deadline, true); #[cfg(all(tokio_unstable, feature = "tracing"))] { @@ -408,10 +420,15 @@ impl Sleep { } } - fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let me = self.project(); - + fn poll_elapsed( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { ready!(crate::trace::trace_leaf(cx)); + let deadline = self.deadline; + self.as_mut().lazy_init_timer_entry(deadline); + + let me = self.project(); // Keep track of task budget #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -422,11 +439,16 @@ impl Sleep { #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] let coop = ready!(crate::runtime::coop::poll_proceed(cx)); - - let result = me.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }); + // Safety: we have just assigned it a value of `Some`. + let result = me + .entry + .as_pin_mut() + .unwrap() + .poll_elapsed(cx) + .map(move |r| { + coop.made_progress(); + r + }); #[cfg(all(tokio_unstable, feature = "tracing"))] return trace_poll_op!("poll_elapsed", result); @@ -434,6 +456,17 @@ impl Sleep { #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] return result; } + + // This lazy initiation is for performance purposes, + // it can avoid the unnecessary creation and drop of `TimerEntry`. + fn lazy_init_timer_entry(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + *me.deadline = deadline; + if me.entry.is_none() { + let entry = TimerEntry::new(me.handle, deadline); + me.entry.as_mut().set(Some(entry)); + } + } } impl Future for Sleep { diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index f6d985a6e56..1152270e019 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -144,11 +144,11 @@ where // Panic if the time driver is not enabled. let _ = handle.driver().time(); + let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle); + Timeout { value: future, - deadline: Some(deadline), - delay: None, - handle, + delay, } } @@ -160,9 +160,7 @@ pin_project! { #[pin] value: T, #[pin] - delay: Option, - deadline : Option, - handle: scheduler::Handle, + delay: Sleep, } } @@ -173,12 +171,14 @@ impl Timeout { // Panic if the time driver is not enabled. let _ = handle.driver().time(); - Timeout { - value, - deadline, - delay: None, - handle, - } + let deadline = match deadline { + Some(deadline) => deadline, + None => Instant::far_future(), + }; + + let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle); + + Timeout { value, delay } } /// Gets a reference to the underlying value in this timeout. @@ -204,7 +204,7 @@ where type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); + let me = self.project(); let had_budget_before = coop::has_budget_remaining(); @@ -215,25 +215,10 @@ where let has_budget_now = coop::has_budget_remaining(); - // If the above inner future is ready, the below code will not be executed. - // This lazy initiation is for performance purposes, - // it can avoid the unnecessary creation and drop of `Sleep`. - if me.delay.is_none() { - let location = trace::caller_location(); - let delay = match me.deadline { - Some(deadline) => { - Sleep::new_timeout_with_handle(*deadline, location, me.handle.clone()) - } - None => Sleep::far_future(location), - }; - me.delay.as_mut().set(Some(delay)); - } - let delay = me.delay; let poll_delay = || -> Poll { - // Safety: we have just assigned it a value of `Some`. - match delay.as_pin_mut().unwrap().poll(cx) { + match delay.poll(cx) { Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), Poll::Pending => Poll::Pending, } From 1ae3de2839061348abb9021d93c15c0b2f819a6f Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 15:36:29 +0800 Subject: [PATCH 08/22] feat: add single_thread_sleep and multi_thread_sleep-8 --- benches/time_timeout.rs | 54 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs index 66f3e6f6b9e..93c41da3a28 100644 --- a/benches/time_timeout.rs +++ b/benches/time_timeout.rs @@ -5,7 +5,7 @@ use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; // a vevry quick async task, but might timeout async fn quick_job() -> usize { @@ -13,14 +13,14 @@ async fn quick_job() -> usize { } fn single_thread_scheduler_timeout(c: &mut Criterion) { - do_test(c, 1, "single_thread_timeout"); + do_timeout_test(c, 1, "single_thread_timeout"); } fn multi_thread_scheduler_timeout(c: &mut Criterion) { - do_test(c, 8, "multi_thread_timeout-8"); + do_timeout_test(c, 8, "multi_thread_timeout-8"); } -fn do_test(c: &mut Criterion, workers: usize, name: &str) { +fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(workers) @@ -31,14 +31,14 @@ fn do_test(c: &mut Criterion, workers: usize, name: &str) { b.iter_custom(|iters| { let start = Instant::now(); runtime.block_on(async { - black_box(spawn_job(iters as usize, workers).await); + black_box(spawn_timeout_job(iters as usize, workers).await); }); start.elapsed() }) }); } -async fn spawn_job(iters: usize, procs: usize) { +async fn spawn_timeout_job(iters: usize, procs: usize) { let mut handles = Vec::with_capacity(procs); for _ in 0..procs { handles.push(tokio::spawn(async move { @@ -53,10 +53,52 @@ async fn spawn_job(iters: usize, procs: usize) { } } +fn single_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 1, "single_thread_sleep"); +} + +fn multi_thread_scheduler_sleep(c: &mut Criterion) { + do_sleep_test(c, 8, "multi_thread_sleep-8"); +} + +fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap(); + + c.bench_function(name, |b| { + b.iter_custom(|iters| { + let start = Instant::now(); + runtime.block_on(async { + black_box(spawn_sleep_job(iters as usize, workers).await); + }); + start.elapsed() + }) + }); +} + +async fn spawn_sleep_job(iters: usize, procs: usize) { + let mut handles = Vec::with_capacity(procs); + for _ in 0..procs { + handles.push(tokio::spawn(async move { + for _ in 0..iters / procs { + let _h = black_box(sleep(Duration::from_secs(1))); + } + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + criterion_group!( timeout_benchmark, single_thread_scheduler_timeout, multi_thread_scheduler_timeout, + single_thread_scheduler_sleep, + multi_thread_scheduler_sleep ); criterion_main!(timeout_benchmark); From ef3630b75bf3689c79f52beb14bee578aced88b1 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 17:57:46 +0800 Subject: [PATCH 09/22] feat: use new_current_thread --- benches/Cargo.toml | 2 +- benches/time_timeout.rs | 34 ++++++++++++++++++++++------------ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 0465cb00e1f..c1d13bac279 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -94,4 +94,4 @@ harness = false [[bench]] name = "time_timeout" path = "time_timeout.rs" -harness = false \ No newline at end of file +harness = false diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs index 93c41da3a28..a5771c7815a 100644 --- a/benches/time_timeout.rs +++ b/benches/time_timeout.rs @@ -5,13 +5,32 @@ use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use tokio::time::{sleep, timeout}; +use tokio::{ + runtime::Runtime, + time::{sleep, timeout}, +}; // a vevry quick async task, but might timeout async fn quick_job() -> usize { 1 } +fn build_run_time(workers: usize) -> Runtime { + if workers == 1 { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap() + } else { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(workers) + .build() + .unwrap() + } +} + fn single_thread_scheduler_timeout(c: &mut Criterion) { do_timeout_test(c, 1, "single_thread_timeout"); } @@ -21,12 +40,7 @@ fn multi_thread_scheduler_timeout(c: &mut Criterion) { } fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(workers) - .build() - .unwrap(); - + let runtime = build_run_time(workers); c.bench_function(name, |b| { b.iter_custom(|iters| { let start = Instant::now(); @@ -62,11 +76,7 @@ fn multi_thread_scheduler_sleep(c: &mut Criterion) { } fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .worker_threads(workers) - .build() - .unwrap(); + let runtime = build_run_time(workers); c.bench_function(name, |b| { b.iter_custom(|iters| { From f6fa09bbf8ee23217a24edd73a630c1e376e6437 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 18:11:40 +0800 Subject: [PATCH 10/22] fix: new me variable --- tokio/src/time/sleep.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 159bf1e5258..5a149088484 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -387,11 +387,9 @@ impl Sleep { fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) { self.as_mut().lazy_init_timer_entry(deadline); - self.project() - .entry - .as_pin_mut() - .unwrap() - .reset(deadline, true); + + let me = self.project(); + me.entry.as_pin_mut().unwrap().reset(deadline, true); #[cfg(all(tokio_unstable, feature = "tracing"))] { From b1fa9904b155f4467f89f6e24bafc9ad95df4e8c Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 18:28:08 +0800 Subject: [PATCH 11/22] fix: create me variable in the block --- tokio/src/time/sleep.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 5a149088484..0d1b07f8c55 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -387,12 +387,11 @@ impl Sleep { fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) { self.as_mut().lazy_init_timer_entry(deadline); - - let me = self.project(); - me.entry.as_pin_mut().unwrap().reset(deadline, true); - + self.as_mut().project().entry.as_pin_mut().unwrap().reset(deadline, true); + #[cfg(all(tokio_unstable, feature = "tracing"))] { + let me = self.as_mut(); let _resource_enter = me.inner.ctx.resource_span.enter(); me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); @@ -402,8 +401,8 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let clock = me.entry.clock(); - let time_source = me.entry.driver().time_source(); + let clock = me.entry.unwrap().clock(); + let time_source = me.entry.unwrap().driver().time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) From 8c0c2e8518fa95812c66bf827f4a8f54d8d8e482 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 18:32:23 +0800 Subject: [PATCH 12/22] fix: ci --- tokio/src/time/sleep.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 0d1b07f8c55..e76d9410707 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -387,11 +387,16 @@ impl Sleep { fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) { self.as_mut().lazy_init_timer_entry(deadline); - self.as_mut().project().entry.as_pin_mut().unwrap().reset(deadline, true); - + self.as_mut() + .project() + .entry + .as_pin_mut() + .unwrap() + .reset(deadline, true); + #[cfg(all(tokio_unstable, feature = "tracing"))] { - let me = self.as_mut(); + let me = self.as_mut().project(); let _resource_enter = me.inner.ctx.resource_span.enter(); me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); From a9a11e963a0268352f30a79596b92963a909ef22 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 18:39:08 +0800 Subject: [PATCH 13/22] fix: ownership --- tokio/src/time/sleep.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index e76d9410707..285d246b276 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -406,8 +406,8 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let clock = me.entry.unwrap().clock(); - let time_source = me.entry.unwrap().driver().time_source(); + let clock = self.as_mut().project().entry.as_pin_mut().unwrap().clock(); + let time_source = self.as_mut().project().entry.as_pin_mut().unwrap().driver().time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) From ef657d23fd0f73bc73d3cc872feaceb0f8bf36b7 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 18:40:23 +0800 Subject: [PATCH 14/22] fix: fmt --- tokio/src/time/sleep.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 285d246b276..996658a2ccc 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -407,7 +407,14 @@ impl Sleep { let duration = { let clock = self.as_mut().project().entry.as_pin_mut().unwrap().clock(); - let time_source = self.as_mut().project().entry.as_pin_mut().unwrap().driver().time_source(); + let time_source = self + .as_mut() + .project() + .entry + .as_pin_mut() + .unwrap() + .driver() + .time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) From a446e4430b3798fe3750d48d1d76211600a32e76 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Wed, 24 Apr 2024 19:13:59 +0800 Subject: [PATCH 15/22] fix: lifetime issue --- tokio/src/time/sleep.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 996658a2ccc..8490b55eab7 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -396,7 +396,7 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] { - let me = self.as_mut().project(); + let me = self.project(); let _resource_enter = me.inner.ctx.resource_span.enter(); me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); @@ -406,15 +406,9 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let clock = self.as_mut().project().entry.as_pin_mut().unwrap().clock(); - let time_source = self - .as_mut() - .project() - .entry - .as_pin_mut() - .unwrap() - .driver() - .time_source(); + let entry_ref = me.entry.as_ref().get_ref().as_ref().unwrap(); + let clock = entry_ref.clock(); + let time_source = entry_ref.driver().time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) From 3a89c454e020a6452c99adc11da149935d2616f8 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Thu, 25 Apr 2024 14:17:57 +0800 Subject: [PATCH 16/22] rebase --- tokio/src/runtime/time/entry.rs | 50 +++++++++----- tokio/src/runtime/time/tests/mod.rs | 12 ++-- tokio/src/time/sleep.rs | 100 ++++++++-------------------- tokio/src/time/timeout.rs | 32 +++------ 4 files changed, 78 insertions(+), 116 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 8f04cd39d57..f90e5821db9 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -301,7 +301,7 @@ pub(crate) struct TimerEntry { /// /// This is manipulated only under the inner mutex. TODO: Can we use loom /// cells for this? - inner: StdUnsafeCell, + inner: StdUnsafeCell>, /// Deadline for the timer. This is used to register on the first /// poll, as we can't register prior to being pinned. deadline: Instant, @@ -477,27 +477,36 @@ unsafe impl linked_list::Link for TimerShared { impl TimerEntry { #[track_caller] - pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self { + pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { // Panic if the time driver is not enabled let _ = handle.driver().time(); - let driver = handle.clone(); - Self { - driver, - inner: StdUnsafeCell::new(TimerShared::new()), + driver: handle, + inner: StdUnsafeCell::new(None), deadline, registered: false, _m: std::marker::PhantomPinned, } } - fn inner(&self) -> &TimerShared { - unsafe { &*self.inner.get() } + fn inner(&self) -> Option<&TimerShared> { + unsafe { &*self.inner.get() }.as_ref() + } + + fn inner_mut(&mut self) -> &mut Option { + unsafe { &mut *self.inner.get() } + } + + pub(crate) fn deadline(&self) -> Instant { + self.deadline } pub(crate) fn is_elapsed(&self) -> bool { - !self.inner().state.might_be_registered() && self.registered + match self.inner() { + Some(inner) => !inner.state.might_be_registered() && self.registered, + None => false, + } } /// Cancels and deregisters the timer. This operation is irreversible. @@ -524,23 +533,31 @@ impl TimerEntry { // driver did so far and happens-before everything the driver does in // the future. While we have the lock held, we also go ahead and // deregister the entry if necessary. - unsafe { self.driver().clear_entry(NonNull::from(self.inner())) }; + + if let Some(inner) = self.inner() { + unsafe { self.driver().clear_entry(NonNull::from(inner)) }; + } } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { - unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time; - unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister; + let this = unsafe { self.as_mut().get_unchecked_mut() }; + this.deadline = new_time; + this.registered = reregister; + this.inner_mut().get_or_insert(TimerShared::new()); let tick = self.driver().time_source().deadline_to_tick(new_time); - if self.inner().extend_expiration(tick).is_ok() { + if self.inner().unwrap().extend_expiration(tick).is_ok() { return; } if reregister { unsafe { - self.driver() - .reregister(&self.driver.driver().io, tick, self.inner().into()); + self.driver().reregister( + &self.driver.driver().io, + tick, + self.inner().unwrap().into(), + ); } } } @@ -562,7 +579,8 @@ impl TimerEntry { let this = unsafe { self.get_unchecked_mut() }; - this.inner().state.poll(cx.waker()) + this.inner_mut().get_or_insert(TimerShared::new()); + this.inner().unwrap().state.poll(cx.waker()) } pub(crate) fn driver(&self) -> &super::Handle { diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index e7ab222ef63..520dc00a462 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -49,7 +49,7 @@ fn single_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -83,7 +83,7 @@ fn drop_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -117,7 +117,7 @@ fn change_waker() { let handle_ = handle.clone(); let jh = thread::spawn(move || { let entry = TimerEntry::new( - &handle_.inner, + handle_.inner.clone(), handle_.inner.driver().clock().now() + Duration::from_secs(1), ); pin!(entry); @@ -157,7 +157,7 @@ fn reset_future() { let start = handle.inner.driver().clock().now(); let jh = thread::spawn(move || { - let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1)); + let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1)); pin!(entry); let _ = entry @@ -219,7 +219,7 @@ fn poll_process_levels() { for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(i), )); @@ -253,7 +253,7 @@ fn poll_process_levels_targeted() { let handle = rt.handle(); let e1 = TimerEntry::new( - &handle.inner, + handle.inner.clone(), handle.inner.driver().clock().now() + Duration::from_millis(193), ); pin!(e1); diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 8490b55eab7..34d1eb71c5b 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -1,4 +1,3 @@ -use crate::runtime::scheduler; use crate::runtime::time::TimerEntry; use crate::time::{error::Error, Duration, Instant}; use crate::util::trace; @@ -227,11 +226,10 @@ pin_project! { #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Sleep { inner: Inner, - deadline : Instant, - handle: scheduler::Handle, + // The link between the `Sleep` instance and the timer that drives it. #[pin] - entry: Option, + entry: TimerEntry, } } @@ -256,20 +254,11 @@ impl Sleep { location: Option<&'static Location<'static>>, ) -> Sleep { use crate::runtime::scheduler; - let handle = scheduler::Handle::current(); - Self::new_timeout_with_handle(deadline, location, handle) - } - - #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] - #[track_caller] - pub(crate) fn new_timeout_with_handle( - deadline: Instant, - location: Option<&'static Location<'static>>, - handle: crate::runtime::scheduler::Handle, - ) -> Sleep { + let entry = TimerEntry::new(handle, deadline); #[cfg(all(tokio_unstable, feature = "tracing"))] let inner = { + let handle = scheduler::Handle::current(); let clock = handle.driver().clock(); let handle = &handle.driver().time(); let time_source = handle.time_source(); @@ -312,27 +301,23 @@ impl Sleep { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let inner = Inner {}; - Sleep { - inner, - deadline, - handle, - entry: None, - } + Sleep { inner, entry } + } + + pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep { + Self::new_timeout(Instant::far_future(), location) } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.deadline + self.entry.deadline() } /// Returns `true` if `Sleep` has elapsed. /// /// A `Sleep` instance is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - if self.entry.is_none() { - return false; - } - self.entry.as_ref().unwrap().is_elapsed() + self.entry.is_elapsed() } /// Resets the `Sleep` instance to a new deadline. @@ -376,23 +361,14 @@ impl Sleep { /// without having it registered. This is required in e.g. the /// [`crate::time::Interval`] where we want to reset the internal [Sleep] /// without having it wake up the last task that polled it. - pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, deadline: Instant) { - self.as_mut().lazy_init_timer_entry(deadline); - self.project() - .entry - .as_pin_mut() - .unwrap() - .reset(deadline, false); + pub(crate) fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset(deadline, false); } - fn reset_inner(mut self: Pin<&mut Self>, deadline: Instant) { - self.as_mut().lazy_init_timer_entry(deadline); - self.as_mut() - .project() - .entry - .as_pin_mut() - .unwrap() - .reset(deadline, true); + fn reset_inner(self: Pin<&mut Self>, deadline: Instant) { + let mut me = self.project(); + me.entry.as_mut().reset(deadline, true); #[cfg(all(tokio_unstable, feature = "tracing"))] { @@ -406,9 +382,8 @@ impl Sleep { tracing::trace_span!("runtime.resource.async_op.poll"); let duration = { - let entry_ref = me.entry.as_ref().get_ref().as_ref().unwrap(); - let clock = entry_ref.clock(); - let time_source = entry_ref.driver().time_source(); + let clock = me.entry.clock(); + let time_source = me.entry.driver().time_source(); let now = time_source.now(clock); let deadline_tick = time_source.deadline_to_tick(deadline); deadline_tick.saturating_sub(now) @@ -423,16 +398,11 @@ impl Sleep { } } - fn poll_elapsed( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll> { - ready!(crate::trace::trace_leaf(cx)); - let deadline = self.deadline; - self.as_mut().lazy_init_timer_entry(deadline); - + fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { let me = self.project(); + ready!(crate::trace::trace_leaf(cx)); + // Keep track of task budget #[cfg(all(tokio_unstable, feature = "tracing"))] let coop = ready!(trace_poll_op!( @@ -442,16 +412,11 @@ impl Sleep { #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] let coop = ready!(crate::runtime::coop::poll_proceed(cx)); - // Safety: we have just assigned it a value of `Some`. - let result = me - .entry - .as_pin_mut() - .unwrap() - .poll_elapsed(cx) - .map(move |r| { - coop.made_progress(); - r - }); + + let result = me.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }); #[cfg(all(tokio_unstable, feature = "tracing"))] return trace_poll_op!("poll_elapsed", result); @@ -459,17 +424,6 @@ impl Sleep { #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] return result; } - - // This lazy initiation is for performance purposes, - // it can avoid the unnecessary creation and drop of `TimerEntry`. - fn lazy_init_timer_entry(self: Pin<&mut Self>, deadline: Instant) { - let mut me = self.project(); - *me.deadline = deadline; - if me.entry.is_none() { - let entry = TimerEntry::new(me.handle, deadline); - me.entry.as_mut().set(Some(entry)); - } - } } impl Future for Sleep { diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index 1152270e019..52ab9891c69 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -6,8 +6,7 @@ use crate::{ runtime::coop, - runtime::scheduler, - time::{error::Elapsed, Duration, Instant, Sleep}, + time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, util::trace, }; @@ -88,7 +87,14 @@ pub fn timeout(duration: Duration, future: F) -> Timeout where F: Future, { - Timeout::new_with_delay(future, Instant::now().checked_add(duration)) + let location = trace::caller_location(); + + let deadline = Instant::now().checked_add(duration); + let delay = match deadline { + Some(deadline) => Sleep::new_timeout(deadline, location), + None => Sleep::far_future(location), + }; + Timeout::new_with_delay(future, delay) } /// Requires a `Future` to complete before the specified instant in time. @@ -140,11 +146,7 @@ pub fn timeout_at(deadline: Instant, future: F) -> Timeout where F: Future, { - let handle = scheduler::Handle::current(); - // Panic if the time driver is not enabled. - let _ = handle.driver().time(); - - let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle); + let delay = sleep_until(deadline); Timeout { value: future, @@ -165,19 +167,7 @@ pin_project! { } impl Timeout { - #[track_caller] - pub(crate) fn new_with_delay(value: T, deadline: Option) -> Timeout { - let handle = scheduler::Handle::current(); - // Panic if the time driver is not enabled. - let _ = handle.driver().time(); - - let deadline = match deadline { - Some(deadline) => deadline, - None => Instant::far_future(), - }; - - let delay = Sleep::new_timeout_with_handle(deadline, trace::caller_location(), handle); - + pub(crate) fn new_with_delay(value: T, delay: Sleep) -> Timeout { Timeout { value, delay } } From c47b73920458480039a09fad73dacbe54d4a88d4 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Thu, 25 Apr 2024 14:23:59 +0800 Subject: [PATCH 17/22] rm unnecessary code --- tokio/src/time/sleep.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 34d1eb71c5b..9223396fe54 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -372,7 +372,6 @@ impl Sleep { #[cfg(all(tokio_unstable, feature = "tracing"))] { - let me = self.project(); let _resource_enter = me.inner.ctx.resource_span.enter(); me.inner.ctx.async_op_span = tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset"); From 8e40ec43acd7800e3308bd66272cef271e9aa0aa Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Thu, 25 Apr 2024 16:30:26 +0800 Subject: [PATCH 18/22] add is_inner_init --- tokio/src/runtime/time/entry.rs | 36 ++++++++++++++------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index f90e5821db9..752e6b1e574 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -490,12 +490,13 @@ impl TimerEntry { } } - fn inner(&self) -> Option<&TimerShared> { - unsafe { &*self.inner.get() }.as_ref() + fn is_inner_init(&self) -> bool { + unsafe { &*self.inner.get() }.is_some() } - fn inner_mut(&mut self) -> &mut Option { - unsafe { &mut *self.inner.get() } + // This lazy initialization is for performance purposes. + fn inner(&self) -> &TimerShared { + unsafe { &mut *self.inner.get() }.get_or_insert(TimerShared::new()) } pub(crate) fn deadline(&self) -> Instant { @@ -503,14 +504,15 @@ impl TimerEntry { } pub(crate) fn is_elapsed(&self) -> bool { - match self.inner() { - Some(inner) => !inner.state.might_be_registered() && self.registered, - None => false, - } + self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered } /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { + // Avoid calling the `clear_entry` method, because it has not been initialized yet. + if !self.is_inner_init() { + return; + } // We need to perform an acq/rel fence with the driver thread, and the // simplest way to do so is to grab the driver lock. // @@ -533,31 +535,24 @@ impl TimerEntry { // driver did so far and happens-before everything the driver does in // the future. While we have the lock held, we also go ahead and // deregister the entry if necessary. - - if let Some(inner) = self.inner() { - unsafe { self.driver().clear_entry(NonNull::from(inner)) }; - } + unsafe { self.driver().clear_entry(NonNull::from(self.inner())) }; } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { let this = unsafe { self.as_mut().get_unchecked_mut() }; this.deadline = new_time; this.registered = reregister; - this.inner_mut().get_or_insert(TimerShared::new()); let tick = self.driver().time_source().deadline_to_tick(new_time); - if self.inner().unwrap().extend_expiration(tick).is_ok() { + if self.inner().extend_expiration(tick).is_ok() { return; } if reregister { unsafe { - self.driver().reregister( - &self.driver.driver().io, - tick, - self.inner().unwrap().into(), - ); + self.driver() + .reregister(&self.driver.driver().io, tick, self.inner().into()); } } } @@ -579,8 +574,7 @@ impl TimerEntry { let this = unsafe { self.get_unchecked_mut() }; - this.inner_mut().get_or_insert(TimerShared::new()); - this.inner().unwrap().state.poll(cx.waker()) + this.inner().state.poll(cx.waker()) } pub(crate) fn driver(&self) -> &super::Handle { From 07953b5bc28412aee23e801377978da81bdfd01d Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Thu, 2 May 2024 16:01:06 +0800 Subject: [PATCH 19/22] adopt code review suggestions from mox692 --- benches/time_timeout.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs index a5771c7815a..eb206a898a8 100644 --- a/benches/time_timeout.rs +++ b/benches/time_timeout.rs @@ -1,7 +1,3 @@ -//! Benchmark spawning a task onto the basic and threaded Tokio executors. -//! This essentially measure the time to enqueue a task in the local and remote -//! case. - use std::time::{Duration, Instant}; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -19,7 +15,6 @@ fn build_run_time(workers: usize) -> Runtime { if workers == 1 { tokio::runtime::Builder::new_current_thread() .enable_all() - .worker_threads(workers) .build() .unwrap() } else { From 373044d9c9d5983d635d415ef08ce44101227345 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Thu, 2 May 2024 16:53:05 +0800 Subject: [PATCH 20/22] get mutable ref only if the inner is none --- tokio/src/runtime/time/entry.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 752e6b1e574..a0892094baa 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -496,7 +496,13 @@ impl TimerEntry { // This lazy initialization is for performance purposes. fn inner(&self) -> &TimerShared { - unsafe { &mut *self.inner.get() }.get_or_insert(TimerShared::new()) + let inner = unsafe { &*self.inner.get() }; + if inner.is_none() { + unsafe { + *(&mut *self.inner.get()) = Some(TimerShared::new()); + } + } + return inner.as_ref().unwrap(); } pub(crate) fn deadline(&self) -> Instant { From 0129c925457029ff687bc9a21fd87219d695d261 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Thu, 2 May 2024 17:00:04 +0800 Subject: [PATCH 21/22] fix ci --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index a0892094baa..02a815921db 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -499,7 +499,7 @@ impl TimerEntry { let inner = unsafe { &*self.inner.get() }; if inner.is_none() { unsafe { - *(&mut *self.inner.get()) = Some(TimerShared::new()); + *self.inner.get() = Some(TimerShared::new()); } } return inner.as_ref().unwrap(); From b49272325e49145c56a00204546abbc452f5b68f Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Thu, 2 May 2024 17:37:47 +0800 Subject: [PATCH 22/22] fix typo --- benches/time_timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/time_timeout.rs b/benches/time_timeout.rs index eb206a898a8..c961477562c 100644 --- a/benches/time_timeout.rs +++ b/benches/time_timeout.rs @@ -6,7 +6,7 @@ use tokio::{ time::{sleep, timeout}, }; -// a vevry quick async task, but might timeout +// a very quick async task, but might timeout async fn quick_job() -> usize { 1 }