//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov.1//!2//! It has been vendored along with its tests to update several outdated dependencies.3//!4//! [`async_executor`]: https://github.com/smol-rs/async-executor5//! [`edge_executor`]: https://github.com/ivmarkov/edge-executor67#![expect(unsafe_code, reason = "original implementation relies on unsafe")]8#![expect(9dead_code,10reason = "keeping methods from original implementation for transparency"11)]1213// TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte)1415use alloc::rc::Rc;16use core::{17future::{poll_fn, Future},18marker::PhantomData,19task::{Context, Poll},20};2122use async_task::{Runnable, Task};23use atomic_waker::AtomicWaker;24use bevy_platform::sync::{Arc, LazyLock};25use futures_lite::FutureExt;2627/// An async executor.28///29/// # Examples30///31/// A multi-threaded executor:32///33/// ```ignore34/// use async_channel::unbounded;35/// use easy_parallel::Parallel;36///37/// use edge_executor::{Executor, block_on};38///39/// let ex: Executor = Default::default();40/// let (signal, shutdown) = unbounded::<()>();41///42/// Parallel::new()43/// // Run four executor threads.44/// .each(0..4, |_| block_on(ex.run(shutdown.recv())))45/// // Run the main future on the current thread.46/// .finish(|| block_on(async {47/// println!("Hello world!");48/// drop(signal);49/// }));50/// ```51pub struct Executor<'a, const C: usize = 64> {52state: LazyLock<Arc<State<C>>>,53_invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,54}5556impl<'a, const C: usize> Executor<'a, C> {57/// Creates a new executor.58///59/// # Examples60///61/// ```ignore62/// use edge_executor::Executor;63///64/// let ex: Executor = Default::default();65/// ```66pub const fn new() -> Self {67Self {68state: LazyLock::new(|| Arc::new(State::new())),69_invariant: PhantomData,70}71}7273/// Spawns a task onto the executor.74///75/// # Examples76///77/// ```ignore78/// use edge_executor::Executor;79///80/// let ex: Executor = Default::default();81///82/// let task = ex.spawn(async {83/// println!("Hello world");84/// });85/// ```86///87/// Note that if the executor's queue size is equal to the number of currently88/// spawned and running tasks, spawning this additional task might cause the executor to panic89/// later, when the task is scheduled for polling.90pub fn spawn<F>(&self, fut: F) -> Task<F::Output>91where92F: Future + Send + 'a,93F::Output: Send + 'a,94{95// SAFETY: Original implementation missing safety documentation96unsafe { self.spawn_unchecked(fut) }97}9899/// Attempts to run a task if at least one is scheduled.100///101/// Running a scheduled task means simply polling its future once.102///103/// # Examples104///105/// ```ignore106/// use edge_executor::Executor;107///108/// let ex: Executor = Default::default();109/// assert!(!ex.try_tick()); // no tasks to run110///111/// let task = ex.spawn(async {112/// println!("Hello world");113/// });114/// assert!(ex.try_tick()); // a task was found115/// ```116pub fn try_tick(&self) -> bool {117if let Some(runnable) = self.try_runnable() {118runnable.run();119120true121} else {122false123}124}125126/// Runs a single task asynchronously.127///128/// Running a task means simply polling its future once.129///130/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.131///132/// # Examples133///134/// ```ignore135/// use edge_executor::{Executor, block_on};136///137/// let ex: Executor = Default::default();138///139/// let task = ex.spawn(async {140/// println!("Hello world");141/// });142/// block_on(ex.tick()); // runs the task143/// ```144pub async fn tick(&self) {145self.runnable().await.run();146}147148/// Runs the executor asynchronously until the given future completes.149///150/// # Examples151///152/// ```ignore153/// use edge_executor::{Executor, block_on};154///155/// let ex: Executor = Default::default();156///157/// let task = ex.spawn(async { 1 + 2 });158/// let res = block_on(ex.run(async { task.await * 2 }));159///160/// assert_eq!(res, 6);161/// ```162pub async fn run<F>(&self, fut: F) -> F::Output163where164F: Future + Send + 'a,165{166// SAFETY: Original implementation missing safety documentation167unsafe { self.run_unchecked(fut).await }168}169170/// Waits for the next runnable task to run.171async fn runnable(&self) -> Runnable {172poll_fn(|ctx| self.poll_runnable(ctx)).await173}174175/// Polls the first task scheduled for execution by the executor.176fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {177self.state().waker.register(ctx.waker());178179if let Some(runnable) = self.try_runnable() {180Poll::Ready(runnable)181} else {182Poll::Pending183}184}185186/// Pops the first task scheduled for execution by the executor.187///188/// Returns189/// - `None` - if no task was scheduled for execution190/// - `Some(Runnnable)` - the first task scheduled for execution. Calling `Runnable::run` will191/// execute the task. In other words, it will poll its future.192fn try_runnable(&self) -> Option<Runnable> {193let runnable;194195#[cfg(all(196target_has_atomic = "8",197target_has_atomic = "16",198target_has_atomic = "32",199target_has_atomic = "64",200target_has_atomic = "ptr"201))]202{203runnable = self.state().queue.pop();204}205206#[cfg(not(all(207target_has_atomic = "8",208target_has_atomic = "16",209target_has_atomic = "32",210target_has_atomic = "64",211target_has_atomic = "ptr"212)))]213{214runnable = self.state().queue.dequeue();215}216217runnable218}219220/// # Safety221///222/// Original implementation missing safety documentation223unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>224where225F: Future,226{227let schedule = {228let state = self.state().clone();229230move |runnable| {231#[cfg(all(232target_has_atomic = "8",233target_has_atomic = "16",234target_has_atomic = "32",235target_has_atomic = "64",236target_has_atomic = "ptr"237))]238{239state.queue.push(runnable).unwrap();240}241242#[cfg(not(all(243target_has_atomic = "8",244target_has_atomic = "16",245target_has_atomic = "32",246target_has_atomic = "64",247target_has_atomic = "ptr"248)))]249{250state.queue.enqueue(runnable).unwrap();251}252253if let Some(waker) = state.waker.take() {254waker.wake();255}256}257};258259// SAFETY: Original implementation missing safety documentation260let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };261262runnable.schedule();263264task265}266267/// # Safety268///269/// Original implementation missing safety documentation270async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output271where272F: Future,273{274let run_forever = async {275loop {276self.tick().await;277}278};279280run_forever.or(fut).await281}282283/// Returns a reference to the inner state.284fn state(&self) -> &Arc<State<C>> {285&self.state286}287}288289impl<'a, const C: usize> Default for Executor<'a, C> {290fn default() -> Self {291Self::new()292}293}294295// SAFETY: Original implementation missing safety documentation296unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}297// SAFETY: Original implementation missing safety documentation298unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}299300/// A thread-local executor.301///302/// The executor can only be run on the thread that created it.303///304/// # Examples305///306/// ```ignore307/// use edge_executor::{LocalExecutor, block_on};308///309/// let local_ex: LocalExecutor = Default::default();310///311/// block_on(local_ex.run(async {312/// println!("Hello world!");313/// }));314/// ```315pub struct LocalExecutor<'a, const C: usize = 64> {316executor: Executor<'a, C>,317_not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,318}319320impl<'a, const C: usize> LocalExecutor<'a, C> {321/// Creates a single-threaded executor.322///323/// # Examples324///325/// ```ignore326/// use edge_executor::LocalExecutor;327///328/// let local_ex: LocalExecutor = Default::default();329/// ```330pub const fn new() -> Self {331Self {332executor: Executor::<C>::new(),333_not_send: PhantomData,334}335}336337/// Spawns a task onto the executor.338///339/// # Examples340///341/// ```ignore342/// use edge_executor::LocalExecutor;343///344/// let local_ex: LocalExecutor = Default::default();345///346/// let task = local_ex.spawn(async {347/// println!("Hello world");348/// });349/// ```350///351/// Note that if the executor's queue size is equal to the number of currently352/// spawned and running tasks, spawning this additional task might cause the executor to panic353/// later, when the task is scheduled for polling.354pub fn spawn<F>(&self, fut: F) -> Task<F::Output>355where356F: Future + 'a,357F::Output: 'a,358{359// SAFETY: Original implementation missing safety documentation360unsafe { self.executor.spawn_unchecked(fut) }361}362363/// Attempts to run a task if at least one is scheduled.364///365/// Running a scheduled task means simply polling its future once.366///367/// # Examples368///369/// ```ignore370/// use edge_executor::LocalExecutor;371///372/// let local_ex: LocalExecutor = Default::default();373/// assert!(!local_ex.try_tick()); // no tasks to run374///375/// let task = local_ex.spawn(async {376/// println!("Hello world");377/// });378/// assert!(local_ex.try_tick()); // a task was found379/// ```380pub fn try_tick(&self) -> bool {381self.executor.try_tick()382}383384/// Runs a single task asynchronously.385///386/// Running a task means simply polling its future once.387///388/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.389///390/// # Examples391///392/// ```ignore393/// use edge_executor::{LocalExecutor, block_on};394///395/// let local_ex: LocalExecutor = Default::default();396///397/// let task = local_ex.spawn(async {398/// println!("Hello world");399/// });400/// block_on(local_ex.tick()); // runs the task401/// ```402pub async fn tick(&self) {403self.executor.tick().await;404}405406/// Runs the executor asynchronously until the given future completes.407///408/// # Examples409///410/// ```ignore411/// use edge_executor::{LocalExecutor, block_on};412///413/// let local_ex: LocalExecutor = Default::default();414///415/// let task = local_ex.spawn(async { 1 + 2 });416/// let res = block_on(local_ex.run(async { task.await * 2 }));417///418/// assert_eq!(res, 6);419/// ```420pub async fn run<F>(&self, fut: F) -> F::Output421where422F: Future,423{424// SAFETY: Original implementation missing safety documentation425unsafe { self.executor.run_unchecked(fut) }.await426}427}428429impl<'a, const C: usize> Default for LocalExecutor<'a, C> {430fn default() -> Self {431Self::new()432}433}434435struct State<const C: usize> {436#[cfg(all(437target_has_atomic = "8",438target_has_atomic = "16",439target_has_atomic = "32",440target_has_atomic = "64",441target_has_atomic = "ptr"442))]443queue: crossbeam_queue::ArrayQueue<Runnable>,444#[cfg(not(all(445target_has_atomic = "8",446target_has_atomic = "16",447target_has_atomic = "32",448target_has_atomic = "64",449target_has_atomic = "ptr"450)))]451queue: heapless::mpmc::MpMcQueue<Runnable, C>,452waker: AtomicWaker,453}454455impl<const C: usize> State<C> {456fn new() -> Self {457Self {458#[cfg(all(459target_has_atomic = "8",460target_has_atomic = "16",461target_has_atomic = "32",462target_has_atomic = "64",463target_has_atomic = "ptr"464))]465queue: crossbeam_queue::ArrayQueue::new(C),466#[cfg(not(all(467target_has_atomic = "8",468target_has_atomic = "16",469target_has_atomic = "32",470target_has_atomic = "64",471target_has_atomic = "ptr"472)))]473queue: heapless::mpmc::MpMcQueue::new(),474waker: AtomicWaker::new(),475}476}477}478479#[cfg(test)]480mod different_executor_tests {481use core::cell::Cell;482483use bevy_tasks::{block_on, futures_lite::{pending, poll_once}};484use futures_lite::pin;485486use super::LocalExecutor;487488#[test]489fn shared_queue_slot() {490block_on(async {491let was_polled = Cell::new(false);492let future = async {493was_polled.set(true);494pending::<()>().await;495};496497let ex1: LocalExecutor = Default::default();498let ex2: LocalExecutor = Default::default();499500// Start the futures for running forever.501let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));502pin!(run1);503pin!(run2);504assert!(poll_once(run1.as_mut()).await.is_none());505assert!(poll_once(run2.as_mut()).await.is_none());506507// Spawn the future on executor one and then poll executor two.508ex1.spawn(future).detach();509assert!(poll_once(run2).await.is_none());510assert!(!was_polled.get());511512// Poll the first one.513assert!(poll_once(run1).await.is_none());514assert!(was_polled.get());515});516}517}518519#[cfg(test)]520mod drop_tests {521use alloc::string::String;522use core::mem;523use core::sync::atomic::{AtomicUsize, Ordering};524use core::task::{Poll, Waker};525use std::sync::Mutex;526527use bevy_platform::sync::LazyLock;528use futures_lite::future;529530use super::{Executor, Task};531532#[test]533fn leaked_executor_leaks_everything() {534static DROP: AtomicUsize = AtomicUsize::new(0);535static WAKER: LazyLock<Mutex<Option<Waker>>> = LazyLock::new(Default::default);536537let ex: Executor = Default::default();538539let task = ex.spawn(async {540let _guard = CallOnDrop(|| {541DROP.fetch_add(1, Ordering::SeqCst);542});543544future::poll_fn(|cx| {545*WAKER.lock().unwrap() = Some(cx.waker().clone());546Poll::Pending::<()>547})548.await;549});550551future::block_on(ex.tick());552assert!(WAKER.lock().unwrap().is_some());553assert_eq!(DROP.load(Ordering::SeqCst), 0);554555mem::forget(ex);556assert_eq!(DROP.load(Ordering::SeqCst), 0);557558assert!(future::block_on(future::poll_once(task)).is_none());559assert_eq!(DROP.load(Ordering::SeqCst), 0);560}561562#[test]563fn await_task_after_dropping_executor() {564let s: String = "hello".into();565566let ex: Executor = Default::default();567let task: Task<&str> = ex.spawn(async { &*s });568assert!(ex.try_tick());569570drop(ex);571assert_eq!(future::block_on(task), "hello");572drop(s);573}574575#[test]576fn drop_executor_and_then_drop_finished_task() {577static DROP: AtomicUsize = AtomicUsize::new(0);578579let ex: Executor = Default::default();580let task = ex.spawn(async {581CallOnDrop(|| {582DROP.fetch_add(1, Ordering::SeqCst);583})584});585assert!(ex.try_tick());586587assert_eq!(DROP.load(Ordering::SeqCst), 0);588drop(ex);589assert_eq!(DROP.load(Ordering::SeqCst), 0);590drop(task);591assert_eq!(DROP.load(Ordering::SeqCst), 1);592}593594#[test]595fn drop_finished_task_and_then_drop_executor() {596static DROP: AtomicUsize = AtomicUsize::new(0);597598let ex: Executor = Default::default();599let task = ex.spawn(async {600CallOnDrop(|| {601DROP.fetch_add(1, Ordering::SeqCst);602})603});604assert!(ex.try_tick());605606assert_eq!(DROP.load(Ordering::SeqCst), 0);607drop(task);608assert_eq!(DROP.load(Ordering::SeqCst), 1);609drop(ex);610assert_eq!(DROP.load(Ordering::SeqCst), 1);611}612613struct CallOnDrop<F: Fn()>(F);614615impl<F: Fn()> Drop for CallOnDrop<F> {616fn drop(&mut self) {617(self.0)();618}619}620}621622#[cfg(test)]623mod local_queue {624use alloc::boxed::Box;625626use futures_lite::{future, pin};627628use super::Executor;629630#[test]631fn two_queues() {632future::block_on(async {633// Create an executor with two runners.634let ex: Executor = Default::default();635let (run1, run2) = (636ex.run(future::pending::<()>()),637ex.run(future::pending::<()>()),638);639let mut run1 = Box::pin(run1);640pin!(run2);641642// Poll them both.643assert!(future::poll_once(run1.as_mut()).await.is_none());644assert!(future::poll_once(run2.as_mut()).await.is_none());645646// Drop the first one, which should leave the local queue in the `None` state.647drop(run1);648assert!(future::poll_once(run2.as_mut()).await.is_none());649});650}651}652653654