Path: blob/main/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
9351 views
use alloc::{boxed::Box, vec::Vec};1use bevy_platform::cell::SyncUnsafeCell;2use bevy_platform::sync::Arc;3use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};4use concurrent_queue::ConcurrentQueue;5use core::{any::Any, panic::AssertUnwindSafe};6use fixedbitset::FixedBitSet;7#[cfg(feature = "std")]8use std::eprintln;9use std::sync::{Mutex, MutexGuard};1011#[cfg(feature = "trace")]12use tracing::{info_span, Span};1314use crate::{15error::{ErrorContext, ErrorHandler, Result},16prelude::Resource,17schedule::{18is_apply_deferred, ConditionWithAccess, ExecutorKind, SystemExecutor, SystemSchedule,19SystemWithAccess,20},21system::{RunSystemError, ScheduleSystem},22world::{unsafe_world_cell::UnsafeWorldCell, World},23};24#[cfg(feature = "hotpatching")]25use crate::{prelude::DetectChanges, HotPatchChanges};2627use super::__rust_begin_short_backtrace;2829/// Borrowed data used by the [`MultiThreadedExecutor`].30struct Environment<'env, 'sys> {31executor: &'env MultiThreadedExecutor,32systems: &'sys [SyncUnsafeCell<SystemWithAccess>],33conditions: SyncUnsafeCell<Conditions<'sys>>,34world_cell: UnsafeWorldCell<'env>,35}3637struct Conditions<'a> {38system_conditions: &'a mut [Vec<ConditionWithAccess>],39set_conditions: &'a mut [Vec<ConditionWithAccess>],40sets_with_conditions_of_systems: &'a [FixedBitSet],41systems_in_sets_with_conditions: &'a [FixedBitSet],42}4344impl<'env, 'sys> Environment<'env, 'sys> {45fn new(46executor: &'env MultiThreadedExecutor,47schedule: &'sys mut SystemSchedule,48world: &'env mut World,49) -> Self {50Environment {51executor,52systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(),53conditions: SyncUnsafeCell::new(Conditions {54system_conditions: &mut schedule.system_conditions,55set_conditions: &mut schedule.set_conditions,56sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems,57systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions,58}),59world_cell: world.as_unsafe_world_cell(),60}61}62}6364/// Per-system data used by the [`MultiThreadedExecutor`].65// Copied here because it can't be read from the system when it's running.66struct SystemTaskMetadata {67/// The set of systems whose `component_access_set()` conflicts with this one.68conflicting_systems: FixedBitSet,69/// The set of systems whose `component_access_set()` conflicts with this system's conditions.70/// Note that this is separate from `conflicting_systems` to handle the case where71/// a system is skipped by an earlier system set condition or system stepping,72/// and needs access to run its conditions but not for itself.73condition_conflicting_systems: FixedBitSet,74/// Indices of the systems that directly depend on the system.75dependents: Vec<usize>,76/// Is `true` if the system does not access `!Send` data.77is_send: bool,78/// Is `true` if the system is exclusive.79is_exclusive: bool,80}8182/// The result of running a system that is sent across a channel.83struct SystemResult {84system_index: usize,85}8687/// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel.88pub struct MultiThreadedExecutor {89/// The running state, protected by a mutex so that a reference to the executor can be shared across tasks.90state: Mutex<ExecutorState>,91/// Queue of system completion events.92system_completion: ConcurrentQueue<SystemResult>,93/// Setting when true applies deferred system buffers after all systems have run94apply_final_deferred: bool,95/// When set, tells the executor that a thread has panicked.96panic_payload: Mutex<Option<Box<dyn Any + Send>>>,97starting_systems: FixedBitSet,98/// Cached tracing span99#[cfg(feature = "trace")]100executor_span: Span,101}102103/// The state of the executor while running.104pub struct ExecutorState {105/// Metadata for scheduling and running system tasks.106system_task_metadata: Vec<SystemTaskMetadata>,107/// The set of systems whose `component_access_set()` conflicts with this system set's conditions.108set_condition_conflicting_systems: Vec<FixedBitSet>,109/// Returns `true` if a system with non-`Send` access is running.110local_thread_running: bool,111/// Returns `true` if an exclusive system is running.112exclusive_running: bool,113/// The number of systems that are running.114num_running_systems: usize,115/// The number of dependencies each system has that have not completed.116num_dependencies_remaining: Vec<usize>,117/// System sets whose conditions have been evaluated.118evaluated_sets: FixedBitSet,119/// Systems that have no remaining dependencies and are waiting to run.120ready_systems: FixedBitSet,121/// copy of `ready_systems`122ready_systems_copy: FixedBitSet,123/// Systems that are running.124running_systems: FixedBitSet,125/// Systems that got skipped.126skipped_systems: FixedBitSet,127/// Systems whose conditions have been evaluated and were run or skipped.128completed_systems: FixedBitSet,129/// Systems that have run but have not had their buffers applied.130unapplied_systems: FixedBitSet,131}132133/// References to data required by the executor.134/// This is copied to each system task so that can invoke the executor when they complete.135// These all need to outlive 'scope in order to be sent to new tasks,136// and keeping them all in a struct means we can use lifetime elision.137#[derive(Copy, Clone)]138struct Context<'scope, 'env, 'sys> {139environment: &'env Environment<'env, 'sys>,140scope: &'scope Scope<'scope, 'env, ()>,141error_handler: ErrorHandler,142}143144impl Default for MultiThreadedExecutor {145fn default() -> Self {146Self::new()147}148}149150impl SystemExecutor for MultiThreadedExecutor {151fn kind(&self) -> ExecutorKind {152ExecutorKind::MultiThreaded153}154155fn init(&mut self, schedule: &SystemSchedule) {156let state = self.state.get_mut().unwrap();157// pre-allocate space158let sys_count = schedule.system_ids.len();159let set_count = schedule.set_ids.len();160161self.system_completion = ConcurrentQueue::bounded(sys_count.max(1));162self.starting_systems = FixedBitSet::with_capacity(sys_count);163state.evaluated_sets = FixedBitSet::with_capacity(set_count);164state.ready_systems = FixedBitSet::with_capacity(sys_count);165state.ready_systems_copy = FixedBitSet::with_capacity(sys_count);166state.running_systems = FixedBitSet::with_capacity(sys_count);167state.completed_systems = FixedBitSet::with_capacity(sys_count);168state.skipped_systems = FixedBitSet::with_capacity(sys_count);169state.unapplied_systems = FixedBitSet::with_capacity(sys_count);170171state.system_task_metadata = Vec::with_capacity(sys_count);172for index in 0..sys_count {173state.system_task_metadata.push(SystemTaskMetadata {174conflicting_systems: FixedBitSet::with_capacity(sys_count),175condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),176dependents: schedule.system_dependents[index].clone(),177is_send: schedule.systems[index].system.is_send(),178is_exclusive: schedule.systems[index].system.is_exclusive(),179});180if schedule.system_dependencies[index] == 0 {181self.starting_systems.insert(index);182}183}184185{186#[cfg(feature = "trace")]187let _span = info_span!("calculate conflicting systems").entered();188for index1 in 0..sys_count {189let system1 = &schedule.systems[index1];190for index2 in 0..index1 {191let system2 = &schedule.systems[index2];192if !system2.access.is_compatible(&system1.access) {193state.system_task_metadata[index1]194.conflicting_systems195.insert(index2);196state.system_task_metadata[index2]197.conflicting_systems198.insert(index1);199}200}201202for index2 in 0..sys_count {203let system2 = &schedule.systems[index2];204if schedule.system_conditions[index1]205.iter()206.any(|condition| !system2.access.is_compatible(&condition.access))207{208state.system_task_metadata[index1]209.condition_conflicting_systems210.insert(index2);211}212}213}214215state.set_condition_conflicting_systems.clear();216state.set_condition_conflicting_systems.reserve(set_count);217for set_idx in 0..set_count {218let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);219for sys_index in 0..sys_count {220let system = &schedule.systems[sys_index];221if schedule.set_conditions[set_idx]222.iter()223.any(|condition| !system.access.is_compatible(&condition.access))224{225conflicting_systems.insert(sys_index);226}227}228state229.set_condition_conflicting_systems230.push(conflicting_systems);231}232}233234state.num_dependencies_remaining = Vec::with_capacity(sys_count);235}236237fn run(238&mut self,239schedule: &mut SystemSchedule,240world: &mut World,241_skip_systems: Option<&FixedBitSet>,242error_handler: ErrorHandler,243) {244let state = self.state.get_mut().unwrap();245// reset counts246if schedule.systems.is_empty() {247return;248}249state.num_running_systems = 0;250state251.num_dependencies_remaining252.clone_from(&schedule.system_dependencies);253state.ready_systems.clone_from(&self.starting_systems);254255// If stepping is enabled, make sure we skip those systems that should256// not be run.257#[cfg(feature = "bevy_debug_stepping")]258if let Some(skipped_systems) = _skip_systems {259debug_assert_eq!(skipped_systems.len(), state.completed_systems.len());260// mark skipped systems as completed261state.completed_systems |= skipped_systems;262263// signal the dependencies for each of the skipped systems, as264// though they had run265for system_index in skipped_systems.ones() {266state.signal_dependents(system_index);267state.ready_systems.remove(system_index);268}269}270271let thread_executor = world272.get_resource::<MainThreadExecutor>()273.map(|e| e.0.clone());274let thread_executor = thread_executor.as_deref();275276let environment = &Environment::new(self, schedule, world);277278ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(279false,280thread_executor,281|scope| {282let context = Context {283environment,284scope,285error_handler,286};287288// The first tick won't need to process finished systems, but we still need to run the loop in289// tick_executor() in case a system completes while the first tick still holds the mutex.290context.tick_executor();291},292);293294// End the borrows of self and world in environment by copying out the reference to systems.295let systems = environment.systems;296297let state = self.state.get_mut().unwrap();298if self.apply_final_deferred {299// Do one final apply buffers after all systems have completed300// Commands should be applied while on the scope's thread, not the executor's thread301let res = apply_deferred(&state.unapplied_systems, systems, world);302if let Err(payload) = res {303let panic_payload = self.panic_payload.get_mut().unwrap();304*panic_payload = Some(payload);305}306state.unapplied_systems.clear();307}308309// check to see if there was a panic310let payload = self.panic_payload.get_mut().unwrap();311if let Some(payload) = payload.take() {312std::panic::resume_unwind(payload);313}314315debug_assert!(state.ready_systems.is_clear());316debug_assert!(state.running_systems.is_clear());317state.evaluated_sets.clear();318state.skipped_systems.clear();319state.completed_systems.clear();320}321322fn set_apply_final_deferred(&mut self, value: bool) {323self.apply_final_deferred = value;324}325}326327impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {328fn system_completed(329&self,330system_index: usize,331res: Result<(), Box<dyn Any + Send>>,332system: &ScheduleSystem,333) {334// tell the executor that the system finished335self.environment336.executor337.system_completion338.push(SystemResult { system_index })339.unwrap_or_else(|error| unreachable!("{}", error));340if let Err(payload) = res {341#[cfg(feature = "std")]342#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]343{344eprintln!("Encountered a panic in system `{}`!", system.name());345}346// set the payload to propagate the error347{348let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap();349*panic_payload = Some(payload);350}351}352self.tick_executor();353}354355#[expect(356clippy::mut_from_ref,357reason = "Field is only accessed here and is guarded by lock with a documented safety comment"358)]359fn try_lock<'a>(&'a self) -> Option<(&'a mut Conditions<'sys>, MutexGuard<'a, ExecutorState>)> {360let guard = self.environment.executor.state.try_lock().ok()?;361// SAFETY: This is an exclusive access as no other location fetches conditions mutably, and362// is synchronized by the lock on the executor state.363let conditions = unsafe { &mut *self.environment.conditions.get() };364Some((conditions, guard))365}366367fn tick_executor(&self) {368// Ensure that the executor handles any events pushed to the system_completion queue by this thread.369// If this thread acquires the lock, the executor runs after the push() and they are processed.370// If this thread does not acquire the lock, then the is_empty() check on the other thread runs371// after the lock is released, which is after try_lock() failed, which is after the push()372// on this thread, so the is_empty() check will see the new events and loop.373loop {374let Some((conditions, mut guard)) = self.try_lock() else {375return;376};377guard.tick(self, conditions);378// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.379drop(guard);380if self.environment.executor.system_completion.is_empty() {381return;382}383}384}385}386387impl MultiThreadedExecutor {388/// Creates a new `multi_threaded` executor for use with a [`Schedule`].389///390/// [`Schedule`]: crate::schedule::Schedule391pub fn new() -> Self {392Self {393state: Mutex::new(ExecutorState::new()),394system_completion: ConcurrentQueue::unbounded(),395starting_systems: FixedBitSet::new(),396apply_final_deferred: true,397panic_payload: Mutex::new(None),398#[cfg(feature = "trace")]399executor_span: info_span!("multithreaded executor"),400}401}402}403404impl ExecutorState {405fn new() -> Self {406Self {407system_task_metadata: Vec::new(),408set_condition_conflicting_systems: Vec::new(),409num_running_systems: 0,410num_dependencies_remaining: Vec::new(),411local_thread_running: false,412exclusive_running: false,413evaluated_sets: FixedBitSet::new(),414ready_systems: FixedBitSet::new(),415ready_systems_copy: FixedBitSet::new(),416running_systems: FixedBitSet::new(),417skipped_systems: FixedBitSet::new(),418completed_systems: FixedBitSet::new(),419unapplied_systems: FixedBitSet::new(),420}421}422423fn tick(&mut self, context: &Context, conditions: &mut Conditions) {424#[cfg(feature = "trace")]425let _span = context.environment.executor.executor_span.enter();426427for result in context.environment.executor.system_completion.try_iter() {428self.finish_system_and_handle_dependents(result);429}430431// SAFETY:432// - `finish_system_and_handle_dependents` has updated the currently running systems.433// - `rebuild_active_access` locks access for all currently running systems.434unsafe {435self.spawn_system_tasks(context, conditions);436}437}438439/// # Safety440/// - Caller must ensure that `self.ready_systems` does not contain any systems that441/// have been mutably borrowed (such as the systems currently running).442/// - `world_cell` must have permission to access all world data (not counting443/// any world data that is claimed by systems currently running on this executor).444unsafe fn spawn_system_tasks(&mut self, context: &Context, conditions: &mut Conditions) {445if self.exclusive_running {446return;447}448449#[cfg(feature = "hotpatching")]450#[expect(451clippy::undocumented_unsafe_blocks,452reason = "This actually could result in UB if a system tries to mutate453`HotPatchChanges`. We allow this as the resource only exists with the `hotpatching` feature.454and `hotpatching` should never be enabled in release."455)]456#[cfg(feature = "hotpatching")]457let hotpatch_tick = unsafe {458context459.environment460.world_cell461.get_resource_ref::<HotPatchChanges>()462}463.map(|r| r.last_changed())464.unwrap_or_default();465466// can't borrow since loop mutably borrows `self`467let mut ready_systems = core::mem::take(&mut self.ready_systems_copy);468469// Skipping systems may cause their dependents to become ready immediately.470// If that happens, we need to run again immediately or we may fail to spawn those dependents.471let mut check_for_new_ready_systems = true;472while check_for_new_ready_systems {473check_for_new_ready_systems = false;474475ready_systems.clone_from(&self.ready_systems);476477for system_index in ready_systems.ones() {478debug_assert!(!self.running_systems.contains(system_index));479// SAFETY: Caller assured that these systems are not running.480// Therefore, no other reference to this system exists and there is no aliasing.481let system =482&mut unsafe { &mut *context.environment.systems[system_index].get() }.system;483484#[cfg(feature = "hotpatching")]485if hotpatch_tick.is_newer_than(486system.get_last_run(),487context.environment.world_cell.change_tick(),488) {489system.refresh_hotpatch();490}491492if !self.can_run(system_index, conditions) {493// NOTE: exclusive systems with ambiguities are susceptible to494// being significantly displaced here (compared to single-threaded order)495// if systems after them in topological order can run496// if that becomes an issue, `break;` if exclusive system497continue;498}499500self.ready_systems.remove(system_index);501502// SAFETY: `can_run` returned true, which means that:503// - There can be no systems running whose accesses would conflict with any conditions.504if unsafe {505!self.should_run(506system_index,507system,508conditions,509context.environment.world_cell,510context.error_handler,511)512} {513self.skip_system_and_signal_dependents(system_index);514// signal_dependents may have set more systems to ready.515check_for_new_ready_systems = true;516continue;517}518519self.running_systems.insert(system_index);520self.num_running_systems += 1;521522if self.system_task_metadata[system_index].is_exclusive {523// SAFETY: `can_run` returned true for this system,524// which means no systems are currently borrowed.525unsafe {526self.spawn_exclusive_system_task(context, system_index);527}528check_for_new_ready_systems = false;529break;530}531532// SAFETY:533// - Caller ensured no other reference to this system exists.534// - `system_task_metadata[system_index].is_exclusive` is `false`,535// so `System::is_exclusive` returned `false` when we called it.536// - `can_run` returned true, so no systems with conflicting world access are running.537unsafe {538self.spawn_system_task(context, system_index);539}540}541}542543// give back544self.ready_systems_copy = ready_systems;545}546547fn can_run(&mut self, system_index: usize, conditions: &mut Conditions) -> bool {548let system_meta = &self.system_task_metadata[system_index];549if system_meta.is_exclusive && self.num_running_systems > 0 {550return false;551}552553if !system_meta.is_send && self.local_thread_running {554return false;555}556557// TODO: an earlier out if world's archetypes did not change558for set_idx in conditions.sets_with_conditions_of_systems[system_index]559.difference(&self.evaluated_sets)560{561if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {562return false;563}564}565566if !system_meta567.condition_conflicting_systems568.is_disjoint(&self.running_systems)569{570return false;571}572573if !self.skipped_systems.contains(system_index)574&& !system_meta575.conflicting_systems576.is_disjoint(&self.running_systems)577{578return false;579}580581true582}583584/// # Safety585/// * `world` must have permission to read any world data required by586/// the system's conditions: this includes conditions for the system587/// itself, and conditions for any of the system's sets.588unsafe fn should_run(589&mut self,590system_index: usize,591system: &mut ScheduleSystem,592conditions: &mut Conditions,593world: UnsafeWorldCell,594error_handler: ErrorHandler,595) -> bool {596let mut should_run = !self.skipped_systems.contains(system_index);597598for set_idx in conditions.sets_with_conditions_of_systems[system_index].ones() {599if self.evaluated_sets.contains(set_idx) {600continue;601}602603// Evaluate the system set's conditions.604// SAFETY:605// - The caller ensures that `world` has permission to read any data606// required by the conditions.607let set_conditions_met = unsafe {608evaluate_and_fold_conditions(609&mut conditions.set_conditions[set_idx],610world,611error_handler,612system,613true,614)615};616617if !set_conditions_met {618self.skipped_systems619.union_with(&conditions.systems_in_sets_with_conditions[set_idx]);620}621622should_run &= set_conditions_met;623self.evaluated_sets.insert(set_idx);624}625626// Evaluate the system's conditions.627// SAFETY:628// - The caller ensures that `world` has permission to read any data629// required by the conditions.630let system_conditions_met = unsafe {631evaluate_and_fold_conditions(632&mut conditions.system_conditions[system_index],633world,634error_handler,635system,636false,637)638};639640if !system_conditions_met {641self.skipped_systems.insert(system_index);642}643644should_run &= system_conditions_met;645646if should_run {647// SAFETY:648// - The caller ensures that `world` has permission to read any data649// required by the system.650let valid_params = match unsafe { system.validate_param_unsafe(world) } {651Ok(()) => true,652Err(e) => {653if !e.skipped {654error_handler(655e.into(),656ErrorContext::System {657name: system.name(),658last_run: system.get_last_run(),659},660);661}662false663}664};665if !valid_params {666self.skipped_systems.insert(system_index);667}668669should_run &= valid_params;670}671672should_run673}674675/// # Safety676/// - Caller must not alias systems that are running.677/// - `is_exclusive` must have returned `false` for the specified system.678/// - `world` must have permission to access the world data679/// used by the specified system.680unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {681// SAFETY: this system is not running, no other reference exists682let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;683// Move the full context object into the new future.684let context = *context;685686let system_meta = &self.system_task_metadata[system_index];687688let task = async move {689let res = std::panic::catch_unwind(AssertUnwindSafe(|| {690// SAFETY:691// - The caller ensures that we have permission to692// access the world data used by the system.693// - `is_exclusive` returned false694unsafe {695if let Err(RunSystemError::Failed(err)) =696__rust_begin_short_backtrace::run_unsafe(697system,698context.environment.world_cell,699)700{701(context.error_handler)(702err,703ErrorContext::System {704name: system.name(),705last_run: system.get_last_run(),706},707);708}709};710}));711context.system_completed(system_index, res, system);712};713714if system_meta.is_send {715context.scope.spawn(task);716} else {717self.local_thread_running = true;718context.scope.spawn_on_external(task);719}720}721722/// # Safety723/// Caller must ensure no systems are currently borrowed.724unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) {725// SAFETY: this system is not running, no other reference exists726let system = &mut unsafe { &mut *context.environment.systems[system_index].get() }.system;727// Move the full context object into the new future.728let context = *context;729730if is_apply_deferred(&**system) {731// TODO: avoid allocation732let unapplied_systems = self.unapplied_systems.clone();733self.unapplied_systems.clear();734let task = async move {735// SAFETY: `can_run` returned true for this system, which means736// that no other systems currently have access to the world.737let world = unsafe { context.environment.world_cell.world_mut() };738let res = apply_deferred(&unapplied_systems, context.environment.systems, world);739context.system_completed(system_index, res, system);740};741742context.scope.spawn_on_scope(task);743} else {744let task = async move {745// SAFETY: `can_run` returned true for this system, which means746// that no other systems currently have access to the world.747let world = unsafe { context.environment.world_cell.world_mut() };748let res = std::panic::catch_unwind(AssertUnwindSafe(|| {749if let Err(RunSystemError::Failed(err)) =750__rust_begin_short_backtrace::run(system, world)751{752(context.error_handler)(753err,754ErrorContext::System {755name: system.name(),756last_run: system.get_last_run(),757},758);759}760}));761context.system_completed(system_index, res, system);762};763764context.scope.spawn_on_scope(task);765}766767self.exclusive_running = true;768self.local_thread_running = true;769}770771fn finish_system_and_handle_dependents(&mut self, result: SystemResult) {772let SystemResult { system_index, .. } = result;773774if self.system_task_metadata[system_index].is_exclusive {775self.exclusive_running = false;776}777778if !self.system_task_metadata[system_index].is_send {779self.local_thread_running = false;780}781782debug_assert!(self.num_running_systems >= 1);783self.num_running_systems -= 1;784self.running_systems.remove(system_index);785self.completed_systems.insert(system_index);786self.unapplied_systems.insert(system_index);787788self.signal_dependents(system_index);789}790791fn skip_system_and_signal_dependents(&mut self, system_index: usize) {792self.completed_systems.insert(system_index);793self.signal_dependents(system_index);794}795796fn signal_dependents(&mut self, system_index: usize) {797for &dep_idx in &self.system_task_metadata[system_index].dependents {798let remaining = &mut self.num_dependencies_remaining[dep_idx];799debug_assert!(*remaining >= 1);800*remaining -= 1;801if *remaining == 0 && !self.completed_systems.contains(dep_idx) {802self.ready_systems.insert(dep_idx);803}804}805}806}807808fn apply_deferred(809unapplied_systems: &FixedBitSet,810systems: &[SyncUnsafeCell<SystemWithAccess>],811world: &mut World,812) -> Result<(), Box<dyn Any + Send>> {813for system_index in unapplied_systems.ones() {814// SAFETY: none of these systems are running, no other references exist815let system = &mut unsafe { &mut *systems[system_index].get() }.system;816let res = std::panic::catch_unwind(AssertUnwindSafe(|| {817system.apply_deferred(world);818}));819if let Err(payload) = res {820#[cfg(feature = "std")]821#[expect(clippy::print_stderr, reason = "Allowed behind `std` feature gate.")]822{823eprintln!(824"Encountered a panic when applying buffers for system `{}`!",825system.name()826);827}828return Err(payload);829}830}831Ok(())832}833834/// # Safety835/// - `world` must have permission to read any world data836/// required by `conditions`.837unsafe fn evaluate_and_fold_conditions(838conditions: &mut [ConditionWithAccess],839world: UnsafeWorldCell,840error_handler: ErrorHandler,841for_system: &ScheduleSystem,842on_set: bool,843) -> bool {844#[expect(845clippy::unnecessary_fold,846reason = "Short-circuiting here would prevent conditions from mutating their own state as needed."847)]848conditions849.iter_mut()850.map(|ConditionWithAccess { condition, .. }| {851// SAFETY:852// - The caller ensures that `world` has permission to read any data853// required by the condition.854unsafe { condition.validate_param_unsafe(world) }855.map_err(From::from)856.and_then(|()| {857// SAFETY:858// - The caller ensures that `world` has permission to read any data859// required by the condition.860unsafe {861__rust_begin_short_backtrace::readonly_run_unsafe(&mut **condition, world)862}863})864.unwrap_or_else(|err| {865if let RunSystemError::Failed(err) = err {866error_handler(867err,868ErrorContext::RunCondition {869name: condition.name(),870last_run: condition.get_last_run(),871system: for_system.name(),872on_set,873},874);875};876false877})878})879.fold(true, |acc, res| acc && res)880}881882/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread883#[derive(Resource, Clone)]884pub struct MainThreadExecutor(pub Arc<ThreadExecutor<'static>>);885886impl Default for MainThreadExecutor {887fn default() -> Self {888Self::new()889}890}891892impl MainThreadExecutor {893/// Creates a new executor that can be used to run systems on the main thread.894pub fn new() -> Self {895MainThreadExecutor(TaskPool::get_thread_executor())896}897}898899#[cfg(test)]900mod tests {901use crate::{902prelude::Resource,903schedule::{ExecutorKind, IntoScheduleConfigs, Schedule},904system::Commands,905world::World,906};907908#[derive(Resource)]909struct R;910911#[test]912fn skipped_systems_notify_dependents() {913let mut world = World::new();914let mut schedule = Schedule::default();915schedule.set_executor_kind(ExecutorKind::MultiThreaded);916schedule.add_systems(917(918(|| {}).run_if(|| false),919// This system depends on a system that is always skipped.920|mut commands: Commands| {921commands.insert_resource(R);922},923)924.chain(),925);926schedule.run(&mut world);927assert!(world.get_resource::<R>().is_some());928}929930/// Regression test for a weird bug flagged by MIRI in931/// `spawn_exclusive_system_task`, related to a `&mut World` being captured932/// inside an `async` block and somehow remaining alive even after its last use.933#[test]934fn check_spawn_exclusive_system_task_miri() {935let mut world = World::new();936let mut schedule = Schedule::default();937schedule.set_executor_kind(ExecutorKind::MultiThreaded);938schedule.add_systems(((|_: Commands| {}), |_: Commands| {}).chain());939schedule.run(&mut world);940}941}942943944