Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_tasks/src/edge_executor.rs
9354 views
1
//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov.
2
//!
3
//! It has been vendored along with its tests to update several outdated dependencies.
4
//!
5
//! [`async_executor`]: https://github.com/smol-rs/async-executor
6
//! [`edge_executor`]: https://github.com/ivmarkov/edge-executor
7
8
#![expect(unsafe_code, reason = "original implementation relies on unsafe")]
9
#![expect(
10
dead_code,
11
reason = "keeping methods from original implementation for transparency"
12
)]
13
14
// TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte)
15
16
use alloc::rc::Rc;
17
use core::{
18
future::{poll_fn, Future},
19
marker::PhantomData,
20
task::{Context, Poll},
21
};
22
23
use async_task::{Runnable, Task};
24
use atomic_waker::AtomicWaker;
25
use bevy_platform::sync::{Arc, LazyLock};
26
use futures_lite::FutureExt;
27
28
/// An async executor.
29
///
30
/// # Examples
31
///
32
/// A multi-threaded executor:
33
///
34
/// ```ignore
35
/// use async_channel::unbounded;
36
/// use easy_parallel::Parallel;
37
///
38
/// use edge_executor::{Executor, block_on};
39
///
40
/// let ex: Executor = Default::default();
41
/// let (signal, shutdown) = unbounded::<()>();
42
///
43
/// Parallel::new()
44
/// // Run four executor threads.
45
/// .each(0..4, |_| block_on(ex.run(shutdown.recv())))
46
/// // Run the main future on the current thread.
47
/// .finish(|| block_on(async {
48
/// println!("Hello world!");
49
/// drop(signal);
50
/// }));
51
/// ```
52
pub struct Executor<'a, const C: usize = 64> {
53
state: LazyLock<Arc<State<C>>>,
54
_invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,
55
}
56
57
impl<'a, const C: usize> Executor<'a, C> {
58
/// Creates a new executor.
59
///
60
/// # Examples
61
///
62
/// ```ignore
63
/// use edge_executor::Executor;
64
///
65
/// let ex: Executor = Default::default();
66
/// ```
67
pub const fn new() -> Self {
68
Self {
69
state: LazyLock::new(|| Arc::new(State::new())),
70
_invariant: PhantomData,
71
}
72
}
73
74
/// Spawns a task onto the executor.
75
///
76
/// # Examples
77
///
78
/// ```ignore
79
/// use edge_executor::Executor;
80
///
81
/// let ex: Executor = Default::default();
82
///
83
/// let task = ex.spawn(async {
84
/// println!("Hello world");
85
/// });
86
/// ```
87
///
88
/// Note that if the executor's queue size is equal to the number of currently
89
/// spawned and running tasks, spawning this additional task might cause the executor to panic
90
/// later, when the task is scheduled for polling.
91
pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
92
where
93
F: Future + Send + 'a,
94
F::Output: Send + 'a,
95
{
96
// SAFETY: Original implementation missing safety documentation
97
unsafe { self.spawn_unchecked(fut) }
98
}
99
100
/// Attempts to run a task if at least one is scheduled.
101
///
102
/// Running a scheduled task means simply polling its future once.
103
///
104
/// # Examples
105
///
106
/// ```ignore
107
/// use edge_executor::Executor;
108
///
109
/// let ex: Executor = Default::default();
110
/// assert!(!ex.try_tick()); // no tasks to run
111
///
112
/// let task = ex.spawn(async {
113
/// println!("Hello world");
114
/// });
115
/// assert!(ex.try_tick()); // a task was found
116
/// ```
117
pub fn try_tick(&self) -> bool {
118
if let Some(runnable) = self.try_runnable() {
119
runnable.run();
120
121
true
122
} else {
123
false
124
}
125
}
126
127
/// Runs a single task asynchronously.
128
///
129
/// Running a task means simply polling its future once.
130
///
131
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
132
///
133
/// # Examples
134
///
135
/// ```ignore
136
/// use edge_executor::{Executor, block_on};
137
///
138
/// let ex: Executor = Default::default();
139
///
140
/// let task = ex.spawn(async {
141
/// println!("Hello world");
142
/// });
143
/// block_on(ex.tick()); // runs the task
144
/// ```
145
pub async fn tick(&self) {
146
self.runnable().await.run();
147
}
148
149
/// Runs the executor asynchronously until the given future completes.
150
///
151
/// # Examples
152
///
153
/// ```ignore
154
/// use edge_executor::{Executor, block_on};
155
///
156
/// let ex: Executor = Default::default();
157
///
158
/// let task = ex.spawn(async { 1 + 2 });
159
/// let res = block_on(ex.run(async { task.await * 2 }));
160
///
161
/// assert_eq!(res, 6);
162
/// ```
163
pub async fn run<F>(&self, fut: F) -> F::Output
164
where
165
F: Future + Send + 'a,
166
{
167
// SAFETY: Original implementation missing safety documentation
168
unsafe { self.run_unchecked(fut).await }
169
}
170
171
/// Waits for the next runnable task to run.
172
async fn runnable(&self) -> Runnable {
173
poll_fn(|ctx| self.poll_runnable(ctx)).await
174
}
175
176
/// Polls the first task scheduled for execution by the executor.
177
fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {
178
self.state().waker.register(ctx.waker());
179
180
if let Some(runnable) = self.try_runnable() {
181
Poll::Ready(runnable)
182
} else {
183
Poll::Pending
184
}
185
}
186
187
/// Pops the first task scheduled for execution by the executor.
188
///
189
/// Returns
190
/// - `None` - if no task was scheduled for execution
191
/// - `Some(Runnable)` - the first task scheduled for execution. Calling `Runnable::run` will
192
/// execute the task. In other words, it will poll its future.
193
fn try_runnable(&self) -> Option<Runnable> {
194
let runnable;
195
196
#[cfg(all(
197
target_has_atomic = "8",
198
target_has_atomic = "16",
199
target_has_atomic = "32",
200
target_has_atomic = "64",
201
target_has_atomic = "ptr"
202
))]
203
{
204
runnable = self.state().queue.pop();
205
}
206
207
#[cfg(not(all(
208
target_has_atomic = "8",
209
target_has_atomic = "16",
210
target_has_atomic = "32",
211
target_has_atomic = "64",
212
target_has_atomic = "ptr"
213
)))]
214
{
215
runnable = self.state().queue.dequeue();
216
}
217
218
runnable
219
}
220
221
/// # Safety
222
///
223
/// Original implementation missing safety documentation
224
unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>
225
where
226
F: Future,
227
{
228
let schedule = {
229
let state = self.state().clone();
230
231
move |runnable| {
232
#[cfg(all(
233
target_has_atomic = "8",
234
target_has_atomic = "16",
235
target_has_atomic = "32",
236
target_has_atomic = "64",
237
target_has_atomic = "ptr"
238
))]
239
{
240
state.queue.push(runnable).unwrap();
241
}
242
243
#[cfg(not(all(
244
target_has_atomic = "8",
245
target_has_atomic = "16",
246
target_has_atomic = "32",
247
target_has_atomic = "64",
248
target_has_atomic = "ptr"
249
)))]
250
{
251
state.queue.enqueue(runnable).unwrap();
252
}
253
254
if let Some(waker) = state.waker.take() {
255
waker.wake();
256
}
257
}
258
};
259
260
// SAFETY: Original implementation missing safety documentation
261
let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };
262
263
runnable.schedule();
264
265
task
266
}
267
268
/// # Safety
269
///
270
/// Original implementation missing safety documentation
271
async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output
272
where
273
F: Future,
274
{
275
let run_forever = async {
276
loop {
277
self.tick().await;
278
}
279
};
280
281
run_forever.or(fut).await
282
}
283
284
/// Returns a reference to the inner state.
285
fn state(&self) -> &Arc<State<C>> {
286
&self.state
287
}
288
}
289
290
impl<'a, const C: usize> Default for Executor<'a, C> {
291
fn default() -> Self {
292
Self::new()
293
}
294
}
295
296
// SAFETY: Original implementation missing safety documentation
297
unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}
298
// SAFETY: Original implementation missing safety documentation
299
unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}
300
301
/// A thread-local executor.
302
///
303
/// The executor can only be run on the thread that created it.
304
///
305
/// # Examples
306
///
307
/// ```ignore
308
/// use edge_executor::{LocalExecutor, block_on};
309
///
310
/// let local_ex: LocalExecutor = Default::default();
311
///
312
/// block_on(local_ex.run(async {
313
/// println!("Hello world!");
314
/// }));
315
/// ```
316
pub struct LocalExecutor<'a, const C: usize = 64> {
317
executor: Executor<'a, C>,
318
_not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,
319
}
320
321
impl<'a, const C: usize> LocalExecutor<'a, C> {
322
/// Creates a single-threaded executor.
323
///
324
/// # Examples
325
///
326
/// ```ignore
327
/// use edge_executor::LocalExecutor;
328
///
329
/// let local_ex: LocalExecutor = Default::default();
330
/// ```
331
pub const fn new() -> Self {
332
Self {
333
executor: Executor::<C>::new(),
334
_not_send: PhantomData,
335
}
336
}
337
338
/// Spawns a task onto the executor.
339
///
340
/// # Examples
341
///
342
/// ```ignore
343
/// use edge_executor::LocalExecutor;
344
///
345
/// let local_ex: LocalExecutor = Default::default();
346
///
347
/// let task = local_ex.spawn(async {
348
/// println!("Hello world");
349
/// });
350
/// ```
351
///
352
/// Note that if the executor's queue size is equal to the number of currently
353
/// spawned and running tasks, spawning this additional task might cause the executor to panic
354
/// later, when the task is scheduled for polling.
355
pub fn spawn<F>(&self, fut: F) -> Task<F::Output>
356
where
357
F: Future + 'a,
358
F::Output: 'a,
359
{
360
// SAFETY: Original implementation missing safety documentation
361
unsafe { self.executor.spawn_unchecked(fut) }
362
}
363
364
/// Attempts to run a task if at least one is scheduled.
365
///
366
/// Running a scheduled task means simply polling its future once.
367
///
368
/// # Examples
369
///
370
/// ```ignore
371
/// use edge_executor::LocalExecutor;
372
///
373
/// let local_ex: LocalExecutor = Default::default();
374
/// assert!(!local_ex.try_tick()); // no tasks to run
375
///
376
/// let task = local_ex.spawn(async {
377
/// println!("Hello world");
378
/// });
379
/// assert!(local_ex.try_tick()); // a task was found
380
/// ```
381
pub fn try_tick(&self) -> bool {
382
self.executor.try_tick()
383
}
384
385
/// Runs a single task asynchronously.
386
///
387
/// Running a task means simply polling its future once.
388
///
389
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
390
///
391
/// # Examples
392
///
393
/// ```ignore
394
/// use edge_executor::{LocalExecutor, block_on};
395
///
396
/// let local_ex: LocalExecutor = Default::default();
397
///
398
/// let task = local_ex.spawn(async {
399
/// println!("Hello world");
400
/// });
401
/// block_on(local_ex.tick()); // runs the task
402
/// ```
403
pub async fn tick(&self) {
404
self.executor.tick().await;
405
}
406
407
/// Runs the executor asynchronously until the given future completes.
408
///
409
/// # Examples
410
///
411
/// ```ignore
412
/// use edge_executor::{LocalExecutor, block_on};
413
///
414
/// let local_ex: LocalExecutor = Default::default();
415
///
416
/// let task = local_ex.spawn(async { 1 + 2 });
417
/// let res = block_on(local_ex.run(async { task.await * 2 }));
418
///
419
/// assert_eq!(res, 6);
420
/// ```
421
pub async fn run<F>(&self, fut: F) -> F::Output
422
where
423
F: Future,
424
{
425
// SAFETY: Original implementation missing safety documentation
426
unsafe { self.executor.run_unchecked(fut) }.await
427
}
428
}
429
430
impl<'a, const C: usize> Default for LocalExecutor<'a, C> {
431
fn default() -> Self {
432
Self::new()
433
}
434
}
435
436
struct State<const C: usize> {
437
#[cfg(all(
438
target_has_atomic = "8",
439
target_has_atomic = "16",
440
target_has_atomic = "32",
441
target_has_atomic = "64",
442
target_has_atomic = "ptr"
443
))]
444
queue: crossbeam_queue::ArrayQueue<Runnable>,
445
#[cfg(not(all(
446
target_has_atomic = "8",
447
target_has_atomic = "16",
448
target_has_atomic = "32",
449
target_has_atomic = "64",
450
target_has_atomic = "ptr"
451
)))]
452
queue: heapless::mpmc::Queue<Runnable, C>,
453
waker: AtomicWaker,
454
}
455
456
impl<const C: usize> State<C> {
457
fn new() -> Self {
458
Self {
459
#[cfg(all(
460
target_has_atomic = "8",
461
target_has_atomic = "16",
462
target_has_atomic = "32",
463
target_has_atomic = "64",
464
target_has_atomic = "ptr"
465
))]
466
queue: crossbeam_queue::ArrayQueue::new(C),
467
#[cfg(not(all(
468
target_has_atomic = "8",
469
target_has_atomic = "16",
470
target_has_atomic = "32",
471
target_has_atomic = "64",
472
target_has_atomic = "ptr"
473
)))]
474
#[allow(deprecated)]
475
queue: heapless::mpmc::Queue::new(),
476
waker: AtomicWaker::new(),
477
}
478
}
479
}
480
481
#[cfg(test)]
482
mod different_executor_tests {
483
use core::cell::Cell;
484
485
use bevy_tasks::{
486
block_on,
487
futures_lite::{pending, poll_once},
488
};
489
use futures_lite::pin;
490
491
use super::LocalExecutor;
492
493
#[test]
494
fn shared_queue_slot() {
495
block_on(async {
496
let was_polled = Cell::new(false);
497
let future = async {
498
was_polled.set(true);
499
pending::<()>().await;
500
};
501
502
let ex1: LocalExecutor = Default::default();
503
let ex2: LocalExecutor = Default::default();
504
505
// Start the futures for running forever.
506
let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));
507
pin!(run1);
508
pin!(run2);
509
assert!(poll_once(run1.as_mut()).await.is_none());
510
assert!(poll_once(run2.as_mut()).await.is_none());
511
512
// Spawn the future on executor one and then poll executor two.
513
ex1.spawn(future).detach();
514
assert!(poll_once(run2).await.is_none());
515
assert!(!was_polled.get());
516
517
// Poll the first one.
518
assert!(poll_once(run1).await.is_none());
519
assert!(was_polled.get());
520
});
521
}
522
}
523
524
#[cfg(test)]
525
mod drop_tests {
526
use alloc::string::String;
527
use core::mem;
528
use core::sync::atomic::{AtomicUsize, Ordering};
529
use core::task::{Poll, Waker};
530
use std::sync::Mutex;
531
532
use bevy_platform::sync::LazyLock;
533
use futures_lite::future;
534
535
use super::{Executor, Task};
536
537
#[test]
538
fn leaked_executor_leaks_everything() {
539
static DROP: AtomicUsize = AtomicUsize::new(0);
540
static WAKER: LazyLock<Mutex<Option<Waker>>> = LazyLock::new(Default::default);
541
542
let ex: Executor = Default::default();
543
544
let task = ex.spawn(async {
545
let _guard = CallOnDrop(|| {
546
DROP.fetch_add(1, Ordering::SeqCst);
547
});
548
549
future::poll_fn(|cx| {
550
*WAKER.lock().unwrap() = Some(cx.waker().clone());
551
Poll::Pending::<()>
552
})
553
.await;
554
});
555
556
future::block_on(ex.tick());
557
assert!(WAKER.lock().unwrap().is_some());
558
assert_eq!(DROP.load(Ordering::SeqCst), 0);
559
560
mem::forget(ex);
561
assert_eq!(DROP.load(Ordering::SeqCst), 0);
562
563
assert!(future::block_on(future::poll_once(task)).is_none());
564
assert_eq!(DROP.load(Ordering::SeqCst), 0);
565
}
566
567
#[test]
568
fn await_task_after_dropping_executor() {
569
let s: String = "hello".into();
570
571
let ex: Executor = Default::default();
572
let task: Task<&str> = ex.spawn(async { &*s });
573
assert!(ex.try_tick());
574
575
drop(ex);
576
assert_eq!(future::block_on(task), "hello");
577
drop(s);
578
}
579
580
#[test]
581
fn drop_executor_and_then_drop_finished_task() {
582
static DROP: AtomicUsize = AtomicUsize::new(0);
583
584
let ex: Executor = Default::default();
585
let task = ex.spawn(async {
586
CallOnDrop(|| {
587
DROP.fetch_add(1, Ordering::SeqCst);
588
})
589
});
590
assert!(ex.try_tick());
591
592
assert_eq!(DROP.load(Ordering::SeqCst), 0);
593
drop(ex);
594
assert_eq!(DROP.load(Ordering::SeqCst), 0);
595
drop(task);
596
assert_eq!(DROP.load(Ordering::SeqCst), 1);
597
}
598
599
#[test]
600
fn drop_finished_task_and_then_drop_executor() {
601
static DROP: AtomicUsize = AtomicUsize::new(0);
602
603
let ex: Executor = Default::default();
604
let task = ex.spawn(async {
605
CallOnDrop(|| {
606
DROP.fetch_add(1, Ordering::SeqCst);
607
})
608
});
609
assert!(ex.try_tick());
610
611
assert_eq!(DROP.load(Ordering::SeqCst), 0);
612
drop(task);
613
assert_eq!(DROP.load(Ordering::SeqCst), 1);
614
drop(ex);
615
assert_eq!(DROP.load(Ordering::SeqCst), 1);
616
}
617
618
struct CallOnDrop<F: Fn()>(F);
619
620
impl<F: Fn()> Drop for CallOnDrop<F> {
621
fn drop(&mut self) {
622
(self.0)();
623
}
624
}
625
}
626
627
#[cfg(test)]
628
mod local_queue {
629
use alloc::boxed::Box;
630
631
use futures_lite::{future, pin};
632
633
use super::Executor;
634
635
#[test]
636
fn two_queues() {
637
future::block_on(async {
638
// Create an executor with two runners.
639
let ex: Executor = Default::default();
640
let (run1, run2) = (
641
ex.run(future::pending::<()>()),
642
ex.run(future::pending::<()>()),
643
);
644
let mut run1 = Box::pin(run1);
645
pin!(run2);
646
647
// Poll them both.
648
assert!(future::poll_once(run1.as_mut()).await.is_none());
649
assert!(future::poll_once(run2.as_mut()).await.is_none());
650
651
// Drop the first one, which should leave the local queue in the `None` state.
652
drop(run1);
653
assert!(future::poll_once(run2.as_mut()).await.is_none());
654
});
655
}
656
}
657
658