Path: blob/main/crates/bevy_ecs/src/message/messages.rs
9368 views
use crate::{1change_detection::MaybeLocation,2message::{Message, MessageCursor, MessageId, MessageInstance},3resource::Resource,4};5use alloc::vec::Vec;6use core::{7marker::PhantomData,8ops::{Deref, DerefMut},9};10#[cfg(feature = "bevy_reflect")]11use {12crate::reflect::ReflectResource,13bevy_reflect::{std_traits::ReflectDefault, Reflect},14};1516/// A message collection that represents the messages that occurred within the last two17/// [`Messages::update`] calls.18/// Messages can be written to using a [`MessageWriter`]19/// and are typically cheaply read using a [`MessageReader`].20///21/// Each message can be consumed by multiple systems, in parallel,22/// with consumption tracked by the [`MessageReader`] on a per-system basis.23///24/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)25/// is applied between writing and reading systems, there is a risk of a race condition.26/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.27///28/// This collection is meant to be paired with a system that calls29/// [`Messages::update`] exactly once per update/frame.30///31/// [`message_update_system`] is a system that does this, typically initialized automatically using32/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).33/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.34/// Messages will persist across a single frame boundary and so ordering of message producers and35/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).36/// If messages are not handled by the end of the frame after they are updated, they will be37/// dropped silently.38///39/// # Example40///41/// ```42/// use bevy_ecs::message::{Message, Messages};43///44/// #[derive(Message)]45/// struct MyMessage {46/// value: usize47/// }48///49/// // setup50/// let mut messages = Messages::<MyMessage>::default();51/// let mut cursor = messages.get_cursor();52///53/// // run this once per update/frame54/// messages.update();55///56/// // somewhere else: write a message57/// messages.write(MyMessage { value: 1 });58///59/// // somewhere else: read the messages60/// for message in cursor.read(&messages) {61/// assert_eq!(message.value, 1)62/// }63///64/// // messages are only processed once per reader65/// assert_eq!(cursor.read(&messages).count(), 0);66/// ```67///68/// # Details69///70/// [`Messages`] is implemented using a variation of a double buffer strategy.71/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.72/// - [`MessageReader`]s will read messages from both buffers.73/// - [`MessageReader`]s that read at least once per update will never drop messages.74/// - [`MessageReader`]s that read once within two updates might still receive some messages75/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred76/// before those updates.77///78/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.79///80/// An alternative call pattern would be to call [`update`](Messages::update)81/// manually across frames to control when messages are cleared.82/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,83/// but can be done by adding your message as a resource instead of using84/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).85///86/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)87/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)88///89/// [`MessageReader`]: super::MessageReader90/// [`MessageWriter`]: super::MessageWriter91/// [`message_update_system`]: super::message_update_system92#[derive(Debug, Resource)]93#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]94pub struct Messages<M: Message> {95/// Holds the oldest still active messages.96/// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.97pub(crate) messages_a: MessageSequence<M>,98/// Holds the newer messages.99pub(crate) messages_b: MessageSequence<M>,100pub(crate) message_count: usize,101}102103// Derived Default impl would incorrectly require M: Default104impl<M: Message> Default for Messages<M> {105fn default() -> Self {106Self {107messages_a: Default::default(),108messages_b: Default::default(),109message_count: Default::default(),110}111}112}113114impl<M: Message> Messages<M> {115/// Returns the index of the oldest message stored in the message buffer.116pub fn oldest_message_count(&self) -> usize {117self.messages_a.start_message_count118}119120/// Writes an `message` to the current message buffer.121/// [`MessageReader`](super::MessageReader)s can then read the message.122/// This method returns the [ID](`MessageId`) of the written `message`.123#[track_caller]124pub fn write(&mut self, message: M) -> MessageId<M> {125self.write_with_caller(message, MaybeLocation::caller())126}127128pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {129let message_id = MessageId {130id: self.message_count,131caller,132_marker: PhantomData,133};134#[cfg(feature = "detailed_trace")]135tracing::trace!("Messages::write() -> id: {}", message_id);136137let message_instance = MessageInstance {138message_id,139message,140};141142self.messages_b.push(message_instance);143self.message_count += 1;144145message_id146}147148/// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.149/// This is more efficient than writing each message individually.150/// This method returns the [IDs](`MessageId`) of the written `messages`.151#[track_caller]152pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {153let last_count = self.message_count;154155self.extend(messages);156157WriteBatchIds {158last_count,159message_count: self.message_count,160_marker: PhantomData,161}162}163164/// Writes the default value of the message. Useful when the message is an empty struct.165/// This method returns the [ID](`MessageId`) of the written `message`.166#[track_caller]167pub fn write_default(&mut self) -> MessageId<M>168where169M: Default,170{171self.write(Default::default())172}173174/// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.175pub fn get_cursor(&self) -> MessageCursor<M> {176MessageCursor::default()177}178179/// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.180/// It will read all future messages.181pub fn get_cursor_current(&self) -> MessageCursor<M> {182MessageCursor {183last_message_count: self.message_count,184..Default::default()185}186}187188/// Swaps the message buffers and clears the oldest message buffer. In general, this should be189/// called once per frame/update.190///191/// If you need access to the messages that were removed, consider using [`Messages::update_drain`].192pub fn update(&mut self) {193core::mem::swap(&mut self.messages_a, &mut self.messages_b);194self.messages_b.clear();195self.messages_b.start_message_count = self.message_count;196debug_assert_eq!(197self.messages_a.start_message_count + self.messages_a.len(),198self.messages_b.start_message_count199);200}201202/// Swaps the message buffers and drains the oldest message buffer, returning an iterator203/// of all messages that were removed. In general, this should be called once per frame/update.204///205/// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.206#[must_use = "If you do not need the returned messages, call .update() instead."]207pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {208core::mem::swap(&mut self.messages_a, &mut self.messages_b);209let iter = self.messages_b.messages.drain(..);210self.messages_b.start_message_count = self.message_count;211debug_assert_eq!(212self.messages_a.start_message_count + self.messages_a.len(),213self.messages_b.start_message_count214);215216iter.map(|e| e.message)217}218219#[inline]220fn reset_start_message_count(&mut self) {221self.messages_a.start_message_count = self.message_count;222self.messages_b.start_message_count = self.message_count;223}224225/// Removes all messages.226#[inline]227pub fn clear(&mut self) {228self.reset_start_message_count();229self.messages_a.clear();230self.messages_b.clear();231}232233/// Returns the number of messages currently stored in the message buffer.234#[inline]235pub fn len(&self) -> usize {236self.messages_a.len() + self.messages_b.len()237}238239/// Returns true if there are no messages currently stored in the message buffer.240#[inline]241pub fn is_empty(&self) -> bool {242self.len() == 0243}244245/// Creates a draining iterator that removes all messages.246pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {247self.reset_start_message_count();248249// Drain the oldest messages first, then the newest250self.messages_a251.drain(..)252.chain(self.messages_b.drain(..))253.map(|i| i.message)254}255256/// Iterates over messages that happened since the last "update" call.257/// WARNING: You probably don't want to use this call. In most cases you should use an258/// [`MessageReader`]. You should only use this if you know you only need to consume messages259/// between the last `update()` call and your call to `iter_current_update_messages`.260/// If messages happen outside that window, they will not be handled. For example, any messages that261/// happen after this call and before the next `update()` call will be dropped.262///263/// [`MessageReader`]: super::MessageReader264pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {265self.messages_b.iter().map(|i| &i.message)266}267268/// Get a specific message by id if it still exists in the messages buffer.269pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {270if id < self.oldest_message_count() {271return None;272}273274let sequence = self.sequence(id);275let index = id.saturating_sub(sequence.start_message_count);276277sequence278.get(index)279.map(|instance| (&instance.message, instance.message_id))280}281282/// Which message buffer is this message id a part of.283fn sequence(&self, id: usize) -> &MessageSequence<M> {284if id < self.messages_b.start_message_count {285&self.messages_a286} else {287&self.messages_b288}289}290}291292impl<M: Message> Extend<M> for Messages<M> {293#[track_caller]294fn extend<I>(&mut self, iter: I)295where296I: IntoIterator<Item = M>,297{298let old_count = self.message_count;299let mut message_count = self.message_count;300let messages = iter.into_iter().map(|message| {301let message_id = MessageId {302id: message_count,303caller: MaybeLocation::caller(),304_marker: PhantomData,305};306message_count += 1;307MessageInstance {308message_id,309message,310}311});312313self.messages_b.extend(messages);314315if old_count != message_count {316#[cfg(feature = "detailed_trace")]317tracing::trace!(318"Messages::extend() -> ids: ({}..{})",319self.message_count,320message_count321);322}323324self.message_count = message_count;325}326}327328#[derive(Debug)]329#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]330pub(crate) struct MessageSequence<M: Message> {331pub(crate) messages: Vec<MessageInstance<M>>,332pub(crate) start_message_count: usize,333}334335// Derived Default impl would incorrectly require M: Default336impl<M: Message> Default for MessageSequence<M> {337fn default() -> Self {338Self {339messages: Default::default(),340start_message_count: Default::default(),341}342}343}344345impl<M: Message> Deref for MessageSequence<M> {346type Target = Vec<MessageInstance<M>>;347348fn deref(&self) -> &Self::Target {349&self.messages350}351}352353impl<M: Message> DerefMut for MessageSequence<M> {354fn deref_mut(&mut self) -> &mut Self::Target {355&mut self.messages356}357}358359/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.360pub struct WriteBatchIds<M> {361last_count: usize,362message_count: usize,363_marker: PhantomData<M>,364}365366impl<M: Message> Iterator for WriteBatchIds<M> {367type Item = MessageId<M>;368369fn next(&mut self) -> Option<Self::Item> {370if self.last_count >= self.message_count {371return None;372}373374let result = Some(MessageId {375id: self.last_count,376caller: MaybeLocation::caller(),377_marker: PhantomData,378});379380self.last_count += 1;381382result383}384385fn size_hint(&self) -> (usize, Option<usize>) {386let len = <Self as ExactSizeIterator>::len(self);387(len, Some(len))388}389}390391impl<M: Message> ExactSizeIterator for WriteBatchIds<M> {392fn len(&self) -> usize {393self.message_count.saturating_sub(self.last_count)394}395}396397#[cfg(test)]398mod tests {399use crate::message::{Message, Messages};400401#[test]402fn iter_current_update_messages_iterates_over_current_messages() {403#[derive(Message, Clone)]404struct TestMessage;405406let mut test_messages = Messages::<TestMessage>::default();407408// Starting empty409assert_eq!(test_messages.len(), 0);410assert_eq!(test_messages.iter_current_update_messages().count(), 0);411test_messages.update();412413// Writing one message414test_messages.write(TestMessage);415416assert_eq!(test_messages.len(), 1);417assert_eq!(test_messages.iter_current_update_messages().count(), 1);418test_messages.update();419420// Writing two messages on the next frame421test_messages.write(TestMessage);422test_messages.write(TestMessage);423424assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3425assert_eq!(test_messages.iter_current_update_messages().count(), 2);426test_messages.update();427428// Writing zero messages429assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2430assert_eq!(test_messages.iter_current_update_messages().count(), 0);431}432433#[test]434fn write_batch_iter_size_hint() {435#[derive(Message, Clone, Copy)]436struct TestMessage;437438let mut test_messages = Messages::<TestMessage>::default();439let write_batch_ids = test_messages.write_batch([TestMessage; 4]);440let expected_len = 4;441assert_eq!(write_batch_ids.len(), expected_len);442assert_eq!(443write_batch_ids.size_hint(),444(expected_len, Some(expected_len))445);446}447}448449450