Skip to content

Commit

Permalink
Stageless: close the finish channel so executor doesn't deadlock (#7448)
Browse files Browse the repository at this point in the history
# Objective

- Fix panic_when_hierachy_cycle test hanging
- The problem is that the scope only awaits one task at a time in get_results. In stageless this task is the multithreaded executor. That tasks hangs when a system panics and cannot make anymore progress. This wasn't a problem before because the executor was spawned after all the system tasks had been spawned. But in stageless the executor is spawned before all the system tasks are spawned.

## Solution

- We can catch unwind on each system and close the finish channel if one panics. This then causes the receiver end of the finish channel to panic too.
- this might have a small perf impact, but when running many_foxes it seems to be within the noise. So less than 40us.

## Other possible solutions

- It might be possible to fairly poll all the tasks in get_results in the scope. If we could do that then the scope could panic whenever one of tasks panics. It would require a data structure that we could both poll the futures through a shared ref and also push to it. I tried FuturesUnordered, but it requires an exclusive ref to poll it.
- The catch unwind could be moved onto when we create the tasks for scope instead. We would then need something like a oneshot async channel to inform get_results if a task panics.
  • Loading branch information
hymm committed Feb 3, 2023
1 parent e1d741a commit ff7d5ff
Showing 1 changed file with 46 additions and 21 deletions.
67 changes: 46 additions & 21 deletions crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::panic::AssertUnwindSafe;

use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::default;
use bevy_utils::syncunsafecell::SyncUnsafeCell;
Expand Down Expand Up @@ -175,11 +177,10 @@ impl SystemExecutor for MultiThreadedExecutor {

if self.num_running_systems > 0 {
// wait for systems to complete
let index = self
.receiver
.recv()
.await
.unwrap_or_else(|error| unreachable!("{}", error));
let index =
self.receiver.recv().await.expect(
"A system has panicked so the executor cannot continue.",
);

self.finish_system_and_signal_dependents(index);

Expand Down Expand Up @@ -429,14 +430,22 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
// SAFETY: access is compatible
unsafe { system.run_unsafe((), world) };
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand Down Expand Up @@ -479,13 +488,21 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
apply_system_buffers(&unapplied_systems, systems, world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
apply_system_buffers(&unapplied_systems, systems, world);
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand All @@ -495,13 +512,21 @@ impl MultiThreadedExecutor {
let task = async move {
#[cfg(feature = "trace")]
let system_guard = system_span.enter();
system.run((), world);
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
system.run((), world);
}));
#[cfg(feature = "trace")]
drop(system_guard);
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
if res.is_err() {
// close the channel to propagate the error to the
// multithreaded executor
sender.close();
} else {
sender
.send(system_index)
.await
.unwrap_or_else(|error| unreachable!("{}", error));
}
};

#[cfg(feature = "trace")]
Expand Down

0 comments on commit ff7d5ff

Please sign in to comment.