Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: make improvements to global_queue_interval #6445

Merged
merged 7 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ impl Builder {
///
/// [the module documentation]: crate::runtime#multi-threaded-runtime-behavior-at-the-time-of-writing
///
/// # Panics
///
/// This function will panic if 0 is passed as an argument.
///
/// # Examples
///
/// ```
Expand All @@ -768,7 +772,9 @@ impl Builder {
/// .build();
/// # }
/// ```
#[track_caller]
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
assert!(val > 0, "global_queue_interval must be greater than 0");
mox692 marked this conversation as resolved.
Show resolved Hide resolved
self.global_queue_interval = Some(val);
self
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Stats {
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;

cmp::max(
// We don't want to return less than 2 as that would result in the
// If we are using self-tuning, we don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,6 @@ impl Core {
.stats
.tuned_global_queue_interval(&worker.handle.shared.config);

debug_assert!(next > 1);
mox692 marked this conversation as resolved.
Show resolved Hide resolved

// Smooth out jitter
if abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Stats {
let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;

cmp::max(
// We don't want to return less than 2 as that would result in the
// If we are using self-tuning, we don't want to return less than 2 as that would result in the
// global queue always getting checked first.
2,
cmp::min(
Expand Down
3 changes: 0 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ impl Worker {
/// Ensure core's state is set correctly for the worker to start using.
fn reset_acquired_core(&mut self, cx: &Context, synced: &mut Synced, core: &mut Core) {
self.global_queue_interval = core.stats.tuned_global_queue_interval(&cx.shared().config);
debug_assert!(self.global_queue_interval > 1);

// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
Expand Down Expand Up @@ -1288,8 +1287,6 @@ impl Worker {
fn tune_global_queue_interval(&mut self, cx: &Context, core: &mut Core) {
let next = core.stats.tuned_global_queue_interval(&cx.shared().config);

debug_assert!(next > 1);

// Smooth out jitter
if abs_diff(self.global_queue_interval, next) > 2 {
self.global_queue_interval = next;
Expand Down
12 changes: 12 additions & 0 deletions tokio/tests/rt_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ fn builder_max_blocking_threads_panic_caller() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[test]
fn builder_global_queue_interval_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let _ = Builder::new_multi_thread().global_queue_interval(0).build();
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

fn current_thread() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand Down
30 changes: 29 additions & 1 deletion tokio/tests/rt_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tokio_test::{assert_err, assert_ok};
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::task::{Context, Poll, Waker};

Expand Down Expand Up @@ -486,6 +486,34 @@ fn max_blocking_threads_set_to_zero() {
.unwrap();
}

/// Regression test for #6445.
///
/// After #6445, setting `global_queue_interval` to 1 is now technically valid.
/// This test confirms that there is no regression in `multi_thread_runtime`
/// when global_queue_interval is set to 1.
#[test]
fn global_queue_interval_set_to_one() {
let rt = tokio::runtime::Builder::new_multi_thread()
.global_queue_interval(1)
.build()
.unwrap();

// Perform a simple work.
let cnt = Arc::new(AtomicUsize::new(0));
rt.block_on(async {
let mut set = tokio::task::JoinSet::new();
for _ in 0..10 {
let cnt = cnt.clone();
set.spawn(async move { cnt.fetch_add(1, Ordering::Relaxed) });
}

while let Some(res) = set.join_next().await {
res.unwrap();
}
});
assert_eq!(cnt.load(Relaxed), 10);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
Expand Down
27 changes: 27 additions & 0 deletions tokio/tests/rt_threaded_alt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,33 @@ fn max_blocking_threads_set_to_zero() {
.unwrap();
}

/// Regression test for #6445.
///
/// After #6445, setting `global_queue_interval` to 1 is now technically valid.
/// This test confirms that there is no regression in `multi_thread_runtime`
/// when global_queue_interval is set to 1.
#[test]
fn global_queue_interval_set_to_one() {
let rt = tokio::runtime::Builder::new_multi_thread_alt()
.global_queue_interval(1)
.build()
.unwrap();

// Perform a simple work.
let cnt = Arc::new(AtomicUsize::new(0));
rt.block_on(async {
let mut set = tokio::task::JoinSet::new();
for _ in 0..10 {
let cnt = cnt.clone();
set.spawn(async move { cnt.fetch_add(1, Relaxed) });
}
while let Some(res) = set.join_next().await {
res.unwrap();
}
});
assert_eq!(cnt.load(Relaxed), 10);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hang_on_shutdown() {
let (sync_tx, sync_rx) = std::sync::mpsc::channel::<()>();
Expand Down
Loading