Path: blob/main/crates/bevy_ecs/src/event/mut_iterators.rs
6600 views
#[cfg(feature = "multi_threaded")]1use bevy_ecs::batching::BatchingStrategy;2use bevy_ecs::event::{BufferedEvent, EventCursor, EventId, EventInstance, Events};3use core::{iter::Chain, slice::IterMut};45/// An iterator that yields any unread events from an [`EventMutator`] or [`EventCursor`].6///7/// [`EventMutator`]: super::EventMutator8#[derive(Debug)]9pub struct EventMutIterator<'a, E: BufferedEvent> {10iter: EventMutIteratorWithId<'a, E>,11}1213impl<'a, E: BufferedEvent> Iterator for EventMutIterator<'a, E> {14type Item = &'a mut E;15fn next(&mut self) -> Option<Self::Item> {16self.iter.next().map(|(event, _)| event)17}1819fn size_hint(&self) -> (usize, Option<usize>) {20self.iter.size_hint()21}2223fn count(self) -> usize {24self.iter.count()25}2627fn last(self) -> Option<Self::Item>28where29Self: Sized,30{31self.iter.last().map(|(event, _)| event)32}3334fn nth(&mut self, n: usize) -> Option<Self::Item> {35self.iter.nth(n).map(|(event, _)| event)36}37}3839impl<'a, E: BufferedEvent> ExactSizeIterator for EventMutIterator<'a, E> {40fn len(&self) -> usize {41self.iter.len()42}43}4445/// An iterator that yields any unread events (and their IDs) from an [`EventMutator`] or [`EventCursor`].46///47/// [`EventMutator`]: super::EventMutator48#[derive(Debug)]49pub struct EventMutIteratorWithId<'a, E: BufferedEvent> {50mutator: &'a mut EventCursor<E>,51chain: Chain<IterMut<'a, EventInstance<E>>, IterMut<'a, EventInstance<E>>>,52unread: usize,53}5455impl<'a, E: BufferedEvent> EventMutIteratorWithId<'a, E> {56/// Creates a new iterator that yields any `events` that have not yet been seen by `mutator`.57pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {58let a_index = mutator59.last_event_count60.saturating_sub(events.events_a.start_event_count);61let b_index = mutator62.last_event_count63.saturating_sub(events.events_b.start_event_count);64let a = events.events_a.get_mut(a_index..).unwrap_or_default();65let b = events.events_b.get_mut(b_index..).unwrap_or_default();6667let unread_count = a.len() + b.len();6869mutator.last_event_count = events.event_count - unread_count;70// Iterate the oldest first, then the newer events71let chain = a.iter_mut().chain(b.iter_mut());7273Self {74mutator,75chain,76unread: unread_count,77}78}7980/// Iterate over only the events.81pub fn without_id(self) -> EventMutIterator<'a, E> {82EventMutIterator { iter: self }83}84}8586impl<'a, E: BufferedEvent> Iterator for EventMutIteratorWithId<'a, E> {87type Item = (&'a mut E, EventId<E>);88fn next(&mut self) -> Option<Self::Item> {89match self90.chain91.next()92.map(|instance| (&mut instance.event, instance.event_id))93{94Some(item) => {95#[cfg(feature = "detailed_trace")]96tracing::trace!("EventMutator::iter() -> {}", item.1);97self.mutator.last_event_count += 1;98self.unread -= 1;99Some(item)100}101None => None,102}103}104105fn size_hint(&self) -> (usize, Option<usize>) {106self.chain.size_hint()107}108109fn count(self) -> usize {110self.mutator.last_event_count += self.unread;111self.unread112}113114fn last(self) -> Option<Self::Item>115where116Self: Sized,117{118let EventInstance { event_id, event } = self.chain.last()?;119self.mutator.last_event_count += self.unread;120Some((event, *event_id))121}122123fn nth(&mut self, n: usize) -> Option<Self::Item> {124if let Some(EventInstance { event_id, event }) = self.chain.nth(n) {125self.mutator.last_event_count += n + 1;126self.unread -= n + 1;127Some((event, *event_id))128} else {129self.mutator.last_event_count += self.unread;130self.unread = 0;131None132}133}134}135136impl<'a, E: BufferedEvent> ExactSizeIterator for EventMutIteratorWithId<'a, E> {137fn len(&self) -> usize {138self.unread139}140}141142/// A parallel iterator over `BufferedEvent`s.143#[derive(Debug)]144#[cfg(feature = "multi_threaded")]145pub struct EventMutParIter<'a, E: BufferedEvent> {146mutator: &'a mut EventCursor<E>,147slices: [&'a mut [EventInstance<E>]; 2],148batching_strategy: BatchingStrategy,149#[cfg(not(target_arch = "wasm32"))]150unread: usize,151}152153#[cfg(feature = "multi_threaded")]154impl<'a, E: BufferedEvent> EventMutParIter<'a, E> {155/// Creates a new parallel iterator over `events` that have not yet been seen by `mutator`.156pub fn new(mutator: &'a mut EventCursor<E>, events: &'a mut Events<E>) -> Self {157let a_index = mutator158.last_event_count159.saturating_sub(events.events_a.start_event_count);160let b_index = mutator161.last_event_count162.saturating_sub(events.events_b.start_event_count);163let a = events.events_a.get_mut(a_index..).unwrap_or_default();164let b = events.events_b.get_mut(b_index..).unwrap_or_default();165166let unread_count = a.len() + b.len();167mutator.last_event_count = events.event_count - unread_count;168169Self {170mutator,171slices: [a, b],172batching_strategy: BatchingStrategy::default(),173#[cfg(not(target_arch = "wasm32"))]174unread: unread_count,175}176}177178/// Changes the batching strategy used when iterating.179///180/// For more information on how this affects the resultant iteration, see181/// [`BatchingStrategy`].182pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {183self.batching_strategy = strategy;184self185}186187/// Runs the provided closure for each unread event in parallel.188///189/// Unlike normal iteration, the event order is not guaranteed in any form.190///191/// # Panics192/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being193/// initialized and run from the ECS scheduler, this should never panic.194///195/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool196pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {197self.for_each_with_id(move |e, _| func(e));198}199200/// Runs the provided closure for each unread event in parallel, like [`for_each`](Self::for_each),201/// but additionally provides the `EventId` to the closure.202///203/// Note that the order of iteration is not guaranteed, but `EventId`s are ordered by send order.204///205/// # Panics206/// If the [`ComputeTaskPool`] is not initialized. If using this from an event reader that is being207/// initialized and run from the ECS scheduler, this should never panic.208///209/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool210#[cfg_attr(211target_arch = "wasm32",212expect(unused_mut, reason = "not mutated on this target")213)]214pub fn for_each_with_id<FN: Fn(&'a mut E, EventId<E>) + Send + Sync + Clone>(215mut self,216func: FN,217) {218#[cfg(target_arch = "wasm32")]219{220self.into_iter().for_each(|(e, i)| func(e, i));221}222223#[cfg(not(target_arch = "wasm32"))]224{225let pool = bevy_tasks::ComputeTaskPool::get();226let thread_count = pool.thread_num();227if thread_count <= 1 {228return self.into_iter().for_each(|(e, i)| func(e, i));229}230231let batch_size = self232.batching_strategy233.calc_batch_size(|| self.len(), thread_count);234let chunks = self.slices.map(|s| s.chunks_mut(batch_size));235236pool.scope(|scope| {237for batch in chunks.into_iter().flatten() {238let func = func.clone();239scope.spawn(async move {240for event in batch {241func(&mut event.event, event.event_id);242}243});244}245});246247// Events are guaranteed to be read at this point.248self.mutator.last_event_count += self.unread;249self.unread = 0;250}251}252253/// Returns the number of [`BufferedEvent`]s to be iterated.254pub fn len(&self) -> usize {255self.slices.iter().map(|s| s.len()).sum()256}257258/// Returns [`true`] if there are no events remaining in this iterator.259pub fn is_empty(&self) -> bool {260self.slices.iter().all(|x| x.is_empty())261}262}263264#[cfg(feature = "multi_threaded")]265impl<'a, E: BufferedEvent> IntoIterator for EventMutParIter<'a, E> {266type IntoIter = EventMutIteratorWithId<'a, E>;267type Item = <Self::IntoIter as Iterator>::Item;268269fn into_iter(self) -> Self::IntoIter {270let EventMutParIter {271mutator: reader,272slices: [a, b],273..274} = self;275let unread = a.len() + b.len();276let chain = a.iter_mut().chain(b);277EventMutIteratorWithId {278mutator: reader,279chain,280unread,281}282}283}284285286