Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
9351 views
1
use alloc::{boxed::Box, vec::Vec};
2
use bevy_platform::cell::SyncUnsafeCell;
3
use bevy_platform::sync::Arc;
4
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
5
use concurrent_queue::ConcurrentQueue;
6
use core::{any::Any, panic::AssertUnwindSafe};
7
use fixedbitset::FixedBitSet;
8
#[cfg(feature = "std")]
9
use std::eprintln;
10
use std::sync::{Mutex, MutexGuard};
11
12
#[cfg(feature = "trace")]
13
use tracing::{info_span, Span};
14
15
use crate::{
16
error::{ErrorContext, ErrorHandler, Result},
17
prelude::Resource,
18
schedule::{
19
is_apply_deferred, ConditionWithAccess, ExecutorKind, SystemExecutor, SystemSchedule,
20
SystemWithAccess,
21
},
22
system::{RunSystemError, ScheduleSystem},
23
world::{unsafe_world_cell::UnsafeWorldCell, World},
24
};
25
#[cfg(feature = "hotpatching")]
26
use crate::{prelude::DetectChanges, HotPatchChanges};
27
28
use super::__rust_begin_short_backtrace;
29
30
/// Borrowed data used by the [`MultiThreadedExecutor`].
31
struct Environment<'env, 'sys> {
32
executor: &'env MultiThreadedExecutor,
33
systems: &'sys [SyncUnsafeCell<SystemWithAccess>],
34
conditions: SyncUnsafeCell<Conditions<'sys>>,
35
world_cell: UnsafeWorldCell<'env>,
36
}
37
38
struct Conditions<'a> {
39
system_conditions: &'a mut [Vec<ConditionWithAccess>],
40
set_conditions: &'a mut [Vec<ConditionWithAccess>],
41
sets_with_conditions_of_systems: &'a [FixedBitSet],
42
systems_in_sets_with_conditions: &'a [FixedBitSet],
43
}
44
45
impl<'env, 'sys> Environment<'env, 'sys> {
46
fn new(
47
executor: &'env MultiThreadedExecutor,
48
schedule: &'sys mut SystemSchedule,
49
world: &'env mut World,
50
) -> Self {
51
Environment {
52
executor,
53
systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),
54
conditions: SyncUnsafeCell::new(Conditions {
55
system_conditions: &mut schedule.system_conditions,
56
set_conditions: &mut schedule.set_conditions,
57
sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,
58
systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,
59
}),
60
world_cell: world.as_unsafe_world_cell(),
61
}
62
}
63
}
64
65
/// Per-system data used by the [`MultiThreadedExecutor`].
66
// Copied here because it can't be read from the system when it's running.
67
struct SystemTaskMetadata {
68
/// The set of systems whose `component_access_set()` conflicts with this one.
69
conflicting_systems: FixedBitSet,
70
/// The set of systems whose `component_access_set()` conflicts with this system's conditions.
71
/// Note that this is separate from `conflicting_systems` to handle the case where
72
/// a system is skipped by an earlier system set condition or system stepping,
73
/// and needs access to run its conditions but not for itself.
74
condition_conflicting_systems: FixedBitSet,
75
/// Indices of the systems that directly depend on the system.
76
dependents: Vec<usize>,
77
/// Is `true` if the system does not access `!Send` data.
78
is_send: bool,
79
/// Is `true` if the system is exclusive.
80
is_exclusive: bool,
81
}
82
83
/// The result of running a system that is sent across a channel.
84
struct SystemResult {
85
system_index: usize,
86
}
87
88
/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.
89
pub struct MultiThreadedExecutor {
90
/// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.
91
state: Mutex<ExecutorState>,
92
/// Queue of system completion events.
93
system_completion: ConcurrentQueue<SystemResult>,
94
/// Setting when true applies deferred system buffers after all systems have run
95
apply_final_deferred: bool,
96
/// When set, tells the executor that a thread has panicked.
97
panic_payload: Mutex<Option<Box<dyn Any + Send>>>,
98
starting_systems: FixedBitSet,
99
/// Cached tracing span
100
#[cfg(feature = "trace")]
101
executor_span: Span,
102
}
103
104
/// The state of the executor while running.
105
pub struct ExecutorState {
106
/// Metadata for scheduling and running system tasks.
107
system_task_metadata: Vec<SystemTaskMetadata>,
108
/// The set of systems whose `component_access_set()` conflicts with this system set's conditions.
109
set_condition_conflicting_systems: Vec<FixedBitSet>,
110
/// Returns `true` if a system with non-`Send` access is running.
111
local_thread_running: bool,
112
/// Returns `true` if an exclusive system is running.
113
exclusive_running: bool,
114
/// The number of systems that are running.
115
num_running_systems: usize,
116
/// The number of dependencies each system has that have not completed.
117
num_dependencies_remaining: Vec<usize>,
118
/// System sets whose conditions have been evaluated.
119
evaluated_sets: FixedBitSet,
120
/// Systems that have no remaining dependencies and are waiting to run.
121
ready_systems: FixedBitSet,
122
/// copy of `ready_systems`
123
ready_systems_copy: FixedBitSet,
124
/// Systems that are running.
125
running_systems: FixedBitSet,
126
/// Systems that got skipped.
127
skipped_systems: FixedBitSet,
128
/// Systems whose conditions have been evaluated and were run or skipped.
129
completed_systems: FixedBitSet,
130
/// Systems that have run but have not had their buffers applied.
131
unapplied_systems: FixedBitSet,
132
}
133
134
/// References to data required by the executor.
135
/// This is copied to each system task so that can invoke the executor when they complete.
136
// These all need to outlive 'scope in order to be sent to new tasks,
137
// and keeping them all in a struct means we can use lifetime elision.
138
#[derive(Copy, Clone)]
139
struct Context<'scope, 'env, 'sys> {
140
environment: &'env Environment<'env, 'sys>,
141
scope: &'scope Scope<'scope, 'env, ()>,
142
error_handler: ErrorHandler,
143
}
144
145
impl Default for MultiThreadedExecutor {
146
fn default() -> Self {
147
Self::new()
148
}
149
}
150
151
impl SystemExecutor for MultiThreadedExecutor {
152
fn kind(&self) -> ExecutorKind {
153
ExecutorKind::MultiThreaded
154
}
155
156
fn init(&mut self, schedule: &SystemSchedule) {
157
let state = self.state.get_mut().unwrap();
158
// pre-allocate space
159
let sys_count = schedule.system_ids.len();
160
let set_count = schedule.set_ids.len();
161
162
self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));
163
self.starting_systems = FixedBitSet::with_capacity(sys_count);
164
state.evaluated_sets = FixedBitSet::with_capacity(set_count);
165
state.ready_systems = FixedBitSet::with_capacity(sys_count);
166
state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);
167
state.running_systems = FixedBitSet::with_capacity(sys_count);
168
state.completed_systems = FixedBitSet::with_capacity(sys_count);
169
state.skipped_systems = FixedBitSet::with_capacity(sys_count);
170
state.unapplied_systems = FixedBitSet::with_capacity(sys_count);
171
172
state.system_task_metadata = Vec::with_capacity(sys_count);
173
for index in 0..sys_count {
174
state.system_task_metadata.push(SystemTaskMetadata {
175
conflicting_systems: FixedBitSet::with_capacity(sys_count),
176
condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
177
dependents: schedule.system_dependents[index].clone(),
178
is_send: schedule.systems[index].system.is_send(),
179
is_exclusive: schedule.systems[index].system.is_exclusive(),
180
});
181
if schedule.system_dependencies[index] == 0 {
182
self.starting_systems.insert(index);
183
}
184
}
185
186
{
187
#[cfg(feature = "trace")]
188
let _span = info_span!("calculate conflicting systems").entered();
189
for index1 in 0..sys_count {
190
let system1 = &schedule.systems[index1];
191
for index2 in 0..index1 {
192
let system2 = &schedule.systems[index2];
193
if !system2.access.is_compatible(&system1.access) {
194
state.system_task_metadata[index1]
195
.conflicting_systems
196
.insert(index2);
197
state.system_task_metadata[index2]
198
.conflicting_systems
199
.insert(index1);
200
}
201
}
202
203
for index2 in 0..sys_count {
204
let system2 = &schedule.systems[index2];
205
if schedule.system_conditions[index1]
206
.iter()
207
.any(|condition| !system2.access.is_compatible(&condition.access))
208
{
209
state.system_task_metadata[index1]
210
.condition_conflicting_systems
211
.insert(index2);
212
}
213
}
214
}
215
216
state.set_condition_conflicting_systems.clear();
217
state.set_condition_conflicting_systems.reserve(set_count);
218
for set_idx in 0..set_count {
219
let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
220
for sys_index in 0..sys_count {
221
let system = &schedule.systems[sys_index];
222
if schedule.set_conditions[set_idx]
223
.iter()
224
.any(|condition| !system.access.is_compatible(&condition.access))
225
{
226
conflicting_systems.insert(sys_index);
227
}
228
}
229
state
230
.set_condition_conflicting_systems
231
.push(conflicting_systems);
232
}
233
}
234
235
state.num_dependencies_remaining = Vec::with_capacity(sys_count);
236
}
237
238
fn run(
239
&mut self,
240
schedule: &mut SystemSchedule,
241
world: &mut World,
242
_skip_systems: Option<&FixedBitSet>,
243
error_handler: ErrorHandler,
244
) {
245
let state = self.state.get_mut().unwrap();
246
// reset counts
247
if schedule.systems.is_empty() {
248
return;
249
}
250
state.num_running_systems = 0;
251
state
252
.num_dependencies_remaining
253
.clone_from(&schedule.system_dependencies);
254
state.ready_systems.clone_from(&self.starting_systems);
255
256
// If stepping is enabled, make sure we skip those systems that should
257
// not be run.
258
#[cfg(feature = "bevy_debug_stepping")]
259
if let Some(skipped_systems) = _skip_systems {
260
debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());
261
// mark skipped systems as completed
262
state.completed_systems |= skipped_systems;
263
264
// signal the dependencies for each of the skipped systems, as
265
// though they had run
266
for system_index in skipped_systems.ones() {
267
state.signal_dependents(system_index);
268
state.ready_systems.remove(system_index);
269
}
270
}
271
272
let thread_executor = world
273
.get_resource::<MainThreadExecutor>()
274
.map(|e| e.0.clone());
275
let thread_executor = thread_executor.as_deref();
276
277
let environment = &Environment::new(self, schedule, world);
278
279
ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
280
false,
281
thread_executor,
282
|scope| {
283
let context = Context {
284
environment,
285
scope,
286
error_handler,
287
};
288
289
// The first tick won't need to process finished systems, but we still need to run the loop in
290
// tick_executor() in case a system completes while the first tick still holds the mutex.
291
context.tick_executor();
292
},
293
);
294
295
// End the borrows of self and world in environment by copying out the reference to systems.
296
let systems = environment.systems;
297
298
let state = self.state.get_mut().unwrap();
299
if self.apply_final_deferred {
300
// Do one final apply buffers after all systems have completed
301
// Commands should be applied while on the scope's thread, not the executor's thread
302
let res = apply_deferred(&state.unapplied_systems, systems, world);
303
if let Err(payload) = res {
304
let panic_payload = self.panic_payload.get_mut().unwrap();
305
*panic_payload = Some(payload);
306
}
307
state.unapplied_systems.clear();
308
}
309
310
// check to see if there was a panic
311
let payload = self.panic_payload.get_mut().unwrap();
312
if let Some(payload) = payload.take() {
313
std::panic::resume_unwind(payload);
314
}
315
316
debug_assert!(state.ready_systems.is_clear());
317
debug_assert!(state.running_systems.is_clear());
318
state.evaluated_sets.clear();
319
state.skipped_systems.clear();
320
state.completed_systems.clear();
321
}
322
323
fn set_apply_final_deferred(&mut self, value: bool) {
324
self.apply_final_deferred = value;
325
}
326
}
327
328
impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
329
fn system_completed(
330
&self,
331
system_index: usize,
332
res: Result<(), Box<dyn Any + Send>>,
333
system: &ScheduleSystem,
334
) {
335
// tell the executor that the system finished
336
self.environment
337
.executor
338
.system_completion
339
.push(SystemResult { system_index })
340
.unwrap_or_else(|error| unreachable!("{}", error));
341
if let Err(payload) = res {
342
#[cfg(feature = "std")]
343
#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
344
{
345
eprintln!("Encountered a panic in system `{}`!", system.name());
346
}
347
// set the payload to propagate the error
348
{
349
let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();
350
*panic_payload = Some(payload);
351
}
352
}
353
self.tick_executor();
354
}
355
356
#[expect(
357
clippy::mut_from_ref,
358
reason = "Field is only accessed here and is guarded by lock with a documented safety comment"
359
)]
360
fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {
361
let guard = self.environment.executor.state.try_lock().ok()?;
362
// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and
363
// is synchronized by the lock on the executor state.
364
let conditions = unsafe { &mut *self.environment.conditions.get() };
365
Some((conditions, guard))
366
}
367
368
fn tick_executor(&self) {
369
// Ensure that the executor handles any events pushed to the system_completion queue by this thread.
370
// If this thread acquires the lock, the executor runs after the push() and they are processed.
371
// If this thread does not acquire the lock, then the is_empty() check on the other thread runs
372
// after the lock is released, which is after try_lock() failed, which is after the push()
373
// on this thread, so the is_empty() check will see the new events and loop.
374
loop {
375
let Some((conditions, mut guard)) = self.try_lock() else {
376
return;
377
};
378
guard.tick(self, conditions);
379
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
380
drop(guard);
381
if self.environment.executor.system_completion.is_empty() {
382
return;
383
}
384
}
385
}
386
}
387
388
impl MultiThreadedExecutor {
389
/// Creates a new `multi_threaded` executor for use with a [`Schedule`].
390
///
391
/// [`Schedule`]: crate::schedule::Schedule
392
pub fn new() -> Self {
393
Self {
394
state: Mutex::new(ExecutorState::new()),
395
system_completion: ConcurrentQueue::unbounded(),
396
starting_systems: FixedBitSet::new(),
397
apply_final_deferred: true,
398
panic_payload: Mutex::new(None),
399
#[cfg(feature = "trace")]
400
executor_span: info_span!("multithreaded executor"),
401
}
402
}
403
}
404
405
impl ExecutorState {
406
fn new() -> Self {
407
Self {
408
system_task_metadata: Vec::new(),
409
set_condition_conflicting_systems: Vec::new(),
410
num_running_systems: 0,
411
num_dependencies_remaining: Vec::new(),
412
local_thread_running: false,
413
exclusive_running: false,
414
evaluated_sets: FixedBitSet::new(),
415
ready_systems: FixedBitSet::new(),
416
ready_systems_copy: FixedBitSet::new(),
417
running_systems: FixedBitSet::new(),
418
skipped_systems: FixedBitSet::new(),
419
completed_systems: FixedBitSet::new(),
420
unapplied_systems: FixedBitSet::new(),
421
}
422
}
423
424
fn tick(&mut self, context: &Context, conditions: &mut Conditions) {
425
#[cfg(feature = "trace")]
426
let _span = context.environment.executor.executor_span.enter();
427
428
for result in context.environment.executor.system_completion.try_iter() {
429
self.finish_system_and_handle_dependents(result);
430
}
431
432
// SAFETY:
433
// - `finish_system_and_handle_dependents` has updated the currently running systems.
434
// - `rebuild_active_access` locks access for all currently running systems.
435
unsafe {
436
self.spawn_system_tasks(context, conditions);
437
}
438
}
439
440
/// # Safety
441
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
442
/// have been mutably borrowed (such as the systems currently running).
443
/// - `world_cell` must have permission to access all world data (not counting
444
/// any world data that is claimed by systems currently running on this executor).
445
unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {
446
if self.exclusive_running {
447
return;
448
}
449
450
#[cfg(feature = "hotpatching")]
451
#[expect(
452
clippy::undocumented_unsafe_blocks,
453
reason = "This actually could result in UB if a system tries to mutate
454
`HotPatchChanges`. We allow this as the resource only exists with the `hotpatching` feature.
455
and `hotpatching` should never be enabled in release."
456
)]
457
#[cfg(feature = "hotpatching")]
458
let hotpatch_tick = unsafe {
459
context
460
.environment
461
.world_cell
462
.get_resource_ref::<HotPatchChanges>()
463
}
464
.map(|r| r.last_changed())
465
.unwrap_or_default();
466
467
// can't borrow since loop mutably borrows `self`
468
let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);
469
470
// Skipping systems may cause their dependents to become ready immediately.
471
// If that happens, we need to run again immediately or we may fail to spawn those dependents.
472
let mut check_for_new_ready_systems = true;
473
while check_for_new_ready_systems {
474
check_for_new_ready_systems = false;
475
476
ready_systems.clone_from(&self.ready_systems);
477
478
for system_index in ready_systems.ones() {
479
debug_assert!(!self.running_systems.contains(system_index));
480
// SAFETY: Caller assured that these systems are not running.
481
// Therefore, no other reference to this system exists and there is no aliasing.
482
let system =
483
&mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
484
485
#[cfg(feature = "hotpatching")]
486
if hotpatch_tick.is_newer_than(
487
system.get_last_run(),
488
context.environment.world_cell.change_tick(),
489
) {
490
system.refresh_hotpatch();
491
}
492
493
if !self.can_run(system_index, conditions) {
494
// NOTE: exclusive systems with ambiguities are susceptible to
495
// being significantly displaced here (compared to single-threaded order)
496
// if systems after them in topological order can run
497
// if that becomes an issue, `break;` if exclusive system
498
continue;
499
}
500
501
self.ready_systems.remove(system_index);
502
503
// SAFETY: `can_run` returned true, which means that:
504
// - There can be no systems running whose accesses would conflict with any conditions.
505
if unsafe {
506
!self.should_run(
507
system_index,
508
system,
509
conditions,
510
context.environment.world_cell,
511
context.error_handler,
512
)
513
} {
514
self.skip_system_and_signal_dependents(system_index);
515
// signal_dependents may have set more systems to ready.
516
check_for_new_ready_systems = true;
517
continue;
518
}
519
520
self.running_systems.insert(system_index);
521
self.num_running_systems += 1;
522
523
if self.system_task_metadata[system_index].is_exclusive {
524
// SAFETY: `can_run` returned true for this system,
525
// which means no systems are currently borrowed.
526
unsafe {
527
self.spawn_exclusive_system_task(context, system_index);
528
}
529
check_for_new_ready_systems = false;
530
break;
531
}
532
533
// SAFETY:
534
// - Caller ensured no other reference to this system exists.
535
// - `system_task_metadata[system_index].is_exclusive` is `false`,
536
// so `System::is_exclusive` returned `false` when we called it.
537
// - `can_run` returned true, so no systems with conflicting world access are running.
538
unsafe {
539
self.spawn_system_task(context, system_index);
540
}
541
}
542
}
543
544
// give back
545
self.ready_systems_copy = ready_systems;
546
}
547
548
fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {
549
let system_meta = &self.system_task_metadata[system_index];
550
if system_meta.is_exclusive && self.num_running_systems > 0 {
551
return false;
552
}
553
554
if !system_meta.is_send && self.local_thread_running {
555
return false;
556
}
557
558
// TODO: an earlier out if world's archetypes did not change
559
for set_idx in conditions.sets_with_conditions_of_systems[system_index]
560
.difference(&self.evaluated_sets)
561
{
562
if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
563
return false;
564
}
565
}
566
567
if !system_meta
568
.condition_conflicting_systems
569
.is_disjoint(&self.running_systems)
570
{
571
return false;
572
}
573
574
if !self.skipped_systems.contains(system_index)
575
&& !system_meta
576
.conflicting_systems
577
.is_disjoint(&self.running_systems)
578
{
579
return false;
580
}
581
582
true
583
}
584
585
/// # Safety
586
/// * `world` must have permission to read any world data required by
587
/// the system's conditions: this includes conditions for the system
588
/// itself, and conditions for any of the system's sets.
589
unsafe fn should_run(
590
&mut self,
591
system_index: usize,
592
system: &mut ScheduleSystem,
593
conditions: &mut Conditions,
594
world: UnsafeWorldCell,
595
error_handler: ErrorHandler,
596
) -> bool {
597
let mut should_run = !self.skipped_systems.contains(system_index);
598
599
for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {
600
if self.evaluated_sets.contains(set_idx) {
601
continue;
602
}
603
604
// Evaluate the system set's conditions.
605
// SAFETY:
606
// - The caller ensures that `world` has permission to read any data
607
// required by the conditions.
608
let set_conditions_met = unsafe {
609
evaluate_and_fold_conditions(
610
&mut conditions.set_conditions[set_idx],
611
world,
612
error_handler,
613
system,
614
true,
615
)
616
};
617
618
if !set_conditions_met {
619
self.skipped_systems
620
.union_with(&conditions.systems_in_sets_with_conditions[set_idx]);
621
}
622
623
should_run &= set_conditions_met;
624
self.evaluated_sets.insert(set_idx);
625
}
626
627
// Evaluate the system's conditions.
628
// SAFETY:
629
// - The caller ensures that `world` has permission to read any data
630
// required by the conditions.
631
let system_conditions_met = unsafe {
632
evaluate_and_fold_conditions(
633
&mut conditions.system_conditions[system_index],
634
world,
635
error_handler,
636
system,
637
false,
638
)
639
};
640
641
if !system_conditions_met {
642
self.skipped_systems.insert(system_index);
643
}
644
645
should_run &= system_conditions_met;
646
647
if should_run {
648
// SAFETY:
649
// - The caller ensures that `world` has permission to read any data
650
// required by the system.
651
let valid_params = match unsafe { system.validate_param_unsafe(world) } {
652
Ok(()) => true,
653
Err(e) => {
654
if !e.skipped {
655
error_handler(
656
e.into(),
657
ErrorContext::System {
658
name: system.name(),
659
last_run: system.get_last_run(),
660
},
661
);
662
}
663
false
664
}
665
};
666
if !valid_params {
667
self.skipped_systems.insert(system_index);
668
}
669
670
should_run &= valid_params;
671
}
672
673
should_run
674
}
675
676
/// # Safety
677
/// - Caller must not alias systems that are running.
678
/// - `is_exclusive` must have returned `false` for the specified system.
679
/// - `world` must have permission to access the world data
680
/// used by the specified system.
681
unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
682
// SAFETY: this system is not running, no other reference exists
683
let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
684
// Move the full context object into the new future.
685
let context = *context;
686
687
let system_meta = &self.system_task_metadata[system_index];
688
689
let task = async move {
690
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
691
// SAFETY:
692
// - The caller ensures that we have permission to
693
// access the world data used by the system.
694
// - `is_exclusive` returned false
695
unsafe {
696
if let Err(RunSystemError::Failed(err)) =
697
__rust_begin_short_backtrace::run_unsafe(
698
system,
699
context.environment.world_cell,
700
)
701
{
702
(context.error_handler)(
703
err,
704
ErrorContext::System {
705
name: system.name(),
706
last_run: system.get_last_run(),
707
},
708
);
709
}
710
};
711
}));
712
context.system_completed(system_index, res, system);
713
};
714
715
if system_meta.is_send {
716
context.scope.spawn(task);
717
} else {
718
self.local_thread_running = true;
719
context.scope.spawn_on_external(task);
720
}
721
}
722
723
/// # Safety
724
/// Caller must ensure no systems are currently borrowed.
725
unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {
726
// SAFETY: this system is not running, no other reference exists
727
let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;
728
// Move the full context object into the new future.
729
let context = *context;
730
731
if is_apply_deferred(&**system) {
732
// TODO: avoid allocation
733
let unapplied_systems = self.unapplied_systems.clone();
734
self.unapplied_systems.clear();
735
let task = async move {
736
// SAFETY: `can_run` returned true for this system, which means
737
// that no other systems currently have access to the world.
738
let world = unsafe { context.environment.world_cell.world_mut() };
739
let res = apply_deferred(&unapplied_systems, context.environment.systems, world);
740
context.system_completed(system_index, res, system);
741
};
742
743
context.scope.spawn_on_scope(task);
744
} else {
745
let task = async move {
746
// SAFETY: `can_run` returned true for this system, which means
747
// that no other systems currently have access to the world.
748
let world = unsafe { context.environment.world_cell.world_mut() };
749
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
750
if let Err(RunSystemError::Failed(err)) =
751
__rust_begin_short_backtrace::run(system, world)
752
{
753
(context.error_handler)(
754
err,
755
ErrorContext::System {
756
name: system.name(),
757
last_run: system.get_last_run(),
758
},
759
);
760
}
761
}));
762
context.system_completed(system_index, res, system);
763
};
764
765
context.scope.spawn_on_scope(task);
766
}
767
768
self.exclusive_running = true;
769
self.local_thread_running = true;
770
}
771
772
fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {
773
let SystemResult { system_index, .. } = result;
774
775
if self.system_task_metadata[system_index].is_exclusive {
776
self.exclusive_running = false;
777
}
778
779
if !self.system_task_metadata[system_index].is_send {
780
self.local_thread_running = false;
781
}
782
783
debug_assert!(self.num_running_systems >= 1);
784
self.num_running_systems -= 1;
785
self.running_systems.remove(system_index);
786
self.completed_systems.insert(system_index);
787
self.unapplied_systems.insert(system_index);
788
789
self.signal_dependents(system_index);
790
}
791
792
fn skip_system_and_signal_dependents(&mut self, system_index: usize) {
793
self.completed_systems.insert(system_index);
794
self.signal_dependents(system_index);
795
}
796
797
fn signal_dependents(&mut self, system_index: usize) {
798
for &dep_idx in &self.system_task_metadata[system_index].dependents {
799
let remaining = &mut self.num_dependencies_remaining[dep_idx];
800
debug_assert!(*remaining >= 1);
801
*remaining -= 1;
802
if *remaining == 0 && !self.completed_systems.contains(dep_idx) {
803
self.ready_systems.insert(dep_idx);
804
}
805
}
806
}
807
}
808
809
fn apply_deferred(
810
unapplied_systems: &FixedBitSet,
811
systems: &[SyncUnsafeCell<SystemWithAccess>],
812
world: &mut World,
813
) -> Result<(), Box<dyn Any + Send>> {
814
for system_index in unapplied_systems.ones() {
815
// SAFETY: none of these systems are running, no other references exist
816
let system = &mut unsafe { &mut *systems[system_index].get() }.system;
817
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
818
system.apply_deferred(world);
819
}));
820
if let Err(payload) = res {
821
#[cfg(feature = "std")]
822
#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]
823
{
824
eprintln!(
825
"Encountered a panic when applying buffers for system `{}`!",
826
system.name()
827
);
828
}
829
return Err(payload);
830
}
831
}
832
Ok(())
833
}
834
835
/// # Safety
836
/// - `world` must have permission to read any world data
837
/// required by `conditions`.
838
unsafe fn evaluate_and_fold_conditions(
839
conditions: &mut [ConditionWithAccess],
840
world: UnsafeWorldCell,
841
error_handler: ErrorHandler,
842
for_system: &ScheduleSystem,
843
on_set: bool,
844
) -> bool {
845
#[expect(
846
clippy::unnecessary_fold,
847
reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."
848
)]
849
conditions
850
.iter_mut()
851
.map(|ConditionWithAccess { condition, .. }| {
852
// SAFETY:
853
// - The caller ensures that `world` has permission to read any data
854
// required by the condition.
855
unsafe { condition.validate_param_unsafe(world) }
856
.map_err(From::from)
857
.and_then(|()| {
858
// SAFETY:
859
// - The caller ensures that `world` has permission to read any data
860
// required by the condition.
861
unsafe {
862
__rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world)
863
}
864
})
865
.unwrap_or_else(|err| {
866
if let RunSystemError::Failed(err) = err {
867
error_handler(
868
err,
869
ErrorContext::RunCondition {
870
name: condition.name(),
871
last_run: condition.get_last_run(),
872
system: for_system.name(),
873
on_set,
874
},
875
);
876
};
877
false
878
})
879
})
880
.fold(true, |acc, res| acc && res)
881
}
882
883
/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread
884
#[derive(Resource, Clone)]
885
pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);
886
887
impl Default for MainThreadExecutor {
888
fn default() -> Self {
889
Self::new()
890
}
891
}
892
893
impl MainThreadExecutor {
894
/// Creates a new executor that can be used to run systems on the main thread.
895
pub fn new() -> Self {
896
MainThreadExecutor(TaskPool::get_thread_executor())
897
}
898
}
899
900
#[cfg(test)]
901
mod tests {
902
use crate::{
903
prelude::Resource,
904
schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},
905
system::Commands,
906
world::World,
907
};
908
909
#[derive(Resource)]
910
struct R;
911
912
#[test]
913
fn skipped_systems_notify_dependents() {
914
let mut world = World::new();
915
let mut schedule = Schedule::default();
916
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
917
schedule.add_systems(
918
(
919
(|| {}).run_if(|| false),
920
// This system depends on a system that is always skipped.
921
|mut commands: Commands| {
922
commands.insert_resource(R);
923
},
924
)
925
.chain(),
926
);
927
schedule.run(&mut world);
928
assert!(world.get_resource::<R>().is_some());
929
}
930
931
/// Regression test for a weird bug flagged by MIRI in
932
/// `spawn_exclusive_system_task`, related to a `&mut World` being captured
933
/// inside an `async` block and somehow remaining alive even after its last use.
934
#[test]
935
fn check_spawn_exclusive_system_task_miri() {
936
let mut world = World::new();
937
let mut schedule = Schedule::default();
938
schedule.set_executor_kind(ExecutorKind::MultiThreaded);
939
schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());
940
schedule.run(&mut world);
941
}
942
}
943
944