Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_tasks/src/single_threaded_task_pool.rs
6604 views
1
use alloc::{string::String, vec::Vec};
2
use bevy_platform::sync::Arc;
3
use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};
4
5
use crate::executor::LocalExecutor;
6
use crate::{block_on, Task};
7
8
crate::cfg::std! {
9
if {
10
use std::thread_local;
11
12
use crate::executor::LocalExecutor as Executor;
13
14
thread_local! {
15
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
16
}
17
} else {
18
19
// Because we do not have thread-locals without std, we cannot use LocalExecutor here.
20
use crate::executor::Executor;
21
22
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
23
}
24
}
25
26
/// Used to create a [`TaskPool`].
27
#[derive(Debug, Default, Clone)]
28
pub struct TaskPoolBuilder {}
29
30
/// This is a dummy struct for wasm support to provide the same api as with the multithreaded
31
/// task pool. In the case of the multithreaded task pool this struct is used to spawn
32
/// tasks on a specific thread. But the wasm task pool just calls
33
/// `wasm_bindgen_futures::spawn_local` for spawning which just runs tasks on the main thread
34
/// and so the [`ThreadExecutor`] does nothing.
35
#[derive(Default)]
36
pub struct ThreadExecutor<'a>(PhantomData<&'a ()>);
37
impl<'a> ThreadExecutor<'a> {
38
/// Creates a new `ThreadExecutor`
39
pub fn new() -> Self {
40
Self::default()
41
}
42
}
43
44
impl TaskPoolBuilder {
45
/// Creates a new `TaskPoolBuilder` instance
46
pub fn new() -> Self {
47
Self::default()
48
}
49
50
/// No op on the single threaded task pool
51
pub fn num_threads(self, _num_threads: usize) -> Self {
52
self
53
}
54
55
/// No op on the single threaded task pool
56
pub fn stack_size(self, _stack_size: usize) -> Self {
57
self
58
}
59
60
/// No op on the single threaded task pool
61
pub fn thread_name(self, _thread_name: String) -> Self {
62
self
63
}
64
65
/// No op on the single threaded task pool
66
pub fn on_thread_spawn(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
67
self
68
}
69
70
/// No op on the single threaded task pool
71
pub fn on_thread_destroy(self, _f: impl Fn() + Send + Sync + 'static) -> Self {
72
self
73
}
74
75
/// Creates a new [`TaskPool`]
76
pub fn build(self) -> TaskPool {
77
TaskPool::new_internal()
78
}
79
}
80
81
/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
82
/// the pool on threads owned by the pool. In this case - main thread only.
83
#[derive(Debug, Default, Clone)]
84
pub struct TaskPool {}
85
86
impl TaskPool {
87
/// Just create a new `ThreadExecutor` for wasm
88
pub fn get_thread_executor() -> Arc<ThreadExecutor<'static>> {
89
Arc::new(ThreadExecutor::new())
90
}
91
92
/// Create a `TaskPool` with the default configuration.
93
pub fn new() -> Self {
94
TaskPoolBuilder::new().build()
95
}
96
97
fn new_internal() -> Self {
98
Self {}
99
}
100
101
/// Return the number of threads owned by the task pool
102
pub fn thread_num(&self) -> usize {
103
1
104
}
105
106
/// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
107
/// passing a scope object into it. The scope object provided to the callback can be used
108
/// to spawn tasks. This function will await the completion of all tasks before returning.
109
///
110
/// This is similar to `rayon::scope` and `crossbeam::scope`
111
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
112
where
113
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
114
T: Send + 'static,
115
{
116
self.scope_with_executor(false, None, f)
117
}
118
119
/// Allows spawning non-`'static` futures on the thread pool. The function takes a callback,
120
/// passing a scope object into it. The scope object provided to the callback can be used
121
/// to spawn tasks. This function will await the completion of all tasks before returning.
122
///
123
/// This is similar to `rayon::scope` and `crossbeam::scope`
124
#[expect(unsafe_code, reason = "Required to transmute lifetimes.")]
125
pub fn scope_with_executor<'env, F, T>(
126
&self,
127
_tick_task_pool_executor: bool,
128
_thread_executor: Option<&ThreadExecutor>,
129
f: F,
130
) -> Vec<T>
131
where
132
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
133
T: Send + 'static,
134
{
135
// SAFETY: This safety comment applies to all references transmuted to 'env.
136
// Any futures spawned with these references need to return before this function completes.
137
// This is guaranteed because we drive all the futures spawned onto the Scope
138
// to completion in this function. However, rust has no way of knowing this so we
139
// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
140
// Any usages of the references passed into `Scope` must be accessed through
141
// the transmuted reference for the rest of this function.
142
143
let executor = LocalExecutor::new();
144
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
145
let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };
146
147
let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
148
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
149
let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };
150
151
let pending_tasks: Cell<usize> = Cell::new(0);
152
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
153
let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };
154
155
let mut scope = Scope {
156
executor_ref,
157
pending_tasks,
158
results_ref,
159
scope: PhantomData,
160
env: PhantomData,
161
};
162
163
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
164
let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };
165
166
f(scope_ref);
167
168
// Wait until the scope is complete
169
block_on(executor.run(async {
170
while pending_tasks.get() != 0 {
171
futures_lite::future::yield_now().await;
172
}
173
}));
174
175
results
176
.take()
177
.into_iter()
178
.map(|result| result.unwrap())
179
.collect()
180
}
181
182
/// Spawns a static future onto the thread pool. The returned Task is a future, which can be polled
183
/// to retrieve the output of the original future. Dropping the task will attempt to cancel it.
184
/// It can also be "detached", allowing it to continue running without having to be polled by the
185
/// end-user.
186
///
187
/// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead.
188
pub fn spawn<T>(
189
&self,
190
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
191
) -> Task<T>
192
where
193
T: 'static + MaybeSend + MaybeSync,
194
{
195
crate::cfg::switch! {{
196
crate::cfg::web => {
197
Task::wrap_future(future)
198
}
199
crate::cfg::std => {
200
LOCAL_EXECUTOR.with(|executor| {
201
let task = executor.spawn(future);
202
// Loop until all tasks are done
203
while executor.try_tick() {}
204
205
Task::new(task)
206
})
207
}
208
_ => {
209
let task = LOCAL_EXECUTOR.spawn(future);
210
// Loop until all tasks are done
211
while LOCAL_EXECUTOR.try_tick() {}
212
213
Task::new(task)
214
}
215
}}
216
}
217
218
/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`].
219
pub fn spawn_local<T>(
220
&self,
221
future: impl Future<Output = T> + 'static + MaybeSend + MaybeSync,
222
) -> Task<T>
223
where
224
T: 'static + MaybeSend + MaybeSync,
225
{
226
self.spawn(future)
227
}
228
229
/// Runs a function with the local executor. Typically used to tick
230
/// the local executor on the main thread as it needs to share time with
231
/// other things.
232
///
233
/// ```
234
/// use bevy_tasks::TaskPool;
235
///
236
/// TaskPool::new().with_local_executor(|local_executor| {
237
/// local_executor.try_tick();
238
/// });
239
/// ```
240
pub fn with_local_executor<F, R>(&self, f: F) -> R
241
where
242
F: FnOnce(&Executor) -> R,
243
{
244
crate::cfg::switch! {{
245
crate::cfg::std => {
246
LOCAL_EXECUTOR.with(f)
247
}
248
_ => {
249
f(&LOCAL_EXECUTOR)
250
}
251
}}
252
}
253
}
254
255
/// A `TaskPool` scope for running one or more non-`'static` futures.
256
///
257
/// For more information, see [`TaskPool::scope`].
258
#[derive(Debug)]
259
pub struct Scope<'scope, 'env: 'scope, T> {
260
executor_ref: &'scope LocalExecutor<'scope>,
261
// The number of pending tasks spawned on the scope
262
pending_tasks: &'scope Cell<usize>,
263
// Vector to gather results of all futures spawned during scope run
264
results_ref: &'env RefCell<Vec<Option<T>>>,
265
266
// make `Scope` invariant over 'scope and 'env
267
scope: PhantomData<&'scope mut &'scope ()>,
268
env: PhantomData<&'env mut &'env ()>,
269
}
270
271
impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
272
/// Spawns a scoped future onto the executor. The scope *must* outlive
273
/// the provided future. The results of the future will be returned as a part of
274
/// [`TaskPool::scope`]'s return value.
275
///
276
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
277
///
278
/// For more information, see [`TaskPool::scope`].
279
pub fn spawn<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
280
self.spawn_on_scope(f);
281
}
282
283
/// Spawns a scoped future onto the executor. The scope *must* outlive
284
/// the provided future. The results of the future will be returned as a part of
285
/// [`TaskPool::scope`]'s return value.
286
///
287
/// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`].
288
///
289
/// For more information, see [`TaskPool::scope`].
290
pub fn spawn_on_external<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
291
self.spawn_on_scope(f);
292
}
293
294
/// Spawns a scoped future that runs on the thread the scope called from. The
295
/// scope *must* outlive the provided future. The results of the future will be
296
/// returned as a part of [`TaskPool::scope`]'s return value.
297
///
298
/// For more information, see [`TaskPool::scope`].
299
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
300
// increment the number of pending tasks
301
let pending_tasks = self.pending_tasks;
302
pending_tasks.update(|i| i + 1);
303
304
// add a spot to keep the result, and record the index
305
let results_ref = self.results_ref;
306
let mut results = results_ref.borrow_mut();
307
let task_number = results.len();
308
results.push(None);
309
drop(results);
310
311
// create the job closure
312
let f = async move {
313
let result = f.await;
314
315
// store the result in the allocated slot
316
let mut results = results_ref.borrow_mut();
317
results[task_number] = Some(result);
318
drop(results);
319
320
// decrement the pending tasks count
321
pending_tasks.update(|i| i - 1);
322
};
323
324
// spawn the job itself
325
self.executor_ref.spawn(f).detach();
326
}
327
}
328
329
crate::cfg::std! {
330
if {
331
pub trait MaybeSend {}
332
impl<T> MaybeSend for T {}
333
334
pub trait MaybeSync {}
335
impl<T> MaybeSync for T {}
336
} else {
337
pub trait MaybeSend: Send {}
338
impl<T: Send> MaybeSend for T {}
339
340
pub trait MaybeSync: Sync {}
341
impl<T: Sync> MaybeSync for T {}
342
}
343
}
344
345
#[cfg(test)]
346
mod test {
347
use std::{time, thread};
348
349
use super::*;
350
351
/// This test creates a scope with a single task that goes to sleep for a
352
/// nontrivial amount of time. At one point, the scope would (incorrectly)
353
/// return early under these conditions, causing a crash.
354
///
355
/// The correct behavior is for the scope to block until the receiver is
356
/// woken by the external thread.
357
#[test]
358
fn scoped_spawn() {
359
let (sender, recever) = async_channel::unbounded();
360
let task_pool = TaskPool {};
361
let thread = thread::spawn(move || {
362
let duration = time::Duration::from_millis(50);
363
thread::sleep(duration);
364
let _ = sender.send(0);
365
});
366
task_pool.scope(|scope| {
367
scope.spawn(async {
368
recever.recv().await
369
});
370
});
371
}
372
}
373
374