Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/event/mut_iterators.rs
6600 views
1
#[cfg(feature = "multi_threaded")]
2
use bevy_ecs::batching::BatchingStrategy;
3
use bevy_ecs::event::{BufferedEvent, EventCursor, EventId, EventInstance, Events};
4
use core::{iter::Chain, slice::IterMut};
5
6
/// An iterator that yields any unread events from an [`EventMutator`] or [`EventCursor`].
7
///
8
/// [`EventMutator`]: super::EventMutator
9
#[derive(Debug)]
10
pub struct EventMutIterator<'a, E: BufferedEvent> {
11
iter: EventMutIteratorWithId<'a, E>,
12
}
13
14
impl<'a, E: BufferedEvent> Iterator for EventMutIterator<'a, E> {
15
type Item = &'a mut E;
16
fn next(&mut self) -> Option<Self::Item> {
17
self.iter.next().map(|(event, _)| event)
18
}
19
20
fn size_hint(&self) -> (usize, Option<usize>) {
21
self.iter.size_hint()
22
}
23
24
fn count(self) -> usize {
25
self.iter.count()
26
}
27
28
fn last(self) -> Option<Self::Item>
29
where
30
Self: Sized,
31
{
32
self.iter.last().map(|(event, _)| event)
33
}
34
35
fn nth(&mut self, n: usize) -> Option<Self::Item> {
36
self.iter.nth(n).map(|(event, _)| event)
37
}
38
}
39
40
impl<'a, E: BufferedEvent> ExactSizeIterator for EventMutIterator<'a, E> {
41
fn len(&self) -> usize {
42
self.iter.len()
43
}
44
}
45
46
/// An iterator that yields any unread events (and their IDs) from an [`EventMutator`] or [`EventCursor`].
47
///
48
/// [`EventMutator`]: super::EventMutator
49
#[derive(Debug)]
50
pub struct EventMutIteratorWithId<'a, E: BufferedEvent> {
51
mutator: &'a mut EventCursor<E>,
52
chain: Chain<IterMut<'a, EventInstance<E>>, IterMut<'a, EventInstance<E>>>,
53
unread: usize,
54
}
55
56
impl<'a, E: BufferedEvent> EventMutIteratorWithId<'a, E> {
57
/// Creates a new iterator that yields any `events` that have not yet been seen by `mutator`.
58
pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
59
let a_index = mutator
60
.last_event_count
61
.saturating_sub(events.events_a.start_event_count);
62
let b_index = mutator
63
.last_event_count
64
.saturating_sub(events.events_b.start_event_count);
65
let a = events.events_a.get_mut(a_index..).unwrap_or_default();
66
let b = events.events_b.get_mut(b_index..).unwrap_or_default();
67
68
let unread_count = a.len() + b.len();
69
70
mutator.last_event_count = events.event_count - unread_count;
71
// Iterate the oldest first, then the newer events
72
let chain = a.iter_mut().chain(b.iter_mut());
73
74
Self {
75
mutator,
76
chain,
77
unread: unread_count,
78
}
79
}
80
81
/// Iterate over only the events.
82
pub fn without_id(self) -> EventMutIterator<'a, E> {
83
EventMutIterator { iter: self }
84
}
85
}
86
87
impl<'a, E: BufferedEvent> Iterator for EventMutIteratorWithId<'a, E> {
88
type Item = (&'a mut E, EventId<E>);
89
fn next(&mut self) -> Option<Self::Item> {
90
match self
91
.chain
92
.next()
93
.map(|instance| (&mut instance.event, instance.event_id))
94
{
95
Some(item) => {
96
#[cfg(feature = "detailed_trace")]
97
tracing::trace!("EventMutator::iter() -> {}", item.1);
98
self.mutator.last_event_count += 1;
99
self.unread -= 1;
100
Some(item)
101
}
102
None => None,
103
}
104
}
105
106
fn size_hint(&self) -> (usize, Option<usize>) {
107
self.chain.size_hint()
108
}
109
110
fn count(self) -> usize {
111
self.mutator.last_event_count += self.unread;
112
self.unread
113
}
114
115
fn last(self) -> Option<Self::Item>
116
where
117
Self: Sized,
118
{
119
let EventInstance { event_id, event } = self.chain.last()?;
120
self.mutator.last_event_count += self.unread;
121
Some((event, *event_id))
122
}
123
124
fn nth(&mut self, n: usize) -> Option<Self::Item> {
125
if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
126
self.mutator.last_event_count += n + 1;
127
self.unread -= n + 1;
128
Some((event, *event_id))
129
} else {
130
self.mutator.last_event_count += self.unread;
131
self.unread = 0;
132
None
133
}
134
}
135
}
136
137
impl<'a, E: BufferedEvent> ExactSizeIterator for EventMutIteratorWithId<'a, E> {
138
fn len(&self) -> usize {
139
self.unread
140
}
141
}
142
143
/// A parallel iterator over `BufferedEvent`s.
144
#[derive(Debug)]
145
#[cfg(feature = "multi_threaded")]
146
pub struct EventMutParIter<'a, E: BufferedEvent> {
147
mutator: &'a mut EventCursor<E>,
148
slices: [&'a mut [EventInstance<E>]; 2],
149
batching_strategy: BatchingStrategy,
150
#[cfg(not(target_arch = "wasm32"))]
151
unread: usize,
152
}
153
154
#[cfg(feature = "multi_threaded")]
155
impl<'a, E: BufferedEvent> EventMutParIter<'a, E> {
156
/// Creates a new parallel iterator over `events` that have not yet been seen by `mutator`.
157
pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {
158
let a_index = mutator
159
.last_event_count
160
.saturating_sub(events.events_a.start_event_count);
161
let b_index = mutator
162
.last_event_count
163
.saturating_sub(events.events_b.start_event_count);
164
let a = events.events_a.get_mut(a_index..).unwrap_or_default();
165
let b = events.events_b.get_mut(b_index..).unwrap_or_default();
166
167
let unread_count = a.len() + b.len();
168
mutator.last_event_count = events.event_count - unread_count;
169
170
Self {
171
mutator,
172
slices: [a, b],
173
batching_strategy: BatchingStrategy::default(),
174
#[cfg(not(target_arch = "wasm32"))]
175
unread: unread_count,
176
}
177
}
178
179
/// Changes the batching strategy used when iterating.
180
///
181
/// For more information on how this affects the resultant iteration, see
182
/// [`BatchingStrategy`].
183
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
184
self.batching_strategy = strategy;
185
self
186
}
187
188
/// Runs the provided closure for each unread event in parallel.
189
///
190
/// Unlike normal iteration, the event order is not guaranteed in any form.
191
///
192
/// # Panics
193
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
194
/// initialized and run from the ECS scheduler, this should never panic.
195
///
196
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
197
pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
198
self.for_each_with_id(move |e, _| func(e));
199
}
200
201
/// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
202
/// but additionally provides the `EventId` to the closure.
203
///
204
/// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
205
///
206
/// # Panics
207
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
208
/// initialized and run from the ECS scheduler, this should never panic.
209
///
210
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
211
#[cfg_attr(
212
target_arch = "wasm32",
213
expect(unused_mut, reason = "not mutated on this target")
214
)]
215
pub fn for_each_with_id<FN: Fn(&'a mut E, EventId<E>) + Send + Sync + Clone>(
216
mut self,
217
func: FN,
218
) {
219
#[cfg(target_arch = "wasm32")]
220
{
221
self.into_iter().for_each(|(e, i)| func(e, i));
222
}
223
224
#[cfg(not(target_arch = "wasm32"))]
225
{
226
let pool = bevy_tasks::ComputeTaskPool::get();
227
let thread_count = pool.thread_num();
228
if thread_count <= 1 {
229
return self.into_iter().for_each(|(e, i)| func(e, i));
230
}
231
232
let batch_size = self
233
.batching_strategy
234
.calc_batch_size(|| self.len(), thread_count);
235
let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
236
237
pool.scope(|scope| {
238
for batch in chunks.into_iter().flatten() {
239
let func = func.clone();
240
scope.spawn(async move {
241
for event in batch {
242
func(&mut event.event, event.event_id);
243
}
244
});
245
}
246
});
247
248
// Events are guaranteed to be read at this point.
249
self.mutator.last_event_count += self.unread;
250
self.unread = 0;
251
}
252
}
253
254
/// Returns the number of [`BufferedEvent`]s to be iterated.
255
pub fn len(&self) -> usize {
256
self.slices.iter().map(|s| s.len()).sum()
257
}
258
259
/// Returns [`true`] if there are no events remaining in this iterator.
260
pub fn is_empty(&self) -> bool {
261
self.slices.iter().all(|x| x.is_empty())
262
}
263
}
264
265
#[cfg(feature = "multi_threaded")]
266
impl<'a, E: BufferedEvent> IntoIterator for EventMutParIter<'a, E> {
267
type IntoIter = EventMutIteratorWithId<'a, E>;
268
type Item = <Self::IntoIter as Iterator>::Item;
269
270
fn into_iter(self) -> Self::IntoIter {
271
let EventMutParIter {
272
mutator: reader,
273
slices: [a, b],
274
..
275
} = self;
276
let unread = a.len() + b.len();
277
let chain = a.iter_mut().chain(b);
278
EventMutIteratorWithId {
279
mutator: reader,
280
chain,
281
unread,
282
}
283
}
284
}
285
286