Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/event/iterators.rs
6600 views
1
#[cfg(feature = "multi_threaded")]
2
use bevy_ecs::batching::BatchingStrategy;
3
use bevy_ecs::event::{BufferedEvent, EventCursor, EventId, EventInstance, Events};
4
use core::{iter::Chain, slice::Iter};
5
6
/// An iterator that yields any unread events from an [`EventReader`](super::EventReader) or [`EventCursor`].
7
#[derive(Debug)]
8
pub struct EventIterator<'a, E: BufferedEvent> {
9
iter: EventIteratorWithId<'a, E>,
10
}
11
12
impl<'a, E: BufferedEvent> Iterator for EventIterator<'a, E> {
13
type Item = &'a E;
14
fn next(&mut self) -> Option<Self::Item> {
15
self.iter.next().map(|(event, _)| event)
16
}
17
18
fn size_hint(&self) -> (usize, Option<usize>) {
19
self.iter.size_hint()
20
}
21
22
fn count(self) -> usize {
23
self.iter.count()
24
}
25
26
fn last(self) -> Option<Self::Item>
27
where
28
Self: Sized,
29
{
30
self.iter.last().map(|(event, _)| event)
31
}
32
33
fn nth(&mut self, n: usize) -> Option<Self::Item> {
34
self.iter.nth(n).map(|(event, _)| event)
35
}
36
}
37
38
impl<'a, E: BufferedEvent> ExactSizeIterator for EventIterator<'a, E> {
39
fn len(&self) -> usize {
40
self.iter.len()
41
}
42
}
43
44
/// An iterator that yields any unread events (and their IDs) from an [`EventReader`](super::EventReader) or [`EventCursor`].
45
#[derive(Debug)]
46
pub struct EventIteratorWithId<'a, E: BufferedEvent> {
47
reader: &'a mut EventCursor<E>,
48
chain: Chain<Iter<'a, EventInstance<E>>, Iter<'a, EventInstance<E>>>,
49
unread: usize,
50
}
51
52
impl<'a, E: BufferedEvent> EventIteratorWithId<'a, E> {
53
/// Creates a new iterator that yields any `events` that have not yet been seen by `reader`.
54
pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
55
let a_index = reader
56
.last_event_count
57
.saturating_sub(events.events_a.start_event_count);
58
let b_index = reader
59
.last_event_count
60
.saturating_sub(events.events_b.start_event_count);
61
let a = events.events_a.get(a_index..).unwrap_or_default();
62
let b = events.events_b.get(b_index..).unwrap_or_default();
63
64
let unread_count = a.len() + b.len();
65
// Ensure `len` is implemented correctly
66
debug_assert_eq!(unread_count, reader.len(events));
67
reader.last_event_count = events.event_count - unread_count;
68
// Iterate the oldest first, then the newer events
69
let chain = a.iter().chain(b.iter());
70
71
Self {
72
reader,
73
chain,
74
unread: unread_count,
75
}
76
}
77
78
/// Iterate over only the events.
79
pub fn without_id(self) -> EventIterator<'a, E> {
80
EventIterator { iter: self }
81
}
82
}
83
84
impl<'a, E: BufferedEvent> Iterator for EventIteratorWithId<'a, E> {
85
type Item = (&'a E, EventId<E>);
86
fn next(&mut self) -> Option<Self::Item> {
87
match self
88
.chain
89
.next()
90
.map(|instance| (&instance.event, instance.event_id))
91
{
92
Some(item) => {
93
#[cfg(feature = "detailed_trace")]
94
tracing::trace!("EventReader::iter() -> {}", item.1);
95
self.reader.last_event_count += 1;
96
self.unread -= 1;
97
Some(item)
98
}
99
None => None,
100
}
101
}
102
103
fn size_hint(&self) -> (usize, Option<usize>) {
104
self.chain.size_hint()
105
}
106
107
fn count(self) -> usize {
108
self.reader.last_event_count += self.unread;
109
self.unread
110
}
111
112
fn last(self) -> Option<Self::Item>
113
where
114
Self: Sized,
115
{
116
let EventInstance { event_id, event } = self.chain.last()?;
117
self.reader.last_event_count += self.unread;
118
Some((event, *event_id))
119
}
120
121
fn nth(&mut self, n: usize) -> Option<Self::Item> {
122
if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {
123
self.reader.last_event_count += n + 1;
124
self.unread -= n + 1;
125
Some((event, *event_id))
126
} else {
127
self.reader.last_event_count += self.unread;
128
self.unread = 0;
129
None
130
}
131
}
132
}
133
134
impl<'a, E: BufferedEvent> ExactSizeIterator for EventIteratorWithId<'a, E> {
135
fn len(&self) -> usize {
136
self.unread
137
}
138
}
139
140
/// A parallel iterator over `BufferedEvent`s.
141
#[cfg(feature = "multi_threaded")]
142
#[derive(Debug)]
143
pub struct EventParIter<'a, E: BufferedEvent> {
144
reader: &'a mut EventCursor<E>,
145
slices: [&'a [EventInstance<E>]; 2],
146
batching_strategy: BatchingStrategy,
147
#[cfg(not(target_arch = "wasm32"))]
148
unread: usize,
149
}
150
151
#[cfg(feature = "multi_threaded")]
152
impl<'a, E: BufferedEvent> EventParIter<'a, E> {
153
/// Creates a new parallel iterator over `events` that have not yet been seen by `reader`.
154
pub fn new(reader: &'a mut EventCursor<E>, events: &'a Events<E>) -> Self {
155
let a_index = reader
156
.last_event_count
157
.saturating_sub(events.events_a.start_event_count);
158
let b_index = reader
159
.last_event_count
160
.saturating_sub(events.events_b.start_event_count);
161
let a = events.events_a.get(a_index..).unwrap_or_default();
162
let b = events.events_b.get(b_index..).unwrap_or_default();
163
164
let unread_count = a.len() + b.len();
165
// Ensure `len` is implemented correctly
166
debug_assert_eq!(unread_count, reader.len(events));
167
reader.last_event_count = events.event_count - unread_count;
168
169
Self {
170
reader,
171
slices: [a, b],
172
batching_strategy: BatchingStrategy::default(),
173
#[cfg(not(target_arch = "wasm32"))]
174
unread: unread_count,
175
}
176
}
177
178
/// Changes the batching strategy used when iterating.
179
///
180
/// For more information on how this affects the resultant iteration, see
181
/// [`BatchingStrategy`].
182
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
183
self.batching_strategy = strategy;
184
self
185
}
186
187
/// Runs the provided closure for each unread event in parallel.
188
///
189
/// Unlike normal iteration, the event order is not guaranteed in any form.
190
///
191
/// # Panics
192
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
193
/// initialized and run from the ECS scheduler, this should never panic.
194
///
195
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
196
pub fn for_each<FN: Fn(&'a E) + Send + Sync + Clone>(self, func: FN) {
197
self.for_each_with_id(move |e, _| func(e));
198
}
199
200
/// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),
201
/// but additionally provides the `EventId` to the closure.
202
///
203
/// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.
204
///
205
/// # Panics
206
/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being
207
/// initialized and run from the ECS scheduler, this should never panic.
208
///
209
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
210
#[cfg_attr(
211
target_arch = "wasm32",
212
expect(unused_mut, reason = "not mutated on this target")
213
)]
214
pub fn for_each_with_id<FN: Fn(&'a E, EventId<E>) + Send + Sync + Clone>(mut self, func: FN) {
215
#[cfg(target_arch = "wasm32")]
216
{
217
self.into_iter().for_each(|(e, i)| func(e, i));
218
}
219
220
#[cfg(not(target_arch = "wasm32"))]
221
{
222
let pool = bevy_tasks::ComputeTaskPool::get();
223
let thread_count = pool.thread_num();
224
if thread_count <= 1 {
225
return self.into_iter().for_each(|(e, i)| func(e, i));
226
}
227
228
let batch_size = self
229
.batching_strategy
230
.calc_batch_size(|| self.len(), thread_count);
231
let chunks = self.slices.map(|s| s.chunks_exact(batch_size));
232
let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);
233
234
pool.scope(|scope| {
235
for batch in chunks.into_iter().flatten().chain(remainders) {
236
let func = func.clone();
237
scope.spawn(async move {
238
for event in batch {
239
func(&event.event, event.event_id);
240
}
241
});
242
}
243
});
244
245
// Events are guaranteed to be read at this point.
246
self.reader.last_event_count += self.unread;
247
self.unread = 0;
248
}
249
}
250
251
/// Returns the number of [`BufferedEvent`]s to be iterated.
252
pub fn len(&self) -> usize {
253
self.slices.iter().map(|s| s.len()).sum()
254
}
255
256
/// Returns [`true`] if there are no events remaining in this iterator.
257
pub fn is_empty(&self) -> bool {
258
self.slices.iter().all(|x| x.is_empty())
259
}
260
}
261
262
#[cfg(feature = "multi_threaded")]
263
impl<'a, E: BufferedEvent> IntoIterator for EventParIter<'a, E> {
264
type IntoIter = EventIteratorWithId<'a, E>;
265
type Item = <Self::IntoIter as Iterator>::Item;
266
267
fn into_iter(self) -> Self::IntoIter {
268
let EventParIter {
269
reader,
270
slices: [a, b],
271
..
272
} = self;
273
let unread = a.len() + b.len();
274
let chain = a.iter().chain(b);
275
EventIteratorWithId {
276
reader,
277
chain,
278
unread,
279
}
280
}
281
}
282
283