Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/message/messages.rs
9368 views
1
use crate::{
2
change_detection::MaybeLocation,
3
message::{Message, MessageCursor, MessageId, MessageInstance},
4
resource::Resource,
5
};
6
use alloc::vec::Vec;
7
use core::{
8
marker::PhantomData,
9
ops::{Deref, DerefMut},
10
};
11
#[cfg(feature = "bevy_reflect")]
12
use {
13
crate::reflect::ReflectResource,
14
bevy_reflect::{std_traits::ReflectDefault, Reflect},
15
};
16
17
/// A message collection that represents the messages that occurred within the last two
18
/// [`Messages::update`] calls.
19
/// Messages can be written to using a [`MessageWriter`]
20
/// and are typically cheaply read using a [`MessageReader`].
21
///
22
/// Each message can be consumed by multiple systems, in parallel,
23
/// with consumption tracked by the [`MessageReader`] on a per-system basis.
24
///
25
/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)
26
/// is applied between writing and reading systems, there is a risk of a race condition.
27
/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.
28
///
29
/// This collection is meant to be paired with a system that calls
30
/// [`Messages::update`] exactly once per update/frame.
31
///
32
/// [`message_update_system`] is a system that does this, typically initialized automatically using
33
/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
34
/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.
35
/// Messages will persist across a single frame boundary and so ordering of message producers and
36
/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).
37
/// If messages are not handled by the end of the frame after they are updated, they will be
38
/// dropped silently.
39
///
40
/// # Example
41
///
42
/// ```
43
/// use bevy_ecs::message::{Message, Messages};
44
///
45
/// #[derive(Message)]
46
/// struct MyMessage {
47
/// value: usize
48
/// }
49
///
50
/// // setup
51
/// let mut messages = Messages::<MyMessage>::default();
52
/// let mut cursor = messages.get_cursor();
53
///
54
/// // run this once per update/frame
55
/// messages.update();
56
///
57
/// // somewhere else: write a message
58
/// messages.write(MyMessage { value: 1 });
59
///
60
/// // somewhere else: read the messages
61
/// for message in cursor.read(&messages) {
62
/// assert_eq!(message.value, 1)
63
/// }
64
///
65
/// // messages are only processed once per reader
66
/// assert_eq!(cursor.read(&messages).count(), 0);
67
/// ```
68
///
69
/// # Details
70
///
71
/// [`Messages`] is implemented using a variation of a double buffer strategy.
72
/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.
73
/// - [`MessageReader`]s will read messages from both buffers.
74
/// - [`MessageReader`]s that read at least once per update will never drop messages.
75
/// - [`MessageReader`]s that read once within two updates might still receive some messages
76
/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred
77
/// before those updates.
78
///
79
/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.
80
///
81
/// An alternative call pattern would be to call [`update`](Messages::update)
82
/// manually across frames to control when messages are cleared.
83
/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,
84
/// but can be done by adding your message as a resource instead of using
85
/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).
86
///
87
/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)
88
/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)
89
///
90
/// [`MessageReader`]: super::MessageReader
91
/// [`MessageWriter`]: super::MessageWriter
92
/// [`message_update_system`]: super::message_update_system
93
#[derive(Debug, Resource)]
94
#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]
95
pub struct Messages<M: Message> {
96
/// Holds the oldest still active messages.
97
/// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.
98
pub(crate) messages_a: MessageSequence<M>,
99
/// Holds the newer messages.
100
pub(crate) messages_b: MessageSequence<M>,
101
pub(crate) message_count: usize,
102
}
103
104
// Derived Default impl would incorrectly require M: Default
105
impl<M: Message> Default for Messages<M> {
106
fn default() -> Self {
107
Self {
108
messages_a: Default::default(),
109
messages_b: Default::default(),
110
message_count: Default::default(),
111
}
112
}
113
}
114
115
impl<M: Message> Messages<M> {
116
/// Returns the index of the oldest message stored in the message buffer.
117
pub fn oldest_message_count(&self) -> usize {
118
self.messages_a.start_message_count
119
}
120
121
/// Writes an `message` to the current message buffer.
122
/// [`MessageReader`](super::MessageReader)s can then read the message.
123
/// This method returns the [ID](`MessageId`) of the written `message`.
124
#[track_caller]
125
pub fn write(&mut self, message: M) -> MessageId<M> {
126
self.write_with_caller(message, MaybeLocation::caller())
127
}
128
129
pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {
130
let message_id = MessageId {
131
id: self.message_count,
132
caller,
133
_marker: PhantomData,
134
};
135
#[cfg(feature = "detailed_trace")]
136
tracing::trace!("Messages::write() -> id: {}", message_id);
137
138
let message_instance = MessageInstance {
139
message_id,
140
message,
141
};
142
143
self.messages_b.push(message_instance);
144
self.message_count += 1;
145
146
message_id
147
}
148
149
/// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.
150
/// This is more efficient than writing each message individually.
151
/// This method returns the [IDs](`MessageId`) of the written `messages`.
152
#[track_caller]
153
pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {
154
let last_count = self.message_count;
155
156
self.extend(messages);
157
158
WriteBatchIds {
159
last_count,
160
message_count: self.message_count,
161
_marker: PhantomData,
162
}
163
}
164
165
/// Writes the default value of the message. Useful when the message is an empty struct.
166
/// This method returns the [ID](`MessageId`) of the written `message`.
167
#[track_caller]
168
pub fn write_default(&mut self) -> MessageId<M>
169
where
170
M: Default,
171
{
172
self.write(Default::default())
173
}
174
175
/// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.
176
pub fn get_cursor(&self) -> MessageCursor<M> {
177
MessageCursor::default()
178
}
179
180
/// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.
181
/// It will read all future messages.
182
pub fn get_cursor_current(&self) -> MessageCursor<M> {
183
MessageCursor {
184
last_message_count: self.message_count,
185
..Default::default()
186
}
187
}
188
189
/// Swaps the message buffers and clears the oldest message buffer. In general, this should be
190
/// called once per frame/update.
191
///
192
/// If you need access to the messages that were removed, consider using [`Messages::update_drain`].
193
pub fn update(&mut self) {
194
core::mem::swap(&mut self.messages_a, &mut self.messages_b);
195
self.messages_b.clear();
196
self.messages_b.start_message_count = self.message_count;
197
debug_assert_eq!(
198
self.messages_a.start_message_count + self.messages_a.len(),
199
self.messages_b.start_message_count
200
);
201
}
202
203
/// Swaps the message buffers and drains the oldest message buffer, returning an iterator
204
/// of all messages that were removed. In general, this should be called once per frame/update.
205
///
206
/// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.
207
#[must_use = "If you do not need the returned messages, call .update() instead."]
208
pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {
209
core::mem::swap(&mut self.messages_a, &mut self.messages_b);
210
let iter = self.messages_b.messages.drain(..);
211
self.messages_b.start_message_count = self.message_count;
212
debug_assert_eq!(
213
self.messages_a.start_message_count + self.messages_a.len(),
214
self.messages_b.start_message_count
215
);
216
217
iter.map(|e| e.message)
218
}
219
220
#[inline]
221
fn reset_start_message_count(&mut self) {
222
self.messages_a.start_message_count = self.message_count;
223
self.messages_b.start_message_count = self.message_count;
224
}
225
226
/// Removes all messages.
227
#[inline]
228
pub fn clear(&mut self) {
229
self.reset_start_message_count();
230
self.messages_a.clear();
231
self.messages_b.clear();
232
}
233
234
/// Returns the number of messages currently stored in the message buffer.
235
#[inline]
236
pub fn len(&self) -> usize {
237
self.messages_a.len() + self.messages_b.len()
238
}
239
240
/// Returns true if there are no messages currently stored in the message buffer.
241
#[inline]
242
pub fn is_empty(&self) -> bool {
243
self.len() == 0
244
}
245
246
/// Creates a draining iterator that removes all messages.
247
pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {
248
self.reset_start_message_count();
249
250
// Drain the oldest messages first, then the newest
251
self.messages_a
252
.drain(..)
253
.chain(self.messages_b.drain(..))
254
.map(|i| i.message)
255
}
256
257
/// Iterates over messages that happened since the last "update" call.
258
/// WARNING: You probably don't want to use this call. In most cases you should use an
259
/// [`MessageReader`]. You should only use this if you know you only need to consume messages
260
/// between the last `update()` call and your call to `iter_current_update_messages`.
261
/// If messages happen outside that window, they will not be handled. For example, any messages that
262
/// happen after this call and before the next `update()` call will be dropped.
263
///
264
/// [`MessageReader`]: super::MessageReader
265
pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {
266
self.messages_b.iter().map(|i| &i.message)
267
}
268
269
/// Get a specific message by id if it still exists in the messages buffer.
270
pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {
271
if id < self.oldest_message_count() {
272
return None;
273
}
274
275
let sequence = self.sequence(id);
276
let index = id.saturating_sub(sequence.start_message_count);
277
278
sequence
279
.get(index)
280
.map(|instance| (&instance.message, instance.message_id))
281
}
282
283
/// Which message buffer is this message id a part of.
284
fn sequence(&self, id: usize) -> &MessageSequence<M> {
285
if id < self.messages_b.start_message_count {
286
&self.messages_a
287
} else {
288
&self.messages_b
289
}
290
}
291
}
292
293
impl<M: Message> Extend<M> for Messages<M> {
294
#[track_caller]
295
fn extend<I>(&mut self, iter: I)
296
where
297
I: IntoIterator<Item = M>,
298
{
299
let old_count = self.message_count;
300
let mut message_count = self.message_count;
301
let messages = iter.into_iter().map(|message| {
302
let message_id = MessageId {
303
id: message_count,
304
caller: MaybeLocation::caller(),
305
_marker: PhantomData,
306
};
307
message_count += 1;
308
MessageInstance {
309
message_id,
310
message,
311
}
312
});
313
314
self.messages_b.extend(messages);
315
316
if old_count != message_count {
317
#[cfg(feature = "detailed_trace")]
318
tracing::trace!(
319
"Messages::extend() -> ids: ({}..{})",
320
self.message_count,
321
message_count
322
);
323
}
324
325
self.message_count = message_count;
326
}
327
}
328
329
#[derive(Debug)]
330
#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]
331
pub(crate) struct MessageSequence<M: Message> {
332
pub(crate) messages: Vec<MessageInstance<M>>,
333
pub(crate) start_message_count: usize,
334
}
335
336
// Derived Default impl would incorrectly require M: Default
337
impl<M: Message> Default for MessageSequence<M> {
338
fn default() -> Self {
339
Self {
340
messages: Default::default(),
341
start_message_count: Default::default(),
342
}
343
}
344
}
345
346
impl<M: Message> Deref for MessageSequence<M> {
347
type Target = Vec<MessageInstance<M>>;
348
349
fn deref(&self) -> &Self::Target {
350
&self.messages
351
}
352
}
353
354
impl<M: Message> DerefMut for MessageSequence<M> {
355
fn deref_mut(&mut self) -> &mut Self::Target {
356
&mut self.messages
357
}
358
}
359
360
/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.
361
pub struct WriteBatchIds<M> {
362
last_count: usize,
363
message_count: usize,
364
_marker: PhantomData<M>,
365
}
366
367
impl<M: Message> Iterator for WriteBatchIds<M> {
368
type Item = MessageId<M>;
369
370
fn next(&mut self) -> Option<Self::Item> {
371
if self.last_count >= self.message_count {
372
return None;
373
}
374
375
let result = Some(MessageId {
376
id: self.last_count,
377
caller: MaybeLocation::caller(),
378
_marker: PhantomData,
379
});
380
381
self.last_count += 1;
382
383
result
384
}
385
386
fn size_hint(&self) -> (usize, Option<usize>) {
387
let len = <Self as ExactSizeIterator>::len(self);
388
(len, Some(len))
389
}
390
}
391
392
impl<M: Message> ExactSizeIterator for WriteBatchIds<M> {
393
fn len(&self) -> usize {
394
self.message_count.saturating_sub(self.last_count)
395
}
396
}
397
398
#[cfg(test)]
399
mod tests {
400
use crate::message::{Message, Messages};
401
402
#[test]
403
fn iter_current_update_messages_iterates_over_current_messages() {
404
#[derive(Message, Clone)]
405
struct TestMessage;
406
407
let mut test_messages = Messages::<TestMessage>::default();
408
409
// Starting empty
410
assert_eq!(test_messages.len(), 0);
411
assert_eq!(test_messages.iter_current_update_messages().count(), 0);
412
test_messages.update();
413
414
// Writing one message
415
test_messages.write(TestMessage);
416
417
assert_eq!(test_messages.len(), 1);
418
assert_eq!(test_messages.iter_current_update_messages().count(), 1);
419
test_messages.update();
420
421
// Writing two messages on the next frame
422
test_messages.write(TestMessage);
423
test_messages.write(TestMessage);
424
425
assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3
426
assert_eq!(test_messages.iter_current_update_messages().count(), 2);
427
test_messages.update();
428
429
// Writing zero messages
430
assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2
431
assert_eq!(test_messages.iter_current_update_messages().count(), 0);
432
}
433
434
#[test]
435
fn write_batch_iter_size_hint() {
436
#[derive(Message, Clone, Copy)]
437
struct TestMessage;
438
439
let mut test_messages = Messages::<TestMessage>::default();
440
let write_batch_ids = test_messages.write_batch([TestMessage; 4]);
441
let expected_len = 4;
442
assert_eq!(write_batch_ids.len(), expected_len);
443
assert_eq!(
444
write_batch_ids.size_hint(),
445
(expected_len, Some(expected_len))
446
);
447
}
448
}
449
450