Path: blob/main/crates/bevy_ecs/src/message/message_reader.rs
9341 views
#[cfg(feature = "multi_threaded")]1use crate::message::MessageParIter;2use crate::{3message::{Message, MessageCursor, MessageIterator, MessageIteratorWithId, Messages},4system::{Local, Res, SystemParam, SystemParamValidationError},5};67/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.8///9/// Use [`PopulatedMessageReader<T>`] to skip the system if there are no messages.10///11/// # Concurrency12///13/// Unlike [`MessageWriter<T>`], systems with `MessageReader<T>` param can be executed concurrently14/// (but not concurrently with `MessageWriter<T>` or `MessageMutator<T>` systems for the same message type).15///16/// [`MessageWriter<T>`]: super::MessageWriter17#[derive(SystemParam, Debug)]18pub struct MessageReader<'w, 's, M: Message> {19pub(super) reader: Local<'s, MessageCursor<M>>,20#[system_param(validation_message = "Message not initialized")]21messages: Res<'w, Messages<M>>,22}2324impl<'w, 's, M: Message> MessageReader<'w, 's, M> {25/// Iterates over the messages this [`MessageReader`] has not seen yet. This updates the26/// [`MessageReader`]'s message counter, which means subsequent message reads will not include messages27/// that happened before now.28pub fn read(&mut self) -> MessageIterator<'_, M> {29self.reader.read(&self.messages)30}3132/// Like [`read`](Self::read), except also returning the [`MessageId`](super::MessageId) of the messages.33pub fn read_with_id(&mut self) -> MessageIteratorWithId<'_, M> {34self.reader.read_with_id(&self.messages)35}3637/// Returns a parallel iterator over the messages this [`MessageReader`] has not seen yet.38/// See also [`for_each`](MessageParIter::for_each).39///40/// # Example41/// ```42/// # use bevy_ecs::prelude::*;43/// # use std::sync::atomic::{AtomicUsize, Ordering};44///45/// #[derive(Message)]46/// struct MyMessage {47/// value: usize,48/// }49///50/// #[derive(Resource, Default)]51/// struct Counter(AtomicUsize);52///53/// // setup54/// let mut world = World::new();55/// world.init_resource::<Messages<MyMessage>>();56/// world.insert_resource(Counter::default());57///58/// let mut schedule = Schedule::default();59/// schedule.add_systems(|mut messages: MessageReader<MyMessage>, counter: Res<Counter>| {60/// messages.par_read().for_each(|MyMessage { value }| {61/// counter.0.fetch_add(*value, Ordering::Relaxed);62/// });63/// });64/// for value in 0..100 {65/// world.write_message(MyMessage { value });66/// }67/// schedule.run(&mut world);68/// let Counter(counter) = world.remove_resource::<Counter>().unwrap();69/// // all messages were processed70/// assert_eq!(counter.into_inner(), 4950);71/// ```72#[cfg(feature = "multi_threaded")]73pub fn par_read(&mut self) -> MessageParIter<'_, M> {74self.reader.par_read(&self.messages)75}7677/// Determines the number of messages available to be read from this [`MessageReader`] without consuming any.78pub fn len(&self) -> usize {79self.reader.len(&self.messages)80}8182/// Returns `true` if there are no messages available to read.83///84/// # Example85///86/// The following example shows a useful pattern where some behavior is triggered if new messages are available.87/// [`MessageReader::clear()`] is used so the same messages don't re-trigger the behavior the next time the system runs.88///89/// ```90/// # use bevy_ecs::prelude::*;91/// #92/// #[derive(Message)]93/// struct Collision;94///95/// fn play_collision_sound(mut messages: MessageReader<Collision>) {96/// if !messages.is_empty() {97/// messages.clear();98/// // Play a sound99/// }100/// }101/// # bevy_ecs::system::assert_is_system(play_collision_sound);102/// ```103pub fn is_empty(&self) -> bool {104self.reader.is_empty(&self.messages)105}106107/// Consumes all available messages.108///109/// This means these messages will not appear in calls to [`MessageReader::read()`] or110/// [`MessageReader::read_with_id()`] and [`MessageReader::is_empty()`] will return `true`.111///112/// For usage, see [`MessageReader::is_empty()`].113pub fn clear(&mut self) {114self.reader.clear(&self.messages);115}116}117118/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.119/// Skips the system if there no messages.120///121/// Use [`MessageReader<T>`] to run the system even if there are no messages.122///123/// Use the [`on_message`](crate::prelude::on_message) run condition to skip the system based on messages that it doesn't read.124#[derive(Debug)]125pub struct PopulatedMessageReader<'w, 's, M: Message>(MessageReader<'w, 's, M>);126127impl<'w, 's, M: Message> core::ops::Deref for PopulatedMessageReader<'w, 's, M> {128type Target = MessageReader<'w, 's, M>;129130fn deref(&self) -> &Self::Target {131&self.0132}133}134135impl<'w, 's, M: Message> core::ops::DerefMut for PopulatedMessageReader<'w, 's, M> {136fn deref_mut(&mut self) -> &mut Self::Target {137&mut self.0138}139}140141// SAFETY: relies on MessageReader to uphold soundness requirements142unsafe impl<'w, 's, M: Message> SystemParam for PopulatedMessageReader<'w, 's, M> {143type State = <MessageReader<'w, 's, M> as SystemParam>::State;144type Item<'world, 'state> = PopulatedMessageReader<'world, 'state, M>;145146fn init_state(world: &mut crate::prelude::World) -> Self::State {147MessageReader::<M>::init_state(world)148}149150fn init_access(151state: &Self::State,152system_meta: &mut crate::system::SystemMeta,153component_access_set: &mut crate::query::FilteredAccessSet,154world: &mut crate::prelude::World,155) {156MessageReader::<M>::init_access(state, system_meta, component_access_set, world);157}158159unsafe fn get_param<'world, 'state>(160state: &'state mut Self::State,161system_meta: &crate::system::SystemMeta,162world: crate::world::unsafe_world_cell::UnsafeWorldCell<'world>,163change_tick: crate::change_detection::Tick,164) -> Self::Item<'world, 'state> {165// SAFETY: requirements are upheld by MessageReader's implementation166unsafe {167PopulatedMessageReader(MessageReader::get_param(168state,169system_meta,170world,171change_tick,172))173}174}175176unsafe fn validate_param(177state: &mut Self::State,178system_meta: &crate::system::SystemMeta,179world: crate::world::unsafe_world_cell::UnsafeWorldCell,180) -> Result<(), SystemParamValidationError> {181// SAFETY: requirements are upheld by MessageReader's implementation182unsafe { MessageReader::<M>::validate_param(state, system_meta, world) }?;183184// SAFETY: requirements are upheld by MessageReader's implementation185let reader =186unsafe { MessageReader::get_param(state, system_meta, world, world.change_tick()) };187if reader.is_empty() {188Err(SystemParamValidationError::skipped::<Self>(189"message queue is empty",190))191} else {192Ok(())193}194}195}196197#[cfg(test)]198mod tests {199use core::sync::atomic::{AtomicBool, Ordering};200201use super::*;202use crate::message::MessageRegistry;203use crate::prelude::*;204use bevy_platform::sync::Arc;205206#[test]207fn test_populated_message_reader() {208let system_ran = Arc::new(AtomicBool::new(false));209210let mut world = World::new();211MessageRegistry::register_message::<TheMessage>(&mut world);212213let mut schedule = Schedule::default();214schedule.add_systems({215let system_ran = system_ran.clone();216move |mut _reader: PopulatedMessageReader<TheMessage>| {217system_ran.store(true, Ordering::SeqCst);218}219});220221schedule.run(&mut world);222assert!(223!system_ran.load(Ordering::SeqCst),224"system with PopulatedMessageReader should have been skipped"225);226227world.write_message(TheMessage);228schedule.run(&mut world);229assert!(230system_ran.load(Ordering::SeqCst),231"system with PopulatedMessageReader should NOT have been skipped"232);233234#[derive(Message)]235struct TheMessage;236}237}238239240