From a8102e93c621a1b3dc7e2c7102b687b348d36a54 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 8 Apr 2024 14:57:46 +0800 Subject: [PATCH 01/13] init --- crates/bevy_ecs/src/query/iter.rs | 48 ----------- crates/bevy_ecs/src/query/par_iter.rs | 31 ++++++- crates/bevy_ecs/src/query/state.rs | 29 ++++--- crates/bevy_pbr/src/render/mesh.rs | 28 +++---- crates/bevy_render/src/view/visibility/mod.rs | 81 ++++++++++--------- crates/bevy_utils/src/parallel_queue.rs | 48 +++++++++++ 6 files changed, 151 insertions(+), 114 deletions(-) diff --git a/crates/bevy_ecs/src/query/iter.rs b/crates/bevy_ecs/src/query/iter.rs index 2cfcf37031c1d..b78612ae75797 100644 --- a/crates/bevy_ecs/src/query/iter.rs +++ b/crates/bevy_ecs/src/query/iter.rs @@ -41,54 +41,6 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryIter<'w, 's, D, F> { } } - /// Executes the equivalent of [`Iterator::for_each`] over a contiguous segment - /// from a table. - /// - /// # Safety - /// - all `rows` must be in `[0, table.entity_count)`. - /// - `table` must match D and F - /// - Both `D::IS_DENSE` and `F::IS_DENSE` must be true. - #[inline] - #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] - pub(super) unsafe fn for_each_in_table_range( - &mut self, - func: &mut Func, - table: &'w Table, - rows: Range, - ) where - Func: FnMut(D::Item<'w>), - { - // SAFETY: Caller assures that D::IS_DENSE and F::IS_DENSE are true, that table matches D and F - // and all indices in rows are in range. - unsafe { - self.fold_over_table_range((), &mut |_, item| func(item), table, rows); - } - } - - /// Executes the equivalent of [`Iterator::for_each`] over a contiguous segment - /// from an archetype. - /// - /// # Safety - /// - all `indices` must be in `[0, archetype.len())`. - /// - `archetype` must match D and F - /// - Either `D::IS_DENSE` or `F::IS_DENSE` must be false. - #[inline] - #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] - pub(super) unsafe fn for_each_in_archetype_range( - &mut self, - func: &mut Func, - archetype: &'w Archetype, - rows: Range, - ) where - Func: FnMut(D::Item<'w>), - { - // SAFETY: Caller assures that either D::IS_DENSE or F::IS_DENSE are false, that archetype matches D and F - // and all indices in rows are in range. - unsafe { - self.fold_over_archetype_range((), &mut |_, item| func(item), archetype, rows); - } - } - /// Executes the equivalent of [`Iterator::fold`] over a contiguous segment /// from an table. /// diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 1d18b69ca0469..887b40faef5a9 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -109,6 +109,28 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool #[inline] pub fn for_each) + Send + Sync + Clone>(self, func: FN) { + self.for_each_with_init(|| {}, |_, item| func(item)) + } + + /// Runs `func` on each query result in parallel on a value returned by `init`. + /// + /// Like rayon, `init` function will be called only when necessary for a value to + /// be paired with the group of items in each bevy's task. + /// + /// # Panics + /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being + /// initialized and run from the ECS scheduler, this should never panic. + /// + /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + pub fn for_each_with_init(self, init: INIT, func: FN) + where + FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone, + INIT: Fn() -> T + Sync + Send + Clone, + { + let func = |mut init, item| { + func(&mut init, item); + init + }; #[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] { // SAFETY: @@ -118,9 +140,10 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist // at the same time. unsafe { + let init = init(); self.state .iter_unchecked_manual(self.world, self.last_run, self.this_run) - .for_each(func); + .fold(init, func); } } #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] @@ -129,16 +152,18 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { if thread_count <= 1 { // SAFETY: See the safety comment above. unsafe { + let init = init(); self.state .iter_unchecked_manual(self.world, self.last_run, self.this_run) - .for_each(func); + .fold(init, func); } } else { // Need a batch size of at least 1. let batch_size = self.get_batch_size(thread_count).max(1); // SAFETY: See the safety comment above. unsafe { - self.state.par_for_each_unchecked_manual( + self.state.par_fold_init_unchecked_manual( + init, self.world, batch_size, func, diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 84fd805fa6ca9..c952c9d27af20 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1374,17 +1374,18 @@ impl QueryState { /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] - pub(crate) unsafe fn par_for_each_unchecked_manual< - 'w, - FN: Fn(D::Item<'w>) + Send + Sync + Clone, - >( + pub(crate) unsafe fn par_fold_init_unchecked_manual<'w, T, FN, INIT>( &self, + init_accum: INIT, world: UnsafeWorldCell<'w>, batch_size: usize, func: FN, last_run: Tick, this_run: Tick, - ) { + ) where + FN: Fn(T, D::Item<'w>) -> T + Send + Sync + Clone, + INIT: Fn() -> T + Sync + Send + Clone, + { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual use arrayvec::ArrayVec; @@ -1403,19 +1404,27 @@ impl QueryState { } let queue = std::mem::take(queue); let mut func = func.clone(); + let init_accum = init_accum.clone(); scope.spawn(async move { #[cfg(feature = "trace")] let _span = self.par_iter_span.enter(); let mut iter = self.iter_unchecked_manual(world, last_run, this_run); + let mut accum = init_accum(); for storage_id in queue { if D::IS_DENSE && F::IS_DENSE { let id = storage_id.table_id; let table = &world.storages().tables.get(id).debug_checked_unwrap(); - iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()); + accum = iter.fold_over_table_range( + accum, + &mut func, + table, + 0..table.entity_count(), + ); } else { let id = storage_id.archetype_id; let archetype = world.archetypes().get(id).debug_checked_unwrap(); - iter.for_each_in_archetype_range( + accum = iter.fold_over_archetype_range( + accum, &mut func, archetype, 0..archetype.len(), @@ -1429,21 +1438,23 @@ impl QueryState { let submit_single = |count, storage_id: StorageId| { for offset in (0..count).step_by(batch_size) { let mut func = func.clone(); + let init_accum = init_accum.clone(); let len = batch_size.min(count - offset); let batch = offset..offset + len; scope.spawn(async move { #[cfg(feature = "trace")] let _span = self.par_iter_span.enter(); + let accum = init_accum(); if D::IS_DENSE && F::IS_DENSE { let id = storage_id.table_id; let table = world.storages().tables.get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) - .for_each_in_table_range(&mut func, table, batch); + .fold_over_table_range(accum, &mut func, table, batch); } else { let id = storage_id.archetype_id; let archetype = world.archetypes().get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) - .for_each_in_archetype_range(&mut func, archetype, batch); + .fold_over_archetype_range(accum, &mut func, archetype, batch); } }); } diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 8319b30b68a96..51ef85b2539b5 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -284,8 +284,10 @@ pub fn extract_meshes( )>, >, ) { - meshes_query.par_iter().for_each( - |( + meshes_query.par_iter().for_each_with_init( + || thread_local_queues.guard(), + |queue, + ( entity, view_visibility, transform, @@ -317,18 +319,16 @@ pub fn extract_meshes( previous_transform: (&previous_transform).into(), flags: flags.bits(), }; - thread_local_queues.scope(|queue| { - queue.push(( - entity, - RenderMeshInstance { - mesh_asset_id: handle.id(), - transforms, - shadow_caster: !not_shadow_caster, - material_bind_group_id: AtomicMaterialBindGroupId::default(), - automatic_batching: !no_automatic_batching, - }, - )); - }); + queue.push(( + entity, + RenderMeshInstance { + mesh_asset_id: handle.id(), + transforms, + shadow_caster: !not_shadow_caster, + material_bind_group_id: AtomicMaterialBindGroupId::default(), + automatic_batching: !no_automatic_batching, + }, + )); }, ); diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index ee8ec2e7d4640..c156db81a8331 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -395,52 +395,53 @@ pub fn check_visibility( let view_mask = maybe_view_mask.copied().unwrap_or_default(); visible_entities.entities.clear(); - visible_aabb_query.par_iter_mut().for_each(|query_item| { - let ( - entity, - inherited_visibility, - mut view_visibility, - maybe_entity_mask, - maybe_model_aabb, - transform, - no_frustum_culling, - ) = query_item; - - // Skip computing visibility for entities that are configured to be hidden. - // ViewVisibility has already been reset in `reset_view_visibility`. - if !inherited_visibility.get() { - return; - } + visible_aabb_query.par_iter_mut().for_each_with_init( + || thread_queues.guard(), + |queue, query_item| { + let ( + entity, + inherited_visibility, + mut view_visibility, + maybe_entity_mask, + maybe_model_aabb, + transform, + no_frustum_culling, + ) = query_item; + + // Skip computing visibility for entities that are configured to be hidden. + // ViewVisibility has already been reset in `reset_view_visibility`. + if !inherited_visibility.get() { + return; + } - let entity_mask = maybe_entity_mask.copied().unwrap_or_default(); - if !view_mask.intersects(&entity_mask) { - return; - } + let entity_mask = maybe_entity_mask.copied().unwrap_or_default(); + if !view_mask.intersects(&entity_mask) { + return; + } - // If we have an aabb, do frustum culling - if !no_frustum_culling { - if let Some(model_aabb) = maybe_model_aabb { - let model = transform.affine(); - let model_sphere = Sphere { - center: model.transform_point3a(model_aabb.center), - radius: transform.radius_vec3a(model_aabb.half_extents), - }; - // Do quick sphere-based frustum culling - if !frustum.intersects_sphere(&model_sphere, false) { - return; - } - // Do aabb-based frustum culling - if !frustum.intersects_obb(model_aabb, &model, true, false) { - return; + // If we have an aabb, do frustum culling + if !no_frustum_culling { + if let Some(model_aabb) = maybe_model_aabb { + let model = transform.affine(); + let model_sphere = Sphere { + center: model.transform_point3a(model_aabb.center), + radius: transform.radius_vec3a(model_aabb.half_extents), + }; + // Do quick sphere-based frustum culling + if !frustum.intersects_sphere(&model_sphere, false) { + return; + } + // Do aabb-based frustum culling + if !frustum.intersects_obb(model_aabb, &model, true, false) { + return; + } } } - } - view_visibility.set(); - thread_queues.scope(|queue| { + view_visibility.set(); queue.push(entity); - }); - }); + }, + ); visible_entities.entities.clear(); thread_queues.drain_into(&mut visible_entities.entities); diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index a0e342caff72d..9c8bfcb960462 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -1,4 +1,9 @@ use core::cell::Cell; +use std::{ + mem, + ops::{Deref, DerefMut}, + ptr::drop_in_place, +}; use thread_local::ThreadLocal; /// A cohesive set of thread-local values of a given type. @@ -9,6 +14,37 @@ pub struct Parallel { locals: ThreadLocal>, } +/// A scope guard of a `Parallel`, when this struct is dropped ,the value will writeback to its `Parallel` +pub struct ParallelGuard<'a, T: Send + Default> { + value: T, + parallel: &'a Parallel, +} +impl<'a, T: Send + Default> Drop for ParallelGuard<'a, T> { + fn drop(&mut self) { + let cell = self.parallel.locals.get().unwrap(); + let mut value = T::default(); + std::mem::swap(&mut value, &mut self.value); + cell.set(value); + // SAFETY: + // value is longer needed + unsafe { + drop_in_place(&mut self.value); + } + } +} +impl<'a, T: Send + Default> Deref for ParallelGuard<'a, T> { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + &self.value + } +} +impl<'a, T: Send + Default> DerefMut for ParallelGuard<'a, T> { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } +} impl Parallel { /// Gets a mutable iterator over all of the per-thread queues. pub fn iter_mut(&mut self) -> impl Iterator { @@ -32,6 +68,18 @@ impl Parallel { cell.set(value); ret } + + /// Get the guard of Parallel + /// + /// If there is no thread-local value, it will be initialized to it's default. + pub fn guard<'a>(&'a self) -> ParallelGuard<'a, T> { + let cell = self.locals.get_or_default(); + let value = cell.take(); + ParallelGuard { + value, + parallel: self, + } + } } impl Parallel From a85544911dea4bfbf093f1cd0752af3eea784b21 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 8 Apr 2024 14:58:17 +0800 Subject: [PATCH 02/13] - --- crates/bevy_utils/src/parallel_queue.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 9c8bfcb960462..9d6e45efb9e07 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -1,6 +1,5 @@ use core::cell::Cell; use std::{ - mem, ops::{Deref, DerefMut}, ptr::drop_in_place, }; From 8339ebf57cbcda211a2fa7d026a803298bb06d23 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 8 Apr 2024 16:09:31 +0800 Subject: [PATCH 03/13] fix cli --- crates/bevy_ecs/src/query/par_iter.rs | 2 +- crates/bevy_utils/src/parallel_queue.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 887b40faef5a9..04efbe5869c48 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -109,7 +109,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool #[inline] pub fn for_each) + Send + Sync + Clone>(self, func: FN) { - self.for_each_with_init(|| {}, |_, item| func(item)) + self.for_each_with_init(|| {}, |_, item| func(item)); } /// Runs `func` on each query result in parallel on a value returned by `init`. diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 9d6e45efb9e07..14980d89b2649 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -71,7 +71,7 @@ impl Parallel { /// Get the guard of Parallel /// /// If there is no thread-local value, it will be initialized to it's default. - pub fn guard<'a>(&'a self) -> ParallelGuard<'a, T> { + pub fn guard(&self) -> ParallelGuard<'_, T> { let cell = self.locals.get_or_default(); let value = cell.take(); ParallelGuard { From fe1b9fc55666916e466fe6279def703560c1bb9e Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 8 Apr 2024 16:55:17 +0800 Subject: [PATCH 04/13] rename --- crates/bevy_ecs/src/query/par_iter.rs | 4 ++-- crates/bevy_pbr/src/render/mesh.rs | 2 +- crates/bevy_render/src/view/visibility/mod.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 04efbe5869c48..9680c10d1e3c6 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -109,7 +109,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool #[inline] pub fn for_each) + Send + Sync + Clone>(self, func: FN) { - self.for_each_with_init(|| {}, |_, item| func(item)); + self.for_each_init(|| {}, |_, item| func(item)); } /// Runs `func` on each query result in parallel on a value returned by `init`. @@ -122,7 +122,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// initialized and run from the ECS scheduler, this should never panic. /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool - pub fn for_each_with_init(self, init: INIT, func: FN) + pub fn for_each_init(self, init: INIT, func: FN) where FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone, INIT: Fn() -> T + Sync + Send + Clone, diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 51ef85b2539b5..76faad978a878 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -284,7 +284,7 @@ pub fn extract_meshes( )>, >, ) { - meshes_query.par_iter().for_each_with_init( + meshes_query.par_iter().for_each_init( || thread_local_queues.guard(), |queue, ( diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index c156db81a8331..82c1fadd846f0 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -395,7 +395,7 @@ pub fn check_visibility( let view_mask = maybe_view_mask.copied().unwrap_or_default(); visible_entities.entities.clear(); - visible_aabb_query.par_iter_mut().for_each_with_init( + visible_aabb_query.par_iter_mut().for_each_init( || thread_queues.guard(), |queue, query_item| { let ( From efc6faf6b3143826a292accd37386de184c43649 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 9 Apr 2024 23:36:51 +0800 Subject: [PATCH 05/13] address review --- crates/bevy_ecs/src/query/iter.rs | 2 +- crates/bevy_ecs/src/query/par_iter.rs | 5 +- crates/bevy_ecs/src/query/state.rs | 2 +- crates/bevy_pbr/src/render/mesh.rs | 2 +- crates/bevy_render/src/view/visibility/mod.rs | 2 +- crates/bevy_utils/src/parallel_queue.rs | 55 +++---------------- 6 files changed, 16 insertions(+), 52 deletions(-) diff --git a/crates/bevy_ecs/src/query/iter.rs b/crates/bevy_ecs/src/query/iter.rs index b78612ae75797..ae6eab9162321 100644 --- a/crates/bevy_ecs/src/query/iter.rs +++ b/crates/bevy_ecs/src/query/iter.rs @@ -704,7 +704,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryIterationCursor<'w, 's, D, F> { } // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: - // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::par_for_each_unchecked_manual + // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::par_fold_init_unchecked_manual /// # Safety /// `tables` and `archetypes` must belong to the same world that the [`QueryIterationCursor`] /// was initialized for. diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 9680c10d1e3c6..70d107bd74c9a 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -122,6 +122,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// initialized and run from the ECS scheduler, this should never panic. /// /// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool + #[inline] pub fn for_each_init(self, init: INIT, func: FN) where FN: Fn(&mut T, QueryItem<'w, D>) + Send + Sync + Clone, @@ -133,6 +134,7 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { }; #[cfg(any(target_arch = "wasm32", not(feature = "multi-threaded")))] { + let init = init(); // SAFETY: // This method can only be called once per instance of QueryParIter, // which ensures that mutable queries cannot be executed multiple times at once. @@ -140,7 +142,6 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { // Query or a World, which ensures that multiple aliasing QueryParIters cannot exist // at the same time. unsafe { - let init = init(); self.state .iter_unchecked_manual(self.world, self.last_run, self.this_run) .fold(init, func); @@ -150,9 +151,9 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { { let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num(); if thread_count <= 1 { + let init = init(); // SAFETY: See the safety comment above. unsafe { - let init = init(); self.state .iter_unchecked_manual(self.world, self.last_run, self.this_run) .fold(init, func); diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index c952c9d27af20..1b4e79eb265a7 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1387,7 +1387,7 @@ impl QueryState { INIT: Fn() -> T + Sync + Send + Clone, { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: - // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual + // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter,QueryState::par_fold_init_unchecked_manual use arrayvec::ArrayVec; bevy_tasks::ComputeTaskPool::get().scope(|scope| { diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 76faad978a878..b76d4f6f3f4ac 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -285,7 +285,7 @@ pub fn extract_meshes( >, ) { meshes_query.par_iter().for_each_init( - || thread_local_queues.guard(), + || thread_local_queues.borrow_mut(), |queue, ( entity, diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index 82c1fadd846f0..1069575d46d07 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -396,7 +396,7 @@ pub fn check_visibility( visible_entities.entities.clear(); visible_aabb_query.par_iter_mut().for_each_init( - || thread_queues.guard(), + || thread_queues.borrow_mut(), |queue, query_item| { let ( entity, diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 14980d89b2649..b4356b16bf0c4 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -1,7 +1,6 @@ -use core::cell::Cell; use std::{ - ops::{Deref, DerefMut}, - ptr::drop_in_place, + cell::{RefCell, RefMut}, + ops::DerefMut, }; use thread_local::ThreadLocal; @@ -10,40 +9,11 @@ use thread_local::ThreadLocal; /// Mutable references can be fetched if `T: Default` via [`Parallel::scope`]. #[derive(Default)] pub struct Parallel { - locals: ThreadLocal>, + locals: ThreadLocal>, } /// A scope guard of a `Parallel`, when this struct is dropped ,the value will writeback to its `Parallel` -pub struct ParallelGuard<'a, T: Send + Default> { - value: T, - parallel: &'a Parallel, -} -impl<'a, T: Send + Default> Drop for ParallelGuard<'a, T> { - fn drop(&mut self) { - let cell = self.parallel.locals.get().unwrap(); - let mut value = T::default(); - std::mem::swap(&mut value, &mut self.value); - cell.set(value); - // SAFETY: - // value is longer needed - unsafe { - drop_in_place(&mut self.value); - } - } -} -impl<'a, T: Send + Default> Deref for ParallelGuard<'a, T> { - type Target = T; - #[inline] - fn deref(&self) -> &Self::Target { - &self.value - } -} -impl<'a, T: Send + Default> DerefMut for ParallelGuard<'a, T> { - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.value - } -} + impl Parallel { /// Gets a mutable iterator over all of the per-thread queues. pub fn iter_mut(&mut self) -> impl Iterator { @@ -61,23 +31,16 @@ impl Parallel { /// /// If there is no thread-local value, it will be initialized to it's default. pub fn scope(&self, f: impl FnOnce(&mut T) -> R) -> R { - let cell = self.locals.get_or_default(); - let mut value = cell.take(); - let ret = f(&mut value); - cell.set(value); + let mut cell = self.locals.get_or_default().borrow_mut(); + let ret = f(cell.deref_mut()); ret } - /// Get the guard of Parallel + /// Mutably borrows the thread-local value. /// /// If there is no thread-local value, it will be initialized to it's default. - pub fn guard(&self) -> ParallelGuard<'_, T> { - let cell = self.locals.get_or_default(); - let value = cell.take(); - ParallelGuard { - value, - parallel: self, - } + pub fn borrow_mut(&self) -> RefMut<'_, T> { + self.locals.get_or_default().borrow_mut() } } From 0eca9bce2872e21f6b18d9c2b80774e321e22232 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 10 Apr 2024 00:00:41 +0800 Subject: [PATCH 06/13] add example --- crates/bevy_ecs/src/query/par_iter.rs | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index 70d107bd74c9a..fd1cb95df6040 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -114,8 +114,28 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// Runs `func` on each query result in parallel on a value returned by `init`. /// - /// Like rayon, `init` function will be called only when necessary for a value to + /// `init` function will be called only when necessary for a value to /// be paired with the group of items in each bevy's task. + /// its useful to init a thread-local value for each task. + /// + /// # Example + /// + /// ``` + /// use bevy_ecs::preclude::*; + /// use bevy_utils::Parallel; + /// #[derive(Component)] + /// struct T; + /// fn system(query: Query<&T>){ + /// let queue: Parallel = Parallel::default(); + /// // queue.borrow_mut() will get or create a thread_local queue for each task/thread; + /// query.par_iter().for_each_init(|| queue.borrow_mut(),|local_queue,item| { + /// local_queue += 1; + /// }); + /// + /// // collect value from every thread + /// let entity_count = queue.drain().sum(); + /// } + /// ``` /// /// # Panics /// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being From 084f3761594810dc842b388e48c24c5a4c15a4b9 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 10 Apr 2024 00:20:39 +0800 Subject: [PATCH 07/13] fix doc --- crates/bevy_ecs/src/query/par_iter.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index fd1cb95df6040..d0724b969a067 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -121,19 +121,19 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// # Example /// /// ``` - /// use bevy_ecs::preclude::*; /// use bevy_utils::Parallel; + /// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query}; /// #[derive(Component)] /// struct T; /// fn system(query: Query<&T>){ - /// let queue: Parallel = Parallel::default(); + /// let mut queue: Parallel = Parallel::default(); /// // queue.borrow_mut() will get or create a thread_local queue for each task/thread; /// query.par_iter().for_each_init(|| queue.borrow_mut(),|local_queue,item| { - /// local_queue += 1; + /// **local_queue += 1; /// }); /// /// // collect value from every thread - /// let entity_count = queue.drain().sum(); + /// let entity_count: usize = queue.iter_mut().map(|v| *v).sum(); /// } /// ``` /// From 71b0872cb2539adc2b3b1f856880de79628b4af8 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 10 Apr 2024 00:35:14 +0800 Subject: [PATCH 08/13] rename --- crates/bevy_ecs/src/query/par_iter.rs | 4 ++-- crates/bevy_pbr/src/render/mesh.rs | 2 +- crates/bevy_render/src/view/visibility/mod.rs | 2 +- crates/bevy_utils/src/parallel_queue.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index d0724b969a067..b3a75ae832e08 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -127,8 +127,8 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// struct T; /// fn system(query: Query<&T>){ /// let mut queue: Parallel = Parallel::default(); - /// // queue.borrow_mut() will get or create a thread_local queue for each task/thread; - /// query.par_iter().for_each_init(|| queue.borrow_mut(),|local_queue,item| { + /// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread; + /// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue,item| { /// **local_queue += 1; /// }); /// diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index b76d4f6f3f4ac..f72424543cd02 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -285,7 +285,7 @@ pub fn extract_meshes( >, ) { meshes_query.par_iter().for_each_init( - || thread_local_queues.borrow_mut(), + || thread_local_queues.borrow_local_mut(), |queue, ( entity, diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index 1069575d46d07..405f6ac1b563a 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -396,7 +396,7 @@ pub fn check_visibility( visible_entities.entities.clear(); visible_aabb_query.par_iter_mut().for_each_init( - || thread_queues.borrow_mut(), + || thread_queues.borrow_local_mut(), |queue, query_item| { let ( entity, diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index b4356b16bf0c4..1fb8355f591df 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -39,7 +39,7 @@ impl Parallel { /// Mutably borrows the thread-local value. /// /// If there is no thread-local value, it will be initialized to it's default. - pub fn borrow_mut(&self) -> RefMut<'_, T> { + pub fn borrow_local_mut(&self) -> RefMut<'_, T> { self.locals.get_or_default().borrow_mut() } } From 395632201f4702fcd4b489b0034eca7bd6bf2cf5 Mon Sep 17 00:00:00 2001 From: re0312 <45868716+re0312@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:50:05 +0800 Subject: [PATCH 09/13] Update crates/bevy_utils/src/parallel_queue.rs Co-authored-by: James Liu --- crates/bevy_utils/src/parallel_queue.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 1fb8355f591df..18334e1d97ba2 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -13,7 +13,6 @@ pub struct Parallel { } /// A scope guard of a `Parallel`, when this struct is dropped ,the value will writeback to its `Parallel` - impl Parallel { /// Gets a mutable iterator over all of the per-thread queues. pub fn iter_mut(&mut self) -> impl Iterator { From d1d11e49e9bf20f605594c593a259bf5133907d8 Mon Sep 17 00:00:00 2001 From: re0312 <45868716+re0312@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:50:23 +0800 Subject: [PATCH 10/13] Update crates/bevy_utils/src/parallel_queue.rs Co-authored-by: James Liu --- crates/bevy_utils/src/parallel_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index 18334e1d97ba2..e71a1325afeb8 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -38,7 +38,7 @@ impl Parallel { /// Mutably borrows the thread-local value. /// /// If there is no thread-local value, it will be initialized to it's default. - pub fn borrow_local_mut(&self) -> RefMut<'_, T> { + pub fn borrow_local_mut(&self) -> impl DerefMut + '_ { self.locals.get_or_default().borrow_mut() } } From 9d07f903e9208312e07123f3bd8964bac05b5c44 Mon Sep 17 00:00:00 2001 From: re0312 <45868716+re0312@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:50:32 +0800 Subject: [PATCH 11/13] Update crates/bevy_ecs/src/query/par_iter.rs Co-authored-by: James Liu --- crates/bevy_ecs/src/query/par_iter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/query/par_iter.rs b/crates/bevy_ecs/src/query/par_iter.rs index b3a75ae832e08..dc1494b25f940 100644 --- a/crates/bevy_ecs/src/query/par_iter.rs +++ b/crates/bevy_ecs/src/query/par_iter.rs @@ -114,9 +114,9 @@ impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> { /// Runs `func` on each query result in parallel on a value returned by `init`. /// - /// `init` function will be called only when necessary for a value to - /// be paired with the group of items in each bevy's task. - /// its useful to init a thread-local value for each task. + /// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread. + /// Callers should avoid using this function as if it were a a parallel version + /// of [`Iterator::fold`]. /// /// # Example /// From 6c13f127b034995df0c3e564a90f8e5e026e275d Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 10 Apr 2024 09:59:18 +0800 Subject: [PATCH 12/13] fix cli --- crates/bevy_utils/src/parallel_queue.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/bevy_utils/src/parallel_queue.rs b/crates/bevy_utils/src/parallel_queue.rs index e71a1325afeb8..3bf62b5ab43f2 100644 --- a/crates/bevy_utils/src/parallel_queue.rs +++ b/crates/bevy_utils/src/parallel_queue.rs @@ -1,7 +1,4 @@ -use std::{ - cell::{RefCell, RefMut}, - ops::DerefMut, -}; +use std::{cell::RefCell, ops::DerefMut}; use thread_local::ThreadLocal; /// A cohesive set of thread-local values of a given type. From e17283d3c990a1c7d06133a0958dae4a6a4a3458 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 23 Apr 2024 16:26:46 +0800 Subject: [PATCH 13/13] resolve conflict --- crates/bevy_pbr/src/render/mesh.rs | 56 +++++++++---------- crates/bevy_render/src/view/visibility/mod.rs | 22 ++++---- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/crates/bevy_pbr/src/render/mesh.rs b/crates/bevy_pbr/src/render/mesh.rs index 1546656f77154..959c77ed6881a 100644 --- a/crates/bevy_pbr/src/render/mesh.rs +++ b/crates/bevy_pbr/src/render/mesh.rs @@ -596,7 +596,7 @@ pub fn extract_meshes_for_cpu_building( >, ) { meshes_query.par_iter().for_each_init( - || thread_local_queues.borrow_local_mut(), + || render_mesh_instance_queues.borrow_local_mut(), |queue, ( entity, @@ -623,23 +623,19 @@ pub fn extract_meshes_for_cpu_building( no_automatic_batching, ); - render_mesh_instance_queues.scope(|queue| { - let transform = transform.affine(); - queue.push(( - entity, - RenderMeshInstanceCpu { - transforms: MeshTransforms { - transform: (&transform).into(), - previous_transform: (&previous_transform - .map(|t| t.0) - .unwrap_or(transform)) - .into(), - flags: mesh_flags.bits(), - }, - shared, + let transform = transform.affine(); + queue.push(( + entity, + RenderMeshInstanceCpu { + transforms: MeshTransforms { + transform: (&transform).into(), + previous_transform: (&previous_transform.map(|t| t.0).unwrap_or(transform)) + .into(), + flags: mesh_flags.bits(), }, - )); - }); + shared, + }, + )); }, ); @@ -685,8 +681,10 @@ pub fn extract_meshes_for_gpu_building( )>, >, ) { - meshes_query.par_iter().for_each( - |( + meshes_query.par_iter().for_each_init( + || render_mesh_instance_queues.borrow_local_mut(), + |queue, + ( entity, view_visibility, transform, @@ -715,17 +713,15 @@ pub fn extract_meshes_for_gpu_building( let lightmap_uv_rect = lightmap::pack_lightmap_uv_rect(lightmap.map(|lightmap| lightmap.uv_rect)); - render_mesh_instance_queues.scope(|queue| { - queue.push(( - entity, - RenderMeshInstanceGpuBuilder { - shared, - transform: (&transform.affine()).into(), - lightmap_uv_rect, - mesh_flags, - }, - )); - }); + queue.push(( + entity, + RenderMeshInstanceGpuBuilder { + shared, + transform: (&transform.affine()).into(), + lightmap_uv_rect, + mesh_flags, + }, + )); }, ); diff --git a/crates/bevy_render/src/view/visibility/mod.rs b/crates/bevy_render/src/view/visibility/mod.rs index e1f8b71f0efdb..aa2982ca423a1 100644 --- a/crates/bevy_render/src/view/visibility/mod.rs +++ b/crates/bevy_render/src/view/visibility/mod.rs @@ -453,16 +453,18 @@ pub fn check_visibility( let view_mask = maybe_view_mask.copied().unwrap_or_default(); - visible_aabb_query.par_iter_mut().for_each(|query_item| { - let ( - entity, - inherited_visibility, - mut view_visibility, - maybe_entity_mask, - maybe_model_aabb, - transform, - no_frustum_culling, - ) = query_item; + visible_aabb_query.par_iter_mut().for_each_init( + || thread_queues.borrow_local_mut(), + |queue, query_item| { + let ( + entity, + inherited_visibility, + mut view_visibility, + maybe_entity_mask, + maybe_model_aabb, + transform, + no_frustum_culling, + ) = query_item; // Skip computing visibility for entities that are configured to be hidden. // ViewVisibility has already been reset in `reset_view_visibility`.