From 9b14b412a502dc0a20e3966d9d2e876de684b383 Mon Sep 17 00:00:00 2001 From: Cameron <51241057+maniwani@users.noreply.github.com> Date: Tue, 17 Jan 2023 01:39:17 +0000 Subject: [PATCH] Add `bevy_ecs::schedule_v3` module (#6587) # Objective Complete the first part of the migration detailed in bevyengine/rfcs#45. ## Solution Add all the new stuff. ### TODO - [x] Impl tuple methods. - [x] Impl chaining. - [x] Port ambiguity detection. - [x] Write docs. - [x] ~~Write more tests.~~(will do later) - [ ] Write changelog and examples here? - [x] ~~Replace `petgraph`.~~ (will do later) Co-authored-by: james7132 Co-authored-by: Michael Hsu Co-authored-by: Mike Hsu --- crates/bevy_ecs/macros/src/lib.rs | 30 +- crates/bevy_ecs/src/lib.rs | 1 + crates/bevy_ecs/src/schedule_v3/condition.rs | 97 ++ crates/bevy_ecs/src/schedule_v3/config.rs | 649 ++++++++++ .../bevy_ecs/src/schedule_v3/executor/mod.rs | 90 ++ .../schedule_v3/executor/multi_threaded.rs | 575 +++++++++ .../src/schedule_v3/executor/simple.rs | 111 ++ .../schedule_v3/executor/single_threaded.rs | 137 ++ .../bevy_ecs/src/schedule_v3/graph_utils.rs | 233 ++++ crates/bevy_ecs/src/schedule_v3/migration.rs | 38 + crates/bevy_ecs/src/schedule_v3/mod.rs | 471 +++++++ crates/bevy_ecs/src/schedule_v3/schedule.rs | 1099 +++++++++++++++++ crates/bevy_ecs/src/schedule_v3/set.rs | 149 +++ crates/bevy_ecs/src/schedule_v3/state.rs | 64 + .../src/system/exclusive_function_system.rs | 13 +- crates/bevy_ecs/src/system/function_system.rs | 13 +- crates/bevy_ecs/src/system/system.rs | 8 + crates/bevy_ecs/src/system/system_piping.rs | 18 +- crates/bevy_ecs/src/world/mod.rs | 25 +- crates/bevy_macro_utils/src/lib.rs | 64 + crates/bevy_time/src/fixed_timestep.rs | 4 + crates/bevy_utils/Cargo.toml | 2 + crates/bevy_utils/src/label.rs | 41 + crates/bevy_utils/src/lib.rs | 3 + crates/bevy_utils/src/syncunsafecell.rs | 122 ++ 25 files changed, 4050 insertions(+), 7 deletions(-) create mode 100644 crates/bevy_ecs/src/schedule_v3/condition.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/config.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/executor/mod.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/executor/simple.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/executor/single_threaded.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/graph_utils.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/migration.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/mod.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/schedule.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/set.rs create mode 100644 crates/bevy_ecs/src/schedule_v3/state.rs create mode 100644 crates/bevy_utils/src/syncunsafecell.rs diff --git a/crates/bevy_ecs/macros/src/lib.rs b/crates/bevy_ecs/macros/src/lib.rs index 3d8a10b3af68e..07d01a8c870ab 100644 --- a/crates/bevy_ecs/macros/src/lib.rs +++ b/crates/bevy_ecs/macros/src/lib.rs @@ -4,7 +4,9 @@ mod component; mod fetch; use crate::fetch::derive_world_query_impl; -use bevy_macro_utils::{derive_label, get_named_struct_fields, BevyManifest}; +use bevy_macro_utils::{ + derive_boxed_label, derive_label, derive_set, get_named_struct_fields, BevyManifest, +}; use proc_macro::TokenStream; use proc_macro2::Span; use quote::{format_ident, quote}; @@ -565,6 +567,32 @@ pub fn derive_run_criteria_label(input: TokenStream) -> TokenStream { derive_label(input, &trait_path, "run_criteria_label") } +/// Derive macro generating an impl of the trait `ScheduleLabel`. +#[proc_macro_derive(ScheduleLabel)] +pub fn derive_schedule_label(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let mut trait_path = bevy_ecs_path(); + trait_path + .segments + .push(format_ident!("schedule_v3").into()); + trait_path + .segments + .push(format_ident!("ScheduleLabel").into()); + derive_boxed_label(input, &trait_path) +} + +/// Derive macro generating an impl of the trait `SystemSet`. +#[proc_macro_derive(SystemSet)] +pub fn derive_system_set(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let mut trait_path = bevy_ecs_path(); + trait_path + .segments + .push(format_ident!("schedule_v3").into()); + trait_path.segments.push(format_ident!("SystemSet").into()); + derive_set(input, &trait_path) +} + pub(crate) fn bevy_ecs_path() -> syn::Path { BevyManifest::default().get_path("bevy_ecs") } diff --git a/crates/bevy_ecs/src/lib.rs b/crates/bevy_ecs/src/lib.rs index 47d93beab837d..b0a9f9f155a06 100644 --- a/crates/bevy_ecs/src/lib.rs +++ b/crates/bevy_ecs/src/lib.rs @@ -14,6 +14,7 @@ pub mod query; #[cfg(feature = "bevy_reflect")] pub mod reflect; pub mod schedule; +pub mod schedule_v3; pub mod storage; pub mod system; pub mod world; diff --git a/crates/bevy_ecs/src/schedule_v3/condition.rs b/crates/bevy_ecs/src/schedule_v3/condition.rs new file mode 100644 index 0000000000000..e617375f763a0 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/condition.rs @@ -0,0 +1,97 @@ +pub use common_conditions::*; + +use crate::system::BoxedSystem; + +pub type BoxedCondition = BoxedSystem<(), bool>; + +/// A system that determines if one or more scheduled systems should run. +/// +/// Implemented for functions and closures that convert into [`System`](crate::system::System) +/// with [read-only](crate::system::ReadOnlySystemParam) parameters. +pub trait Condition: sealed::Condition {} + +impl Condition for F where F: sealed::Condition {} + +mod sealed { + use crate::system::{IntoSystem, IsFunctionSystem, ReadOnlySystemParam, SystemParamFunction}; + + pub trait Condition: IntoSystem<(), bool, Params> {} + + impl Condition<(IsFunctionSystem, Params, Marker)> for F + where + F: SystemParamFunction<(), bool, Params, Marker> + Send + Sync + 'static, + Params: ReadOnlySystemParam + 'static, + Marker: 'static, + { + } +} + +mod common_conditions { + use crate::schedule_v3::{State, States}; + use crate::system::{Res, Resource}; + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the resource exists. + pub fn resource_exists() -> impl FnMut(Option>) -> bool + where + T: Resource, + { + move |res: Option>| res.is_some() + } + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the resource is equal to `value`. + /// + /// # Panics + /// + /// The condition will panic if the resource does not exist. + pub fn resource_equals(value: T) -> impl FnMut(Res) -> bool + where + T: Resource + PartialEq, + { + move |res: Res| *res == value + } + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the resource exists and is equal to `value`. + /// + /// The condition will return `false` if the resource does not exist. + pub fn resource_exists_and_equals(value: T) -> impl FnMut(Option>) -> bool + where + T: Resource + PartialEq, + { + move |res: Option>| match res { + Some(res) => *res == value, + None => false, + } + } + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the state machine exists. + pub fn state_exists() -> impl FnMut(Option>>) -> bool { + move |current_state: Option>>| current_state.is_some() + } + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the state machine is currently in `state`. + /// + /// # Panics + /// + /// The condition will panic if the resource does not exist. + pub fn state_equals(state: S) -> impl FnMut(Res>) -> bool { + move |current_state: Res>| current_state.0 == state + } + + /// Generates a [`Condition`](super::Condition)-satisfying closure that returns `true` + /// if the state machine exists and is currently in `state`. + /// + /// The condition will return `false` if the state does not exist. + pub fn state_exists_and_equals( + state: S, + ) -> impl FnMut(Option>>) -> bool { + move |current_state: Option>>| match current_state { + Some(current_state) => current_state.0 == state, + None => false, + } + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/config.rs b/crates/bevy_ecs/src/schedule_v3/config.rs new file mode 100644 index 0000000000000..d7a87c374c047 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/config.rs @@ -0,0 +1,649 @@ +use bevy_ecs_macros::all_tuples; +use bevy_utils::default; + +use crate::{ + schedule_v3::{ + condition::{BoxedCondition, Condition}, + graph_utils::{Ambiguity, Dependency, DependencyKind, GraphInfo}, + set::{BoxedSystemSet, IntoSystemSet, SystemSet}, + }, + system::{BoxedSystem, IntoSystem, System}, +}; + +/// A [`SystemSet`] with scheduling metadata. +pub struct SystemSetConfig { + pub(super) set: BoxedSystemSet, + pub(super) graph_info: GraphInfo, + pub(super) conditions: Vec, +} + +impl SystemSetConfig { + fn new(set: BoxedSystemSet) -> Self { + // system type sets are automatically populated + // to avoid unintentionally broad changes, they cannot be configured + assert!( + !set.is_system_type(), + "configuring system type sets is not allowed" + ); + + Self { + set, + graph_info: GraphInfo { + sets: Vec::new(), + dependencies: Vec::new(), + ambiguous_with: default(), + }, + conditions: Vec::new(), + } + } +} + +/// A [`System`] with scheduling metadata. +pub struct SystemConfig { + pub(super) system: BoxedSystem, + pub(super) graph_info: GraphInfo, + pub(super) conditions: Vec, +} + +impl SystemConfig { + fn new(system: BoxedSystem) -> Self { + // include system in its default sets + let sets = system.default_system_sets().into_iter().collect(); + Self { + system, + graph_info: GraphInfo { + sets, + dependencies: Vec::new(), + ambiguous_with: default(), + }, + conditions: Vec::new(), + } + } +} + +fn new_condition

(condition: impl Condition

) -> BoxedCondition { + let condition_system = IntoSystem::into_system(condition); + assert!( + condition_system.is_send(), + "Condition `{}` accesses thread-local resources. This is not currently supported.", + condition_system.name() + ); + + Box::new(condition_system) +} + +fn ambiguous_with(graph_info: &mut GraphInfo, set: BoxedSystemSet) { + match &mut graph_info.ambiguous_with { + detection @ Ambiguity::Check => { + *detection = Ambiguity::IgnoreWithSet(vec![set]); + } + Ambiguity::IgnoreWithSet(ambiguous_with) => { + ambiguous_with.push(set); + } + Ambiguity::IgnoreAll => (), + } +} + +/// Types that can be converted into a [`SystemSetConfig`]. +/// +/// This has been implemented for all types that implement [`SystemSet`] and boxed trait objects. +pub trait IntoSystemSetConfig: sealed::IntoSystemSetConfig { + /// Convert into a [`SystemSetConfig`]. + #[doc(hidden)] + fn into_config(self) -> SystemSetConfig; + /// Add to the provided `set`. + fn in_set(self, set: impl SystemSet) -> SystemSetConfig; + /// Run before all systems in `set`. + fn before(self, set: impl IntoSystemSet) -> SystemSetConfig; + /// Run after all systems in `set`. + fn after(self, set: impl IntoSystemSet) -> SystemSetConfig; + /// Run the systems in this set only if the [`Condition`] is `true`. + /// + /// The `Condition` will be evaluated at most once (per schedule run), + /// the first time a system in this set prepares to run. + fn run_if

(self, condition: impl Condition

) -> SystemSetConfig; + /// Suppress warnings and errors that would result from systems in this set having ambiguities + /// (conflicting access but indeterminate order) with systems in `set`. + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemSetConfig; + /// Suppress warnings and errors that would result from systems in this set having ambiguities + /// (conflicting access but indeterminate order) with any other system. + fn ambiguous_with_all(self) -> SystemSetConfig; +} + +impl IntoSystemSetConfig for S +where + S: SystemSet + sealed::IntoSystemSetConfig, +{ + fn into_config(self) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)) + } + + fn in_set(self, set: impl SystemSet) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).in_set(set) + } + + fn before(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).before(set) + } + + fn after(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).after(set) + } + + fn run_if

(self, condition: impl Condition

) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).run_if(condition) + } + + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).ambiguous_with(set) + } + + fn ambiguous_with_all(self) -> SystemSetConfig { + SystemSetConfig::new(Box::new(self)).ambiguous_with_all() + } +} + +impl IntoSystemSetConfig for BoxedSystemSet { + fn into_config(self) -> SystemSetConfig { + SystemSetConfig::new(self) + } + + fn in_set(self, set: impl SystemSet) -> SystemSetConfig { + SystemSetConfig::new(self).in_set(set) + } + + fn before(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(self).before(set) + } + + fn after(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(self).after(set) + } + + fn run_if

(self, condition: impl Condition

) -> SystemSetConfig { + SystemSetConfig::new(self).run_if(condition) + } + + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemSetConfig { + SystemSetConfig::new(self).ambiguous_with(set) + } + + fn ambiguous_with_all(self) -> SystemSetConfig { + SystemSetConfig::new(self).ambiguous_with_all() + } +} + +impl IntoSystemSetConfig for SystemSetConfig { + fn into_config(self) -> Self { + self + } + + fn in_set(mut self, set: impl SystemSet) -> Self { + assert!( + !set.is_system_type(), + "adding arbitrary systems to a system type set is not allowed" + ); + self.graph_info.sets.push(Box::new(set)); + self + } + + fn before(mut self, set: impl IntoSystemSet) -> Self { + self.graph_info.dependencies.push(Dependency::new( + DependencyKind::Before, + Box::new(set.into_system_set()), + )); + self + } + + fn after(mut self, set: impl IntoSystemSet) -> Self { + self.graph_info.dependencies.push(Dependency::new( + DependencyKind::After, + Box::new(set.into_system_set()), + )); + self + } + + fn run_if

(mut self, condition: impl Condition

) -> Self { + self.conditions.push(new_condition(condition)); + self + } + + fn ambiguous_with(mut self, set: impl IntoSystemSet) -> Self { + ambiguous_with(&mut self.graph_info, Box::new(set.into_system_set())); + self + } + + fn ambiguous_with_all(mut self) -> Self { + self.graph_info.ambiguous_with = Ambiguity::IgnoreAll; + self + } +} + +/// Types that can be converted into a [`SystemConfig`]. +/// +/// This has been implemented for boxed [`System`](crate::system::System) +/// trait objects and all functions that turn into such. +pub trait IntoSystemConfig: sealed::IntoSystemConfig { + /// Convert into a [`SystemConfig`]. + #[doc(hidden)] + fn into_config(self) -> SystemConfig; + /// Add to `set` membership. + fn in_set(self, set: impl SystemSet) -> SystemConfig; + /// Run before all systems in `set`. + fn before(self, set: impl IntoSystemSet) -> SystemConfig; + /// Run after all systems in `set`. + fn after(self, set: impl IntoSystemSet) -> SystemConfig; + /// Run only if the [`Condition`] is `true`. + /// + /// The `Condition` will be evaluated at most once (per schedule run), + /// when the system prepares to run. + fn run_if

(self, condition: impl Condition

) -> SystemConfig; + /// Suppress warnings and errors that would result from this system having ambiguities + /// (conflicting access but indeterminate order) with systems in `set`. + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemConfig; + /// Suppress warnings and errors that would result from this system having ambiguities + /// (conflicting access but indeterminate order) with any other system. + fn ambiguous_with_all(self) -> SystemConfig; +} + +impl IntoSystemConfig for F +where + F: IntoSystem<(), (), Params> + sealed::IntoSystemConfig, +{ + fn into_config(self) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))) + } + + fn in_set(self, set: impl SystemSet) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).in_set(set) + } + + fn before(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).before(set) + } + + fn after(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).after(set) + } + + fn run_if

