use crate::{1batching::BatchingStrategy,2component::Tick,3entity::{EntityEquivalent, UniqueEntityEquivalentVec},4world::unsafe_world_cell::UnsafeWorldCell,5};67use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};89use alloc::vec::Vec;1011/// A parallel iterator over query results of a [`Query`](crate::system::Query).12///13/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and14/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.15pub struct QueryParIter<'w, 's, D: QueryData, F: QueryFilter> {16pub(crate) world: UnsafeWorldCell<'w>,17pub(crate) state: &'s QueryState<D, F>,18pub(crate) last_run: Tick,19pub(crate) this_run: Tick,20pub(crate) batching_strategy: BatchingStrategy,21}2223impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {24/// Changes the batching strategy used when iterating.25///26/// For more information on how this affects the resultant iteration, see27/// [`BatchingStrategy`].28pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {29self.batching_strategy = strategy;30self31}3233/// Runs `func` on each query result in parallel.34///35/// # Panics36/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being37/// initialized and run from the ECS scheduler, this should never panic.38///39/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool40#[inline]41pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {42self.for_each_init(|| {}, |_, item| func(item));43}4445/// Runs `func` on each query result in parallel on a value returned by `init`.46///47/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.48/// Callers should avoid using this function as if it were a parallel version49/// of [`Iterator::fold`].50///51/// # Example52///53/// ```54/// use bevy_utils::Parallel;55/// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};56/// #[derive(Component)]57/// struct T;58/// fn system(query: Query<&T>){59/// let mut queue: Parallel<usize> = Parallel::default();60/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;61/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {62/// **local_queue += 1;63/// });64///65/// // collect value from every thread66/// let entity_count: usize = queue.iter_mut().map(|v| *v).sum();67/// }68/// ```69///70/// # Panics71/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being72/// initialized and run from the ECS scheduler, this should never panic.73///74/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool75#[inline]76pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)77where78FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,79INIT: Fn() -> T + Sync + Send + Clone,80{81let func = |mut init, item| {82func(&mut init, item);83init84};85#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]86{87let init = init();88// SAFETY:89// This method can only be called once per instance of QueryParIter,90// which ensures that mutable queries cannot be executed multiple times at once.91// Mutable instances of QueryParIter can only be created via an exclusive borrow of a92// Query or a World, which ensures that multiple aliasing QueryParIters cannot exist93// at the same time.94unsafe {95self.state96.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)97.into_iter()98.fold(init, func);99}100}101#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]102{103let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();104if thread_count <= 1 {105let init = init();106// SAFETY: See the safety comment above.107unsafe {108self.state109.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)110.into_iter()111.fold(init, func);112}113} else {114// Need a batch size of at least 1.115let batch_size = self.get_batch_size(thread_count).max(1);116// SAFETY: See the safety comment above.117unsafe {118self.state.par_fold_init_unchecked_manual(119init,120self.world,121batch_size,122func,123self.last_run,124self.this_run,125);126}127}128}129}130131#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]132fn get_batch_size(&self, thread_count: usize) -> u32 {133let max_items = || {134let id_iter = self.state.matched_storage_ids.iter();135if self.state.is_dense {136// SAFETY: We only access table metadata.137let tables = unsafe { &self.world.world_metadata().storages().tables };138id_iter139// SAFETY: The if check ensures that matched_storage_ids stores TableIds140.map(|id| unsafe { tables[id.table_id].entity_count() })141.max()142} else {143let archetypes = &self.world.archetypes();144id_iter145// SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds146.map(|id| unsafe { archetypes[id.archetype_id].len() })147.max()148}149.map(|v| v as usize)150.unwrap_or(0)151};152self.batching_strategy153.calc_batch_size(max_items, thread_count) as u32154}155}156157/// A parallel iterator over the unique query items generated from an [`Entity`] list.158///159/// This struct is created by the [`Query::par_iter_many`] method.160///161/// [`Entity`]: crate::entity::Entity162/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many163pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent> {164pub(crate) world: UnsafeWorldCell<'w>,165pub(crate) state: &'s QueryState<D, F>,166pub(crate) entity_list: Vec<E>,167pub(crate) last_run: Tick,168pub(crate) this_run: Tick,169pub(crate) batching_strategy: BatchingStrategy,170}171172impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>173QueryParManyIter<'w, 's, D, F, E>174{175/// Changes the batching strategy used when iterating.176///177/// For more information on how this affects the resultant iteration, see178/// [`BatchingStrategy`].179pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {180self.batching_strategy = strategy;181self182}183184/// Runs `func` on each query result in parallel.185///186/// # Panics187/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being188/// initialized and run from the ECS scheduler, this should never panic.189///190/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool191#[inline]192pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {193self.for_each_init(|| {}, |_, item| func(item));194}195196/// Runs `func` on each query result in parallel on a value returned by `init`.197///198/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.199/// Callers should avoid using this function as if it were a parallel version200/// of [`Iterator::fold`].201///202/// # Example203///204/// ```205/// use bevy_utils::Parallel;206/// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};207/// # use core::slice;208/// use bevy_platform::prelude::Vec;209/// # fn some_expensive_operation(_item: &T) -> usize {210/// # 0211/// # }212///213/// #[derive(Component)]214/// struct T;215///216/// #[derive(Resource)]217/// struct V(Vec<Entity>);218///219/// impl<'a> IntoIterator for &'a V {220/// // ...221/// # type Item = &'a Entity;222/// # type IntoIter = slice::Iter<'a, Entity>;223/// #224/// # fn into_iter(self) -> Self::IntoIter {225/// # self.0.iter()226/// # }227/// }228///229/// fn system(query: Query<&T>, entities: Res<V>){230/// let mut queue: Parallel<usize> = Parallel::default();231/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;232/// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {233/// **local_queue += some_expensive_operation(item);234/// });235///236/// // collect value from every thread237/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();238/// }239/// ```240///241/// # Panics242/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being243/// initialized and run from the ECS scheduler, this should never panic.244///245/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool246#[inline]247pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)248where249FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,250INIT: Fn() -> T + Sync + Send + Clone,251{252let func = |mut init, item| {253func(&mut init, item);254init255};256#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]257{258let init = init();259// SAFETY:260// This method can only be called once per instance of QueryParManyIter,261// which ensures that mutable queries cannot be executed multiple times at once.262// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a263// Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist264// at the same time.265unsafe {266self.state267.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)268.iter_many_inner(&self.entity_list)269.fold(init, func);270}271}272#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]273{274let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();275if thread_count <= 1 {276let init = init();277// SAFETY: See the safety comment above.278unsafe {279self.state280.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)281.iter_many_inner(&self.entity_list)282.fold(init, func);283}284} else {285// Need a batch size of at least 1.286let batch_size = self.get_batch_size(thread_count).max(1);287// SAFETY: See the safety comment above.288unsafe {289self.state.par_many_fold_init_unchecked_manual(290init,291self.world,292&self.entity_list,293batch_size,294func,295self.last_run,296self.this_run,297);298}299}300}301}302303#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]304fn get_batch_size(&self, thread_count: usize) -> u32 {305self.batching_strategy306.calc_batch_size(|| self.entity_list.len(), thread_count) as u32307}308}309310/// A parallel iterator over the unique query items generated from an [`EntitySet`].311///312/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.313///314/// [`EntitySet`]: crate::entity::EntitySet315/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique316/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut317pub struct QueryParManyUniqueIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>318{319pub(crate) world: UnsafeWorldCell<'w>,320pub(crate) state: &'s QueryState<D, F>,321pub(crate) entity_list: UniqueEntityEquivalentVec<E>,322pub(crate) last_run: Tick,323pub(crate) this_run: Tick,324pub(crate) batching_strategy: BatchingStrategy,325}326327impl<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>328QueryParManyUniqueIter<'w, 's, D, F, E>329{330/// Changes the batching strategy used when iterating.331///332/// For more information on how this affects the resultant iteration, see333/// [`BatchingStrategy`].334pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {335self.batching_strategy = strategy;336self337}338339/// Runs `func` on each query result in parallel.340///341/// # Panics342/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being343/// initialized and run from the ECS scheduler, this should never panic.344///345/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool346#[inline]347pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {348self.for_each_init(|| {}, |_, item| func(item));349}350351/// Runs `func` on each query result in parallel on a value returned by `init`.352///353/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.354/// Callers should avoid using this function as if it were a parallel version355/// of [`Iterator::fold`].356///357/// # Example358///359/// ```360/// use bevy_utils::Parallel;361/// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};362/// # use core::slice;363/// # use crate::bevy_ecs::entity::UniqueEntityIter;364/// # fn some_expensive_operation(_item: &T) -> usize {365/// # 0366/// # }367///368/// #[derive(Component)]369/// struct T;370///371/// #[derive(Resource)]372/// struct V(UniqueEntityVec);373///374/// impl<'a> IntoIterator for &'a V {375/// // ...376/// # type Item = &'a Entity;377/// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;378/// #379/// # fn into_iter(self) -> Self::IntoIter {380/// # self.0.iter()381/// # }382/// }383///384/// fn system(query: Query<&T>, entities: Res<V>){385/// let mut queue: Parallel<usize> = Parallel::default();386/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;387/// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {388/// **local_queue += some_expensive_operation(item);389/// });390///391/// // collect value from every thread392/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();393/// }394/// ```395///396/// # Panics397/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being398/// initialized and run from the ECS scheduler, this should never panic.399///400/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool401#[inline]402pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)403where404FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,405INIT: Fn() -> T + Sync + Send + Clone,406{407let func = |mut init, item| {408func(&mut init, item);409init410};411#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]412{413let init = init();414// SAFETY:415// This method can only be called once per instance of QueryParManyUniqueIter,416// which ensures that mutable queries cannot be executed multiple times at once.417// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a418// Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist419// at the same time.420unsafe {421self.state422.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)423.iter_many_unique_inner(self.entity_list)424.fold(init, func);425}426}427#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]428{429let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();430if thread_count <= 1 {431let init = init();432// SAFETY: See the safety comment above.433unsafe {434self.state435.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)436.iter_many_unique_inner(self.entity_list)437.fold(init, func);438}439} else {440// Need a batch size of at least 1.441let batch_size = self.get_batch_size(thread_count).max(1);442// SAFETY: See the safety comment above.443unsafe {444self.state.par_many_unique_fold_init_unchecked_manual(445init,446self.world,447&self.entity_list,448batch_size,449func,450self.last_run,451self.this_run,452);453}454}455}456}457458#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]459fn get_batch_size(&self, thread_count: usize) -> u32 {460self.batching_strategy461.calc_batch_size(|| self.entity_list.len(), thread_count) as u32462}463}464465466