Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt(threaded): cap LIFO slot polls #5712

Merged
merged 11 commits into from
May 23, 2023
11 changes: 0 additions & 11 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,6 @@ cfg_rt_multi_thread! {
pub(crate) fn set(budget: Budget) {
let _ = context::budget(|cell| cell.set(budget));
}

/// Consume one unit of progress from the current task's budget.
pub(crate) fn consume_one() {
let _ = context::budget(|cell| {
let mut budget = cell.get();
if let Some(ref mut counter) = budget.0 {
*counter = counter.saturating_sub(1);
}
cell.set(budget);
});
}
}

cfg_rt! {
Expand Down
27 changes: 20 additions & 7 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
carllerche marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ struct Core {
/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo_slot: Option<Notified>,

/// The worker-local run queue.
Expand Down Expand Up @@ -191,6 +191,8 @@ type Notified = task::Notified<Arc<Handle>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

const MAX_LIFO_POLLS_PER_TICK: usize = 3;
carllerche marked this conversation as resolved.
Show resolved Hide resolved

pub(super) fn create(
size: usize,
park: Parker,
Expand Down Expand Up @@ -470,6 +472,7 @@ impl Context {
// Run the task
coop::budget(|| {
task.run();
let mut lifo_polls = 0;

// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
Expand All @@ -494,11 +497,21 @@ impl Context {
None => return Ok(core),
};

// Polling a task doesn't necessarily consume any budget, if it
// doesn't use any Tokio leaf futures. To prevent such tasks
// from using the lifo slot in an infinite loop, we consume an
// extra unit of budget between each iteration of the loop.
coop::consume_one();
// In ping-ping style workloads where task A notifies task B,
// which notifies task A again, continuously prioritizing the
// LIFO slot can cause starvation as these two tasks will
// repeatedly schedule the other. To mitigate this, we limit the
// number of times the LIFO slot is prioritized.
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.run_queue.push_back_or_overflow(
task,
self.worker.inject(),
&mut core.metrics,
);
return Ok(core);
}

lifo_polls += 1;

if coop::has_budget_remaining() {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
// Run the LIFO task, then loop
Expand Down