diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 82d3596915e..499ba97f14a 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -758,6 +758,10 @@ impl Builder { /// /// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing /// + /// # Panics + /// + /// This function will panic if 0 is passed as an argument. + /// /// # Examples /// /// ``` @@ -768,7 +772,9 @@ impl Builder { /// .build(); /// # } /// ``` + #[track_caller] pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { + assert!(val > 0, "global_queue_interval must be greater than 0"); self.global_queue_interval = Some(val); self } diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 30c108c9dd6..03cfc790054 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -63,7 +63,7 @@ impl Stats { let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; cmp::max( - // We don't want to return less than 2 as that would result in the + // If we are using self-tuning, we don't want to return less than 2 as that would result in the // global queue always getting checked first. 2, cmp::min( diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 9998870ab4d..f07fb8568cd 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -985,8 +985,6 @@ impl Core { .stats .tuned_global_queue_interval(&worker.handle.shared.config); - debug_assert!(next > 1); - // Smooth out jitter if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index 7118e4915a0..c2045602797 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -82,7 +82,7 @@ impl Stats { let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32; cmp::max( - // We don't want to return less than 2 as that would result in the + // If we are using self-tuning, we don't want to return less than 2 as that would result in the // global queue always getting checked first. 2, cmp::min( diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 54c6b0ed7ba..c315e382291 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -664,7 +664,6 @@ impl Worker { /// Ensure core's state is set correctly for the worker to start using. fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) { self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config); - debug_assert!(self.global_queue_interval > 1); // Reset `lifo_enabled` here in case the core was previously stolen from // a task that had the LIFO slot disabled. @@ -1288,8 +1287,6 @@ impl Worker { fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) { let next = core.stats.tuned_global_queue_interval(&cx.shared().config); - debug_assert!(next > 1); - // Smooth out jitter if abs_diff(self.global_queue_interval, next) > 2 { self.global_queue_interval = next; diff --git a/tokio/tests/rt_panic.rs b/tokio/tests/rt_panic.rs index ecaf977c881..5c0bd37a79e 100644 --- a/tokio/tests/rt_panic.rs +++ b/tokio/tests/rt_panic.rs @@ -70,6 +70,18 @@ fn builder_max_blocking_threads_panic_caller() -> Result<(), Box> { Ok(()) } +#[test] +fn builder_global_queue_interval_panic_caller() -> Result<(), Box> { + let panic_location_file = test_panic(|| { + let _ = Builder::new_multi_thread().global_queue_interval(0).build(); + }); + + // The panic location should be in this file + assert_eq!(&panic_location_file.unwrap(), file!()); + + Ok(()) +} + fn current_thread() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 8a61c6ad38f..6e769fc831f 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -10,8 +10,8 @@ use tokio_test::{assert_err, assert_ok}; use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; @@ -486,6 +486,34 @@ fn max_blocking_threads_set_to_zero() { .unwrap(); } +/// Regression test for #6445. +/// +/// After #6445, setting `global_queue_interval` to 1 is now technically valid. +/// This test confirms that there is no regression in `multi_thread_runtime` +/// when global_queue_interval is set to 1. +#[test] +fn global_queue_interval_set_to_one() { + let rt = tokio::runtime::Builder::new_multi_thread() + .global_queue_interval(1) + .build() + .unwrap(); + + // Perform a simple work. + let cnt = Arc::new(AtomicUsize::new(0)); + rt.block_on(async { + let mut set = tokio::task::JoinSet::new(); + for _ in 0..10 { + let cnt = cnt.clone(); + set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) }); + } + + while let Some(res) = set.join_next().await { + res.unwrap(); + } + }); + assert_eq!(cnt.load(Relaxed), 10); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn hang_on_shutdown() { let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>(); diff --git a/tokio/tests/rt_threaded_alt.rs b/tokio/tests/rt_threaded_alt.rs index 3e3ac076290..8b7143b2f97 100644 --- a/tokio/tests/rt_threaded_alt.rs +++ b/tokio/tests/rt_threaded_alt.rs @@ -487,6 +487,33 @@ fn max_blocking_threads_set_to_zero() { .unwrap(); } +/// Regression test for #6445. +/// +/// After #6445, setting `global_queue_interval` to 1 is now technically valid. +/// This test confirms that there is no regression in `multi_thread_runtime` +/// when global_queue_interval is set to 1. +#[test] +fn global_queue_interval_set_to_one() { + let rt = tokio::runtime::Builder::new_multi_thread_alt() + .global_queue_interval(1) + .build() + .unwrap(); + + // Perform a simple work. + let cnt = Arc::new(AtomicUsize::new(0)); + rt.block_on(async { + let mut set = tokio::task::JoinSet::new(); + for _ in 0..10 { + let cnt = cnt.clone(); + set.spawn(async move { cnt.fetch_add(1, Relaxed) }); + } + while let Some(res) = set.join_next().await { + res.unwrap(); + } + }); + assert_eq!(cnt.load(Relaxed), 10); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn hang_on_shutdown() { let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();