Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_tasks/src/edge_executor.rs
6604 views
1
//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov.
2
//!
3
//! It has been vendored along with its tests to update several outdated dependencies.
4
//!
5
//! [`async_executor`]: https://github.com/smol-rs/async-executor
6
//! [`edge_executor`]: https://github.com/ivmarkov/edge-executor
7
8
#![expect(unsafe_code, reason = "original implementation relies on unsafe")]
9
#![expect(
10
dead_code,
11
reason = "keeping methods from original implementation for transparency"
12
)]
13
14
// TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte)
15
16
use alloc::rc::Rc;
17
use core::{
18
future::{poll_fn, Future},
19
marker::PhantomData,
20
task::{Context, Poll},
21
};
22
23
use async_task::{Runnable, Task};
24
use atomic_waker::AtomicWaker;
25
use bevy_platform::sync::{Arc, LazyLock};
26
use futures_lite::FutureExt;
27
28
/// An async executor.
29
///
30
/// # Examples
31
///
32
/// A multi-threaded executor:
33
///
34
/// ```ignore
35
/// use async_channel::unbounded;
36
/// use easy_parallel::Parallel;
37
///
38
/// use edge_executor::{Executor, block_on};
39
///
40
/// let ex: Executor = Default::default();
41
/// let (signal, shutdown) = unbounded::<()>();
42
///
43
/// Parallel::new()
44
/// // Run four executor threads.
45
/// .each(0..4, |_| block_on(ex.run(shutdown.recv())))
46
/// // Run the main future on the current thread.
47
/// .finish(|| block_on(async {
48
/// println!("Hello world!");
49
/// drop(signal);
50
/// }));
51
/// ```
52
pub struct Executor<'a, const C: usize = 64> {
53
state: LazyLock<Arc<State<C>>>,
54
_invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,
55
}
56
57
impl<'a, const C: usize> Executor<'a, C> {
58
/// Creates a new executor.
59
///
60
/// # Examples
61
///
62
/// ```ignore
63
/// use edge_executor::Executor;
64
///
65
/// let ex: Executor = Default::default();
66
/// ```
67
pub const fn new() -> Self {
68
Self {
69
state: LazyLock::new(|| Arc::new(State::new())),
70
_invariant: PhantomData,
71
}
72
}
73
74
/// Spawns a task onto the executor.
75
///
76
/// # Examples
77
///
78
/// ```ignore
79
/// use edge_executor::Executor;
80
///
81
/// let ex: Executor = Default::default();
82
///
83
/// let task = ex.spawn(async {
84
/// println!("Hello world");
85
/// });
86
/// ```
87
///
88
/// Note that if the executor's queue size is equal to the number of currently
89
/// spawned and running tasks, spawning this additional task might cause the executor to panic
90
/// later, when the task is scheduled for polling.
91
pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
92
where
93
F: Future + Send + 'a,
94
F::Output: Send + 'a,
95
{
96
// SAFETY: Original implementation missing safety documentation
97
unsafe { self.spawn_unchecked(fut) }
98
}
99
100
/// Attempts to run a task if at least one is scheduled.
101
///
102
/// Running a scheduled task means simply polling its future once.
103
///
104
/// # Examples
105
///
106
/// ```ignore
107
/// use edge_executor::Executor;
108
///
109
/// let ex: Executor = Default::default();
110
/// assert!(!ex.try_tick()); // no tasks to run
111
///
112
/// let task = ex.spawn(async {
113
/// println!("Hello world");
114
/// });
115
/// assert!(ex.try_tick()); // a task was found
116
/// ```
117
pub fn try_tick(&self) -> bool {
118
if let Some(runnable) = self.try_runnable() {
119
runnable.run();
120
121
true
122
} else {
123
false
124
}
125
}
126
127
/// Runs a single task asynchronously.
128
///
129
/// Running a task means simply polling its future once.
130
///
131
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
132
///
133
/// # Examples
134
///
135
/// ```ignore
136
/// use edge_executor::{Executor, block_on};
137
///
138
/// let ex: Executor = Default::default();
139
///
140
/// let task = ex.spawn(async {
141
/// println!("Hello world");
142
/// });
143
/// block_on(ex.tick()); // runs the task
144
/// ```
145
pub async fn tick(&self) {
146
self.runnable().await.run();
147
}
148
149
/// Runs the executor asynchronously until the given future completes.
150
///
151
/// # Examples
152
///
153
/// ```ignore
154
/// use edge_executor::{Executor, block_on};
155
///
156
/// let ex: Executor = Default::default();
157
///
158
/// let task = ex.spawn(async { 1 + 2 });
159
/// let res = block_on(ex.run(async { task.await * 2 }));
160
///
161
/// assert_eq!(res, 6);
162
/// ```
163
pub async fn run<F>(&self, fut: F) -> F::Output
164
where
165
F: Future + Send + 'a,
166
{
167
// SAFETY: Original implementation missing safety documentation
168
unsafe { self.run_unchecked(fut).await }
169
}
170
171
/// Waits for the next runnable task to run.
172
async fn runnable(&self) -> Runnable {
173
poll_fn(|ctx| self.poll_runnable(ctx)).await
174
}
175
176
/// Polls the first task scheduled for execution by the executor.
177
fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {
178
self.state().waker.register(ctx.waker());
179
180
if let Some(runnable) = self.try_runnable() {
181
Poll::Ready(runnable)
182
} else {
183
Poll::Pending
184
}
185
}
186
187
/// Pops the first task scheduled for execution by the executor.
188
///
189
/// Returns
190
/// - `None` - if no task was scheduled for execution
191
/// - `Some(Runnnable)` - the first task scheduled for execution. Calling `Runnable::run` will
192
/// execute the task. In other words, it will poll its future.
193
fn try_runnable(&self) -> Option<Runnable> {
194
let runnable;
195
196
#[cfg(all(
197
target_has_atomic = "8",
198
target_has_atomic = "16",
199
target_has_atomic = "32",
200
target_has_atomic = "64",
201
target_has_atomic = "ptr"
202
))]
203
{
204
runnable = self.state().queue.pop();
205
}
206
207
#[cfg(not(all(
208
target_has_atomic = "8",
209
target_has_atomic = "16",
210
target_has_atomic = "32",
211
target_has_atomic = "64",
212
target_has_atomic = "ptr"
213
)))]
214
{
215
runnable = self.state().queue.dequeue();
216
}
217
218
runnable
219
}
220
221
/// # Safety
222
///
223
/// Original implementation missing safety documentation
224
unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>
225
where
226
F: Future,
227
{
228
let schedule = {
229
let state = self.state().clone();
230
231
move |runnable| {
232
#[cfg(all(
233
target_has_atomic = "8",
234
target_has_atomic = "16",
235
target_has_atomic = "32",
236
target_has_atomic = "64",
237
target_has_atomic = "ptr"
238
))]
239
{
240
state.queue.push(runnable).unwrap();
241
}
242
243
#[cfg(not(all(
244
target_has_atomic = "8",
245
target_has_atomic = "16",
246
target_has_atomic = "32",
247
target_has_atomic = "64",
248
target_has_atomic = "ptr"
249
)))]
250
{
251
state.queue.enqueue(runnable).unwrap();
252
}
253
254
if let Some(waker) = state.waker.take() {
255
waker.wake();
256
}
257
}
258
};
259
260
// SAFETY: Original implementation missing safety documentation
261
let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };
262
263
runnable.schedule();
264
265
task
266
}
267
268
/// # Safety
269
///
270
/// Original implementation missing safety documentation
271
async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output
272
where
273
F: Future,
274
{
275
let run_forever = async {
276
loop {
277
self.tick().await;
278
}
279
};
280
281
run_forever.or(fut).await
282
}
283
284
/// Returns a reference to the inner state.
285
fn state(&self) -> &Arc<State<C>> {
286
&self.state
287
}
288
}
289
290
impl<'a, const C: usize> Default for Executor<'a, C> {
291
fn default() -> Self {
292
Self::new()
293
}
294
}
295
296
// SAFETY: Original implementation missing safety documentation
297
unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}
298
// SAFETY: Original implementation missing safety documentation
299
unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}
300
301
/// A thread-local executor.
302
///
303
/// The executor can only be run on the thread that created it.
304
///
305
/// # Examples
306
///
307
/// ```ignore
308
/// use edge_executor::{LocalExecutor, block_on};
309
///
310
/// let local_ex: LocalExecutor = Default::default();
311
///
312
/// block_on(local_ex.run(async {
313
/// println!("Hello world!");
314
/// }));
315
/// ```
316
pub struct LocalExecutor<'a, const C: usize = 64> {
317
executor: Executor<'a, C>,
318
_not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,
319
}
320
321
impl<'a, const C: usize> LocalExecutor<'a, C> {
322
/// Creates a single-threaded executor.
323
///
324
/// # Examples
325
///
326
/// ```ignore
327
/// use edge_executor::LocalExecutor;
328
///
329
/// let local_ex: LocalExecutor = Default::default();
330
/// ```
331
pub const fn new() -> Self {
332
Self {
333
executor: Executor::<C>::new(),
334
_not_send: PhantomData,
335
}
336
}
337
338
/// Spawns a task onto the executor.
339
///
340
/// # Examples
341
///
342
/// ```ignore
343
/// use edge_executor::LocalExecutor;
344
///
345
/// let local_ex: LocalExecutor = Default::default();
346
///
347
/// let task = local_ex.spawn(async {
348
/// println!("Hello world");
349
/// });
350
/// ```
351
///
352
/// Note that if the executor's queue size is equal to the number of currently
353
/// spawned and running tasks, spawning this additional task might cause the executor to panic
354
/// later, when the task is scheduled for polling.
355
pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
356
where
357
F: Future + 'a,
358
F::Output: 'a,
359
{
360
// SAFETY: Original implementation missing safety documentation
361
unsafe { self.executor.spawn_unchecked(fut) }
362
}
363
364
/// Attempts to run a task if at least one is scheduled.
365
///
366
/// Running a scheduled task means simply polling its future once.
367
///
368
/// # Examples
369
///
370
/// ```ignore
371
/// use edge_executor::LocalExecutor;
372
///
373
/// let local_ex: LocalExecutor = Default::default();
374
/// assert!(!local_ex.try_tick()); // no tasks to run
375
///
376
/// let task = local_ex.spawn(async {
377
/// println!("Hello world");
378
/// });
379
/// assert!(local_ex.try_tick()); // a task was found
380
/// ```
381
pub fn try_tick(&self) -> bool {
382
self.executor.try_tick()
383
}
384
385
/// Runs a single task asynchronously.
386
///
387
/// Running a task means simply polling its future once.
388
///
389
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
390
///
391
/// # Examples
392
///
393
/// ```ignore
394
/// use edge_executor::{LocalExecutor, block_on};
395
///
396
/// let local_ex: LocalExecutor = Default::default();
397
///
398
/// let task = local_ex.spawn(async {
399
/// println!("Hello world");
400
/// });
401
/// block_on(local_ex.tick()); // runs the task
402
/// ```
403
pub async fn tick(&self) {
404
self.executor.tick().await;
405
}
406
407
/// Runs the executor asynchronously until the given future completes.
408
///
409
/// # Examples
410
///
411
/// ```ignore
412
/// use edge_executor::{LocalExecutor, block_on};
413
///
414
/// let local_ex: LocalExecutor = Default::default();
415
///
416
/// let task = local_ex.spawn(async { 1 + 2 });
417
/// let res = block_on(local_ex.run(async { task.await * 2 }));
418
///
419
/// assert_eq!(res, 6);
420
/// ```
421
pub async fn run<F>(&self, fut: F) -> F::Output
422
where
423
F: Future,
424
{
425
// SAFETY: Original implementation missing safety documentation
426
unsafe { self.executor.run_unchecked(fut) }.await
427
}
428
}
429
430
impl<'a, const C: usize> Default for LocalExecutor<'a, C> {
431
fn default() -> Self {
432
Self::new()
433
}
434
}
435
436
struct State<const C: usize> {
437
#[cfg(all(
438
target_has_atomic = "8",
439
target_has_atomic = "16",
440
target_has_atomic = "32",
441
target_has_atomic = "64",
442
target_has_atomic = "ptr"
443
))]
444
queue: crossbeam_queue::ArrayQueue<Runnable>,
445
#[cfg(not(all(
446
target_has_atomic = "8",
447
target_has_atomic = "16",
448
target_has_atomic = "32",
449
target_has_atomic = "64",
450
target_has_atomic = "ptr"
451
)))]
452
queue: heapless::mpmc::MpMcQueue<Runnable, C>,
453
waker: AtomicWaker,
454
}
455
456
impl<const C: usize> State<C> {
457
fn new() -> Self {
458
Self {
459
#[cfg(all(
460
target_has_atomic = "8",
461
target_has_atomic = "16",
462
target_has_atomic = "32",
463
target_has_atomic = "64",
464
target_has_atomic = "ptr"
465
))]
466
queue: crossbeam_queue::ArrayQueue::new(C),
467
#[cfg(not(all(
468
target_has_atomic = "8",
469
target_has_atomic = "16",
470
target_has_atomic = "32",
471
target_has_atomic = "64",
472
target_has_atomic = "ptr"
473
)))]
474
queue: heapless::mpmc::MpMcQueue::new(),
475
waker: AtomicWaker::new(),
476
}
477
}
478
}
479
480
#[cfg(test)]
481
mod different_executor_tests {
482
use core::cell::Cell;
483
484
use bevy_tasks::{block_on, futures_lite::{pending, poll_once}};
485
use futures_lite::pin;
486
487
use super::LocalExecutor;
488
489
#[test]
490
fn shared_queue_slot() {
491
block_on(async {
492
let was_polled = Cell::new(false);
493
let future = async {
494
was_polled.set(true);
495
pending::<()>().await;
496
};
497
498
let ex1: LocalExecutor = Default::default();
499
let ex2: LocalExecutor = Default::default();
500
501
// Start the futures for running forever.
502
let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));
503
pin!(run1);
504
pin!(run2);
505
assert!(poll_once(run1.as_mut()).await.is_none());
506
assert!(poll_once(run2.as_mut()).await.is_none());
507
508
// Spawn the future on executor one and then poll executor two.
509
ex1.spawn(future).detach();
510
assert!(poll_once(run2).await.is_none());
511
assert!(!was_polled.get());
512
513
// Poll the first one.
514
assert!(poll_once(run1).await.is_none());
515
assert!(was_polled.get());
516
});
517
}
518
}
519
520
#[cfg(test)]
521
mod drop_tests {
522
use alloc::string::String;
523
use core::mem;
524
use core::sync::atomic::{AtomicUsize, Ordering};
525
use core::task::{Poll, Waker};
526
use std::sync::Mutex;
527
528
use bevy_platform::sync::LazyLock;
529
use futures_lite::future;
530
531
use super::{Executor, Task};
532
533
#[test]
534
fn leaked_executor_leaks_everything() {
535
static DROP: AtomicUsize = AtomicUsize::new(0);
536
static WAKER: LazyLock<Mutex<Option<Waker>>> = LazyLock::new(Default::default);
537
538
let ex: Executor = Default::default();
539
540
let task = ex.spawn(async {
541
let _guard = CallOnDrop(|| {
542
DROP.fetch_add(1, Ordering::SeqCst);
543
});
544
545
future::poll_fn(|cx| {
546
*WAKER.lock().unwrap() = Some(cx.waker().clone());
547
Poll::Pending::<()>
548
})
549
.await;
550
});
551
552
future::block_on(ex.tick());
553
assert!(WAKER.lock().unwrap().is_some());
554
assert_eq!(DROP.load(Ordering::SeqCst), 0);
555
556
mem::forget(ex);
557
assert_eq!(DROP.load(Ordering::SeqCst), 0);
558
559
assert!(future::block_on(future::poll_once(task)).is_none());
560
assert_eq!(DROP.load(Ordering::SeqCst), 0);
561
}
562
563
#[test]
564
fn await_task_after_dropping_executor() {
565
let s: String = "hello".into();
566
567
let ex: Executor = Default::default();
568
let task: Task<&str> = ex.spawn(async { &*s });
569
assert!(ex.try_tick());
570
571
drop(ex);
572
assert_eq!(future::block_on(task), "hello");
573
drop(s);
574
}
575
576
#[test]
577
fn drop_executor_and_then_drop_finished_task() {
578
static DROP: AtomicUsize = AtomicUsize::new(0);
579
580
let ex: Executor = Default::default();
581
let task = ex.spawn(async {
582
CallOnDrop(|| {
583
DROP.fetch_add(1, Ordering::SeqCst);
584
})
585
});
586
assert!(ex.try_tick());
587
588
assert_eq!(DROP.load(Ordering::SeqCst), 0);
589
drop(ex);
590
assert_eq!(DROP.load(Ordering::SeqCst), 0);
591
drop(task);
592
assert_eq!(DROP.load(Ordering::SeqCst), 1);
593
}
594
595
#[test]
596
fn drop_finished_task_and_then_drop_executor() {
597
static DROP: AtomicUsize = AtomicUsize::new(0);
598
599
let ex: Executor = Default::default();
600
let task = ex.spawn(async {
601
CallOnDrop(|| {
602
DROP.fetch_add(1, Ordering::SeqCst);
603
})
604
});
605
assert!(ex.try_tick());
606
607
assert_eq!(DROP.load(Ordering::SeqCst), 0);
608
drop(task);
609
assert_eq!(DROP.load(Ordering::SeqCst), 1);
610
drop(ex);
611
assert_eq!(DROP.load(Ordering::SeqCst), 1);
612
}
613
614
struct CallOnDrop<F: Fn()>(F);
615
616
impl<F: Fn()> Drop for CallOnDrop<F> {
617
fn drop(&mut self) {
618
(self.0)();
619
}
620
}
621
}
622
623
#[cfg(test)]
624
mod local_queue {
625
use alloc::boxed::Box;
626
627
use futures_lite::{future, pin};
628
629
use super::Executor;
630
631
#[test]
632
fn two_queues() {
633
future::block_on(async {
634
// Create an executor with two runners.
635
let ex: Executor = Default::default();
636
let (run1, run2) = (
637
ex.run(future::pending::<()>()),
638
ex.run(future::pending::<()>()),
639
);
640
let mut run1 = Box::pin(run1);
641
pin!(run2);
642
643
// Poll them both.
644
assert!(future::poll_once(run1.as_mut()).await.is_none());
645
assert!(future::poll_once(run2.as_mut()).await.is_none());
646
647
// Drop the first one, which should leave the local queue in the `None` state.
648
drop(run1);
649
assert!(future::poll_once(run2.as_mut()).await.is_none());
650
});
651
}
652
}
653
654