From a9390b28e09500c62d2f60bad254922de4c481ce Mon Sep 17 00:00:00 2001 From: mox692 Date: Sat, 30 Mar 2024 17:14:06 +0900 Subject: [PATCH 1/6] runtime: make improvements to `global_queue_interval` --- tokio/src/runtime/builder.rs | 5 +++++ tokio/src/runtime/scheduler/multi_thread/stats.rs | 2 +- tokio/src/runtime/scheduler/multi_thread/worker.rs | 2 -- tokio/src/runtime/scheduler/multi_thread_alt/stats.rs | 2 +- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 2 -- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 82d3596915e..5c417957c3f 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -758,6 +758,10 @@ impl Builder { /// /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing /// + /// # Panics + /// + /// This function will panic if 0 is passed as an argument. + /// /// # Examples /// /// ``` @@ -769,6 +773,7 @@ impl Builder { /// # } /// ``` pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { + assert!(val > 0, "global_queue_interval must be greater than 0"); self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 30c108c9dd6..03cfc790054 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -63,7 +63,7 @@ impl Stats { let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; cmp::max( - // We don't want to return less than 2 as that would result in the + // If we are using self-tuning, we don't want to return less than 2 as that would result in the // global queue always getting checked first. 2, cmp::min( diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 9998870ab4d..f07fb8568cd 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -985,8 +985,6 @@ impl Core { .stats .tuned_global_queue_interval(&worker.handle.shared.config); - debug_assert!(next > 1); - // Smooth out jitter if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index 7118e4915a0..c2045602797 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -82,7 +82,7 @@ impl Stats { let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; cmp::max( - // We don't want to return less than 2 as that would result in the + // If we are using self-tuning, we don't want to return less than 2 as that would result in the // global queue always getting checked first. 2, cmp::min( diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 54c6b0ed7ba..9dd4b91d7ee 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1288,8 +1288,6 @@ impl Worker { fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) { let next = core.stats.tuned_global_queue_interval(&cx.shared().config); - debug_assert!(next > 1); - // Smooth out jitter if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; From c9e63f4b0a5e483d5545bd0079c8cbe576bc5a15 Mon Sep 17 00:00:00 2001 From: mox692 Date: Mon, 1 Apr 2024 19:55:13 +0900 Subject: [PATCH 2/6] add track_caller to global_queue_interval --- tokio/src/runtime/builder.rs | 1 + tokio/tests/rt_panic.rs | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 5c417957c3f..499ba97f14a 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -772,6 +772,7 @@ impl Builder { /// .build(); /// # } /// ``` + #[track_caller] pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { assert!(val > 0, "global_queue_interval must be greater than 0"); self.global_queue_interval = Some(val); diff --git a/tokio/tests/rt_panic.rs b/tokio/tests/rt_panic.rs index ecaf977c881..5c0bd37a79e 100644 --- a/tokio/tests/rt_panic.rs +++ b/tokio/tests/rt_panic.rs @@ -70,6 +70,18 @@ fn builder_max_blocking_threads_panic_caller() -> Result<(), Box> { Ok(()) } +#[test] +fn builder_global_queue_interval_panic_caller() -> Result<(), Box> { + let panic_location_file = test_panic(|| { + let _ = Builder::new_multi_thread().global_queue_interval(0).build(); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + fn current_thread() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() From fc5d5fa44104dd13bec725f06c7a7a447197d1fd Mon Sep 17 00:00:00 2001 From: mox692 Date: Tue, 2 Apr 2024 21:28:04 +0900 Subject: [PATCH 3/6] Add test when global_queue_interval is set to 1 --- tokio/tests/rt_threaded.rs | 27 ++++++++++++++++++++++++++- tokio/tests/rt_threaded_alt.rs | 25 +++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 8a61c6ad38f..41fd8c8da42 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -10,8 +10,8 @@ use tokio_test::{assert_err, assert_ok}; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; @@ -486,6 +486,31 @@ fn max_blocking_threads_set_to_zero() { .unwrap(); } +/// Regression test for #6445. +/// +/// After #6445, setting `global_queue_interval` to 1 is now technically valid. +/// This test confirms that there is no regression in `multi_thread_runtime` +/// when global_queue_interval is set to 1. +#[test] +fn global_queue_interval_set_to_one() { + let rt = tokio::runtime::Builder::new_multi_thread_alt() + .global_queue_interval(1) + .build() + .unwrap(); + + // Perform a simple work. + let cnt = Arc::new(AtomicUsize::new(0)); + rt.block_on(async { + for _ in 0..10 { + let cnt = cnt.clone(); + tokio::spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) }) + .await + .unwrap(); + } + }); + assert_eq!(cnt.load(Relaxed), 10); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn hang_on_shutdown() { let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 3e3ac076290..27d25baea8f 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -487,6 +487,31 @@ fn max_blocking_threads_set_to_zero() { .unwrap(); } +/// Regression test for #6445. +/// +/// After #6445, setting `global_queue_interval` to 1 is now technically valid. +/// This test confirms that there is no regression in `multi_thread_runtime` +/// when global_queue_interval is set to 1. +#[test] +fn global_queue_interval_set_to_one() { + let rt = tokio::runtime::Builder::new_multi_thread() + .global_queue_interval(1) + .build() + .unwrap(); + + // Perform a simple work. + let cnt = Arc::new(AtomicUsize::new(0)); + rt.block_on(async { + for _ in 0..10 { + let cnt = cnt.clone(); + tokio::spawn(async move { cnt.fetch_add(1, Relaxed) }) + .await + .unwrap(); + } + }); + assert_eq!(cnt.load(Relaxed), 10); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn hang_on_shutdown() { let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); From fa57fbbddf5705aa7aff89c2408714f6ee23860d Mon Sep 17 00:00:00 2001 From: mox692 Date: Tue, 2 Apr 2024 21:32:39 +0900 Subject: [PATCH 4/6] Fix lint --- tokio/tests/rt_threaded.rs | 2 +- tokio/tests/rt_threaded_alt.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 41fd8c8da42..04ec52c9f6c 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -493,7 +493,7 @@ fn max_blocking_threads_set_to_zero() { /// when global_queue_interval is set to 1. #[test] fn global_queue_interval_set_to_one() { - let rt = tokio::runtime::Builder::new_multi_thread_alt() + let rt = tokio::runtime::Builder::new_multi_thread() .global_queue_interval(1) .build() .unwrap(); diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 27d25baea8f..918ad72b2f9 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -494,7 +494,7 @@ fn max_blocking_threads_set_to_zero() { /// when global_queue_interval is set to 1. #[test] fn global_queue_interval_set_to_one() { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = tokio::runtime::Builder::new_multi_thread_alt() .global_queue_interval(1) .build() .unwrap(); From 87794a41dcf08d00b59530dc19a45ca4070ae73c Mon Sep 17 00:00:00 2001 From: mox692 Date: Tue, 2 Apr 2024 21:41:15 +0900 Subject: [PATCH 5/6] Remove another assertion --- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 9dd4b91d7ee..c315e382291 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -664,7 +664,6 @@ impl Worker { /// Ensure core's state is set correctly for the worker to start using. fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) { self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config); - debug_assert!(self.global_queue_interval > 1); // Reset `lifo_enabled` here in case the core was previously stolen from // a task that had the LIFO slot disabled. From bbe13d0845da9b3fb57042f75c905a0410062637 Mon Sep 17 00:00:00 2001 From: mox692 Date: Tue, 2 Apr 2024 23:01:45 +0900 Subject: [PATCH 6/6] run tasks parallel --- tokio/tests/rt_threaded.rs | 9 ++++++--- tokio/tests/rt_threaded_alt.rs | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 04ec52c9f6c..6e769fc831f 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -501,11 +501,14 @@ fn global_queue_interval_set_to_one() { // Perform a simple work. let cnt = Arc::new(AtomicUsize::new(0)); rt.block_on(async { + let mut set = tokio::task::JoinSet::new(); for _ in 0..10 { let cnt = cnt.clone(); - tokio::spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) }) - .await - .unwrap(); + set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) }); + } + + while let Some(res) = set.join_next().await { + res.unwrap(); } }); assert_eq!(cnt.load(Relaxed), 10); diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 918ad72b2f9..8b7143b2f97 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -502,11 +502,13 @@ fn global_queue_interval_set_to_one() { // Perform a simple work. let cnt = Arc::new(AtomicUsize::new(0)); rt.block_on(async { + let mut set = tokio::task::JoinSet::new(); for _ in 0..10 { let cnt = cnt.clone(); - tokio::spawn(async move { cnt.fetch_add(1, Relaxed) }) - .await - .unwrap(); + set.spawn(async move { cnt.fetch_add(1, Relaxed) }); + } + while let Some(res) = set.join_next().await { + res.unwrap(); } }); assert_eq!(cnt.load(Relaxed), 10);