Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix asset_debug_server hang. There should be at most one ThreadExecut… #7825

Closed
10 changes: 8 additions & 2 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,17 @@ fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World
}

/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
#[derive(Resource, Default, Clone)]
#[derive(Resource, Clone)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);

impl Default for MainThreadExecutor {
fn default() -> Self {
Self::new()
}
}

impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
MainThreadExecutor(TaskPool::get_thread_executor())
}
}
5 changes: 5 additions & 0 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ impl TaskPoolBuilder {
pub struct TaskPool {}

impl TaskPool {
/// Just create a new `ThreadExecutor` for wasm
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Arc::new(ThreadExecutor::new())
}

/// Create a `TaskPool` with the default configuration.
pub fn new() -> Self {
TaskPoolBuilder::new().build()
Expand Down
31 changes: 23 additions & 8 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ pub struct TaskPool {
impl TaskPool {
thread_local! {
static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = async_executor::LocalExecutor::new();
static THREAD_EXECUTOR: ThreadExecutor<'static> = ThreadExecutor::new();
static THREAD_EXECUTOR: Arc<ThreadExecutor<'static>> = Arc::new(ThreadExecutor::new());
}

/// Each thread should only create one `ThreadExecutor`, otherwise, there are good chances they will deadlock
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
Self::THREAD_EXECUTOR.with(|executor| executor.clone())
}

/// Create a `TaskPool` with the default configuration.
Expand Down Expand Up @@ -376,24 +381,34 @@ impl TaskPool {
let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty();

// we get this from a thread local so we should always be on the scope executors thread.
// note: it is possible `scope_executor` and `external_executor` is the same executor,
// in that case, we should only tick one of them, otherwise, it may cause deadlock.
let scope_ticker = scope_executor.ticker().unwrap();
if let Some(external_ticker) = external_executor.ticker() {
if tick_task_pool_executor {
let external_ticker = if !external_executor.is_same(scope_executor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change. This is definitely easier to follow.

external_executor.ticker()
} else {
None
};

match (external_ticker, tick_task_pool_executor) {
(Some(external_ticker), true) => {
Self::execute_global_external_scope(
executor,
external_ticker,
scope_ticker,
get_results,
)
.await
} else {
}
(Some(external_ticker), false) => {
Self::execute_external_scope(external_ticker, scope_ticker, get_results)
.await
}
} else if tick_task_pool_executor {
Self::execute_global_scope(executor, scope_ticker, get_results).await
} else {
Self::execute_scope(scope_ticker, get_results).await
// either external_executor is none or it is same as scope_executor
(None, true) => {
Self::execute_global_scope(executor, scope_ticker, get_results).await
}
(None, false) => Self::execute_scope(scope_ticker, get_results).await,
}
})
}
Expand Down
13 changes: 9 additions & 4 deletions crates/bevy_tasks/src/thread_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,38 @@ impl<'task> ThreadExecutor<'task> {
pub fn ticker<'ticker>(&'ticker self) -> Option<ThreadExecutorTicker<'task, 'ticker>> {
if thread::current().id() == self.thread_id {
return Some(ThreadExecutorTicker {
executor: &self.executor,
executor: self,
_marker: PhantomData::default(),
});
}
None
}

/// Returns true if `self` and `other`'s executor is same
pub fn is_same(&self, other: &Self) -> bool {
std::ptr::eq(self, other)
}
}

/// Used to tick the [`ThreadExecutor`]. The executor does not
/// make progress unless it is manually ticked on the thread it was
/// created on.
#[derive(Debug)]
pub struct ThreadExecutorTicker<'task, 'ticker> {
executor: &'ticker Executor<'task>,
executor: &'ticker ThreadExecutor<'task>,
// make type not send or sync
_marker: PhantomData<*const ()>,
}
impl<'task, 'ticker> ThreadExecutorTicker<'task, 'ticker> {
/// Tick the thread executor.
pub async fn tick(&self) {
self.executor.tick().await;
self.executor.executor.tick().await;
}

/// Synchronously try to tick a task on the executor.
/// Returns false if if does not find a task to tick.
pub fn try_tick(&self) -> bool {
self.executor.try_tick()
self.executor.executor.try_tick()
}
}

Expand Down