Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/world/command_queue.rs
9353 views
1
use crate::{
2
change_detection::MaybeLocation,
3
system::{Command, SystemBuffer, SystemMeta},
4
world::{DeferredWorld, World},
5
};
6
7
use alloc::{boxed::Box, vec::Vec};
8
use bevy_ptr::{OwningPtr, Unaligned};
9
use core::{
10
fmt::Debug,
11
mem::{size_of, MaybeUninit},
12
panic::AssertUnwindSafe,
13
ptr::{addr_of_mut, NonNull},
14
};
15
use log::warn;
16
17
struct CommandMeta {
18
/// SAFETY: The `value` must point to a value of type `T: Command`,
19
/// where `T` is some specific type that was used to produce this metadata.
20
///
21
/// `world` is optional to allow this one function pointer to perform double-duty as a drop.
22
///
23
/// Advances `cursor` by the size of `T` in bytes.
24
consume_command_and_get_size:
25
unsafe fn(value: OwningPtr<Unaligned>, world: Option<NonNull<World>>, cursor: &mut usize),
26
}
27
28
/// Densely and efficiently stores a queue of heterogenous types implementing [`Command`].
29
// NOTE: [`CommandQueue`] is implemented via a `Vec<MaybeUninit<u8>>` instead of a `Vec<Box<dyn Command>>`
30
// as an optimization. Since commands are used frequently in systems as a way to spawn
31
// entities/components/resources, and it's not currently possible to parallelize these
32
// due to mutable [`World`] access, maximizing performance for [`CommandQueue`] is
33
// preferred to simplicity of implementation.
34
pub struct CommandQueue {
35
// This buffer densely stores all queued commands.
36
//
37
// For each command, one `CommandMeta` is stored, followed by zero or more bytes
38
// to store the command itself. To interpret these bytes, a pointer must
39
// be passed to the corresponding `CommandMeta.apply_command_and_get_size` fn pointer.
40
pub(crate) bytes: Vec<MaybeUninit<u8>>,
41
pub(crate) cursor: usize,
42
pub(crate) panic_recovery: Vec<MaybeUninit<u8>>,
43
pub(crate) caller: MaybeLocation,
44
}
45
46
impl Default for CommandQueue {
47
#[track_caller]
48
fn default() -> Self {
49
Self {
50
bytes: Default::default(),
51
cursor: Default::default(),
52
panic_recovery: Default::default(),
53
caller: MaybeLocation::caller(),
54
}
55
}
56
}
57
58
/// Wraps pointers to a [`CommandQueue`], used internally to avoid stacked borrow rules when
59
/// partially applying the world's command queue recursively
60
#[derive(Clone)]
61
pub(crate) struct RawCommandQueue {
62
pub(crate) bytes: NonNull<Vec<MaybeUninit<u8>>>,
63
pub(crate) cursor: NonNull<usize>,
64
pub(crate) panic_recovery: NonNull<Vec<MaybeUninit<u8>>>,
65
}
66
67
// CommandQueue needs to implement Debug manually, rather than deriving it, because the derived impl just prints
68
// [core::mem::maybe_uninit::MaybeUninit<u8>, core::mem::maybe_uninit::MaybeUninit<u8>, ..] for every byte in the vec,
69
// which gets extremely verbose very quickly, while also providing no useful information.
70
// It is not possible to soundly print the values of the contained bytes, as some of them may be padding or uninitialized (#4863)
71
// So instead, the manual impl just prints the length of vec.
72
impl Debug for CommandQueue {
73
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74
f.debug_struct("CommandQueue")
75
.field("len_bytes", &self.bytes.len())
76
.field("caller", &self.caller)
77
.finish_non_exhaustive()
78
}
79
}
80
81
// SAFETY: All commands [`Command`] implement [`Send`]
82
unsafe impl Send for CommandQueue {}
83
84
// SAFETY: `&CommandQueue` never gives access to the inner commands.
85
unsafe impl Sync for CommandQueue {}
86
87
impl CommandQueue {
88
/// Push a [`Command`] onto the queue.
89
#[inline]
90
pub fn push(&mut self, command: impl Command) {
91
// SAFETY: self is guaranteed to live for the lifetime of this method
92
unsafe {
93
self.get_raw().push(command);
94
}
95
}
96
97
/// Execute the queued [`Command`]s in the world after applying any commands in the world's internal queue.
98
/// This clears the queue.
99
#[inline]
100
pub fn apply(&mut self, world: &mut World) {
101
// flush the world's internal queue
102
world.flush_commands();
103
104
// SAFETY: A reference is always a valid pointer
105
unsafe {
106
self.get_raw().apply_or_drop_queued(Some(world.into()));
107
}
108
}
109
110
/// Take all commands from `other` and append them to `self`, leaving `other` empty
111
pub fn append(&mut self, other: &mut CommandQueue) {
112
self.bytes.append(&mut other.bytes);
113
}
114
115
/// Returns false if there are any commands in the queue
116
#[inline]
117
pub fn is_empty(&self) -> bool {
118
self.cursor >= self.bytes.len()
119
}
120
121
/// Returns a [`RawCommandQueue`] instance sharing the underlying command queue.
122
pub(crate) fn get_raw(&mut self) -> RawCommandQueue {
123
// SAFETY: self is always valid memory
124
unsafe {
125
RawCommandQueue {
126
bytes: NonNull::new_unchecked(addr_of_mut!(self.bytes)),
127
cursor: NonNull::new_unchecked(addr_of_mut!(self.cursor)),
128
panic_recovery: NonNull::new_unchecked(addr_of_mut!(self.panic_recovery)),
129
}
130
}
131
}
132
}
133
134
impl RawCommandQueue {
135
/// Returns a new `RawCommandQueue` instance, this must be manually dropped.
136
pub(crate) fn new() -> Self {
137
// SAFETY: Pointers returned by `Box::into_raw` are guaranteed to be non null
138
unsafe {
139
Self {
140
bytes: NonNull::new_unchecked(Box::into_raw(Box::default())),
141
cursor: NonNull::new_unchecked(Box::into_raw(Box::new(0usize))),
142
panic_recovery: NonNull::new_unchecked(Box::into_raw(Box::default())),
143
}
144
}
145
}
146
147
/// Returns true if the queue is empty.
148
///
149
/// # Safety
150
///
151
/// * Caller ensures that `bytes` and `cursor` point to valid memory
152
pub unsafe fn is_empty(&self) -> bool {
153
// SAFETY: Pointers are guaranteed to be valid by requirements on `.clone_unsafe`
154
(unsafe { *self.cursor.as_ref() }) >= (unsafe { self.bytes.as_ref() }).len()
155
}
156
157
/// Push a [`Command`] onto the queue.
158
///
159
/// # Safety
160
///
161
/// * Caller ensures that `self` has not outlived the underlying queue
162
#[inline]
163
pub unsafe fn push<C: Command>(&mut self, command: C) {
164
// Stores a command alongside its metadata.
165
// `repr(C)` prevents the compiler from reordering the fields,
166
// while `repr(packed)` prevents the compiler from inserting padding bytes.
167
#[repr(C, packed)]
168
struct Packed<C: Command> {
169
meta: CommandMeta,
170
command: C,
171
}
172
173
let meta = CommandMeta {
174
consume_command_and_get_size: |command, world, cursor| {
175
*cursor += size_of::<C>();
176
177
// SAFETY: According to the invariants of `CommandMeta.consume_command_and_get_size`,
178
// `command` must point to a value of type `C`.
179
let command: C = unsafe { command.read_unaligned() };
180
match world {
181
// Apply command to the provided world...
182
Some(mut world) => {
183
// SAFETY: Caller ensures pointer is not null
184
let world = unsafe { world.as_mut() };
185
command.apply(world);
186
// The command may have queued up world commands, which we flush here to ensure they are also picked up.
187
// If the current command queue already the World Command queue, this will still behave appropriately because the global cursor
188
// is still at the current `stop`, ensuring only the newly queued Commands will be applied.
189
world.flush();
190
}
191
// ...or discard it.
192
None => drop(command),
193
}
194
},
195
};
196
197
// SAFETY: There are no outstanding references to self.bytes
198
let bytes = unsafe { self.bytes.as_mut() };
199
200
let old_len = bytes.len();
201
202
// Reserve enough bytes for both the metadata and the command itself.
203
bytes.reserve(size_of::<Packed<C>>());
204
205
// Pointer to the bytes at the end of the buffer.
206
// SAFETY: We know it is within bounds of the allocation, due to the call to `.reserve()`.
207
let ptr = unsafe { bytes.as_mut_ptr().add(old_len) };
208
209
// Write the metadata into the buffer, followed by the command.
210
// We are using a packed struct to write them both as one operation.
211
// SAFETY: `ptr` must be non-null, since it is within a non-null buffer.
212
// The call to `reserve()` ensures that the buffer has enough space to fit a value of type `C`,
213
// and it is valid to write any bit pattern since the underlying buffer is of type `MaybeUninit<u8>`.
214
unsafe {
215
ptr.cast::<Packed<C>>()
216
.write_unaligned(Packed { meta, command });
217
}
218
219
// Extend the length of the buffer to include the data we just wrote.
220
// SAFETY: The new length is guaranteed to fit in the vector's capacity,
221
// due to the call to `.reserve()` above.
222
unsafe {
223
bytes.set_len(old_len + size_of::<Packed<C>>());
224
}
225
}
226
227
/// If `world` is [`Some`], this will apply the queued [commands](`Command`).
228
/// If `world` is [`None`], this will drop the queued [commands](`Command`) (without applying them).
229
/// This clears the queue.
230
///
231
/// # Safety
232
///
233
/// * Caller ensures that `self` has not outlived the underlying queue
234
#[inline]
235
pub(crate) unsafe fn apply_or_drop_queued(&mut self, world: Option<NonNull<World>>) {
236
// SAFETY: If this is the command queue on world, world will not be dropped as we have a mutable reference
237
// If this is not the command queue on world we have exclusive ownership and self will not be mutated
238
let start = *self.cursor.as_ref();
239
let stop = self.bytes.as_ref().len();
240
let mut local_cursor = start;
241
// SAFETY: we are setting the global cursor to the current length to prevent the executing commands from applying
242
// the remaining commands currently in this list. This is safe.
243
*self.cursor.as_mut() = stop;
244
245
while local_cursor < stop {
246
// SAFETY: The cursor is either at the start of the buffer, or just after the previous command.
247
// Since we know that the cursor is in bounds, it must point to the start of a new command.
248
let meta = unsafe {
249
self.bytes
250
.as_mut()
251
.as_mut_ptr()
252
.add(local_cursor)
253
.cast::<CommandMeta>()
254
.read_unaligned()
255
};
256
257
// Advance to the bytes just after `meta`, which represent a type-erased command.
258
local_cursor += size_of::<CommandMeta>();
259
// Construct an owned pointer to the command.
260
// SAFETY: It is safe to transfer ownership out of `self.bytes`, since the increment of `cursor` above
261
// guarantees that nothing stored in the buffer will get observed after this function ends.
262
// `cmd` points to a valid address of a stored command, so it must be non-null.
263
let cmd = unsafe {
264
OwningPtr::<Unaligned>::new(NonNull::new_unchecked(
265
self.bytes.as_mut().as_mut_ptr().add(local_cursor).cast(),
266
))
267
};
268
let f = AssertUnwindSafe(|| {
269
// SAFETY: The data underneath the cursor must correspond to the type erased in metadata,
270
// since they were stored next to each other by `.push()`.
271
// For ZSTs, the type doesn't matter as long as the pointer is non-null.
272
// This also advances the cursor past the command. For ZSTs, the cursor will not move.
273
// At this point, it will either point to the next `CommandMeta`,
274
// or the cursor will be out of bounds and the loop will end.
275
unsafe { (meta.consume_command_and_get_size)(cmd, world, &mut local_cursor) };
276
});
277
278
#[cfg(feature = "std")]
279
{
280
let result = std::panic::catch_unwind(f);
281
282
if let Err(payload) = result {
283
// local_cursor now points to the location _after_ the panicked command.
284
// Add the remaining commands that _would have_ been applied to the
285
// panic_recovery queue.
286
//
287
// This uses `current_stop` instead of `stop` to account for any commands
288
// that were queued _during_ this panic.
289
//
290
// This is implemented in such a way that if apply_or_drop_queued() are nested recursively in,
291
// an applied Command, the correct command order will be retained.
292
let panic_recovery = self.panic_recovery.as_mut();
293
let bytes = self.bytes.as_mut();
294
let current_stop = bytes.len();
295
panic_recovery.extend_from_slice(&bytes[local_cursor..current_stop]);
296
bytes.set_len(start);
297
*self.cursor.as_mut() = start;
298
299
// This was the "top of the apply stack". If we are _not_ at the top of the apply stack,
300
// when we call`resume_unwind" the caller "closer to the top" will catch the unwind and do this check,
301
// until we reach the top.
302
if start == 0 {
303
bytes.append(panic_recovery);
304
}
305
std::panic::resume_unwind(payload);
306
}
307
}
308
309
#[cfg(not(feature = "std"))]
310
(f)();
311
}
312
313
// Reset the buffer: all commands past the original `start` cursor have been applied.
314
// SAFETY: we are setting the length of bytes to the original length, minus the length of the original
315
// list of commands being considered. All bytes remaining in the Vec are still valid, unapplied commands.
316
unsafe {
317
self.bytes.as_mut().set_len(start);
318
*self.cursor.as_mut() = start;
319
};
320
}
321
}
322
323
impl Drop for CommandQueue {
324
fn drop(&mut self) {
325
if !self.bytes.is_empty() {
326
if let Some(caller) = self.caller.into_option() {
327
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply? caller:{caller:?}");
328
} else {
329
warn!("CommandQueue has un-applied commands being dropped. Did you forget to call SystemState::apply?");
330
}
331
}
332
// SAFETY: A reference is always a valid pointer
333
unsafe { self.get_raw().apply_or_drop_queued(None) };
334
}
335
}
336
337
impl SystemBuffer for CommandQueue {
338
#[inline]
339
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
340
#[cfg(feature = "trace")]
341
let _span_guard = _system_meta.commands_span.enter();
342
self.apply(world);
343
}
344
345
#[inline]
346
fn queue(&mut self, _system_meta: &SystemMeta, mut world: DeferredWorld) {
347
world.commands().append(self);
348
}
349
}
350
351
#[cfg(test)]
352
mod test {
353
use super::*;
354
use crate::{component::Component, resource::Resource};
355
use alloc::{borrow::ToOwned, string::String, sync::Arc};
356
use core::{
357
panic::AssertUnwindSafe,
358
sync::atomic::{AtomicU32, Ordering},
359
};
360
361
#[cfg(miri)]
362
use alloc::format;
363
364
struct DropCheck(Arc<AtomicU32>);
365
366
impl DropCheck {
367
fn new() -> (Self, Arc<AtomicU32>) {
368
let drops = Arc::new(AtomicU32::new(0));
369
(Self(drops.clone()), drops)
370
}
371
}
372
373
impl Drop for DropCheck {
374
fn drop(&mut self) {
375
self.0.fetch_add(1, Ordering::Relaxed);
376
}
377
}
378
379
impl Command for DropCheck {
380
fn apply(self, _: &mut World) {}
381
}
382
383
#[test]
384
fn test_command_queue_inner_drop() {
385
let mut queue = CommandQueue::default();
386
387
let (dropcheck_a, drops_a) = DropCheck::new();
388
let (dropcheck_b, drops_b) = DropCheck::new();
389
390
queue.push(dropcheck_a);
391
queue.push(dropcheck_b);
392
393
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
394
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
395
396
let mut world = World::new();
397
queue.apply(&mut world);
398
399
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
400
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
401
}
402
403
/// Asserts that inner [commands](`Command`) are dropped on early drop of [`CommandQueue`].
404
/// Originally identified as an issue in [#10676](https://github.com/bevyengine/bevy/issues/10676)
405
#[test]
406
fn test_command_queue_inner_drop_early() {
407
let mut queue = CommandQueue::default();
408
409
let (dropcheck_a, drops_a) = DropCheck::new();
410
let (dropcheck_b, drops_b) = DropCheck::new();
411
412
queue.push(dropcheck_a);
413
queue.push(dropcheck_b);
414
415
assert_eq!(drops_a.load(Ordering::Relaxed), 0);
416
assert_eq!(drops_b.load(Ordering::Relaxed), 0);
417
418
drop(queue);
419
420
assert_eq!(drops_a.load(Ordering::Relaxed), 1);
421
assert_eq!(drops_b.load(Ordering::Relaxed), 1);
422
}
423
424
#[derive(Component)]
425
struct A;
426
427
struct SpawnCommand;
428
429
impl Command for SpawnCommand {
430
fn apply(self, world: &mut World) {
431
world.spawn(A);
432
}
433
}
434
435
#[test]
436
fn test_command_queue_inner() {
437
let mut queue = CommandQueue::default();
438
439
queue.push(SpawnCommand);
440
queue.push(SpawnCommand);
441
442
let mut world = World::new();
443
queue.apply(&mut world);
444
445
assert_eq!(world.query::<&A>().query(&world).count(), 2);
446
447
// The previous call to `apply` cleared the queue.
448
// This call should do nothing.
449
queue.apply(&mut world);
450
assert_eq!(world.query::<&A>().query(&world).count(), 2);
451
}
452
453
#[expect(
454
dead_code,
455
reason = "The inner string is used to ensure that, when the PanicCommand gets pushed to the queue, some data is written to the `bytes` vector."
456
)]
457
struct PanicCommand(String);
458
impl Command for PanicCommand {
459
fn apply(self, _: &mut World) {
460
panic!("command is panicking");
461
}
462
}
463
464
#[test]
465
fn test_command_queue_inner_panic_safe() {
466
std::panic::set_hook(Box::new(|_| {}));
467
468
let mut queue = CommandQueue::default();
469
470
queue.push(PanicCommand("I panic!".to_owned()));
471
queue.push(SpawnCommand);
472
473
let mut world = World::new();
474
475
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
476
queue.apply(&mut world);
477
}));
478
479
// Even though the first command panicked, it's still ok to push
480
// more commands.
481
queue.push(SpawnCommand);
482
queue.push(SpawnCommand);
483
queue.apply(&mut world);
484
assert_eq!(world.query::<&A>().query(&world).count(), 3);
485
}
486
487
#[test]
488
fn test_command_queue_inner_nested_panic_safe() {
489
std::panic::set_hook(Box::new(|_| {}));
490
491
#[derive(Resource, Default)]
492
struct Order(Vec<usize>);
493
494
let mut world = World::new();
495
world.init_resource::<Order>();
496
497
fn add_index(index: usize) -> impl Command {
498
move |world: &mut World| world.resource_mut::<Order>().0.push(index)
499
}
500
world.commands().queue(add_index(1));
501
world.commands().queue(|world: &mut World| {
502
world.commands().queue(add_index(2));
503
world.commands().queue(PanicCommand("I panic!".to_owned()));
504
world.commands().queue(add_index(3));
505
world.flush_commands();
506
});
507
world.commands().queue(add_index(4));
508
509
let _ = std::panic::catch_unwind(AssertUnwindSafe(|| {
510
world.flush_commands();
511
}));
512
513
world.commands().queue(add_index(5));
514
world.flush_commands();
515
assert_eq!(&world.resource::<Order>().0, &[1, 2, 3, 4, 5]);
516
}
517
518
// NOTE: `CommandQueue` is `Send` because `Command` is send.
519
// If the `Command` trait gets reworked to be non-send, `CommandQueue`
520
// should be reworked.
521
// This test asserts that Command types are send.
522
fn assert_is_send_impl(_: impl Send) {}
523
fn assert_is_send(command: impl Command) {
524
assert_is_send_impl(command);
525
}
526
527
#[test]
528
fn test_command_is_send() {
529
assert_is_send(SpawnCommand);
530
}
531
532
#[expect(
533
dead_code,
534
reason = "This struct is used to test how the CommandQueue reacts to padding added by rust's compiler."
535
)]
536
struct CommandWithPadding(u8, u16);
537
impl Command for CommandWithPadding {
538
fn apply(self, _: &mut World) {}
539
}
540
541
#[cfg(miri)]
542
#[test]
543
fn test_uninit_bytes() {
544
let mut queue = CommandQueue::default();
545
queue.push(CommandWithPadding(0, 0));
546
let _ = format!("{:?}", queue.bytes);
547
}
548
}
549
550