Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/message/message_reader.rs
9341 views
1
#[cfg(feature = "multi_threaded")]
2
use crate::message::MessageParIter;
3
use crate::{
4
message::{Message, MessageCursor, MessageIterator, MessageIteratorWithId, Messages},
5
system::{Local, Res, SystemParam, SystemParamValidationError},
6
};
7
8
/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
9
///
10
/// Use [`PopulatedMessageReader<T>`] to skip the system if there are no messages.
11
///
12
/// # Concurrency
13
///
14
/// Unlike [`MessageWriter<T>`], systems with `MessageReader<T>` param can be executed concurrently
15
/// (but not concurrently with `MessageWriter<T>` or `MessageMutator<T>` systems for the same message type).
16
///
17
/// [`MessageWriter<T>`]: super::MessageWriter
18
#[derive(SystemParam, Debug)]
19
pub struct MessageReader<'w, 's, M: Message> {
20
pub(super) reader: Local<'s, MessageCursor<M>>,
21
#[system_param(validation_message = "Message not initialized")]
22
messages: Res<'w, Messages<M>>,
23
}
24
25
impl<'w, 's, M: Message> MessageReader<'w, 's, M> {
26
/// Iterates over the messages this [`MessageReader`] has not seen yet. This updates the
27
/// [`MessageReader`]'s message counter, which means subsequent message reads will not include messages
28
/// that happened before now.
29
pub fn read(&mut self) -> MessageIterator<'_, M> {
30
self.reader.read(&self.messages)
31
}
32
33
/// Like [`read`](Self::read), except also returning the [`MessageId`](super::MessageId) of the messages.
34
pub fn read_with_id(&mut self) -> MessageIteratorWithId<'_, M> {
35
self.reader.read_with_id(&self.messages)
36
}
37
38
/// Returns a parallel iterator over the messages this [`MessageReader`] has not seen yet.
39
/// See also [`for_each`](MessageParIter::for_each).
40
///
41
/// # Example
42
/// ```
43
/// # use bevy_ecs::prelude::*;
44
/// # use std::sync::atomic::{AtomicUsize, Ordering};
45
///
46
/// #[derive(Message)]
47
/// struct MyMessage {
48
/// value: usize,
49
/// }
50
///
51
/// #[derive(Resource, Default)]
52
/// struct Counter(AtomicUsize);
53
///
54
/// // setup
55
/// let mut world = World::new();
56
/// world.init_resource::<Messages<MyMessage>>();
57
/// world.insert_resource(Counter::default());
58
///
59
/// let mut schedule = Schedule::default();
60
/// schedule.add_systems(|mut messages: MessageReader<MyMessage>, counter: Res<Counter>| {
61
/// messages.par_read().for_each(|MyMessage { value }| {
62
/// counter.0.fetch_add(*value, Ordering::Relaxed);
63
/// });
64
/// });
65
/// for value in 0..100 {
66
/// world.write_message(MyMessage { value });
67
/// }
68
/// schedule.run(&mut world);
69
/// let Counter(counter) = world.remove_resource::<Counter>().unwrap();
70
/// // all messages were processed
71
/// assert_eq!(counter.into_inner(), 4950);
72
/// ```
73
#[cfg(feature = "multi_threaded")]
74
pub fn par_read(&mut self) -> MessageParIter<'_, M> {
75
self.reader.par_read(&self.messages)
76
}
77
78
/// Determines the number of messages available to be read from this [`MessageReader`] without consuming any.
79
pub fn len(&self) -> usize {
80
self.reader.len(&self.messages)
81
}
82
83
/// Returns `true` if there are no messages available to read.
84
///
85
/// # Example
86
///
87
/// The following example shows a useful pattern where some behavior is triggered if new messages are available.
88
/// [`MessageReader::clear()`] is used so the same messages don't re-trigger the behavior the next time the system runs.
89
///
90
/// ```
91
/// # use bevy_ecs::prelude::*;
92
/// #
93
/// #[derive(Message)]
94
/// struct Collision;
95
///
96
/// fn play_collision_sound(mut messages: MessageReader<Collision>) {
97
/// if !messages.is_empty() {
98
/// messages.clear();
99
/// // Play a sound
100
/// }
101
/// }
102
/// # bevy_ecs::system::assert_is_system(play_collision_sound);
103
/// ```
104
pub fn is_empty(&self) -> bool {
105
self.reader.is_empty(&self.messages)
106
}
107
108
/// Consumes all available messages.
109
///
110
/// This means these messages will not appear in calls to [`MessageReader::read()`] or
111
/// [`MessageReader::read_with_id()`] and [`MessageReader::is_empty()`] will return `true`.
112
///
113
/// For usage, see [`MessageReader::is_empty()`].
114
pub fn clear(&mut self) {
115
self.reader.clear(&self.messages);
116
}
117
}
118
119
/// Reads [`Message`]s of type `T` in order and tracks which messages have already been read.
120
/// Skips the system if there no messages.
121
///
122
/// Use [`MessageReader<T>`] to run the system even if there are no messages.
123
///
124
/// Use the [`on_message`](crate::prelude::on_message) run condition to skip the system based on messages that it doesn't read.
125
#[derive(Debug)]
126
pub struct PopulatedMessageReader<'w, 's, M: Message>(MessageReader<'w, 's, M>);
127
128
impl<'w, 's, M: Message> core::ops::Deref for PopulatedMessageReader<'w, 's, M> {
129
type Target = MessageReader<'w, 's, M>;
130
131
fn deref(&self) -> &Self::Target {
132
&self.0
133
}
134
}
135
136
impl<'w, 's, M: Message> core::ops::DerefMut for PopulatedMessageReader<'w, 's, M> {
137
fn deref_mut(&mut self) -> &mut Self::Target {
138
&mut self.0
139
}
140
}
141
142
// SAFETY: relies on MessageReader to uphold soundness requirements
143
unsafe impl<'w, 's, M: Message> SystemParam for PopulatedMessageReader<'w, 's, M> {
144
type State = <MessageReader<'w, 's, M> as SystemParam>::State;
145
type Item<'world, 'state> = PopulatedMessageReader<'world, 'state, M>;
146
147
fn init_state(world: &mut crate::prelude::World) -> Self::State {
148
MessageReader::<M>::init_state(world)
149
}
150
151
fn init_access(
152
state: &Self::State,
153
system_meta: &mut crate::system::SystemMeta,
154
component_access_set: &mut crate::query::FilteredAccessSet,
155
world: &mut crate::prelude::World,
156
) {
157
MessageReader::<M>::init_access(state, system_meta, component_access_set, world);
158
}
159
160
unsafe fn get_param<'world, 'state>(
161
state: &'state mut Self::State,
162
system_meta: &crate::system::SystemMeta,
163
world: crate::world::unsafe_world_cell::UnsafeWorldCell<'world>,
164
change_tick: crate::change_detection::Tick,
165
) -> Self::Item<'world, 'state> {
166
// SAFETY: requirements are upheld by MessageReader's implementation
167
unsafe {
168
PopulatedMessageReader(MessageReader::get_param(
169
state,
170
system_meta,
171
world,
172
change_tick,
173
))
174
}
175
}
176
177
unsafe fn validate_param(
178
state: &mut Self::State,
179
system_meta: &crate::system::SystemMeta,
180
world: crate::world::unsafe_world_cell::UnsafeWorldCell,
181
) -> Result<(), SystemParamValidationError> {
182
// SAFETY: requirements are upheld by MessageReader's implementation
183
unsafe { MessageReader::<M>::validate_param(state, system_meta, world) }?;
184
185
// SAFETY: requirements are upheld by MessageReader's implementation
186
let reader =
187
unsafe { MessageReader::get_param(state, system_meta, world, world.change_tick()) };
188
if reader.is_empty() {
189
Err(SystemParamValidationError::skipped::<Self>(
190
"message queue is empty",
191
))
192
} else {
193
Ok(())
194
}
195
}
196
}
197
198
#[cfg(test)]
199
mod tests {
200
use core::sync::atomic::{AtomicBool, Ordering};
201
202
use super::*;
203
use crate::message::MessageRegistry;
204
use crate::prelude::*;
205
use bevy_platform::sync::Arc;
206
207
#[test]
208
fn test_populated_message_reader() {
209
let system_ran = Arc::new(AtomicBool::new(false));
210
211
let mut world = World::new();
212
MessageRegistry::register_message::<TheMessage>(&mut world);
213
214
let mut schedule = Schedule::default();
215
schedule.add_systems({
216
let system_ran = system_ran.clone();
217
move |mut _reader: PopulatedMessageReader<TheMessage>| {
218
system_ran.store(true, Ordering::SeqCst);
219
}
220
});
221
222
schedule.run(&mut world);
223
assert!(
224
!system_ran.load(Ordering::SeqCst),
225
"system with PopulatedMessageReader should have been skipped"
226
);
227
228
world.write_message(TheMessage);
229
schedule.run(&mut world);
230
assert!(
231
system_ran.load(Ordering::SeqCst),
232
"system with PopulatedMessageReader should NOT have been skipped"
233
);
234
235
#[derive(Message)]
236
struct TheMessage;
237
}
238
}
239
240