Path: blob/main/crates/bevy_tasks/src/single_threaded_task_pool.rs
6604 views
use alloc::{string::String, vec::Vec};1use bevy_platform::sync::Arc;2use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};34use crate::executor::LocalExecutor;5use crate::{block_on, Task};67crate::cfg::std! {8if {9use std::thread_local;1011use crate::executor::LocalExecutor as Executor;1213thread_local! {14static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };15}16} else {1718// Because we do not have thread-locals without std, we cannot use LocalExecutor here.19use crate::executor::Executor;2021static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };22}23}2425/// Used to create a [`TaskPool`].26#[derive(Debug, Default, Clone)]27pub struct TaskPoolBuilder {}2829/// This is a dummy struct for wasm support to provide the same api as with the multithreaded30/// task pool. In the case of the multithreaded task pool this struct is used to spawn31/// tasks on a specific thread. But the wasm task pool just calls32/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread33/// and so the [`ThreadExecutor`] does nothing.34#[derive(Default)]35pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);36impl<'a> ThreadExecutor<'a> {37/// Creates a new `ThreadExecutor`38pub fn new() -> Self {39Self::default()40}41}4243impl TaskPoolBuilder {44/// Creates a new `TaskPoolBuilder` instance45pub fn new() -> Self {46Self::default()47}4849/// No op on the single threaded task pool50pub fn num_threads(self, _num_threads: usize) -> Self {51self52}5354/// No op on the single threaded task pool55pub fn stack_size(self, _stack_size: usize) -> Self {56self57}5859/// No op on the single threaded task pool60pub fn thread_name(self, _thread_name: String) -> Self {61self62}6364/// No op on the single threaded task pool65pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {66self67}6869/// No op on the single threaded task pool70pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {71self72}7374/// Creates a new [`TaskPool`]75pub fn build(self) -> TaskPool {76TaskPool::new_internal()77}78}7980/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by81/// the pool on threads owned by the pool. In this case - main thread only.82#[derive(Debug, Default, Clone)]83pub struct TaskPool {}8485impl TaskPool {86/// Just create a new `ThreadExecutor` for wasm87pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {88Arc::new(ThreadExecutor::new())89}9091/// Create a `TaskPool` with the default configuration.92pub fn new() -> Self {93TaskPoolBuilder::new().build()94}9596fn new_internal() -> Self {97Self {}98}99100/// Return the number of threads owned by the task pool101pub fn thread_num(&self) -> usize {1021103}104105/// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,106/// passing a scope object into it. The scope object provided to the callback can be used107/// to spawn tasks. This function will await the completion of all tasks before returning.108///109/// This is similar to `rayon::scope` and `crossbeam::scope`110pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>111where112F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),113T: Send + 'static,114{115self.scope_with_executor(false, None, f)116}117118/// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,119/// passing a scope object into it. The scope object provided to the callback can be used120/// to spawn tasks. This function will await the completion of all tasks before returning.121///122/// This is similar to `rayon::scope` and `crossbeam::scope`123#[expect(unsafe_code, reason = "Required to transmute lifetimes.")]124pub fn scope_with_executor<'env, F, T>(125&self,126_tick_task_pool_executor: bool,127_thread_executor: Option<&ThreadExecutor>,128f: F,129) -> Vec<T>130where131F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),132T: Send + 'static,133{134// SAFETY: This safety comment applies to all references transmuted to 'env.135// Any futures spawned with these references need to return before this function completes.136// This is guaranteed because we drive all the futures spawned onto the Scope137// to completion in this function. However, rust has no way of knowing this so we138// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.139// Any usages of the references passed into `Scope` must be accessed through140// the transmuted reference for the rest of this function.141142let executor = LocalExecutor::new();143// SAFETY: As above, all futures must complete in this function so we can change the lifetime144let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };145146let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());147// SAFETY: As above, all futures must complete in this function so we can change the lifetime148let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };149150let pending_tasks: Cell<usize> = Cell::new(0);151// SAFETY: As above, all futures must complete in this function so we can change the lifetime152let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };153154let mut scope = Scope {155executor_ref,156pending_tasks,157results_ref,158scope: PhantomData,159env: PhantomData,160};161162// SAFETY: As above, all futures must complete in this function so we can change the lifetime163let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };164165f(scope_ref);166167// Wait until the scope is complete168block_on(executor.run(async {169while pending_tasks.get() != 0 {170futures_lite::future::yield_now().await;171}172}));173174results175.take()176.into_iter()177.map(|result| result.unwrap())178.collect()179}180181/// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled182/// to retrieve the output of the original future. Dropping the task will attempt to cancel it.183/// It can also be "detached", allowing it to continue running without having to be polled by the184/// end-user.185///186/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.187pub fn spawn<T>(188&self,189future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,190) -> Task<T>191where192T: 'static + MaybeSend + MaybeSync,193{194crate::cfg::switch! {{195crate::cfg::web => {196Task::wrap_future(future)197}198crate::cfg::std => {199LOCAL_EXECUTOR.with(|executor| {200let task = executor.spawn(future);201// Loop until all tasks are done202while executor.try_tick() {}203204Task::new(task)205})206}207_ => {208let task = LOCAL_EXECUTOR.spawn(future);209// Loop until all tasks are done210while LOCAL_EXECUTOR.try_tick() {}211212Task::new(task)213}214}}215}216217/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].218pub fn spawn_local<T>(219&self,220future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,221) -> Task<T>222where223T: 'static + MaybeSend + MaybeSync,224{225self.spawn(future)226}227228/// Runs a function with the local executor. Typically used to tick229/// the local executor on the main thread as it needs to share time with230/// other things.231///232/// ```233/// use bevy_tasks::TaskPool;234///235/// TaskPool::new().with_local_executor(|local_executor| {236/// local_executor.try_tick();237/// });238/// ```239pub fn with_local_executor<F, R>(&self, f: F) -> R240where241F: FnOnce(&Executor) -> R,242{243crate::cfg::switch! {{244crate::cfg::std => {245LOCAL_EXECUTOR.with(f)246}247_ => {248f(&LOCAL_EXECUTOR)249}250}}251}252}253254/// A `TaskPool` scope for running one or more non-`'static` futures.255///256/// For more information, see [`TaskPool::scope`].257#[derive(Debug)]258pub struct Scope<'scope, 'env: 'scope, T> {259executor_ref: &'scope LocalExecutor<'scope>,260// The number of pending tasks spawned on the scope261pending_tasks: &'scope Cell<usize>,262// Vector to gather results of all futures spawned during scope run263results_ref: &'env RefCell<Vec<Option<T>>>,264265// make `Scope` invariant over 'scope and 'env266scope: PhantomData<&'scope mut &'scope ()>,267env: PhantomData<&'env mut &'env ()>,268}269270impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {271/// Spawns a scoped future onto the executor. The scope *must* outlive272/// the provided future. The results of the future will be returned as a part of273/// [`TaskPool::scope`]'s return value.274///275/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].276///277/// For more information, see [`TaskPool::scope`].278pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {279self.spawn_on_scope(f);280}281282/// Spawns a scoped future onto the executor. The scope *must* outlive283/// the provided future. The results of the future will be returned as a part of284/// [`TaskPool::scope`]'s return value.285///286/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].287///288/// For more information, see [`TaskPool::scope`].289pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {290self.spawn_on_scope(f);291}292293/// Spawns a scoped future that runs on the thread the scope called from. The294/// scope *must* outlive the provided future. The results of the future will be295/// returned as a part of [`TaskPool::scope`]'s return value.296///297/// For more information, see [`TaskPool::scope`].298pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {299// increment the number of pending tasks300let pending_tasks = self.pending_tasks;301pending_tasks.update(|i| i + 1);302303// add a spot to keep the result, and record the index304let results_ref = self.results_ref;305let mut results = results_ref.borrow_mut();306let task_number = results.len();307results.push(None);308drop(results);309310// create the job closure311let f = async move {312let result = f.await;313314// store the result in the allocated slot315let mut results = results_ref.borrow_mut();316results[task_number] = Some(result);317drop(results);318319// decrement the pending tasks count320pending_tasks.update(|i| i - 1);321};322323// spawn the job itself324self.executor_ref.spawn(f).detach();325}326}327328crate::cfg::std! {329if {330pub trait MaybeSend {}331impl<T> MaybeSend for T {}332333pub trait MaybeSync {}334impl<T> MaybeSync for T {}335} else {336pub trait MaybeSend: Send {}337impl<T: Send> MaybeSend for T {}338339pub trait MaybeSync: Sync {}340impl<T: Sync> MaybeSync for T {}341}342}343344#[cfg(test)]345mod test {346use std::{time, thread};347348use super::*;349350/// This test creates a scope with a single task that goes to sleep for a351/// nontrivial amount of time. At one point, the scope would (incorrectly)352/// return early under these conditions, causing a crash.353///354/// The correct behavior is for the scope to block until the receiver is355/// woken by the external thread.356#[test]357fn scoped_spawn() {358let (sender, recever) = async_channel::unbounded();359let task_pool = TaskPool {};360let thread = thread::spawn(move || {361let duration = time::Duration::from_millis(50);362thread::sleep(duration);363let _ = sender.send(0);364});365task_pool.scope(|scope| {366scope.spawn(async {367recever.recv().await368});369});370}371}372373374