(self, condition: impl Condition

) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).run_if(condition) + } + + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).ambiguous_with(set) + } + + fn ambiguous_with_all(self) -> SystemConfig { + SystemConfig::new(Box::new(IntoSystem::into_system(self))).ambiguous_with_all() + } +} + +impl IntoSystemConfig<()> for BoxedSystem<(), ()> { + fn into_config(self) -> SystemConfig { + SystemConfig::new(self) + } + + fn in_set(self, set: impl SystemSet) -> SystemConfig { + SystemConfig::new(self).in_set(set) + } + + fn before(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(self).before(set) + } + + fn after(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(self).after(set) + } + + fn run_if

(self, condition: impl Condition

) -> SystemConfig { + SystemConfig::new(self).run_if(condition) + } + + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemConfig { + SystemConfig::new(self).ambiguous_with(set) + } + + fn ambiguous_with_all(self) -> SystemConfig { + SystemConfig::new(self).ambiguous_with_all() + } +} + +impl IntoSystemConfig<()> for SystemConfig { + fn into_config(self) -> Self { + self + } + + fn in_set(mut self, set: impl SystemSet) -> Self { + assert!( + !set.is_system_type(), + "adding arbitrary systems to a system type set is not allowed" + ); + self.graph_info.sets.push(Box::new(set)); + self + } + + fn before(mut self, set: impl IntoSystemSet) -> Self { + self.graph_info.dependencies.push(Dependency::new( + DependencyKind::Before, + Box::new(set.into_system_set()), + )); + self + } + + fn after(mut self, set: impl IntoSystemSet) -> Self { + self.graph_info.dependencies.push(Dependency::new( + DependencyKind::After, + Box::new(set.into_system_set()), + )); + self + } + + fn run_if

(mut self, condition: impl Condition

) -> Self { + self.conditions.push(new_condition(condition)); + self + } + + fn ambiguous_with(mut self, set: impl IntoSystemSet) -> Self { + ambiguous_with(&mut self.graph_info, Box::new(set.into_system_set())); + self + } + + fn ambiguous_with_all(mut self) -> Self { + self.graph_info.ambiguous_with = Ambiguity::IgnoreAll; + self + } +} + +// only `System` system objects can be scheduled +mod sealed { + use crate::{ + schedule_v3::{BoxedSystemSet, SystemSet}, + system::{BoxedSystem, IntoSystem}, + }; + + use super::{SystemConfig, SystemSetConfig}; + + pub trait IntoSystemConfig {} + + impl> IntoSystemConfig for F {} + + impl IntoSystemConfig<()> for BoxedSystem<(), ()> {} + + impl IntoSystemConfig<()> for SystemConfig {} + + pub trait IntoSystemSetConfig {} + + impl IntoSystemSetConfig for S {} + + impl IntoSystemSetConfig for BoxedSystemSet {} + + impl IntoSystemSetConfig for SystemSetConfig {} +} + +/// A collection of [`SystemConfig`]. +pub struct SystemConfigs { + pub(super) systems: Vec, + /// If `true`, adds `before -> after` ordering constraints between the successive elements. + pub(super) chained: bool, +} + +/// Types that can convert into a [`SystemConfigs`]. +pub trait IntoSystemConfigs +where + Self: Sized, +{ + /// Convert into a [`SystemConfigs`]. + #[doc(hidden)] + fn into_configs(self) -> SystemConfigs; + + /// Add these systems to the provided `set`. + fn in_set(self, set: impl SystemSet) -> SystemConfigs { + self.into_configs().in_set(set) + } + + /// Run before all systems in `set`. + fn before(self, set: impl IntoSystemSet) -> SystemConfigs { + self.into_configs().before(set) + } + + /// Run after all systems in `set`. + fn after(self, set: impl IntoSystemSet) -> SystemConfigs { + self.into_configs().after(set) + } + + /// Suppress warnings and errors that would result from these systems having ambiguities + /// (conflicting access but indeterminate order) with systems in `set`. + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemConfigs { + self.into_configs().ambiguous_with(set) + } + + /// Suppress warnings and errors that would result from these systems having ambiguities + /// (conflicting access but indeterminate order) with any other system. + fn ambiguous_with_all(self) -> SystemConfigs { + self.into_configs().ambiguous_with_all() + } + + /// Treat this collection as a sequence of systems. + /// + /// Ordering constraints will be applied between the successive elements. + fn chain(self) -> SystemConfigs { + self.into_configs().chain() + } +} + +impl IntoSystemConfigs<()> for SystemConfigs { + fn into_configs(self) -> Self { + self + } + + fn in_set(mut self, set: impl SystemSet) -> Self { + assert!( + !set.is_system_type(), + "adding arbitrary systems to a system type set is not allowed" + ); + for config in &mut self.systems { + config.graph_info.sets.push(set.dyn_clone()); + } + + self + } + + fn before(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.systems { + config + .graph_info + .dependencies + .push(Dependency::new(DependencyKind::Before, set.dyn_clone())); + } + + self + } + + fn after(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.systems { + config + .graph_info + .dependencies + .push(Dependency::new(DependencyKind::After, set.dyn_clone())); + } + + self + } + + fn ambiguous_with(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.systems { + ambiguous_with(&mut config.graph_info, set.dyn_clone()); + } + + self + } + + fn ambiguous_with_all(mut self) -> Self { + for config in &mut self.systems { + config.graph_info.ambiguous_with = Ambiguity::IgnoreAll; + } + + self + } + + fn chain(mut self) -> Self { + self.chained = true; + self + } +} + +/// A collection of [`SystemSetConfig`]. +pub struct SystemSetConfigs { + pub(super) sets: Vec, + /// If `true`, adds `before -> after` ordering constraints between the successive elements. + pub(super) chained: bool, +} + +/// Types that can convert into a [`SystemSetConfigs`]. +pub trait IntoSystemSetConfigs +where + Self: Sized, +{ + /// Convert into a [`SystemSetConfigs`]. + #[doc(hidden)] + fn into_configs(self) -> SystemSetConfigs; + + /// Add these system sets to the provided `set`. + fn in_set(self, set: impl SystemSet) -> SystemSetConfigs { + self.into_configs().in_set(set) + } + + /// Run before all systems in `set`. + fn before(self, set: impl IntoSystemSet) -> SystemSetConfigs { + self.into_configs().before(set) + } + + /// Run after all systems in `set`. + fn after(self, set: impl IntoSystemSet) -> SystemSetConfigs { + self.into_configs().after(set) + } + + /// Suppress warnings and errors that would result from systems in these sets having ambiguities + /// (conflicting access but indeterminate order) with systems in `set`. + fn ambiguous_with(self, set: impl IntoSystemSet) -> SystemSetConfigs { + self.into_configs().ambiguous_with(set) + } + + /// Suppress warnings and errors that would result from systems in these sets having ambiguities + /// (conflicting access but indeterminate order) with any other system. + fn ambiguous_with_all(self) -> SystemSetConfigs { + self.into_configs().ambiguous_with_all() + } + + /// Treat this collection as a sequence of system sets. + /// + /// Ordering constraints will be applied between the successive elements. + fn chain(self) -> SystemSetConfigs { + self.into_configs().chain() + } +} + +impl IntoSystemSetConfigs for SystemSetConfigs { + fn into_configs(self) -> Self { + self + } + + fn in_set(mut self, set: impl SystemSet) -> Self { + assert!( + !set.is_system_type(), + "adding arbitrary systems to a system type set is not allowed" + ); + for config in &mut self.sets { + config.graph_info.sets.push(set.dyn_clone()); + } + + self + } + + fn before(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.sets { + config + .graph_info + .dependencies + .push(Dependency::new(DependencyKind::Before, set.dyn_clone())); + } + + self + } + + fn after(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.sets { + config + .graph_info + .dependencies + .push(Dependency::new(DependencyKind::After, set.dyn_clone())); + } + + self + } + + fn ambiguous_with(mut self, set: impl IntoSystemSet) -> Self { + let set = set.into_system_set(); + for config in &mut self.sets { + ambiguous_with(&mut config.graph_info, set.dyn_clone()); + } + + self + } + + fn ambiguous_with_all(mut self) -> Self { + for config in &mut self.sets { + config.graph_info.ambiguous_with = Ambiguity::IgnoreAll; + } + + self + } + + fn chain(mut self) -> Self { + self.chained = true; + self + } +} + +macro_rules! impl_system_collection { + ($(($param: ident, $sys: ident)),*) => { + impl<$($param, $sys),*> IntoSystemConfigs<($($param,)*)> for ($($sys,)*) + where + $($sys: IntoSystemConfig<$param>),* + { + #[allow(non_snake_case)] + fn into_configs(self) -> SystemConfigs { + let ($($sys,)*) = self; + SystemConfigs { + systems: vec![$($sys.into_config(),)*], + chained: false, + } + } + } + } +} + +macro_rules! impl_system_set_collection { + ($($set: ident),*) => { + impl<$($set: IntoSystemSetConfig),*> IntoSystemSetConfigs for ($($set,)*) + { + #[allow(non_snake_case)] + fn into_configs(self) -> SystemSetConfigs { + let ($($set,)*) = self; + SystemSetConfigs { + sets: vec![$($set.into_config(),)*], + chained: false, + } + } + } + } +} + +all_tuples!(impl_system_collection, 0, 15, P, S); +all_tuples!(impl_system_set_collection, 0, 15, S); diff --git a/crates/bevy_ecs/src/schedule_v3/executor/mod.rs b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs new file mode 100644 index 0000000000000..bfc1eef14d609 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/executor/mod.rs @@ -0,0 +1,90 @@ +mod multi_threaded; +mod simple; +mod single_threaded; + +pub use self::multi_threaded::MultiThreadedExecutor; +pub use self::simple::SimpleExecutor; +pub use self::single_threaded::SingleThreadedExecutor; + +use fixedbitset::FixedBitSet; + +use crate::{ + schedule_v3::{BoxedCondition, NodeId}, + system::BoxedSystem, + world::World, +}; + +/// Types that can run a [`SystemSchedule`] on a [`World`]. +pub(super) trait SystemExecutor: Send + Sync { + fn kind(&self) -> ExecutorKind; + fn init(&mut self, schedule: &SystemSchedule); + fn run(&mut self, schedule: &mut SystemSchedule, world: &mut World); +} + +/// Specifies how a [`Schedule`](super::Schedule) will be run. +/// +/// [`MultiThreaded`](ExecutorKind::MultiThreaded) is the default. +#[derive(PartialEq, Eq, Default)] +pub enum ExecutorKind { + /// Runs the schedule using a single thread. + /// + /// Useful if you're dealing with a single-threaded environment, saving your threads for + /// other things, or just trying minimize overhead. + SingleThreaded, + /// Like [`SingleThreaded`](ExecutorKind::SingleThreaded) but calls [`apply_buffers`](crate::system::System::apply_buffers) + /// immediately after running each system. + Simple, + /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. + #[default] + MultiThreaded, +} + +/// Holds systems and conditions of a [`Schedule`](super::Schedule) sorted in topological order +/// (along with dependency information for multi-threaded execution). +/// +/// Since the arrays are sorted in the same order, elements are referenced by their index. +/// `FixedBitSet` is used as a smaller, more efficient substitute of `HashSet`. +#[derive(Default)] +pub(super) struct SystemSchedule { + pub(super) systems: Vec, + pub(super) system_conditions: Vec>, + pub(super) set_conditions: Vec>, + pub(super) system_ids: Vec, + pub(super) set_ids: Vec, + pub(super) system_dependencies: Vec, + pub(super) system_dependents: Vec>, + pub(super) sets_of_systems: Vec, + pub(super) systems_in_sets: Vec, +} + +impl SystemSchedule { + pub const fn new() -> Self { + Self { + systems: Vec::new(), + system_conditions: Vec::new(), + set_conditions: Vec::new(), + system_ids: Vec::new(), + set_ids: Vec::new(), + system_dependencies: Vec::new(), + system_dependents: Vec::new(), + sets_of_systems: Vec::new(), + systems_in_sets: Vec::new(), + } + } +} + +/// Instructs the executor to call [`apply_buffers`](crate::system::System::apply_buffers) +/// on the systems that have run but not applied their buffers. +/// +/// **Notes** +/// - This function (currently) does nothing if it's called manually or wrapped inside a [`PipeSystem`](crate::system::PipeSystem). +/// - Modifying a [`Schedule`](super::Schedule) may change the order buffers are applied. +#[allow(unused_variables)] +pub fn apply_system_buffers(world: &mut World) {} + +/// Returns `true` if the [`System`](crate::system::System) is an instance of [`apply_system_buffers`]. +pub(super) fn is_apply_system_buffers(system: &BoxedSystem) -> bool { + use std::any::Any; + // deref to use `System::type_id` instead of `Any::type_id` + system.as_ref().type_id() == apply_system_buffers.type_id() +} diff --git a/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs new file mode 100644 index 0000000000000..3debba3ac59b4 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/executor/multi_threaded.rs @@ -0,0 +1,575 @@ +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_utils::default; +use bevy_utils::syncunsafecell::SyncUnsafeCell; +#[cfg(feature = "trace")] +use bevy_utils::tracing::{info_span, Instrument}; + +use async_channel::{Receiver, Sender}; +use fixedbitset::FixedBitSet; + +use crate::{ + archetype::ArchetypeComponentId, + query::Access, + schedule_v3::{ + is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule, + }, + system::BoxedSystem, + world::World, +}; + +/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. +struct SyncUnsafeSchedule<'a> { + systems: &'a [SyncUnsafeCell], + conditions: Conditions<'a>, +} + +struct Conditions<'a> { + system_conditions: &'a mut [Vec], + set_conditions: &'a mut [Vec], + sets_of_systems: &'a [FixedBitSet], + systems_in_sets: &'a [FixedBitSet], +} + +impl SyncUnsafeSchedule<'_> { + fn new(schedule: &mut SystemSchedule) -> SyncUnsafeSchedule<'_> { + SyncUnsafeSchedule { + systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), + conditions: Conditions { + system_conditions: &mut schedule.system_conditions, + set_conditions: &mut schedule.set_conditions, + sets_of_systems: &schedule.sets_of_systems, + systems_in_sets: &schedule.systems_in_sets, + }, + } + } +} + +/// Per-system data used by the [`MultiThreadedExecutor`]. +// Copied here because it can't be read from the system when it's running. +struct SystemTaskMetadata { + /// The `ArchetypeComponentId` access of the system. + archetype_component_access: Access, + /// Indices of the systems that directly depend on the system. + dependents: Vec, + /// Is `true` if the system does not access `!Send` data. + is_send: bool, + /// Is `true` if the system is exclusive. + is_exclusive: bool, +} + +/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. +pub struct MultiThreadedExecutor { + /// Sends system completion events. + sender: Sender, + /// Receives system completion events. + receiver: Receiver, + /// Metadata for scheduling and running system tasks. + system_task_metadata: Vec, + /// Union of the accesses of all currently running systems. + active_access: Access, + /// Returns `true` if a system with non-`Send` access is running. + local_thread_running: bool, + /// Returns `true` if an exclusive system is running. + exclusive_running: bool, + /// The number of systems that are running. + num_running_systems: usize, + /// The number of systems that have completed. + num_completed_systems: usize, + /// The number of dependencies each system has that have not completed. + num_dependencies_remaining: Vec, + /// System sets whose conditions have been evaluated. + evaluated_sets: FixedBitSet, + /// Systems that have no remaining dependencies and are waiting to run. + ready_systems: FixedBitSet, + /// copy of `ready_systems` + ready_systems_copy: FixedBitSet, + /// Systems that are running. + running_systems: FixedBitSet, + /// Systems that got skipped. + skipped_systems: FixedBitSet, + /// Systems whose conditions have been evaluated and were run or skipped. + completed_systems: FixedBitSet, + /// Systems that have run but have not had their buffers applied. + unapplied_systems: FixedBitSet, +} + +impl Default for MultiThreadedExecutor { + fn default() -> Self { + Self::new() + } +} + +impl SystemExecutor for MultiThreadedExecutor { + fn kind(&self) -> ExecutorKind { + ExecutorKind::MultiThreaded + } + + fn init(&mut self, schedule: &SystemSchedule) { + // pre-allocate space + let sys_count = schedule.system_ids.len(); + let set_count = schedule.set_ids.len(); + + self.evaluated_sets = FixedBitSet::with_capacity(set_count); + self.ready_systems = FixedBitSet::with_capacity(sys_count); + self.ready_systems_copy = FixedBitSet::with_capacity(sys_count); + self.running_systems = FixedBitSet::with_capacity(sys_count); + self.completed_systems = FixedBitSet::with_capacity(sys_count); + self.skipped_systems = FixedBitSet::with_capacity(sys_count); + self.unapplied_systems = FixedBitSet::with_capacity(sys_count); + + self.system_task_metadata = Vec::with_capacity(sys_count); + for index in 0..sys_count { + self.system_task_metadata.push(SystemTaskMetadata { + archetype_component_access: default(), + dependents: schedule.system_dependents[index].clone(), + is_send: schedule.systems[index].is_send(), + is_exclusive: schedule.systems[index].is_exclusive(), + }); + } + + self.num_dependencies_remaining = Vec::with_capacity(sys_count); + } + + fn run(&mut self, schedule: &mut SystemSchedule, world: &mut World) { + // reset counts + let num_systems = schedule.systems.len(); + self.num_running_systems = 0; + self.num_completed_systems = 0; + self.num_dependencies_remaining.clear(); + self.num_dependencies_remaining + .extend_from_slice(&schedule.system_dependencies); + + for (system_index, dependencies) in self.num_dependencies_remaining.iter_mut().enumerate() { + if *dependencies == 0 { + self.ready_systems.insert(system_index); + } + } + + let world = SyncUnsafeCell::from_mut(world); + let SyncUnsafeSchedule { + systems, + mut conditions, + } = SyncUnsafeSchedule::new(schedule); + + ComputeTaskPool::init(TaskPool::default).scope(|scope| { + // the executor itself is a `Send` future so that it can run + // alongside systems that claim the local thread + let executor = async { + while self.num_completed_systems < num_systems { + // SAFETY: self.ready_systems does not contain running systems + unsafe { + self.spawn_system_tasks(scope, systems, &mut conditions, world); + } + + if self.num_running_systems > 0 { + // wait for systems to complete + let index = self + .receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + + self.finish_system_and_signal_dependents(index); + + while let Ok(index) = self.receiver.try_recv() { + self.finish_system_and_signal_dependents(index); + } + + self.rebuild_active_access(); + } + } + + // SAFETY: all systems have completed + let world = unsafe { &mut *world.get() }; + apply_system_buffers(&mut self.unapplied_systems, systems, world); + + debug_assert!(self.ready_systems.is_clear()); + debug_assert!(self.running_systems.is_clear()); + debug_assert!(self.unapplied_systems.is_clear()); + self.active_access.clear(); + self.evaluated_sets.clear(); + self.skipped_systems.clear(); + self.completed_systems.clear(); + }; + + #[cfg(feature = "trace")] + let executor_span = info_span!("schedule_task"); + #[cfg(feature = "trace")] + let executor = executor.instrument(executor_span); + scope.spawn(executor); + }); + } +} + +impl MultiThreadedExecutor { + pub fn new() -> Self { + let (sender, receiver) = async_channel::unbounded(); + Self { + sender, + receiver, + system_task_metadata: Vec::new(), + num_running_systems: 0, + num_completed_systems: 0, + num_dependencies_remaining: Vec::new(), + active_access: default(), + local_thread_running: false, + exclusive_running: false, + evaluated_sets: FixedBitSet::new(), + ready_systems: FixedBitSet::new(), + ready_systems_copy: FixedBitSet::new(), + running_systems: FixedBitSet::new(), + skipped_systems: FixedBitSet::new(), + completed_systems: FixedBitSet::new(), + unapplied_systems: FixedBitSet::new(), + } + } + + /// # Safety + /// Caller must ensure that `self.ready_systems` does not contain any systems that + /// have been mutably borrowed (such as the systems currently running). + unsafe fn spawn_system_tasks<'scope>( + &mut self, + scope: &Scope<'_, 'scope, ()>, + systems: &'scope [SyncUnsafeCell], + conditions: &mut Conditions, + cell: &'scope SyncUnsafeCell, + ) { + if self.exclusive_running { + return; + } + + // can't borrow since loop mutably borrows `self` + let mut ready_systems = std::mem::take(&mut self.ready_systems_copy); + ready_systems.clear(); + ready_systems.union_with(&self.ready_systems); + + for system_index in ready_systems.ones() { + assert!(!self.running_systems.contains(system_index)); + // SAFETY: Caller assured that these systems are not running. + // Therefore, no other reference to this system exists and there is no aliasing. + let system = unsafe { &mut *systems[system_index].get() }; + + // SAFETY: No exclusive system is running. + // Therefore, there is no existing mutable reference to the world. + let world = unsafe { &*cell.get() }; + if !self.can_run(system_index, system, conditions, world) { + // NOTE: exclusive systems with ambiguities are susceptible to + // being significantly displaced here (compared to single-threaded order) + // if systems after them in topological order can run + // if that becomes an issue, `break;` if exclusive system + continue; + } + + self.ready_systems.set(system_index, false); + + if !self.should_run(system_index, system, conditions, world) { + self.skip_system_and_signal_dependents(system_index); + continue; + } + + self.running_systems.insert(system_index); + self.num_running_systems += 1; + + if self.system_task_metadata[system_index].is_exclusive { + // SAFETY: `can_run` confirmed that no systems are running. + // Therefore, there is no existing reference to the world. + unsafe { + let world = &mut *cell.get(); + self.spawn_exclusive_system_task(scope, system_index, systems, world); + } + break; + } + + // SAFETY: No other reference to this system exists. + unsafe { + self.spawn_system_task(scope, system_index, systems, world); + } + } + + // give back + self.ready_systems_copy = ready_systems; + } + + fn can_run( + &mut self, + system_index: usize, + system: &mut BoxedSystem, + conditions: &mut Conditions, + world: &World, + ) -> bool { + #[cfg(feature = "trace")] + let _span = info_span!("check_access", name = &*system.name()).entered(); + + let system_meta = &self.system_task_metadata[system_index]; + if system_meta.is_exclusive && self.num_running_systems > 0 { + return false; + } + + if !system_meta.is_send && self.local_thread_running { + return false; + } + + // TODO: an earlier out if world's archetypes did not change + for set_idx in conditions.sets_of_systems[system_index].difference(&self.evaluated_sets) { + for condition in &mut conditions.set_conditions[set_idx] { + condition.update_archetype_component_access(world); + if !condition + .archetype_component_access() + .is_compatible(&self.active_access) + { + return false; + } + } + } + + for condition in &mut conditions.system_conditions[system_index] { + condition.update_archetype_component_access(world); + if !condition + .archetype_component_access() + .is_compatible(&self.active_access) + { + return false; + } + } + + if !self.skipped_systems.contains(system_index) { + system.update_archetype_component_access(world); + if !system + .archetype_component_access() + .is_compatible(&self.active_access) + { + return false; + } + + // PERF: use an optimized clear() + extend() operation + let meta_access = + &mut self.system_task_metadata[system_index].archetype_component_access; + meta_access.clear(); + meta_access.extend(system.archetype_component_access()); + } + + true + } + + fn should_run( + &mut self, + system_index: usize, + _system: &BoxedSystem, + conditions: &mut Conditions, + world: &World, + ) -> bool { + #[cfg(feature = "trace")] + let _span = info_span!("check_conditions", name = &*_system.name()).entered(); + + let mut should_run = !self.skipped_systems.contains(system_index); + for set_idx in conditions.sets_of_systems[system_index].ones() { + if self.evaluated_sets.contains(set_idx) { + continue; + } + + // evaluate system set's conditions + let set_conditions_met = + evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world); + + if !set_conditions_met { + self.skipped_systems + .union_with(&conditions.systems_in_sets[set_idx]); + } + + should_run &= set_conditions_met; + self.evaluated_sets.insert(set_idx); + } + + // evaluate system's conditions + let system_conditions_met = + evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world); + + if !system_conditions_met { + self.skipped_systems.insert(system_index); + } + + should_run &= system_conditions_met; + + should_run + } + + /// # Safety + /// Caller must not alias systems that are running. + unsafe fn spawn_system_task<'scope>( + &mut self, + scope: &Scope<'_, 'scope, ()>, + system_index: usize, + systems: &'scope [SyncUnsafeCell], + world: &'scope World, + ) { + // SAFETY: this system is not running, no other reference exists + let system = unsafe { &mut *systems[system_index].get() }; + + #[cfg(feature = "trace")] + let task_span = info_span!("system_task", name = &*system.name()); + #[cfg(feature = "trace")] + let system_span = info_span!("system", name = &*system.name()); + + let sender = self.sender.clone(); + let task = async move { + #[cfg(feature = "trace")] + let system_guard = system_span.enter(); + // SAFETY: access is compatible + unsafe { system.run_unsafe((), world) }; + #[cfg(feature = "trace")] + drop(system_guard); + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(task_span); + + let system_meta = &self.system_task_metadata[system_index]; + self.active_access + .extend(&system_meta.archetype_component_access); + + if system_meta.is_send { + scope.spawn(task); + } else { + self.local_thread_running = true; + scope.spawn_on_scope(task); + } + } + + /// # Safety + /// Caller must ensure no systems are currently borrowed. + unsafe fn spawn_exclusive_system_task<'scope>( + &mut self, + scope: &Scope<'_, 'scope, ()>, + system_index: usize, + systems: &'scope [SyncUnsafeCell], + world: &'scope mut World, + ) { + // SAFETY: this system is not running, no other reference exists + let system = unsafe { &mut *systems[system_index].get() }; + + #[cfg(feature = "trace")] + let task_span = info_span!("system_task", name = &*system.name()); + #[cfg(feature = "trace")] + let system_span = info_span!("system", name = &*system.name()); + + let sender = self.sender.clone(); + if is_apply_system_buffers(system) { + // TODO: avoid allocation + let mut unapplied_systems = self.unapplied_systems.clone(); + let task = async move { + #[cfg(feature = "trace")] + let system_guard = system_span.enter(); + apply_system_buffers(&mut unapplied_systems, systems, world); + #[cfg(feature = "trace")] + drop(system_guard); + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(task_span); + scope.spawn_on_scope(task); + } else { + let task = async move { + #[cfg(feature = "trace")] + let system_guard = system_span.enter(); + system.run((), world); + #[cfg(feature = "trace")] + drop(system_guard); + sender + .send(system_index) + .await + .unwrap_or_else(|error| unreachable!("{}", error)); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(task_span); + scope.spawn_on_scope(task); + } + + self.exclusive_running = true; + self.local_thread_running = true; + } + + fn finish_system_and_signal_dependents(&mut self, system_index: usize) { + if self.system_task_metadata[system_index].is_exclusive { + self.exclusive_running = false; + } + + if !self.system_task_metadata[system_index].is_send { + self.local_thread_running = false; + } + + debug_assert!(self.num_running_systems >= 1); + self.num_running_systems -= 1; + self.num_completed_systems += 1; + self.running_systems.set(system_index, false); + self.completed_systems.insert(system_index); + self.unapplied_systems.insert(system_index); + self.signal_dependents(system_index); + } + + fn skip_system_and_signal_dependents(&mut self, system_index: usize) { + self.num_completed_systems += 1; + self.completed_systems.insert(system_index); + self.signal_dependents(system_index); + } + + fn signal_dependents(&mut self, system_index: usize) { + #[cfg(feature = "trace")] + let _span = info_span!("signal_dependents").entered(); + for &dep_idx in &self.system_task_metadata[system_index].dependents { + let remaining = &mut self.num_dependencies_remaining[dep_idx]; + debug_assert!(*remaining >= 1); + *remaining -= 1; + if *remaining == 0 && !self.completed_systems.contains(dep_idx) { + self.ready_systems.insert(dep_idx); + } + } + } + + fn rebuild_active_access(&mut self) { + self.active_access.clear(); + for index in self.running_systems.ones() { + let system_meta = &self.system_task_metadata[index]; + self.active_access + .extend(&system_meta.archetype_component_access); + } + } +} + +fn apply_system_buffers( + unapplied_systems: &mut FixedBitSet, + systems: &[SyncUnsafeCell], + world: &mut World, +) { + for system_index in unapplied_systems.ones() { + // SAFETY: none of these systems are running, no other references exist + let system = unsafe { &mut *systems[system_index].get() }; + #[cfg(feature = "trace")] + let _apply_buffers_span = info_span!("apply_buffers", name = &*system.name()).entered(); + system.apply_buffers(world); + } + + unapplied_systems.clear(); +} + +fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &World) -> bool { + // not short-circuiting is intentional + #[allow(clippy::unnecessary_fold)] + conditions + .iter_mut() + .map(|condition| { + #[cfg(feature = "trace")] + let _condition_span = info_span!("condition", name = &*condition.name()).entered(); + // SAFETY: caller ensures system access is compatible + unsafe { condition.run_unsafe((), world) } + }) + .fold(true, |acc, res| acc && res) +} diff --git a/crates/bevy_ecs/src/schedule_v3/executor/simple.rs b/crates/bevy_ecs/src/schedule_v3/executor/simple.rs new file mode 100644 index 0000000000000..1d45aa29129b3 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/executor/simple.rs @@ -0,0 +1,111 @@ +#[cfg(feature = "trace")] +use bevy_utils::tracing::info_span; +use fixedbitset::FixedBitSet; + +use crate::{ + schedule_v3::{BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule}, + world::World, +}; + +/// A variant of [`SingleThreadedExecutor`](crate::schedule_v3::SingleThreadedExecutor) that calls +/// [`apply_buffers`](crate::system::System::apply_buffers) immediately after running each system. +#[derive(Default)] +pub struct SimpleExecutor { + /// Systems sets whose conditions have been evaluated. + evaluated_sets: FixedBitSet, + /// Systems that have run or been skipped. + completed_systems: FixedBitSet, +} + +impl SystemExecutor for SimpleExecutor { + fn kind(&self) -> ExecutorKind { + ExecutorKind::Simple + } + + fn init(&mut self, schedule: &SystemSchedule) { + let sys_count = schedule.system_ids.len(); + let set_count = schedule.set_ids.len(); + self.evaluated_sets = FixedBitSet::with_capacity(set_count); + self.completed_systems = FixedBitSet::with_capacity(sys_count); + } + + fn run(&mut self, schedule: &mut SystemSchedule, world: &mut World) { + for system_index in 0..schedule.systems.len() { + #[cfg(feature = "trace")] + let name = schedule.systems[system_index].name(); + #[cfg(feature = "trace")] + let should_run_span = info_span!("check_conditions", name = &*name).entered(); + + let mut should_run = !self.completed_systems.contains(system_index); + for set_idx in schedule.sets_of_systems[system_index].ones() { + if self.evaluated_sets.contains(set_idx) { + continue; + } + + // evaluate system set's conditions + let set_conditions_met = + evaluate_and_fold_conditions(&mut schedule.set_conditions[set_idx], world); + + if !set_conditions_met { + self.completed_systems + .union_with(&schedule.systems_in_sets[set_idx]); + } + + should_run &= set_conditions_met; + self.evaluated_sets.insert(set_idx); + } + + // evaluate system's conditions + let system_conditions_met = + evaluate_and_fold_conditions(&mut schedule.system_conditions[system_index], world); + + should_run &= system_conditions_met; + + #[cfg(feature = "trace")] + should_run_span.exit(); + + // system has either been skipped or will run + self.completed_systems.insert(system_index); + + if !should_run { + continue; + } + + let system = &mut schedule.systems[system_index]; + #[cfg(feature = "trace")] + let system_span = info_span!("system", name = &*name).entered(); + system.run((), world); + #[cfg(feature = "trace")] + system_span.exit(); + + #[cfg(feature = "trace")] + let _apply_buffers_span = info_span!("apply_buffers", name = &*name).entered(); + system.apply_buffers(world); + } + + self.evaluated_sets.clear(); + self.completed_systems.clear(); + } +} + +impl SimpleExecutor { + pub const fn new() -> Self { + Self { + evaluated_sets: FixedBitSet::new(), + completed_systems: FixedBitSet::new(), + } + } +} + +fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &mut World) -> bool { + // not short-circuiting is intentional + #[allow(clippy::unnecessary_fold)] + conditions + .iter_mut() + .map(|condition| { + #[cfg(feature = "trace")] + let _condition_span = info_span!("condition", name = &*condition.name()).entered(); + condition.run((), world) + }) + .fold(true, |acc, res| acc && res) +} diff --git a/crates/bevy_ecs/src/schedule_v3/executor/single_threaded.rs b/crates/bevy_ecs/src/schedule_v3/executor/single_threaded.rs new file mode 100644 index 0000000000000..289b05b8c1e78 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/executor/single_threaded.rs @@ -0,0 +1,137 @@ +#[cfg(feature = "trace")] +use bevy_utils::tracing::info_span; +use fixedbitset::FixedBitSet; + +use crate::{ + schedule_v3::{ + is_apply_system_buffers, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule, + }, + world::World, +}; + +/// Runs the schedule using a single thread. +/// +/// Useful if you're dealing with a single-threaded environment, saving your threads for +/// other things, or just trying minimize overhead. +#[derive(Default)] +pub struct SingleThreadedExecutor { + /// System sets whose conditions have been evaluated. + evaluated_sets: FixedBitSet, + /// Systems that have run or been skipped. + completed_systems: FixedBitSet, + /// Systems that have run but have not had their buffers applied. + unapplied_systems: FixedBitSet, +} + +impl SystemExecutor for SingleThreadedExecutor { + fn kind(&self) -> ExecutorKind { + ExecutorKind::SingleThreaded + } + + fn init(&mut self, schedule: &SystemSchedule) { + // pre-allocate space + let sys_count = schedule.system_ids.len(); + let set_count = schedule.set_ids.len(); + self.evaluated_sets = FixedBitSet::with_capacity(set_count); + self.completed_systems = FixedBitSet::with_capacity(sys_count); + self.unapplied_systems = FixedBitSet::with_capacity(sys_count); + } + + fn run(&mut self, schedule: &mut SystemSchedule, world: &mut World) { + for system_index in 0..schedule.systems.len() { + #[cfg(feature = "trace")] + let name = schedule.systems[system_index].name(); + #[cfg(feature = "trace")] + let should_run_span = info_span!("check_conditions", name = &*name).entered(); + + let mut should_run = !self.completed_systems.contains(system_index); + for set_idx in schedule.sets_of_systems[system_index].ones() { + if self.evaluated_sets.contains(set_idx) { + continue; + } + + // evaluate system set's conditions + let set_conditions_met = + evaluate_and_fold_conditions(&mut schedule.set_conditions[set_idx], world); + + if !set_conditions_met { + self.completed_systems + .union_with(&schedule.systems_in_sets[set_idx]); + } + + should_run &= set_conditions_met; + self.evaluated_sets.insert(set_idx); + } + + // evaluate system's conditions + let system_conditions_met = + evaluate_and_fold_conditions(&mut schedule.system_conditions[system_index], world); + + should_run &= system_conditions_met; + + #[cfg(feature = "trace")] + should_run_span.exit(); + + // system has either been skipped or will run + self.completed_systems.insert(system_index); + + if !should_run { + continue; + } + + let system = &mut schedule.systems[system_index]; + if is_apply_system_buffers(system) { + #[cfg(feature = "trace")] + let system_span = info_span!("system", name = &*name).entered(); + self.apply_system_buffers(schedule, world); + #[cfg(feature = "trace")] + system_span.exit(); + } else { + #[cfg(feature = "trace")] + let system_span = info_span!("system", name = &*name).entered(); + system.run((), world); + #[cfg(feature = "trace")] + system_span.exit(); + self.unapplied_systems.insert(system_index); + } + } + + self.apply_system_buffers(schedule, world); + self.evaluated_sets.clear(); + self.completed_systems.clear(); + } +} + +impl SingleThreadedExecutor { + pub const fn new() -> Self { + Self { + evaluated_sets: FixedBitSet::new(), + completed_systems: FixedBitSet::new(), + unapplied_systems: FixedBitSet::new(), + } + } + + fn apply_system_buffers(&mut self, schedule: &mut SystemSchedule, world: &mut World) { + for system_index in self.unapplied_systems.ones() { + let system = &mut schedule.systems[system_index]; + #[cfg(feature = "trace")] + let _apply_buffers_span = info_span!("apply_buffers", name = &*system.name()).entered(); + system.apply_buffers(world); + } + + self.unapplied_systems.clear(); + } +} + +fn evaluate_and_fold_conditions(conditions: &mut [BoxedCondition], world: &mut World) -> bool { + // not short-circuiting is intentional + #[allow(clippy::unnecessary_fold)] + conditions + .iter_mut() + .map(|condition| { + #[cfg(feature = "trace")] + let _condition_span = info_span!("condition", name = &*condition.name()).entered(); + condition.run((), world) + }) + .fold(true, |acc, res| acc && res) +} diff --git a/crates/bevy_ecs/src/schedule_v3/graph_utils.rs b/crates/bevy_ecs/src/schedule_v3/graph_utils.rs new file mode 100644 index 0000000000000..b58bad317a959 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/graph_utils.rs @@ -0,0 +1,233 @@ +use std::fmt::Debug; + +use bevy_utils::{ + petgraph::{graphmap::NodeTrait, prelude::*}, + HashMap, HashSet, +}; +use fixedbitset::FixedBitSet; + +use crate::schedule_v3::set::*; + +/// Unique identifier for a system or system set. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) enum NodeId { + System(usize), + Set(usize), +} + +impl NodeId { + /// Returns the internal integer value. + pub fn index(&self) -> usize { + match self { + NodeId::System(index) | NodeId::Set(index) => *index, + } + } + + /// Returns `true` if the identified node is a system. + pub const fn is_system(&self) -> bool { + matches!(self, NodeId::System(_)) + } + + /// Returns `true` if the identified node is a system set. + pub const fn is_set(&self) -> bool { + matches!(self, NodeId::Set(_)) + } +} + +/// Specifies what kind of edge should be added to the dependency graph. +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub(crate) enum DependencyKind { + /// A node that should be preceded. + Before, + /// A node that should be succeeded. + After, +} + +/// An edge to be added to the dependency graph. +#[derive(Clone)] +pub(crate) struct Dependency { + pub(crate) kind: DependencyKind, + pub(crate) set: BoxedSystemSet, +} + +impl Dependency { + pub fn new(kind: DependencyKind, set: BoxedSystemSet) -> Self { + Self { kind, set } + } +} + +/// Configures ambiguity detection for a single system. +#[derive(Clone, Debug, Default)] +pub(crate) enum Ambiguity { + #[default] + Check, + /// Ignore warnings with systems in any of these system sets. May contain duplicates. + IgnoreWithSet(Vec), + /// Ignore all warnings. + IgnoreAll, +} + +#[derive(Clone)] +pub(crate) struct GraphInfo { + pub(crate) sets: Vec, + pub(crate) dependencies: Vec, + pub(crate) ambiguous_with: Ambiguity, +} + +/// Converts 2D row-major pair of indices into a 1D array index. +pub(crate) fn index(row: usize, col: usize, num_cols: usize) -> usize { + debug_assert!(col < num_cols); + (row * num_cols) + col +} + +/// Converts a 1D array index into a 2D row-major pair of indices. +pub(crate) fn row_col(index: usize, num_cols: usize) -> (usize, usize) { + (index / num_cols, index % num_cols) +} + +/// Stores the results of the graph analysis. +pub(crate) struct CheckGraphResults { + /// Boolean reachability matrix for the graph. + pub(crate) reachable: FixedBitSet, + /// Pairs of nodes that have a path connecting them. + pub(crate) connected: HashSet<(V, V)>, + /// Pairs of nodes that don't have a path connecting them. + pub(crate) disconnected: HashSet<(V, V)>, + /// Edges that are redundant because a longer path exists. + pub(crate) transitive_edges: Vec<(V, V)>, + /// Variant of the graph with no transitive edges. + pub(crate) transitive_reduction: DiGraphMap, + /// Variant of the graph with all possible transitive edges. + // TODO: this will very likely be used by "if-needed" ordering + #[allow(dead_code)] + pub(crate) transitive_closure: DiGraphMap, +} + +impl Default for CheckGraphResults { + fn default() -> Self { + Self { + reachable: FixedBitSet::new(), + connected: HashSet::new(), + disconnected: HashSet::new(), + transitive_edges: Vec::new(), + transitive_reduction: DiGraphMap::new(), + transitive_closure: DiGraphMap::new(), + } + } +} + +/// Processes a DAG and computes its: +/// - transitive reduction (along with the set of removed edges) +/// - transitive closure +/// - reachability matrix (as a bitset) +/// - pairs of nodes connected by a path +/// - pairs of nodes not connected by a path +/// +/// The algorithm implemented comes from +/// ["On the calculation of transitive reduction-closure of orders"][1] by Habib, Morvan and Rampon. +/// +/// [1]: https://doi.org/10.1016/0012-365X(93)90164-O +pub(crate) fn check_graph( + graph: &DiGraphMap, + topological_order: &[V], +) -> CheckGraphResults +where + V: NodeTrait + Debug, +{ + if graph.node_count() == 0 { + return CheckGraphResults::default(); + } + + let n = graph.node_count(); + + // build a copy of the graph where the nodes and edges appear in topsorted order + let mut map = HashMap::with_capacity(n); + let mut topsorted = DiGraphMap::::new(); + // iterate nodes in topological order + for (i, &node) in topological_order.iter().enumerate() { + map.insert(node, i); + topsorted.add_node(node); + // insert nodes as successors to their predecessors + for pred in graph.neighbors_directed(node, Direction::Incoming) { + topsorted.add_edge(pred, node, ()); + } + } + + let mut reachable = FixedBitSet::with_capacity(n * n); + let mut connected = HashSet::new(); + let mut disconnected = HashSet::new(); + + let mut transitive_edges = Vec::new(); + let mut transitive_reduction = DiGraphMap::::new(); + let mut transitive_closure = DiGraphMap::::new(); + + let mut visited = FixedBitSet::with_capacity(n); + + // iterate nodes in topological order + for node in topsorted.nodes() { + transitive_reduction.add_node(node); + transitive_closure.add_node(node); + } + + // iterate nodes in reverse topological order + for a in topsorted.nodes().rev() { + let index_a = *map.get(&a).unwrap(); + // iterate their successors in topological order + for b in topsorted.neighbors_directed(a, Direction::Outgoing) { + let index_b = *map.get(&b).unwrap(); + debug_assert!(index_a < index_b); + if !visited[index_b] { + // edge is not redundant + transitive_reduction.add_edge(a, b, ()); + transitive_closure.add_edge(a, b, ()); + reachable.insert(index(index_a, index_b, n)); + + let successors = transitive_closure + .neighbors_directed(b, Direction::Outgoing) + .collect::>(); + for c in successors { + let index_c = *map.get(&c).unwrap(); + debug_assert!(index_b < index_c); + if !visited[index_c] { + visited.insert(index_c); + transitive_closure.add_edge(a, c, ()); + reachable.insert(index(index_a, index_c, n)); + } + } + } else { + // edge is redundant + transitive_edges.push((a, b)); + } + } + + visited.clear(); + } + + // partition pairs of nodes into "connected by path" and "not connected by path" + for i in 0..(n - 1) { + // reachable is upper triangular because the nodes were topsorted + for index in index(i, i + 1, n)..=index(i, n - 1, n) { + let (a, b) = row_col(index, n); + let pair = (topological_order[a], topological_order[b]); + if reachable[index] { + connected.insert(pair); + } else { + disconnected.insert(pair); + } + } + } + + // fill diagonal (nodes reach themselves) + // for i in 0..n { + // reachable.set(index(i, i, n), true); + // } + + CheckGraphResults { + reachable, + connected, + disconnected, + transitive_edges, + transitive_reduction, + transitive_closure, + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/migration.rs b/crates/bevy_ecs/src/schedule_v3/migration.rs new file mode 100644 index 0000000000000..c4932c2fbca7c --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/migration.rs @@ -0,0 +1,38 @@ +use crate::schedule_v3::*; +use crate::world::World; + +/// Temporary "stageless" `App` methods. +pub trait AppExt { + /// Sets the [`Schedule`] that will be modified by default when you call `App::add_system` + /// and similar methods. + /// + /// **Note:** This will create the schedule if it does not already exist. + fn set_default_schedule(&mut self, label: impl ScheduleLabel) -> &mut Self; + /// Applies the function to the [`Schedule`] associated with `label`. + /// + /// **Note:** This will create the schedule if it does not already exist. + fn edit_schedule( + &mut self, + label: impl ScheduleLabel, + f: impl FnMut(&mut Schedule), + ) -> &mut Self; + /// Adds [`State`] and [`NextState`] resources, [`OnEnter`] and [`OnExit`] schedules + /// for each state variant, and an instance of [`apply_state_transition::`] in + /// \ so that transitions happen before `Update`. + fn add_state(&mut self) -> &mut Self; +} + +/// Temporary "stageless" [`World`] methods. +pub trait WorldExt { + /// Runs the [`Schedule`] associated with `label`. + fn run_schedule(&mut self, label: impl ScheduleLabel); +} + +impl WorldExt for World { + fn run_schedule(&mut self, label: impl ScheduleLabel) { + if let Some(mut schedule) = self.resource_mut::().remove(&label) { + schedule.run(self); + self.resource_mut::().insert(label, schedule); + } + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/mod.rs b/crates/bevy_ecs/src/schedule_v3/mod.rs new file mode 100644 index 0000000000000..6a95e28e4a8e0 --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/mod.rs @@ -0,0 +1,471 @@ +mod condition; +mod config; +mod executor; +mod graph_utils; +mod migration; +mod schedule; +mod set; +mod state; + +pub use self::condition::*; +pub use self::config::*; +pub use self::executor::*; +use self::graph_utils::*; +pub use self::migration::*; +pub use self::schedule::*; +pub use self::set::*; +pub use self::state::*; + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicU32, Ordering}; + + pub use crate as bevy_ecs; + pub use crate::schedule_v3::{IntoSystemConfig, IntoSystemSetConfig, Schedule, SystemSet}; + pub use crate::system::{Res, ResMut}; + pub use crate::{prelude::World, system::Resource}; + + #[derive(SystemSet, Clone, Debug, PartialEq, Eq, Hash)] + enum TestSet { + A, + B, + C, + D, + X, + } + + #[derive(Resource, Default)] + struct SystemOrder(Vec); + + #[derive(Resource, Default)] + struct RunConditionBool(pub bool); + + #[derive(Resource, Default)] + struct Counter(pub AtomicU32); + + fn make_exclusive_system(tag: u32) -> impl FnMut(&mut World) { + move |world| world.resource_mut::().0.push(tag) + } + + fn make_function_system(tag: u32) -> impl FnMut(ResMut) { + move |mut resource: ResMut| resource.0.push(tag) + } + + fn named_system(mut resource: ResMut) { + resource.0.push(u32::MAX); + } + + fn named_exclusive_system(world: &mut World) { + world.resource_mut::().0.push(u32::MAX); + } + + fn counting_system(counter: Res) { + counter.0.fetch_add(1, Ordering::Relaxed); + } + + mod system_execution { + use super::*; + + #[test] + fn run_system() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.add_system(make_function_system(0)); + schedule.run(&mut world); + + assert_eq!(world.resource::().0, vec![0]); + } + + #[test] + fn run_exclusive_system() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.add_system(make_exclusive_system(0)); + schedule.run(&mut world); + + assert_eq!(world.resource::().0, vec![0]); + } + + #[test] + #[cfg(not(miri))] + fn parallel_execution() { + use bevy_tasks::{ComputeTaskPool, TaskPool}; + use std::sync::{Arc, Barrier}; + + let mut world = World::default(); + let mut schedule = Schedule::default(); + let thread_count = ComputeTaskPool::init(TaskPool::default).thread_num(); + + let barrier = Arc::new(Barrier::new(thread_count)); + + for _ in 0..thread_count { + let inner = barrier.clone(); + schedule.add_system(move || { + inner.wait(); + }); + } + + schedule.run(&mut world); + } + } + + mod system_ordering { + use super::*; + + #[test] + fn order_systems() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.add_system(named_system); + schedule.add_system(make_function_system(1).before(named_system)); + schedule.add_system( + make_function_system(0) + .after(named_system) + .in_set(TestSet::A), + ); + schedule.run(&mut world); + + assert_eq!(world.resource::().0, vec![1, u32::MAX, 0]); + + world.insert_resource(SystemOrder::default()); + + assert_eq!(world.resource::().0, vec![]); + + // modify the schedule after it's been initialized and test ordering with sets + schedule.configure_set(TestSet::A.after(named_system)); + schedule.add_system( + make_function_system(3) + .before(TestSet::A) + .after(named_system), + ); + schedule.add_system(make_function_system(4).after(TestSet::A)); + schedule.run(&mut world); + + assert_eq!( + world.resource::().0, + vec![1, u32::MAX, 3, 0, 4] + ); + } + + #[test] + fn order_exclusive_systems() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.add_system(named_exclusive_system); + schedule.add_system(make_exclusive_system(1).before(named_exclusive_system)); + schedule.add_system(make_exclusive_system(0).after(named_exclusive_system)); + schedule.run(&mut world); + + assert_eq!(world.resource::().0, vec![1, u32::MAX, 0]); + } + + #[test] + fn add_systems_correct_order() { + #[derive(Resource)] + struct X(Vec); + + let mut world = World::new(); + world.init_resource::(); + + let mut schedule = Schedule::new(); + schedule.add_systems( + ( + make_function_system(0), + make_function_system(1), + make_exclusive_system(2), + make_function_system(3), + ) + .chain(), + ); + + schedule.run(&mut world); + assert_eq!(world.resource::().0, vec![0, 1, 2, 3]); + } + } + + mod conditions { + use super::*; + + #[test] + fn system_with_condition() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + world.init_resource::(); + + schedule.add_system( + make_function_system(0).run_if(|condition: Res| condition.0), + ); + + schedule.run(&mut world); + assert_eq!(world.resource::().0, vec![]); + + world.resource_mut::().0 = true; + schedule.run(&mut world); + assert_eq!(world.resource::().0, vec![0]); + } + + #[test] + fn run_exclusive_system_with_condition() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + world.init_resource::(); + + schedule.add_system( + make_exclusive_system(0).run_if(|condition: Res| condition.0), + ); + + schedule.run(&mut world); + assert_eq!(world.resource::().0, vec![]); + + world.resource_mut::().0 = true; + schedule.run(&mut world); + assert_eq!(world.resource::().0, vec![0]); + } + + #[test] + fn multiple_conditions_on_system() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.add_system(counting_system.run_if(|| false).run_if(|| false)); + schedule.add_system(counting_system.run_if(|| true).run_if(|| false)); + schedule.add_system(counting_system.run_if(|| false).run_if(|| true)); + schedule.add_system(counting_system.run_if(|| true).run_if(|| true)); + + schedule.run(&mut world); + assert_eq!(world.resource::().0.load(Ordering::Relaxed), 1); + } + + #[test] + fn multiple_conditions_on_system_sets() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.configure_set(TestSet::A.run_if(|| false).run_if(|| false)); + schedule.add_system(counting_system.in_set(TestSet::A)); + schedule.configure_set(TestSet::B.run_if(|| true).run_if(|| false)); + schedule.add_system(counting_system.in_set(TestSet::B)); + schedule.configure_set(TestSet::C.run_if(|| false).run_if(|| true)); + schedule.add_system(counting_system.in_set(TestSet::C)); + schedule.configure_set(TestSet::D.run_if(|| true).run_if(|| true)); + schedule.add_system(counting_system.in_set(TestSet::D)); + + schedule.run(&mut world); + assert_eq!(world.resource::().0.load(Ordering::Relaxed), 1); + } + + #[test] + fn systems_nested_in_system_sets() { + let mut world = World::default(); + let mut schedule = Schedule::default(); + + world.init_resource::(); + + schedule.configure_set(TestSet::A.run_if(|| false)); + schedule.add_system(counting_system.in_set(TestSet::A).run_if(|| false)); + schedule.configure_set(TestSet::B.run_if(|| true)); + schedule.add_system(counting_system.in_set(TestSet::B).run_if(|| false)); + schedule.configure_set(TestSet::C.run_if(|| false)); + schedule.add_system(counting_system.in_set(TestSet::C).run_if(|| true)); + schedule.configure_set(TestSet::D.run_if(|| true)); + schedule.add_system(counting_system.in_set(TestSet::D).run_if(|| true)); + + schedule.run(&mut world); + assert_eq!(world.resource::().0.load(Ordering::Relaxed), 1); + } + } + + mod schedule_build_errors { + use super::*; + + #[test] + #[should_panic] + fn dependency_loop() { + let mut schedule = Schedule::new(); + schedule.configure_set(TestSet::X.after(TestSet::X)); + } + + #[test] + fn dependency_cycle() { + let mut world = World::new(); + let mut schedule = Schedule::new(); + + schedule.configure_set(TestSet::A.after(TestSet::B)); + schedule.configure_set(TestSet::B.after(TestSet::A)); + + let result = schedule.initialize(&mut world); + assert!(matches!(result, Err(ScheduleBuildError::DependencyCycle))); + + fn foo() {} + fn bar() {} + + let mut world = World::new(); + let mut schedule = Schedule::new(); + + schedule.add_systems((foo.after(bar), bar.after(foo))); + let result = schedule.initialize(&mut world); + assert!(matches!(result, Err(ScheduleBuildError::DependencyCycle))); + } + + #[test] + #[should_panic] + fn hierarchy_loop() { + let mut schedule = Schedule::new(); + schedule.configure_set(TestSet::X.in_set(TestSet::X)); + } + + #[test] + fn hierarchy_cycle() { + let mut world = World::new(); + let mut schedule = Schedule::new(); + + schedule.configure_set(TestSet::A.in_set(TestSet::B)); + schedule.configure_set(TestSet::B.in_set(TestSet::A)); + + let result = schedule.initialize(&mut world); + assert!(matches!(result, Err(ScheduleBuildError::HierarchyCycle))); + } + + #[test] + fn system_type_set_ambiguity() { + // Define some systems. + fn foo() {} + fn bar() {} + + let mut world = World::new(); + let mut schedule = Schedule::new(); + + // Schedule `bar` to run after `foo`. + schedule.add_system(foo); + schedule.add_system(bar.after(foo)); + + // There's only one `foo`, so it's fine. + let result = schedule.initialize(&mut world); + assert!(result.is_ok()); + + // Schedule another `foo`. + schedule.add_system(foo); + + // When there are multiple instances of `foo`, dependencies on + // `foo` are no longer allowed. Too much ambiguity. + let result = schedule.initialize(&mut world); + assert!(matches!( + result, + Err(ScheduleBuildError::SystemTypeSetAmbiguity(_)) + )); + + // same goes for `ambiguous_with` + let mut schedule = Schedule::new(); + schedule.add_system(foo); + schedule.add_system(bar.ambiguous_with(foo)); + let result = schedule.initialize(&mut world); + assert!(result.is_ok()); + schedule.add_system(foo); + let result = schedule.initialize(&mut world); + assert!(matches!( + result, + Err(ScheduleBuildError::SystemTypeSetAmbiguity(_)) + )); + } + + #[test] + #[should_panic] + fn in_system_type_set() { + fn foo() {} + fn bar() {} + + let mut schedule = Schedule::new(); + schedule.add_system(foo.in_set(bar.into_system_set())); + } + + #[test] + #[should_panic] + fn configure_system_type_set() { + fn foo() {} + let mut schedule = Schedule::new(); + schedule.configure_set(foo.into_system_set()); + } + + #[test] + fn hierarchy_redundancy() { + let mut world = World::new(); + let mut schedule = Schedule::new(); + + schedule.set_build_settings( + ScheduleBuildSettings::new().with_hierarchy_detection(LogLevel::Error), + ); + + // Add `A`. + schedule.configure_set(TestSet::A); + + // Add `B` as child of `A`. + schedule.configure_set(TestSet::B.in_set(TestSet::A)); + + // Add `X` as child of both `A` and `B`. + schedule.configure_set(TestSet::X.in_set(TestSet::A).in_set(TestSet::B)); + + // `X` cannot be the `A`'s child and grandchild at the same time. + let result = schedule.initialize(&mut world); + assert!(matches!( + result, + Err(ScheduleBuildError::HierarchyRedundancy) + )); + } + + #[test] + fn cross_dependency() { + let mut world = World::new(); + let mut schedule = Schedule::new(); + + // Add `B` and give it both kinds of relationships with `A`. + schedule.configure_set(TestSet::B.in_set(TestSet::A)); + schedule.configure_set(TestSet::B.after(TestSet::A)); + let result = schedule.initialize(&mut world); + assert!(matches!( + result, + Err(ScheduleBuildError::CrossDependency(_, _)) + )); + } + + #[test] + fn ambiguity() { + #[derive(Resource)] + struct X; + + fn res_ref(_x: Res) {} + fn res_mut(_x: ResMut) {} + + let mut world = World::new(); + let mut schedule = Schedule::new(); + + schedule.set_build_settings( + ScheduleBuildSettings::new().with_ambiguity_detection(LogLevel::Error), + ); + + schedule.add_systems((res_ref, res_mut)); + let result = schedule.initialize(&mut world); + assert!(matches!(result, Err(ScheduleBuildError::Ambiguity))); + } + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/schedule.rs b/crates/bevy_ecs/src/schedule_v3/schedule.rs new file mode 100644 index 0000000000000..2bd7b00eca72f --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/schedule.rs @@ -0,0 +1,1099 @@ +use std::{ + fmt::{Debug, Write}, + result::Result, +}; + +use bevy_utils::default; +#[cfg(feature = "trace")] +use bevy_utils::tracing::info_span; +use bevy_utils::{ + petgraph::{algo::tarjan_scc, prelude::*}, + thiserror::Error, + tracing::{error, info, warn}, + HashMap, HashSet, +}; + +use fixedbitset::FixedBitSet; + +use crate::{ + self as bevy_ecs, + component::ComponentId, + schedule_v3::*, + system::{BoxedSystem, Resource}, + world::World, +}; + +/// Resource that stores [`Schedule`]s mapped to [`ScheduleLabel`]s. +#[derive(Default, Resource)] +pub struct Schedules { + inner: HashMap, +} + +impl Schedules { + /// Constructs an empty `Schedules` with zero initial capacity. + pub fn new() -> Self { + Self { + inner: HashMap::new(), + } + } + + /// Inserts a labeled schedule into the map. + /// + /// If the map already had an entry for `label`, `schedule` is inserted, + /// and the old schedule is returned. Otherwise, `None` is returned. + pub fn insert(&mut self, label: impl ScheduleLabel, schedule: Schedule) -> Option { + let label: Box = Box::new(label); + if self.inner.contains_key(&label) { + warn!("schedule with label {:?} already exists", label); + } + self.inner.insert(label, schedule) + } + + /// Removes the schedule corresponding to the `label` from the map, returning it if it existed. + pub fn remove(&mut self, label: &dyn ScheduleLabel) -> Option { + if !self.inner.contains_key(label) { + warn!("schedule with label {:?} not found", label); + } + self.inner.remove(label) + } + + /// Returns a reference to the schedule associated with `label`, if it exists. + pub fn get(&self, label: &dyn ScheduleLabel) -> Option<&Schedule> { + self.inner.get(label) + } + + /// Returns a mutable reference to the schedule associated with `label`, if it exists. + pub fn get_mut(&mut self, label: &dyn ScheduleLabel) -> Option<&mut Schedule> { + self.inner.get_mut(label) + } + + /// Iterates the change ticks of all systems in all stored schedules and clamps any older than + /// [`MAX_CHANGE_AGE`](crate::change_detection::MAX_CHANGE_AGE). + /// This prevents overflow and thus prevents false positives. + pub(crate) fn check_change_ticks(&mut self, change_tick: u32) { + #[cfg(feature = "trace")] + let _all_span = info_span!("check stored schedule ticks").entered(); + // label used when trace feature is enabled + #[allow(unused_variables)] + for (label, schedule) in self.inner.iter_mut() { + #[cfg(feature = "trace")] + let name = format!("{:?}", label); + #[cfg(feature = "trace")] + let _one_span = info_span!("check schedule ticks", name = &name).entered(); + schedule.check_change_ticks(change_tick); + } + } +} + +/// A collection of systems, and the metadata and executor needed to run them +/// in a certain order under certain conditions. +pub struct Schedule { + graph: ScheduleGraph, + executable: SystemSchedule, + executor: Box, + executor_initialized: bool, +} + +impl Default for Schedule { + fn default() -> Self { + Self::new() + } +} + +impl Schedule { + /// Constructs an empty `Schedule`. + pub fn new() -> Self { + Self { + graph: ScheduleGraph::new(), + executable: SystemSchedule::new(), + executor: Box::new(MultiThreadedExecutor::new()), + executor_initialized: false, + } + } + + /// Add a system to the schedule. + pub fn add_system

