Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/query/par_iter.rs
6600 views
1
use crate::{
2
batching::BatchingStrategy,
3
component::Tick,
4
entity::{EntityEquivalent, UniqueEntityEquivalentVec},
5
world::unsafe_world_cell::UnsafeWorldCell,
6
};
7
8
use super::{QueryData, QueryFilter, QueryItem, QueryState, ReadOnlyQueryData};
9
10
use alloc::vec::Vec;
11
12
/// A parallel iterator over query results of a [`Query`](crate::system::Query).
13
///
14
/// This struct is created by the [`Query::par_iter`](crate::system::Query::par_iter) and
15
/// [`Query::par_iter_mut`](crate::system::Query::par_iter_mut) methods.
16
pub struct QueryParIter<'w, 's, D: QueryData, F: QueryFilter> {
17
pub(crate) world: UnsafeWorldCell<'w>,
18
pub(crate) state: &'s QueryState<D, F>,
19
pub(crate) last_run: Tick,
20
pub(crate) this_run: Tick,
21
pub(crate) batching_strategy: BatchingStrategy,
22
}
23
24
impl<'w, 's, D: QueryData, F: QueryFilter> QueryParIter<'w, 's, D, F> {
25
/// Changes the batching strategy used when iterating.
26
///
27
/// For more information on how this affects the resultant iteration, see
28
/// [`BatchingStrategy`].
29
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
30
self.batching_strategy = strategy;
31
self
32
}
33
34
/// Runs `func` on each query result in parallel.
35
///
36
/// # Panics
37
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
38
/// initialized and run from the ECS scheduler, this should never panic.
39
///
40
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
41
#[inline]
42
pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
43
self.for_each_init(|| {}, |_, item| func(item));
44
}
45
46
/// Runs `func` on each query result in parallel on a value returned by `init`.
47
///
48
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
49
/// Callers should avoid using this function as if it were a parallel version
50
/// of [`Iterator::fold`].
51
///
52
/// # Example
53
///
54
/// ```
55
/// use bevy_utils::Parallel;
56
/// use crate::{bevy_ecs::prelude::Component, bevy_ecs::system::Query};
57
/// #[derive(Component)]
58
/// struct T;
59
/// fn system(query: Query<&T>){
60
/// let mut queue: Parallel<usize> = Parallel::default();
61
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
62
/// query.par_iter().for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
63
/// **local_queue += 1;
64
/// });
65
///
66
/// // collect value from every thread
67
/// let entity_count: usize = queue.iter_mut().map(|v| *v).sum();
68
/// }
69
/// ```
70
///
71
/// # Panics
72
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
73
/// initialized and run from the ECS scheduler, this should never panic.
74
///
75
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
76
#[inline]
77
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
78
where
79
FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
80
INIT: Fn() -> T + Sync + Send + Clone,
81
{
82
let func = |mut init, item| {
83
func(&mut init, item);
84
init
85
};
86
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
87
{
88
let init = init();
89
// SAFETY:
90
// This method can only be called once per instance of QueryParIter,
91
// which ensures that mutable queries cannot be executed multiple times at once.
92
// Mutable instances of QueryParIter can only be created via an exclusive borrow of a
93
// Query or a World, which ensures that multiple aliasing QueryParIters cannot exist
94
// at the same time.
95
unsafe {
96
self.state
97
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
98
.into_iter()
99
.fold(init, func);
100
}
101
}
102
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
103
{
104
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
105
if thread_count <= 1 {
106
let init = init();
107
// SAFETY: See the safety comment above.
108
unsafe {
109
self.state
110
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
111
.into_iter()
112
.fold(init, func);
113
}
114
} else {
115
// Need a batch size of at least 1.
116
let batch_size = self.get_batch_size(thread_count).max(1);
117
// SAFETY: See the safety comment above.
118
unsafe {
119
self.state.par_fold_init_unchecked_manual(
120
init,
121
self.world,
122
batch_size,
123
func,
124
self.last_run,
125
self.this_run,
126
);
127
}
128
}
129
}
130
}
131
132
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
133
fn get_batch_size(&self, thread_count: usize) -> u32 {
134
let max_items = || {
135
let id_iter = self.state.matched_storage_ids.iter();
136
if self.state.is_dense {
137
// SAFETY: We only access table metadata.
138
let tables = unsafe { &self.world.world_metadata().storages().tables };
139
id_iter
140
// SAFETY: The if check ensures that matched_storage_ids stores TableIds
141
.map(|id| unsafe { tables[id.table_id].entity_count() })
142
.max()
143
} else {
144
let archetypes = &self.world.archetypes();
145
id_iter
146
// SAFETY: The if check ensures that matched_storage_ids stores ArchetypeIds
147
.map(|id| unsafe { archetypes[id.archetype_id].len() })
148
.max()
149
}
150
.map(|v| v as usize)
151
.unwrap_or(0)
152
};
153
self.batching_strategy
154
.calc_batch_size(max_items, thread_count) as u32
155
}
156
}
157
158
/// A parallel iterator over the unique query items generated from an [`Entity`] list.
159
///
160
/// This struct is created by the [`Query::par_iter_many`] method.
161
///
162
/// [`Entity`]: crate::entity::Entity
163
/// [`Query::par_iter_many`]: crate::system::Query::par_iter_many
164
pub struct QueryParManyIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent> {
165
pub(crate) world: UnsafeWorldCell<'w>,
166
pub(crate) state: &'s QueryState<D, F>,
167
pub(crate) entity_list: Vec<E>,
168
pub(crate) last_run: Tick,
169
pub(crate) this_run: Tick,
170
pub(crate) batching_strategy: BatchingStrategy,
171
}
172
173
impl<'w, 's, D: ReadOnlyQueryData, F: QueryFilter, E: EntityEquivalent + Sync>
174
QueryParManyIter<'w, 's, D, F, E>
175
{
176
/// Changes the batching strategy used when iterating.
177
///
178
/// For more information on how this affects the resultant iteration, see
179
/// [`BatchingStrategy`].
180
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
181
self.batching_strategy = strategy;
182
self
183
}
184
185
/// Runs `func` on each query result in parallel.
186
///
187
/// # Panics
188
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
189
/// initialized and run from the ECS scheduler, this should never panic.
190
///
191
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
192
#[inline]
193
pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
194
self.for_each_init(|| {}, |_, item| func(item));
195
}
196
197
/// Runs `func` on each query result in parallel on a value returned by `init`.
198
///
199
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
200
/// Callers should avoid using this function as if it were a parallel version
201
/// of [`Iterator::fold`].
202
///
203
/// # Example
204
///
205
/// ```
206
/// use bevy_utils::Parallel;
207
/// use crate::{bevy_ecs::prelude::{Component, Res, Resource, Entity}, bevy_ecs::system::Query};
208
/// # use core::slice;
209
/// use bevy_platform::prelude::Vec;
210
/// # fn some_expensive_operation(_item: &T) -> usize {
211
/// # 0
212
/// # }
213
///
214
/// #[derive(Component)]
215
/// struct T;
216
///
217
/// #[derive(Resource)]
218
/// struct V(Vec<Entity>);
219
///
220
/// impl<'a> IntoIterator for &'a V {
221
/// // ...
222
/// # type Item = &'a Entity;
223
/// # type IntoIter = slice::Iter<'a, Entity>;
224
/// #
225
/// # fn into_iter(self) -> Self::IntoIter {
226
/// # self.0.iter()
227
/// # }
228
/// }
229
///
230
/// fn system(query: Query<&T>, entities: Res<V>){
231
/// let mut queue: Parallel<usize> = Parallel::default();
232
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
233
/// query.par_iter_many(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
234
/// **local_queue += some_expensive_operation(item);
235
/// });
236
///
237
/// // collect value from every thread
238
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
239
/// }
240
/// ```
241
///
242
/// # Panics
243
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
244
/// initialized and run from the ECS scheduler, this should never panic.
245
///
246
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
247
#[inline]
248
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
249
where
250
FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
251
INIT: Fn() -> T + Sync + Send + Clone,
252
{
253
let func = |mut init, item| {
254
func(&mut init, item);
255
init
256
};
257
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
258
{
259
let init = init();
260
// SAFETY:
261
// This method can only be called once per instance of QueryParManyIter,
262
// which ensures that mutable queries cannot be executed multiple times at once.
263
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
264
// Query or a World, which ensures that multiple aliasing QueryParManyIters cannot exist
265
// at the same time.
266
unsafe {
267
self.state
268
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
269
.iter_many_inner(&self.entity_list)
270
.fold(init, func);
271
}
272
}
273
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
274
{
275
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
276
if thread_count <= 1 {
277
let init = init();
278
// SAFETY: See the safety comment above.
279
unsafe {
280
self.state
281
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
282
.iter_many_inner(&self.entity_list)
283
.fold(init, func);
284
}
285
} else {
286
// Need a batch size of at least 1.
287
let batch_size = self.get_batch_size(thread_count).max(1);
288
// SAFETY: See the safety comment above.
289
unsafe {
290
self.state.par_many_fold_init_unchecked_manual(
291
init,
292
self.world,
293
&self.entity_list,
294
batch_size,
295
func,
296
self.last_run,
297
self.this_run,
298
);
299
}
300
}
301
}
302
}
303
304
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
305
fn get_batch_size(&self, thread_count: usize) -> u32 {
306
self.batching_strategy
307
.calc_batch_size(|| self.entity_list.len(), thread_count) as u32
308
}
309
}
310
311
/// A parallel iterator over the unique query items generated from an [`EntitySet`].
312
///
313
/// This struct is created by the [`Query::par_iter_many_unique`] and [`Query::par_iter_many_unique_mut`] methods.
314
///
315
/// [`EntitySet`]: crate::entity::EntitySet
316
/// [`Query::par_iter_many_unique`]: crate::system::Query::par_iter_many_unique
317
/// [`Query::par_iter_many_unique_mut`]: crate::system::Query::par_iter_many_unique_mut
318
pub struct QueryParManyUniqueIter<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
319
{
320
pub(crate) world: UnsafeWorldCell<'w>,
321
pub(crate) state: &'s QueryState<D, F>,
322
pub(crate) entity_list: UniqueEntityEquivalentVec<E>,
323
pub(crate) last_run: Tick,
324
pub(crate) this_run: Tick,
325
pub(crate) batching_strategy: BatchingStrategy,
326
}
327
328
impl<'w, 's, D: QueryData, F: QueryFilter, E: EntityEquivalent + Sync>
329
QueryParManyUniqueIter<'w, 's, D, F, E>
330
{
331
/// Changes the batching strategy used when iterating.
332
///
333
/// For more information on how this affects the resultant iteration, see
334
/// [`BatchingStrategy`].
335
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
336
self.batching_strategy = strategy;
337
self
338
}
339
340
/// Runs `func` on each query result in parallel.
341
///
342
/// # Panics
343
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
344
/// initialized and run from the ECS scheduler, this should never panic.
345
///
346
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
347
#[inline]
348
pub fn for_each<FN: Fn(QueryItem<'w, 's, D>) + Send + Sync + Clone>(self, func: FN) {
349
self.for_each_init(|| {}, |_, item| func(item));
350
}
351
352
/// Runs `func` on each query result in parallel on a value returned by `init`.
353
///
354
/// `init` may be called multiple times per thread, and the values returned may be discarded between tasks on any given thread.
355
/// Callers should avoid using this function as if it were a parallel version
356
/// of [`Iterator::fold`].
357
///
358
/// # Example
359
///
360
/// ```
361
/// use bevy_utils::Parallel;
362
/// use crate::{bevy_ecs::{prelude::{Component, Res, Resource, Entity}, entity::UniqueEntityVec, system::Query}};
363
/// # use core::slice;
364
/// # use crate::bevy_ecs::entity::UniqueEntityIter;
365
/// # fn some_expensive_operation(_item: &T) -> usize {
366
/// # 0
367
/// # }
368
///
369
/// #[derive(Component)]
370
/// struct T;
371
///
372
/// #[derive(Resource)]
373
/// struct V(UniqueEntityVec);
374
///
375
/// impl<'a> IntoIterator for &'a V {
376
/// // ...
377
/// # type Item = &'a Entity;
378
/// # type IntoIter = UniqueEntityIter<slice::Iter<'a, Entity>>;
379
/// #
380
/// # fn into_iter(self) -> Self::IntoIter {
381
/// # self.0.iter()
382
/// # }
383
/// }
384
///
385
/// fn system(query: Query<&T>, entities: Res<V>){
386
/// let mut queue: Parallel<usize> = Parallel::default();
387
/// // queue.borrow_local_mut() will get or create a thread_local queue for each task/thread;
388
/// query.par_iter_many_unique(&entities).for_each_init(|| queue.borrow_local_mut(),|local_queue, item| {
389
/// **local_queue += some_expensive_operation(item);
390
/// });
391
///
392
/// // collect value from every thread
393
/// let final_value: usize = queue.iter_mut().map(|v| *v).sum();
394
/// }
395
/// ```
396
///
397
/// # Panics
398
/// If the [`ComputeTaskPool`] is not initialized. If using this from a query that is being
399
/// initialized and run from the ECS scheduler, this should never panic.
400
///
401
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
402
#[inline]
403
pub fn for_each_init<FN, INIT, T>(self, init: INIT, func: FN)
404
where
405
FN: Fn(&mut T, QueryItem<'w, 's, D>) + Send + Sync + Clone,
406
INIT: Fn() -> T + Sync + Send + Clone,
407
{
408
let func = |mut init, item| {
409
func(&mut init, item);
410
init
411
};
412
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
413
{
414
let init = init();
415
// SAFETY:
416
// This method can only be called once per instance of QueryParManyUniqueIter,
417
// which ensures that mutable queries cannot be executed multiple times at once.
418
// Mutable instances of QueryParManyUniqueIter can only be created via an exclusive borrow of a
419
// Query or a World, which ensures that multiple aliasing QueryParManyUniqueIters cannot exist
420
// at the same time.
421
unsafe {
422
self.state
423
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
424
.iter_many_unique_inner(self.entity_list)
425
.fold(init, func);
426
}
427
}
428
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
429
{
430
let thread_count = bevy_tasks::ComputeTaskPool::get().thread_num();
431
if thread_count <= 1 {
432
let init = init();
433
// SAFETY: See the safety comment above.
434
unsafe {
435
self.state
436
.query_unchecked_manual_with_ticks(self.world, self.last_run, self.this_run)
437
.iter_many_unique_inner(self.entity_list)
438
.fold(init, func);
439
}
440
} else {
441
// Need a batch size of at least 1.
442
let batch_size = self.get_batch_size(thread_count).max(1);
443
// SAFETY: See the safety comment above.
444
unsafe {
445
self.state.par_many_unique_fold_init_unchecked_manual(
446
init,
447
self.world,
448
&self.entity_list,
449
batch_size,
450
func,
451
self.last_run,
452
self.this_run,
453
);
454
}
455
}
456
}
457
}
458
459
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
460
fn get_batch_size(&self, thread_count: usize) -> u32 {
461
self.batching_strategy
462
.calc_batch_size(|| self.entity_list.len(), thread_count) as u32
463
}
464
}
465
466