(&mut self, system: impl IntoSystemConfig

) -> &mut Self { + self.graph.add_system(system); + self + } + + /// Add a collection of systems to the schedule. + pub fn add_systems

(&mut self, systems: impl IntoSystemConfigs

) -> &mut Self { + self.graph.add_systems(systems); + self + } + + /// Configure a system set in this schedule. + pub fn configure_set(&mut self, set: impl IntoSystemSetConfig) -> &mut Self { + self.graph.configure_set(set); + self + } + + /// Configure a collection of system sets in this schedule. + pub fn configure_sets(&mut self, sets: impl IntoSystemSetConfigs) -> &mut Self { + self.graph.configure_sets(sets); + self + } + + /// Changes the system set that new systems and system sets will join by default + /// if they aren't already part of one. + pub fn set_default_set(&mut self, set: impl SystemSet) -> &mut Self { + self.graph.set_default_set(set); + self + } + + /// Changes miscellaneous build settings. + pub fn set_build_settings(&mut self, settings: ScheduleBuildSettings) -> &mut Self { + self.graph.settings = settings; + self + } + + /// Returns the schedule's current execution strategy. + pub fn get_executor_kind(&self) -> ExecutorKind { + self.executor.kind() + } + + /// Sets the schedule's execution strategy. + pub fn set_executor_kind(&mut self, executor: ExecutorKind) -> &mut Self { + if executor != self.executor.kind() { + self.executor = match executor { + ExecutorKind::Simple => Box::new(SimpleExecutor::new()), + ExecutorKind::SingleThreaded => Box::new(SingleThreadedExecutor::new()), + ExecutorKind::MultiThreaded => Box::new(MultiThreadedExecutor::new()), + }; + self.executor_initialized = false; + } + self + } + + /// Runs all systems in this schedule on the `world`, using its current execution strategy. + pub fn run(&mut self, world: &mut World) { + world.check_change_ticks(); + self.initialize(world).unwrap(); + // TODO: label + #[cfg(feature = "trace")] + let _span = info_span!("schedule").entered(); + self.executor.run(&mut self.executable, world); + } + + /// Initializes any newly-added systems and conditions, rebuilds the executable schedule, + /// and re-initializes the executor. + pub fn initialize(&mut self, world: &mut World) -> Result<(), ScheduleBuildError> { + if self.graph.changed { + self.graph.initialize(world); + self.graph.update_schedule(&mut self.executable)?; + self.graph.changed = false; + self.executor_initialized = false; + } + + if !self.executor_initialized { + self.executor.init(&self.executable); + self.executor_initialized = true; + } + + Ok(()) + } + + /// Iterates the change ticks of all systems in the schedule and clamps any older than + /// [`MAX_CHANGE_AGE`](crate::change_detection::MAX_CHANGE_AGE). + /// This prevents overflow and thus prevents false positives. + pub(crate) fn check_change_ticks(&mut self, change_tick: u32) { + for system in &mut self.executable.systems { + system.check_change_tick(change_tick); + } + + for conditions in &mut self.executable.system_conditions { + for system in conditions.iter_mut() { + system.check_change_tick(change_tick); + } + } + + for conditions in &mut self.executable.set_conditions { + for system in conditions.iter_mut() { + system.check_change_tick(change_tick); + } + } + } +} + +/// A directed acylic graph structure. +#[derive(Default)] +struct Dag { + /// A directed graph. + graph: DiGraphMap, + /// A cached topological ordering of the graph. + topsort: Vec, +} + +impl Dag { + fn new() -> Self { + Self { + graph: DiGraphMap::new(), + topsort: Vec::new(), + } + } +} + +/// A [`SystemSet`] with metadata, stored in a [`ScheduleGraph`]. +struct SystemSetNode { + inner: BoxedSystemSet, + /// `true` if this system set was modified with `configure_set` + configured: bool, +} + +impl SystemSetNode { + pub fn new(set: BoxedSystemSet) -> Self { + Self { + inner: set, + configured: false, + } + } + + pub fn name(&self) -> String { + format!("{:?}", &self.inner) + } + + pub fn is_system_type(&self) -> bool { + self.inner.is_system_type() + } +} + +/// Metadata for a [`Schedule`]. +#[derive(Default)] +struct ScheduleGraph { + systems: Vec>, + system_conditions: Vec>>, + system_sets: Vec, + system_set_conditions: Vec>>, + system_set_ids: HashMap, + uninit: Vec<(NodeId, usize)>, + hierarchy: Dag, + dependency: Dag, + dependency_flattened: Dag, + ambiguous_with: UnGraphMap, + ambiguous_with_flattened: UnGraphMap, + ambiguous_with_all: HashSet, + default_set: Option, + changed: bool, + settings: ScheduleBuildSettings, +} + +impl ScheduleGraph { + pub fn new() -> Self { + Self { + systems: Vec::new(), + system_conditions: Vec::new(), + system_sets: Vec::new(), + system_set_conditions: Vec::new(), + system_set_ids: HashMap::new(), + uninit: Vec::new(), + hierarchy: Dag::new(), + dependency: Dag::new(), + dependency_flattened: Dag::new(), + ambiguous_with: UnGraphMap::new(), + ambiguous_with_flattened: UnGraphMap::new(), + ambiguous_with_all: HashSet::new(), + default_set: None, + changed: false, + settings: default(), + } + } + + fn set_default_set(&mut self, set: impl SystemSet) { + assert!( + !set.is_system_type(), + "adding arbitrary systems to a system type set is not allowed" + ); + self.default_set = Some(Box::new(set)); + } + + fn add_systems

(&mut self, systems: impl IntoSystemConfigs

) { + let SystemConfigs { systems, chained } = systems.into_configs(); + let mut system_iter = systems.into_iter(); + if chained { + let Some(prev) = system_iter.next() else { return }; + let mut prev_id = self.add_system_inner(prev).unwrap(); + for next in system_iter { + let next_id = self.add_system_inner(next).unwrap(); + self.dependency.graph.add_edge(prev_id, next_id, ()); + prev_id = next_id; + } + } else { + for system in system_iter { + self.add_system_inner(system).unwrap(); + } + } + } + + fn add_system

(&mut self, system: impl IntoSystemConfig

) { + self.add_system_inner(system).unwrap(); + } + + fn add_system_inner

( + &mut self, + system: impl IntoSystemConfig

, + ) -> Result { + let SystemConfig { + system, + mut graph_info, + conditions, + } = system.into_config(); + + let id = NodeId::System(self.systems.len()); + + if graph_info.sets.is_empty() { + if let Some(default) = self.default_set.as_ref() { + graph_info.sets.push(default.dyn_clone()); + } + } + + // graph updates are immediate + self.update_graphs(id, graph_info)?; + + // system init has to be deferred (need `&mut World`) + self.uninit.push((id, 0)); + self.systems.push(Some(system)); + self.system_conditions.push(Some(conditions)); + + Ok(id) + } + + fn configure_sets(&mut self, sets: impl IntoSystemSetConfigs) { + let SystemSetConfigs { sets, chained } = sets.into_configs(); + let mut set_iter = sets.into_iter(); + if chained { + let Some(prev) = set_iter.next() else { return }; + let mut prev_id = self.configure_set_inner(prev).unwrap(); + for next in set_iter { + let next_id = self.configure_set_inner(next).unwrap(); + self.dependency.graph.add_edge(prev_id, next_id, ()); + prev_id = next_id; + } + } else { + for set in set_iter { + self.configure_set_inner(set).unwrap(); + } + } + } + + fn configure_set(&mut self, set: impl IntoSystemSetConfig) { + self.configure_set_inner(set).unwrap(); + } + + fn configure_set_inner( + &mut self, + set: impl IntoSystemSetConfig, + ) -> Result { + let SystemSetConfig { + set, + mut graph_info, + mut conditions, + } = set.into_config(); + + let id = match self.system_set_ids.get(&set) { + Some(&id) => id, + None => self.add_set(set.dyn_clone()), + }; + + let meta = &mut self.system_sets[id.index()]; + let already_configured = std::mem::replace(&mut meta.configured, true); + + // a system set can be configured multiple times, so this "default check" + // should only happen the first time `configure_set` is called on it + if !already_configured && graph_info.sets.is_empty() { + if let Some(default) = self.default_set.as_ref() { + info!("adding system set `{:?}` to default: `{:?}`", set, default); + graph_info.sets.push(default.dyn_clone()); + } + } + + // graph updates are immediate + self.update_graphs(id, graph_info)?; + + // system init has to be deferred (need `&mut World`) + let system_set_conditions = + self.system_set_conditions[id.index()].get_or_insert_with(Vec::new); + self.uninit.push((id, system_set_conditions.len())); + system_set_conditions.append(&mut conditions); + + Ok(id) + } + + fn add_set(&mut self, set: BoxedSystemSet) -> NodeId { + let id = NodeId::Set(self.system_sets.len()); + self.system_sets.push(SystemSetNode::new(set.dyn_clone())); + self.system_set_conditions.push(None); + self.system_set_ids.insert(set, id); + id + } + + fn check_sets( + &mut self, + id: &NodeId, + graph_info: &GraphInfo, + ) -> Result<(), ScheduleBuildError> { + for set in &graph_info.sets { + match self.system_set_ids.get(set) { + Some(set_id) => { + if id == set_id { + return Err(ScheduleBuildError::HierarchyLoop(set.dyn_clone())); + } + } + None => { + self.add_set(set.dyn_clone()); + } + } + } + + Ok(()) + } + + fn check_edges( + &mut self, + id: &NodeId, + graph_info: &GraphInfo, + ) -> Result<(), ScheduleBuildError> { + for Dependency { kind: _, set } in &graph_info.dependencies { + match self.system_set_ids.get(set) { + Some(set_id) => { + if id == set_id { + return Err(ScheduleBuildError::DependencyLoop(set.dyn_clone())); + } + } + None => { + self.add_set(set.dyn_clone()); + } + } + } + + if let Ambiguity::IgnoreWithSet(ambiguous_with) = &graph_info.ambiguous_with { + for set in ambiguous_with { + if !self.system_set_ids.contains_key(set) { + self.add_set(set.dyn_clone()); + } + } + } + + Ok(()) + } + + fn update_graphs( + &mut self, + id: NodeId, + graph_info: GraphInfo, + ) -> Result<(), ScheduleBuildError> { + self.check_sets(&id, &graph_info)?; + self.check_edges(&id, &graph_info)?; + self.changed = true; + + let GraphInfo { + sets, + dependencies, + ambiguous_with, + } = graph_info; + + if !self.hierarchy.graph.contains_node(id) { + self.hierarchy.graph.add_node(id); + } + + for set in sets.into_iter().map(|set| self.system_set_ids[&set]) { + self.hierarchy.graph.add_edge(set, id, ()); + } + + if !self.dependency.graph.contains_node(id) { + self.dependency.graph.add_node(id); + } + + for (kind, set) in dependencies + .into_iter() + .map(|Dependency { kind, set }| (kind, self.system_set_ids[&set])) + { + let (lhs, rhs) = match kind { + DependencyKind::Before => (id, set), + DependencyKind::After => (set, id), + }; + self.dependency.graph.add_edge(lhs, rhs, ()); + } + + match ambiguous_with { + Ambiguity::Check => (), + Ambiguity::IgnoreWithSet(ambigous_with) => { + for set in ambigous_with + .into_iter() + .map(|set| self.system_set_ids[&set]) + { + self.ambiguous_with.add_edge(id, set, ()); + } + } + Ambiguity::IgnoreAll => { + self.ambiguous_with_all.insert(id); + } + } + + Ok(()) + } + + fn initialize(&mut self, world: &mut World) { + for (id, i) in self.uninit.drain(..) { + match id { + NodeId::System(index) => { + self.systems[index].as_mut().unwrap().initialize(world); + if let Some(v) = self.system_conditions[index].as_mut() { + for condition in v.iter_mut() { + condition.initialize(world); + } + } + } + NodeId::Set(index) => { + if let Some(v) = self.system_set_conditions[index].as_mut() { + for condition in v.iter_mut().skip(i) { + condition.initialize(world); + } + } + } + } + } + } + + fn build_schedule(&mut self) -> Result { + // check hierarchy for cycles + let hier_scc = tarjan_scc(&self.hierarchy.graph); + if self.contains_cycles(&hier_scc) { + self.report_cycles(&hier_scc); + return Err(ScheduleBuildError::HierarchyCycle); + } + + self.hierarchy.topsort = hier_scc.into_iter().flatten().rev().collect::>(); + + let hier_results = check_graph(&self.hierarchy.graph, &self.hierarchy.topsort); + if self.contains_hierarchy_conflicts(&hier_results.transitive_edges) { + self.report_hierarchy_conflicts(&hier_results.transitive_edges); + if matches!(self.settings.hierarchy_detection, LogLevel::Error) { + return Err(ScheduleBuildError::HierarchyRedundancy); + } + } + + // remove redundant edges + self.hierarchy.graph = hier_results.transitive_reduction; + + // check dependencies for cycles + let dep_scc = tarjan_scc(&self.dependency.graph); + if self.contains_cycles(&dep_scc) { + self.report_cycles(&dep_scc); + return Err(ScheduleBuildError::DependencyCycle); + } + + self.dependency.topsort = dep_scc.into_iter().flatten().rev().collect::>(); + + // nodes can have dependent XOR hierarchical relationship + let dep_results = check_graph(&self.dependency.graph, &self.dependency.topsort); + for &(a, b) in dep_results.connected.iter() { + if hier_results.connected.contains(&(a, b)) || hier_results.connected.contains(&(b, a)) + { + let name_a = self.get_node_name(&a); + let name_b = self.get_node_name(&b); + return Err(ScheduleBuildError::CrossDependency(name_a, name_b)); + } + } + + // map system sets to all their member systems + let mut systems_in_sets = HashMap::with_capacity(self.system_sets.len()); + // iterate in reverse topological order (bottom-up) + for &id in self.hierarchy.topsort.iter().rev() { + if id.is_system() { + continue; + } + + let set = id; + systems_in_sets.insert(set, Vec::new()); + + for child in self + .hierarchy + .graph + .neighbors_directed(set, Direction::Outgoing) + { + match child { + NodeId::System(_) => { + systems_in_sets.get_mut(&set).unwrap().push(child); + } + NodeId::Set(_) => { + let [sys, child_sys] = + systems_in_sets.get_many_mut([&set, &child]).unwrap(); + sys.extend_from_slice(child_sys); + } + } + } + } + + // can't depend on or be ambiguous with system type sets that have many instances + for (&set, systems) in systems_in_sets.iter() { + let node = &self.system_sets[set.index()]; + if node.is_system_type() { + let ambiguities = self.ambiguous_with.edges(set).count(); + let mut dependencies = 0; + dependencies += self + .dependency + .graph + .edges_directed(set, Direction::Incoming) + .count(); + dependencies += self + .dependency + .graph + .edges_directed(set, Direction::Outgoing) + .count(); + if systems.len() > 1 && (ambiguities > 0 || dependencies > 0) { + return Err(ScheduleBuildError::SystemTypeSetAmbiguity( + node.inner.dyn_clone(), + )); + } + } + } + + // flatten dependency graph + let mut dependency_flattened = DiGraphMap::new(); + for id in self.dependency.graph.nodes() { + if id.is_system() { + dependency_flattened.add_node(id); + } + } + + for (lhs, rhs, _) in self.dependency.graph.all_edges() { + match (lhs, rhs) { + (NodeId::System(_), NodeId::System(_)) => { + dependency_flattened.add_edge(lhs, rhs, ()); + } + (NodeId::Set(_), NodeId::System(_)) => { + for &lhs_ in &systems_in_sets[&lhs] { + dependency_flattened.add_edge(lhs_, rhs, ()); + } + } + (NodeId::System(_), NodeId::Set(_)) => { + for &rhs_ in &systems_in_sets[&rhs] { + dependency_flattened.add_edge(lhs, rhs_, ()); + } + } + (NodeId::Set(_), NodeId::Set(_)) => { + for &lhs_ in &systems_in_sets[&lhs] { + for &rhs_ in &systems_in_sets[&rhs] { + dependency_flattened.add_edge(lhs_, rhs_, ()); + } + } + } + } + } + + // check flattened dependencies for cycles + let flat_scc = tarjan_scc(&dependency_flattened); + if self.contains_cycles(&flat_scc) { + self.report_cycles(&flat_scc); + return Err(ScheduleBuildError::DependencyCycle); + } + + self.dependency_flattened.graph = dependency_flattened; + self.dependency_flattened.topsort = + flat_scc.into_iter().flatten().rev().collect::>(); + + let flat_results = check_graph( + &self.dependency_flattened.graph, + &self.dependency_flattened.topsort, + ); + + // remove redundant edges + self.dependency_flattened.graph = flat_results.transitive_reduction; + + // flatten allowed ambiguities + let mut ambiguous_with_flattened = UnGraphMap::new(); + for (lhs, rhs, _) in self.ambiguous_with.all_edges() { + match (lhs, rhs) { + (NodeId::System(_), NodeId::System(_)) => { + ambiguous_with_flattened.add_edge(lhs, rhs, ()); + } + (NodeId::Set(_), NodeId::System(_)) => { + for &lhs_ in &systems_in_sets[&lhs] { + ambiguous_with_flattened.add_edge(lhs_, rhs, ()); + } + } + (NodeId::System(_), NodeId::Set(_)) => { + for &rhs_ in &systems_in_sets[&rhs] { + ambiguous_with_flattened.add_edge(lhs, rhs_, ()); + } + } + (NodeId::Set(_), NodeId::Set(_)) => { + for &lhs_ in &systems_in_sets[&lhs] { + for &rhs_ in &systems_in_sets[&rhs] { + ambiguous_with_flattened.add_edge(lhs_, rhs_, ()); + } + } + } + } + } + + self.ambiguous_with_flattened = ambiguous_with_flattened; + + // check for conflicts + let mut conflicting_systems = Vec::new(); + for &(a, b) in flat_results.disconnected.iter() { + if self.ambiguous_with_flattened.contains_edge(a, b) + || self.ambiguous_with_all.contains(&a) + || self.ambiguous_with_all.contains(&b) + { + continue; + } + + let system_a = self.systems[a.index()].as_ref().unwrap(); + let system_b = self.systems[b.index()].as_ref().unwrap(); + if system_a.is_exclusive() || system_b.is_exclusive() { + conflicting_systems.push((a, b, Vec::new())); + } else { + let access_a = system_a.component_access(); + let access_b = system_b.component_access(); + if !access_a.is_compatible(access_b) { + let conflicts = access_a.get_conflicts(access_b); + conflicting_systems.push((a, b, conflicts)); + } + } + } + + if self.contains_conflicts(&conflicting_systems) { + self.report_conflicts(&conflicting_systems); + if matches!(self.settings.ambiguity_detection, LogLevel::Error) { + return Err(ScheduleBuildError::Ambiguity); + } + } + + // build the schedule + let dg_system_ids = self.dependency_flattened.topsort.clone(); + let dg_system_idx_map = dg_system_ids + .iter() + .cloned() + .enumerate() + .map(|(i, id)| (id, i)) + .collect::>(); + + let hg_systems = self + .hierarchy + .topsort + .iter() + .cloned() + .enumerate() + .filter(|&(_i, id)| id.is_system()) + .collect::>(); + + let (hg_set_idxs, hg_set_ids): (Vec<_>, Vec<_>) = self + .hierarchy + .topsort + .iter() + .cloned() + .enumerate() + .filter(|&(_i, id)| { + // ignore system sets that have no conditions + // ignore system type sets (already covered, they don't have conditions) + id.is_set() + && self.system_set_conditions[id.index()] + .as_ref() + .filter(|v| !v.is_empty()) + .is_some() + }) + .unzip(); + + let sys_count = self.systems.len(); + let set_count = hg_set_ids.len(); + let node_count = self.systems.len() + self.system_sets.len(); + + // get the number of dependencies and the immediate dependents of each system + // (needed by multi-threaded executor to run systems in the correct order) + let mut system_dependencies = Vec::with_capacity(sys_count); + let mut system_dependents = Vec::with_capacity(sys_count); + for &sys_id in &dg_system_ids { + let num_dependencies = self + .dependency_flattened + .graph + .neighbors_directed(sys_id, Direction::Incoming) + .count(); + + let dependents = self + .dependency_flattened + .graph + .neighbors_directed(sys_id, Direction::Outgoing) + .map(|dep_id| dg_system_idx_map[&dep_id]) + .collect::>(); + + system_dependencies.push(num_dependencies); + system_dependents.push(dependents); + } + + // get the rows and columns of the hierarchy graph's reachability matrix + // (needed to we can evaluate conditions in the correct order) + let mut systems_in_sets = vec![FixedBitSet::with_capacity(sys_count); set_count]; + for (i, &row) in hg_set_idxs.iter().enumerate() { + let bitset = &mut systems_in_sets[i]; + for &(col, sys_id) in &hg_systems { + let idx = dg_system_idx_map[&sys_id]; + let is_descendant = hier_results.reachable[index(row, col, node_count)]; + bitset.set(idx, is_descendant); + } + } + + let mut sets_of_systems = vec![FixedBitSet::with_capacity(set_count); sys_count]; + for &(col, sys_id) in &hg_systems { + let i = dg_system_idx_map[&sys_id]; + let bitset = &mut sets_of_systems[i]; + for (idx, &row) in hg_set_idxs + .iter() + .enumerate() + .take_while(|&(_idx, &row)| row < col) + { + let is_ancestor = hier_results.reachable[index(row, col, node_count)]; + bitset.set(idx, is_ancestor); + } + } + + Ok(SystemSchedule { + systems: Vec::with_capacity(sys_count), + system_conditions: Vec::with_capacity(sys_count), + set_conditions: Vec::with_capacity(set_count), + system_ids: dg_system_ids, + set_ids: hg_set_ids, + system_dependencies, + system_dependents, + sets_of_systems, + systems_in_sets, + }) + } + + fn update_schedule(&mut self, schedule: &mut SystemSchedule) -> Result<(), ScheduleBuildError> { + if !self.uninit.is_empty() { + return Err(ScheduleBuildError::Uninitialized); + } + + // move systems out of old schedule + for ((id, system), conditions) in schedule + .system_ids + .drain(..) + .zip(schedule.systems.drain(..)) + .zip(schedule.system_conditions.drain(..)) + { + self.systems[id.index()] = Some(system); + self.system_conditions[id.index()] = Some(conditions); + } + + for (id, conditions) in schedule + .set_ids + .drain(..) + .zip(schedule.set_conditions.drain(..)) + { + self.system_set_conditions[id.index()] = Some(conditions); + } + + *schedule = self.build_schedule()?; + + // move systems into new schedule + for &id in &schedule.system_ids { + let system = self.systems[id.index()].take().unwrap(); + let conditions = self.system_conditions[id.index()].take().unwrap(); + schedule.systems.push(system); + schedule.system_conditions.push(conditions); + } + + for &id in &schedule.set_ids { + let conditions = self.system_set_conditions[id.index()].take().unwrap(); + schedule.set_conditions.push(conditions); + } + + Ok(()) + } +} + +// methods for reporting errors +impl ScheduleGraph { + fn get_node_name(&self, id: &NodeId) -> String { + match id { + NodeId::System(_) => self.systems[id.index()] + .as_ref() + .unwrap() + .name() + .to_string(), + NodeId::Set(_) => self.system_sets[id.index()].name(), + } + } + + fn get_node_kind(id: &NodeId) -> &'static str { + match id { + NodeId::System(_) => "system", + NodeId::Set(_) => "system set", + } + } + + fn contains_hierarchy_conflicts(&self, transitive_edges: &[(NodeId, NodeId)]) -> bool { + if transitive_edges.is_empty() { + return false; + } + + true + } + + fn report_hierarchy_conflicts(&self, transitive_edges: &[(NodeId, NodeId)]) { + let mut message = String::from("hierarchy contains redundant edge(s)"); + for (parent, child) in transitive_edges { + writeln!( + message, + " -- {:?} '{:?}' cannot be child of set '{:?}', longer path exists", + Self::get_node_kind(child), + self.get_node_name(child), + self.get_node_name(parent), + ) + .unwrap(); + } + + error!("{}", message); + } + + fn contains_cycles(&self, strongly_connected_components: &[Vec]) -> bool { + if strongly_connected_components + .iter() + .all(|scc| scc.len() == 1) + { + return false; + } + + true + } + + fn report_cycles(&self, strongly_connected_components: &[Vec]) { + let components_with_cycles = strongly_connected_components + .iter() + .filter(|scc| scc.len() > 1) + .cloned() + .collect::>(); + + let mut message = format!( + "schedule contains at least {} cycle(s)", + components_with_cycles.len() + ); + + writeln!(message, " -- cycle(s) found within:").unwrap(); + for (i, scc) in components_with_cycles.into_iter().enumerate() { + let names = scc + .iter() + .map(|id| self.get_node_name(id)) + .collect::>(); + writeln!(message, " ---- {i}: {names:?}").unwrap(); + } + + error!("{}", message); + } + + fn contains_conflicts(&self, conflicts: &[(NodeId, NodeId, Vec)]) -> bool { + if conflicts.is_empty() { + return false; + } + + true + } + + fn report_conflicts(&self, ambiguities: &[(NodeId, NodeId, Vec)]) { + let mut string = String::from( + "Some systems with conflicting access have indeterminate execution order. \ + Consider adding `before`, `after`, or `ambiguous_with` relationships between these:\n", + ); + + for (system_a, system_b, conflicts) in ambiguities { + debug_assert!(system_a.is_system()); + debug_assert!(system_b.is_system()); + let name_a = self.get_node_name(system_a); + let name_b = self.get_node_name(system_b); + + writeln!(string, " -- {name_a} and {name_b}").unwrap(); + if !conflicts.is_empty() { + writeln!(string, " conflict on: {conflicts:?}").unwrap(); + } else { + // one or both systems must be exclusive + let world = std::any::type_name::(); + writeln!(string, " conflict on: {world}").unwrap(); + } + } + + warn!("{}", string); + } +} + +/// Category of errors encountered during schedule construction. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ScheduleBuildError { + /// A system set contains itself. + #[error("`{0:?}` contains itself.")] + HierarchyLoop(BoxedSystemSet), + /// The hierarchy of system sets contains a cycle. + #[error("System set hierarchy contains cycle(s).")] + HierarchyCycle, + /// The hierarchy of system sets contains redundant edges. + /// + /// This error is disabled by default, but can be opted-in using [`ScheduleBuildSettings`]. + #[error("System set hierarchy contains redundant edges.")] + HierarchyRedundancy, + /// A system (set) has been told to run before itself. + #[error("`{0:?}` depends on itself.")] + DependencyLoop(BoxedSystemSet), + /// The dependency graph contains a cycle. + #[error("System dependencies contain cycle(s).")] + DependencyCycle, + /// Tried to order a system (set) relative to a system set it belongs to. + #[error("`{0:?}` and `{1:?}` have both `in_set` and `before`-`after` relationships (these might be transitive). This combination is unsolvable as a system cannot run before or after a set it belongs to.")] + CrossDependency(String, String), + /// Tried to order a system (set) relative to all instances of some system function. + #[error("Tried to order against `fn {0:?}` in a schedule that has more than one `{0:?}` instance. `fn {0:?}` is a `SystemTypeSet` and cannot be used for ordering if ambiguous. Use a different set without this restriction.")] + SystemTypeSetAmbiguity(BoxedSystemSet), + /// Systems with conflicting access have indeterminate run order. + /// + /// This error is disabled by default, but can be opted-in using [`ScheduleBuildSettings`]. + #[error("Systems with conflicting access have indeterminate run order.")] + Ambiguity, + /// Tried to run a schedule before all of its systems have been initialized. + #[error("Systems in schedule have not been initialized.")] + Uninitialized, +} + +/// Specifies how schedule construction should respond to detecting a certain kind of issue. +pub enum LogLevel { + /// Occurrences are logged only. + Warn, + /// Occurrences are logged and result in errors. + Error, +} + +/// Specifies miscellaneous settings for schedule construction. +pub struct ScheduleBuildSettings { + ambiguity_detection: LogLevel, + hierarchy_detection: LogLevel, +} + +impl Default for ScheduleBuildSettings { + fn default() -> Self { + Self::new() + } +} + +impl ScheduleBuildSettings { + pub const fn new() -> Self { + Self { + ambiguity_detection: LogLevel::Warn, + hierarchy_detection: LogLevel::Warn, + } + } + + /// Determines whether the presence of ambiguities (systems with conflicting access but indeterminate order) + /// is only logged or also results in an [`Ambiguity`](ScheduleBuildError::Ambiguity) error. + pub fn with_ambiguity_detection(mut self, level: LogLevel) -> Self { + self.ambiguity_detection = level; + self + } + + /// Determines whether the presence of redundant edges in the hierarchy of system sets is only + /// logged or also results in a [`HierarchyRedundancy`](ScheduleBuildError::HierarchyRedundancy) + /// error. + pub fn with_hierarchy_detection(mut self, level: LogLevel) -> Self { + self.hierarchy_detection = level; + self + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/set.rs b/crates/bevy_ecs/src/schedule_v3/set.rs new file mode 100644 index 0000000000000..5b29f31e6a3ae --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/set.rs @@ -0,0 +1,149 @@ +use std::fmt::Debug; +use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; + +pub use bevy_ecs_macros::{ScheduleLabel, SystemSet}; +use bevy_utils::define_boxed_label; +use bevy_utils::label::DynHash; + +use crate::system::{ + ExclusiveSystemParam, ExclusiveSystemParamFunction, IsExclusiveFunctionSystem, + IsFunctionSystem, SystemParam, SystemParamFunction, +}; + +define_boxed_label!(ScheduleLabel); + +pub type BoxedSystemSet = Box; +pub type BoxedScheduleLabel = Box; + +/// Types that identify logical groups of systems. +pub trait SystemSet: DynHash + Debug + Send + Sync + 'static { + /// Returns `true` if this system set is a [`SystemTypeSet`]. + fn is_system_type(&self) -> bool { + false + } + + #[doc(hidden)] + fn dyn_clone(&self) -> Box; +} + +impl PartialEq for dyn SystemSet { + fn eq(&self, other: &Self) -> bool { + self.dyn_eq(other.as_dyn_eq()) + } +} + +impl Eq for dyn SystemSet {} + +impl Hash for dyn SystemSet { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } +} + +impl Clone for Box { + fn clone(&self) -> Self { + self.dyn_clone() + } +} + +/// A [`SystemSet`] grouping instances of the same function. +/// +/// This kind of set is automatically populated and thus has some special rules: +/// - You cannot manually add members. +/// - You cannot configure them. +/// - You cannot order something relative to one if it has more than one member. +pub struct SystemTypeSet(PhantomData T>); + +impl SystemTypeSet { + pub(crate) fn new() -> Self { + Self(PhantomData) + } +} + +impl Debug for SystemTypeSet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("SystemTypeSet") + .field(&std::any::type_name::()) + .finish() + } +} + +impl Hash for SystemTypeSet { + fn hash(&self, _state: &mut H) { + // all systems of a given type are the same + } +} +impl Clone for SystemTypeSet { + fn clone(&self) -> Self { + Self(PhantomData) + } +} + +impl Copy for SystemTypeSet {} + +impl PartialEq for SystemTypeSet { + #[inline] + fn eq(&self, _other: &Self) -> bool { + // all systems of a given type are the same + true + } +} + +impl Eq for SystemTypeSet {} + +impl SystemSet for SystemTypeSet { + fn is_system_type(&self) -> bool { + true + } + + fn dyn_clone(&self) -> Box { + Box::new(*self) + } +} + +/// Types that can be converted into a [`SystemSet`]. +pub trait IntoSystemSet: Sized { + type Set: SystemSet; + + fn into_system_set(self) -> Self::Set; +} + +// systems sets +impl IntoSystemSet<()> for S { + type Set = Self; + + #[inline] + fn into_system_set(self) -> Self::Set { + self + } +} + +// systems +impl IntoSystemSet<(IsFunctionSystem, In, Out, Param, Marker)> for F +where + Param: SystemParam, + F: SystemParamFunction, +{ + type Set = SystemTypeSet; + + #[inline] + fn into_system_set(self) -> Self::Set { + SystemTypeSet::new() + } +} + +// exclusive systems +impl IntoSystemSet<(IsExclusiveFunctionSystem, In, Out, Param, Marker)> + for F +where + Param: ExclusiveSystemParam, + F: ExclusiveSystemParamFunction, +{ + type Set = SystemTypeSet; + + #[inline] + fn into_system_set(self) -> Self::Set { + SystemTypeSet::new() + } +} diff --git a/crates/bevy_ecs/src/schedule_v3/state.rs b/crates/bevy_ecs/src/schedule_v3/state.rs new file mode 100644 index 0000000000000..85f357c670f7f --- /dev/null +++ b/crates/bevy_ecs/src/schedule_v3/state.rs @@ -0,0 +1,64 @@ +use std::fmt::Debug; +use std::hash::Hash; +use std::mem; + +use crate as bevy_ecs; +use crate::schedule_v3::{ScheduleLabel, SystemSet, WorldExt}; +use crate::system::Resource; +use crate::world::World; + +/// Types that can define states in a finite-state machine. +pub trait States: 'static + Send + Sync + Clone + PartialEq + Eq + Hash + Debug { + type Iter: Iterator; + + /// Returns an iterator over all the state variants. + fn states() -> Self::Iter; +} + +/// The label of a [`Schedule`](super::Schedule) that runs whenever [`State`] +/// enters this state. +#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)] +pub struct OnEnter(pub S); + +/// The label of a [`Schedule`](super::Schedule) that runs whenever [`State`] +/// exits this state. +#[derive(ScheduleLabel, Clone, Debug, PartialEq, Eq, Hash)] +pub struct OnExit(pub S); + +/// A [`SystemSet`] that will run within \ when this state is active. +/// +/// This is provided for convenience. A more general [`state_equals`](super::state_equals) +/// [condition](super::Condition) also exists for systems that need to run elsewhere. +#[derive(SystemSet, Clone, Debug, PartialEq, Eq, Hash)] +pub struct OnUpdate(pub S); + +/// A finite-state machine whose transitions have associated schedules +/// ([`OnEnter(state)`] and [`OnExit(state)`]). +/// +/// The current state value can be accessed through this resource. To *change* the state, +/// queue a transition in the [`NextState`] resource, and it will be applied by the next +/// [`apply_state_transition::`] system. +#[derive(Resource)] +pub struct State(pub S); + +/// The next state of [`State`]. +/// +/// To queue a transition, just set the contained value to `Some(next_state)`. +#[derive(Resource)] +pub struct NextState(pub Option); + +/// If a new state is queued in [`NextState`], this system: +/// - Takes the new state value from [`NextState`] and updates [`State`]. +/// - Runs the [`OnExit(exited_state)`] schedule. +/// - Runs the [`OnEnter(entered_state)`] schedule. +pub fn apply_state_transition(world: &mut World) { + if world.resource::>().0.is_some() { + let entered_state = world.resource_mut::>().0.take().unwrap(); + let exited_state = mem::replace( + &mut world.resource_mut::>().0, + entered_state.clone(), + ); + world.run_schedule(OnExit(exited_state)); + world.run_schedule(OnEnter(entered_state)); + } +} diff --git a/crates/bevy_ecs/src/system/exclusive_function_system.rs b/crates/bevy_ecs/src/system/exclusive_function_system.rs index cda67f67ce53e..1d5c7fca36781 100644 --- a/crates/bevy_ecs/src/system/exclusive_function_system.rs +++ b/crates/bevy_ecs/src/system/exclusive_function_system.rs @@ -11,7 +11,7 @@ use crate::{ world::{World, WorldId}, }; use bevy_ecs_macros::all_tuples; -use std::{borrow::Cow, marker::PhantomData}; +use std::{any::TypeId, borrow::Cow, marker::PhantomData}; /// A function system that runs with exclusive [`World`] access. /// @@ -72,6 +72,11 @@ where self.system_meta.name.clone() } + #[inline] + fn type_id(&self) -> TypeId { + TypeId::of::() + } + #[inline] fn component_access(&self) -> &Access { self.system_meta.component_access_set.combined_access() @@ -149,9 +154,15 @@ where self.system_meta.name.as_ref(), ); } + fn default_labels(&self) -> Vec { vec![self.func.as_system_label().as_label()] } + + fn default_system_sets(&self) -> Vec> { + let set = crate::schedule_v3::SystemTypeSet::::new(); + vec![Box::new(set)] + } } impl AsSystemLabel<(In, Out, Param, Marker, IsExclusiveFunctionSystem)> diff --git a/crates/bevy_ecs/src/system/function_system.rs b/crates/bevy_ecs/src/system/function_system.rs index 7ec7c24489380..c46cab36cb215 100644 --- a/crates/bevy_ecs/src/system/function_system.rs +++ b/crates/bevy_ecs/src/system/function_system.rs @@ -9,7 +9,7 @@ use crate::{ world::{World, WorldId}, }; use bevy_ecs_macros::all_tuples; -use std::{borrow::Cow, fmt::Debug, marker::PhantomData}; +use std::{any::TypeId, borrow::Cow, fmt::Debug, marker::PhantomData}; /// The metadata of a [`System`]. #[derive(Clone)] @@ -368,6 +368,11 @@ where self.system_meta.name.clone() } + #[inline] + fn type_id(&self) -> TypeId { + TypeId::of::() + } + #[inline] fn component_access(&self) -> &Access { self.system_meta.component_access_set.combined_access() @@ -453,9 +458,15 @@ where self.system_meta.name.as_ref(), ); } + fn default_labels(&self) -> Vec { vec![self.func.as_system_label().as_label()] } + + fn default_system_sets(&self) -> Vec> { + let set = crate::schedule_v3::SystemTypeSet::::new(); + vec![Box::new(set)] + } } /// A [`SystemLabel`] that was automatically generated for a system on the basis of its `TypeId`. diff --git a/crates/bevy_ecs/src/system/system.rs b/crates/bevy_ecs/src/system/system.rs index 43fcdb41dd3cb..a6034f0d53829 100644 --- a/crates/bevy_ecs/src/system/system.rs +++ b/crates/bevy_ecs/src/system/system.rs @@ -5,6 +5,8 @@ use crate::{ archetype::ArchetypeComponentId, change_detection::MAX_CHANGE_AGE, component::ComponentId, query::Access, schedule::SystemLabelId, world::World, }; + +use std::any::TypeId; use std::borrow::Cow; /// An ECS system that can be added to a [`Schedule`](crate::schedule::Schedule) @@ -26,6 +28,8 @@ pub trait System: Send + Sync + 'static { type Out; /// Returns the system's name. fn name(&self) -> Cow<'static, str>; + /// Returns the [`TypeId`] of the underlying system type. + fn type_id(&self) -> TypeId; /// Returns the system's component [`Access`]. fn component_access(&self) -> &Access; /// Returns the system's archetype component [`Access`]. @@ -64,6 +68,10 @@ pub trait System: Send + Sync + 'static { fn default_labels(&self) -> Vec { Vec::new() } + /// Returns the system's default [system sets](crate::schedule_v3::SystemSet). + fn default_system_sets(&self) -> Vec> { + Vec::new() + } /// Gets the system's last change tick fn get_last_change_tick(&self) -> u32; /// Sets the system's last change tick diff --git a/crates/bevy_ecs/src/system/system_piping.rs b/crates/bevy_ecs/src/system/system_piping.rs index 0af8caa5eed6e..efd545a46a30f 100644 --- a/crates/bevy_ecs/src/system/system_piping.rs +++ b/crates/bevy_ecs/src/system/system_piping.rs @@ -5,7 +5,7 @@ use crate::{ system::{IntoSystem, System}, world::World, }; -use std::borrow::Cow; +use std::{any::TypeId, borrow::Cow}; /// A [`System`] created by piping the output of the first system into the input of the second. /// @@ -77,6 +77,10 @@ impl> System for PipeSystem< self.name.clone() } + fn type_id(&self) -> TypeId { + TypeId::of::<(SystemA, SystemB)>() + } + fn archetype_component_access(&self) -> &Access { &self.archetype_component_access } @@ -141,6 +145,18 @@ impl> System for PipeSystem< self.system_a.set_last_change_tick(last_change_tick); self.system_b.set_last_change_tick(last_change_tick); } + + fn default_labels(&self) -> Vec { + let mut labels = self.system_a.default_labels(); + labels.extend(&self.system_b.default_labels()); + labels + } + + fn default_system_sets(&self) -> Vec> { + let mut system_sets = self.system_a.default_system_sets(); + system_sets.extend_from_slice(&self.system_b.default_system_sets()); + system_sets + } } /// An extension trait providing the [`IntoPipeSystem::pipe`] method to pass input from one system into the next. diff --git a/crates/bevy_ecs/src/world/mod.rs b/crates/bevy_ecs/src/world/mod.rs index af4807344660c..f8b41c8b34c43 100644 --- a/crates/bevy_ecs/src/world/mod.rs +++ b/crates/bevy_ecs/src/world/mod.rs @@ -2,7 +2,7 @@ mod entity_ref; mod spawn_batch; mod world_cell; -pub use crate::change_detection::{Mut, Ref}; +pub use crate::change_detection::{Mut, Ref, CHECK_TICK_THRESHOLD}; pub use entity_ref::{EntityMut, EntityRef}; pub use spawn_batch::*; pub use world_cell::*; @@ -64,6 +64,7 @@ pub struct World { pub(crate) archetype_component_access: ArchetypeComponentAccess, pub(crate) change_tick: AtomicU32, pub(crate) last_change_tick: u32, + pub(crate) last_check_tick: u32, } impl Default for World { @@ -81,6 +82,7 @@ impl Default for World { // are detected on first system runs and for direct world queries. change_tick: AtomicU32::new(1), last_change_tick: 0, + last_check_tick: 0, } } } @@ -1589,20 +1591,37 @@ impl World { self.last_change_tick } + /// Iterates all component change ticks and clamps any older than [`MAX_CHANGE_AGE`](crate::change_detection::MAX_CHANGE_AGE). + /// This prevents overflow and thus prevents false positives. + /// + /// **Note:** Does nothing if the [`World`] counter has not been incremented at least [`CHECK_TICK_THRESHOLD`](crate::change_detection::CHECK_TICK_THRESHOLD) + /// times since the previous pass. + // TODO: benchmark and optimize pub fn check_change_ticks(&mut self) { - // Iterate over all component change ticks, clamping their age to max age - // PERF: parallelize let change_tick = self.change_tick(); + if change_tick.wrapping_sub(self.last_check_tick) < CHECK_TICK_THRESHOLD { + return; + } + let Storages { ref mut tables, ref mut sparse_sets, ref mut resources, ref mut non_send_resources, } = self.storages; + + #[cfg(feature = "trace")] + let _span = bevy_utils::tracing::info_span!("check component ticks").entered(); tables.check_change_ticks(change_tick); sparse_sets.check_change_ticks(change_tick); resources.check_change_ticks(change_tick); non_send_resources.check_change_ticks(change_tick); + + if let Some(mut schedules) = self.get_resource_mut::() { + schedules.check_change_ticks(change_tick); + } + + self.last_check_tick = change_tick; } pub fn clear_entities(&mut self) { diff --git a/crates/bevy_macro_utils/src/lib.rs b/crates/bevy_macro_utils/src/lib.rs index 29f15461221c5..890e3b468a310 100644 --- a/crates/bevy_macro_utils/src/lib.rs +++ b/crates/bevy_macro_utils/src/lib.rs @@ -117,6 +117,70 @@ impl BevyManifest { } } +/// Derive a label trait +/// +/// # Args +/// +/// - `input`: The [`syn::DeriveInput`] for struct that is deriving the label trait +/// - `trait_path`: The path [`syn::Path`] to the label trait +pub fn derive_boxed_label(input: syn::DeriveInput, trait_path: &syn::Path) -> TokenStream { + let ident = input.ident; + let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl(); + let mut where_clause = where_clause.cloned().unwrap_or_else(|| syn::WhereClause { + where_token: Default::default(), + predicates: Default::default(), + }); + where_clause.predicates.push( + syn::parse2(quote! { + Self: 'static + Send + Sync + Clone + Eq + ::std::fmt::Debug + ::std::hash::Hash + }) + .unwrap(), + ); + + (quote! { + impl #impl_generics #trait_path for #ident #ty_generics #where_clause { + fn dyn_clone(&self) -> std::boxed::Box { + std::boxed::Box::new(std::clone::Clone::clone(self)) + } + } + }) + .into() +} + +/// Derive a label trait +/// +/// # Args +/// +/// - `input`: The [`syn::DeriveInput`] for struct that is deriving the label trait +/// - `trait_path`: The path [`syn::Path`] to the label trait +pub fn derive_set(input: syn::DeriveInput, trait_path: &syn::Path) -> TokenStream { + let ident = input.ident; + let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl(); + let mut where_clause = where_clause.cloned().unwrap_or_else(|| syn::WhereClause { + where_token: Default::default(), + predicates: Default::default(), + }); + where_clause.predicates.push( + syn::parse2(quote! { + Self: 'static + Send + Sync + Clone + Eq + ::std::fmt::Debug + ::std::hash::Hash + }) + .unwrap(), + ); + + (quote! { + impl #impl_generics #trait_path for #ident #ty_generics #where_clause { + fn is_system_type(&self) -> bool { + false + } + + fn dyn_clone(&self) -> std::boxed::Box { + std::boxed::Box::new(std::clone::Clone::clone(self)) + } + } + }) + .into() +} + /// Derive a label trait /// /// # Args diff --git a/crates/bevy_time/src/fixed_timestep.rs b/crates/bevy_time/src/fixed_timestep.rs index 8a63f67f92bb6..fa355706581bb 100644 --- a/crates/bevy_time/src/fixed_timestep.rs +++ b/crates/bevy_time/src/fixed_timestep.rs @@ -177,6 +177,10 @@ impl System for FixedTimestep { Cow::Borrowed(std::any::type_name::()) } + fn type_id(&self) -> std::any::TypeId { + std::any::TypeId::of::() + } + fn archetype_component_access(&self) -> &Access { self.internal_system.archetype_component_access() } diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index a61ce8d16594d..5a019dbc38455 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -14,6 +14,8 @@ tracing = { version = "0.1", default-features = false, features = ["std"] } instant = { version = "0.1", features = ["wasm-bindgen"] } uuid = { version = "1.1", features = ["v4", "serde"] } hashbrown = { version = "0.12", features = ["serde"] } +petgraph = "0.6" +thiserror = "1.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = {version = "0.2.0", features = ["js"]} diff --git a/crates/bevy_utils/src/label.rs b/crates/bevy_utils/src/label.rs index 6c504b0283a30..d3e816552bb1e 100644 --- a/crates/bevy_utils/src/label.rs +++ b/crates/bevy_utils/src/label.rs @@ -60,6 +60,47 @@ where } } +/// Macro to define a new label trait +/// +/// # Example +/// +/// ``` +/// # use bevy_utils::define_boxed_label; +/// define_boxed_label!(MyNewLabelTrait); +/// ``` +#[macro_export] +macro_rules! define_boxed_label { + ($label_trait_name:ident) => { + /// A strongly-typed label. + pub trait $label_trait_name: + 'static + Send + Sync + ::std::fmt::Debug + ::bevy_utils::label::DynHash + { + #[doc(hidden)] + fn dyn_clone(&self) -> Box; + } + + impl PartialEq for dyn $label_trait_name { + fn eq(&self, other: &Self) -> bool { + self.dyn_eq(other.as_dyn_eq()) + } + } + + impl Eq for dyn $label_trait_name {} + + impl ::std::hash::Hash for dyn $label_trait_name { + fn hash(&self, state: &mut H) { + self.dyn_hash(state); + } + } + + impl Clone for Box { + fn clone(&self) -> Self { + self.dyn_clone() + } + } + }; +} + /// Macro to define a new label trait /// /// # Example diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index e54065eb5f6bf..a6a4aebcb254c 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -15,6 +15,7 @@ pub mod label; mod short_names; pub use short_names::get_short_name; pub mod synccell; +pub mod syncunsafecell; mod default; mod float_ord; @@ -24,6 +25,8 @@ pub use default::default; pub use float_ord::*; pub use hashbrown; pub use instant::{Duration, Instant}; +pub use petgraph; +pub use thiserror; pub use tracing; pub use uuid::Uuid; diff --git a/crates/bevy_utils/src/syncunsafecell.rs b/crates/bevy_utils/src/syncunsafecell.rs new file mode 100644 index 0000000000000..07e2422d89a8a --- /dev/null +++ b/crates/bevy_utils/src/syncunsafecell.rs @@ -0,0 +1,122 @@ +//! A reimplementation of the currently unstable [`std::cell::SyncUnsafeCell`] +//! +//! [`std::cell::SyncUnsafeCell`]: https://doc.rust-lang.org/nightly/std/cell/struct.SyncUnsafeCell.html + +pub use core::cell::UnsafeCell; + +/// [`UnsafeCell`], but [`Sync`]. +/// +/// See [tracking issue](https://github.com/rust-lang/rust/issues/95439) for upcoming native impl, +/// which should replace this one entirely (except `from_mut`). +/// +/// This is just an `UnsafeCell`, except it implements `Sync` +/// if `T` implements `Sync`. +/// +/// `UnsafeCell` doesn't implement `Sync`, to prevent accidental mis-use. +/// You can use `SyncUnsafeCell` instead of `UnsafeCell` to allow it to be +/// shared between threads, if that's intentional. +/// Providing proper synchronization is still the task of the user, +/// making this type just as unsafe to use. +/// +/// See [`UnsafeCell`] for details. +#[repr(transparent)] +pub struct SyncUnsafeCell { + value: UnsafeCell, +} + +// SAFETY: `T` is Sync, caller is responsible for upholding rust safety rules +unsafe impl Sync for SyncUnsafeCell {} + +impl SyncUnsafeCell { + /// Constructs a new instance of `SyncUnsafeCell` which will wrap the specified value. + #[inline] + pub const fn new(value: T) -> Self { + Self { + value: UnsafeCell::new(value), + } + } + + /// Unwraps the value. + #[inline] + pub fn into_inner(self) -> T { + self.value.into_inner() + } +} + +impl SyncUnsafeCell { + /// Gets a mutable pointer to the wrapped value. + /// + /// This can be cast to a pointer of any kind. + /// Ensure that the access is unique (no active references, mutable or not) + /// when casting to `&mut T`, and ensure that there are no mutations + /// or mutable aliases going on when casting to `&T` + #[inline] + pub const fn get(&self) -> *mut T { + self.value.get() + } + + /// Returns a mutable reference to the underlying data. + /// + /// This call borrows the `SyncUnsafeCell` mutably (at compile-time) which + /// guarantees that we possess the only reference. + #[inline] + pub fn get_mut(&mut self) -> &mut T { + self.value.get_mut() + } + + /// Gets a mutable pointer to the wrapped value. + /// + /// See [`UnsafeCell::get`] for details. + #[inline] + pub const fn raw_get(this: *const Self) -> *mut T { + // We can just cast the pointer from `SyncUnsafeCell` to `T` because + // of #[repr(transparent)] on both SyncUnsafeCell and UnsafeCell. + // See UnsafeCell::raw_get. + this as *const T as *mut T + } + + #[inline] + /// Returns a `&SyncUnsafeCell` from a `&mut T`. + pub fn from_mut(t: &mut T) -> &SyncUnsafeCell { + // SAFETY: `&mut` ensures unique access, and `UnsafeCell` and `SyncUnsafeCell` + // have #[repr(transparent)] + unsafe { &*(t as *mut T as *const SyncUnsafeCell) } + } +} + +impl SyncUnsafeCell<[T]> { + /// Returns a `&[SyncUnsafeCell]` from a `&SyncUnsafeCell<[T]>`. + /// # Examples + /// + /// ``` + /// # use bevy_utils::syncunsafecell::SyncUnsafeCell; + /// + /// let slice: &mut [i32] = &mut [1, 2, 3]; + /// let cell_slice: &SyncUnsafeCell<[i32]> = SyncUnsafeCell::from_mut(slice); + /// let slice_cell: &[SyncUnsafeCell] = cell_slice.as_slice_of_cells(); + /// + /// assert_eq!(slice_cell.len(), 3); + /// ``` + pub fn as_slice_of_cells(&self) -> &[SyncUnsafeCell] { + // SAFETY: `UnsafeCell` and `SyncUnsafeCell` have #[repr(transparent)] + // therefore: + // - `SyncUnsafeCell` has the same layout as `T` + // - `SyncUnsafeCell<[T]>` has the same layout as `[T]` + // - `SyncUnsafeCell<[T]>` has the same layout as `[SyncUnsafeCell]` + unsafe { &*(self as *const SyncUnsafeCell<[T]> as *const [SyncUnsafeCell]) } + } +} + +impl Default for SyncUnsafeCell { + /// Creates an `SyncUnsafeCell`, with the `Default` value for T. + fn default() -> SyncUnsafeCell { + SyncUnsafeCell::new(Default::default()) + } +} + +impl From for SyncUnsafeCell { + /// Creates a new `SyncUnsafeCell` containing the given value. + fn from(t: T) -> SyncUnsafeCell { + SyncUnsafeCell::new(t) + } +}