//! Alternative to `async_executor` based on [`edge_executor`] by Ivan Markov.1//!2//! It has been vendored along with its tests to update several outdated dependencies.3//!4//! [`async_executor`]: https://github.com/smol-rs/async-executor5//! [`edge_executor`]: https://github.com/ivmarkov/edge-executor67#![expect(unsafe_code, reason = "original implementation relies on unsafe")]8#![expect(9dead_code,10reason = "keeping methods from original implementation for transparency"11)]1213// TODO: Create a more tailored replacement, possibly integrating [Fotre](https://github.com/NthTensor/Forte)1415use alloc::rc::Rc;16use core::{17future::{poll_fn, Future},18marker::PhantomData,19task::{Context, Poll},20};2122use async_task::{Runnable, Task};23use atomic_waker::AtomicWaker;24use bevy_platform::sync::{Arc, LazyLock};25use futures_lite::FutureExt;2627/// An async executor.28///29/// # Examples30///31/// A multi-threaded executor:32///33/// ```ignore34/// use async_channel::unbounded;35/// use easy_parallel::Parallel;36///37/// use edge_executor::{Executor, block_on};38///39/// let ex: Executor = Default::default();40/// let (signal, shutdown) = unbounded::<()>();41///42/// Parallel::new()43/// // Run four executor threads.44/// .each(0..4, |_| block_on(ex.run(shutdown.recv())))45/// // Run the main future on the current thread.46/// .finish(|| block_on(async {47/// println!("Hello world!");48/// drop(signal);49/// }));50/// ```51pub struct Executor<'a, const C: usize = 64> {52state: LazyLock<Arc<State<C>>>,53_invariant: PhantomData<core::cell::UnsafeCell<&'a ()>>,54}5556impl<'a, const C: usize> Executor<'a, C> {57/// Creates a new executor.58///59/// # Examples60///61/// ```ignore62/// use edge_executor::Executor;63///64/// let ex: Executor = Default::default();65/// ```66pub const fn new() -> Self {67Self {68state: LazyLock::new(|| Arc::new(State::new())),69_invariant: PhantomData,70}71}7273/// Spawns a task onto the executor.74///75/// # Examples76///77/// ```ignore78/// use edge_executor::Executor;79///80/// let ex: Executor = Default::default();81///82/// let task = ex.spawn(async {83/// println!("Hello world");84/// });85/// ```86///87/// Note that if the executor's queue size is equal to the number of currently88/// spawned and running tasks, spawning this additional task might cause the executor to panic89/// later, when the task is scheduled for polling.90pub fn spawn<F>(&self, fut: F) -> Task<F::Output>91where92F: Future + Send + 'a,93F::Output: Send + 'a,94{95// SAFETY: Original implementation missing safety documentation96unsafe { self.spawn_unchecked(fut) }97}9899/// Attempts to run a task if at least one is scheduled.100///101/// Running a scheduled task means simply polling its future once.102///103/// # Examples104///105/// ```ignore106/// use edge_executor::Executor;107///108/// let ex: Executor = Default::default();109/// assert!(!ex.try_tick()); // no tasks to run110///111/// let task = ex.spawn(async {112/// println!("Hello world");113/// });114/// assert!(ex.try_tick()); // a task was found115/// ```116pub fn try_tick(&self) -> bool {117if let Some(runnable) = self.try_runnable() {118runnable.run();119120true121} else {122false123}124}125126/// Runs a single task asynchronously.127///128/// Running a task means simply polling its future once.129///130/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.131///132/// # Examples133///134/// ```ignore135/// use edge_executor::{Executor, block_on};136///137/// let ex: Executor = Default::default();138///139/// let task = ex.spawn(async {140/// println!("Hello world");141/// });142/// block_on(ex.tick()); // runs the task143/// ```144pub async fn tick(&self) {145self.runnable().await.run();146}147148/// Runs the executor asynchronously until the given future completes.149///150/// # Examples151///152/// ```ignore153/// use edge_executor::{Executor, block_on};154///155/// let ex: Executor = Default::default();156///157/// let task = ex.spawn(async { 1 + 2 });158/// let res = block_on(ex.run(async { task.await * 2 }));159///160/// assert_eq!(res, 6);161/// ```162pub async fn run<F>(&self, fut: F) -> F::Output163where164F: Future + Send + 'a,165{166// SAFETY: Original implementation missing safety documentation167unsafe { self.run_unchecked(fut).await }168}169170/// Waits for the next runnable task to run.171async fn runnable(&self) -> Runnable {172poll_fn(|ctx| self.poll_runnable(ctx)).await173}174175/// Polls the first task scheduled for execution by the executor.176fn poll_runnable(&self, ctx: &Context<'_>) -> Poll<Runnable> {177self.state().waker.register(ctx.waker());178179if let Some(runnable) = self.try_runnable() {180Poll::Ready(runnable)181} else {182Poll::Pending183}184}185186/// Pops the first task scheduled for execution by the executor.187///188/// Returns189/// - `None` - if no task was scheduled for execution190/// - `Some(Runnable)` - the first task scheduled for execution. Calling `Runnable::run` will191/// execute the task. In other words, it will poll its future.192fn try_runnable(&self) -> Option<Runnable> {193let runnable;194195#[cfg(all(196target_has_atomic = "8",197target_has_atomic = "16",198target_has_atomic = "32",199target_has_atomic = "64",200target_has_atomic = "ptr"201))]202{203runnable = self.state().queue.pop();204}205206#[cfg(not(all(207target_has_atomic = "8",208target_has_atomic = "16",209target_has_atomic = "32",210target_has_atomic = "64",211target_has_atomic = "ptr"212)))]213{214runnable = self.state().queue.dequeue();215}216217runnable218}219220/// # Safety221///222/// Original implementation missing safety documentation223unsafe fn spawn_unchecked<F>(&self, fut: F) -> Task<F::Output>224where225F: Future,226{227let schedule = {228let state = self.state().clone();229230move |runnable| {231#[cfg(all(232target_has_atomic = "8",233target_has_atomic = "16",234target_has_atomic = "32",235target_has_atomic = "64",236target_has_atomic = "ptr"237))]238{239state.queue.push(runnable).unwrap();240}241242#[cfg(not(all(243target_has_atomic = "8",244target_has_atomic = "16",245target_has_atomic = "32",246target_has_atomic = "64",247target_has_atomic = "ptr"248)))]249{250state.queue.enqueue(runnable).unwrap();251}252253if let Some(waker) = state.waker.take() {254waker.wake();255}256}257};258259// SAFETY: Original implementation missing safety documentation260let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) };261262runnable.schedule();263264task265}266267/// # Safety268///269/// Original implementation missing safety documentation270async unsafe fn run_unchecked<F>(&self, fut: F) -> F::Output271where272F: Future,273{274let run_forever = async {275loop {276self.tick().await;277}278};279280run_forever.or(fut).await281}282283/// Returns a reference to the inner state.284fn state(&self) -> &Arc<State<C>> {285&self.state286}287}288289impl<'a, const C: usize> Default for Executor<'a, C> {290fn default() -> Self {291Self::new()292}293}294295// SAFETY: Original implementation missing safety documentation296unsafe impl<'a, const C: usize> Send for Executor<'a, C> {}297// SAFETY: Original implementation missing safety documentation298unsafe impl<'a, const C: usize> Sync for Executor<'a, C> {}299300/// A thread-local executor.301///302/// The executor can only be run on the thread that created it.303///304/// # Examples305///306/// ```ignore307/// use edge_executor::{LocalExecutor, block_on};308///309/// let local_ex: LocalExecutor = Default::default();310///311/// block_on(local_ex.run(async {312/// println!("Hello world!");313/// }));314/// ```315pub struct LocalExecutor<'a, const C: usize = 64> {316executor: Executor<'a, C>,317_not_send: PhantomData<core::cell::UnsafeCell<&'a Rc<()>>>,318}319320impl<'a, const C: usize> LocalExecutor<'a, C> {321/// Creates a single-threaded executor.322///323/// # Examples324///325/// ```ignore326/// use edge_executor::LocalExecutor;327///328/// let local_ex: LocalExecutor = Default::default();329/// ```330pub const fn new() -> Self {331Self {332executor: Executor::<C>::new(),333_not_send: PhantomData,334}335}336337/// Spawns a task onto the executor.338///339/// # Examples340///341/// ```ignore342/// use edge_executor::LocalExecutor;343///344/// let local_ex: LocalExecutor = Default::default();345///346/// let task = local_ex.spawn(async {347/// println!("Hello world");348/// });349/// ```350///351/// Note that if the executor's queue size is equal to the number of currently352/// spawned and running tasks, spawning this additional task might cause the executor to panic353/// later, when the task is scheduled for polling.354pub fn spawn<F>(&self, fut: F) -> Task<F::Output>355where356F: Future + 'a,357F::Output: 'a,358{359// SAFETY: Original implementation missing safety documentation360unsafe { self.executor.spawn_unchecked(fut) }361}362363/// Attempts to run a task if at least one is scheduled.364///365/// Running a scheduled task means simply polling its future once.366///367/// # Examples368///369/// ```ignore370/// use edge_executor::LocalExecutor;371///372/// let local_ex: LocalExecutor = Default::default();373/// assert!(!local_ex.try_tick()); // no tasks to run374///375/// let task = local_ex.spawn(async {376/// println!("Hello world");377/// });378/// assert!(local_ex.try_tick()); // a task was found379/// ```380pub fn try_tick(&self) -> bool {381self.executor.try_tick()382}383384/// Runs a single task asynchronously.385///386/// Running a task means simply polling its future once.387///388/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.389///390/// # Examples391///392/// ```ignore393/// use edge_executor::{LocalExecutor, block_on};394///395/// let local_ex: LocalExecutor = Default::default();396///397/// let task = local_ex.spawn(async {398/// println!("Hello world");399/// });400/// block_on(local_ex.tick()); // runs the task401/// ```402pub async fn tick(&self) {403self.executor.tick().await;404}405406/// Runs the executor asynchronously until the given future completes.407///408/// # Examples409///410/// ```ignore411/// use edge_executor::{LocalExecutor, block_on};412///413/// let local_ex: LocalExecutor = Default::default();414///415/// let task = local_ex.spawn(async { 1 + 2 });416/// let res = block_on(local_ex.run(async { task.await * 2 }));417///418/// assert_eq!(res, 6);419/// ```420pub async fn run<F>(&self, fut: F) -> F::Output421where422F: Future,423{424// SAFETY: Original implementation missing safety documentation425unsafe { self.executor.run_unchecked(fut) }.await426}427}428429impl<'a, const C: usize> Default for LocalExecutor<'a, C> {430fn default() -> Self {431Self::new()432}433}434435struct State<const C: usize> {436#[cfg(all(437target_has_atomic = "8",438target_has_atomic = "16",439target_has_atomic = "32",440target_has_atomic = "64",441target_has_atomic = "ptr"442))]443queue: crossbeam_queue::ArrayQueue<Runnable>,444#[cfg(not(all(445target_has_atomic = "8",446target_has_atomic = "16",447target_has_atomic = "32",448target_has_atomic = "64",449target_has_atomic = "ptr"450)))]451queue: heapless::mpmc::Queue<Runnable, C>,452waker: AtomicWaker,453}454455impl<const C: usize> State<C> {456fn new() -> Self {457Self {458#[cfg(all(459target_has_atomic = "8",460target_has_atomic = "16",461target_has_atomic = "32",462target_has_atomic = "64",463target_has_atomic = "ptr"464))]465queue: crossbeam_queue::ArrayQueue::new(C),466#[cfg(not(all(467target_has_atomic = "8",468target_has_atomic = "16",469target_has_atomic = "32",470target_has_atomic = "64",471target_has_atomic = "ptr"472)))]473#[allow(deprecated)]474queue: heapless::mpmc::Queue::new(),475waker: AtomicWaker::new(),476}477}478}479480#[cfg(test)]481mod different_executor_tests {482use core::cell::Cell;483484use bevy_tasks::{485block_on,486futures_lite::{pending, poll_once},487};488use futures_lite::pin;489490use super::LocalExecutor;491492#[test]493fn shared_queue_slot() {494block_on(async {495let was_polled = Cell::new(false);496let future = async {497was_polled.set(true);498pending::<()>().await;499};500501let ex1: LocalExecutor = Default::default();502let ex2: LocalExecutor = Default::default();503504// Start the futures for running forever.505let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));506pin!(run1);507pin!(run2);508assert!(poll_once(run1.as_mut()).await.is_none());509assert!(poll_once(run2.as_mut()).await.is_none());510511// Spawn the future on executor one and then poll executor two.512ex1.spawn(future).detach();513assert!(poll_once(run2).await.is_none());514assert!(!was_polled.get());515516// Poll the first one.517assert!(poll_once(run1).await.is_none());518assert!(was_polled.get());519});520}521}522523#[cfg(test)]524mod drop_tests {525use alloc::string::String;526use core::mem;527use core::sync::atomic::{AtomicUsize, Ordering};528use core::task::{Poll, Waker};529use std::sync::Mutex;530531use bevy_platform::sync::LazyLock;532use futures_lite::future;533534use super::{Executor, Task};535536#[test]537fn leaked_executor_leaks_everything() {538static DROP: AtomicUsize = AtomicUsize::new(0);539static WAKER: LazyLock<Mutex<Option<Waker>>> = LazyLock::new(Default::default);540541let ex: Executor = Default::default();542543let task = ex.spawn(async {544let _guard = CallOnDrop(|| {545DROP.fetch_add(1, Ordering::SeqCst);546});547548future::poll_fn(|cx| {549*WAKER.lock().unwrap() = Some(cx.waker().clone());550Poll::Pending::<()>551})552.await;553});554555future::block_on(ex.tick());556assert!(WAKER.lock().unwrap().is_some());557assert_eq!(DROP.load(Ordering::SeqCst), 0);558559mem::forget(ex);560assert_eq!(DROP.load(Ordering::SeqCst), 0);561562assert!(future::block_on(future::poll_once(task)).is_none());563assert_eq!(DROP.load(Ordering::SeqCst), 0);564}565566#[test]567fn await_task_after_dropping_executor() {568let s: String = "hello".into();569570let ex: Executor = Default::default();571let task: Task<&str> = ex.spawn(async { &*s });572assert!(ex.try_tick());573574drop(ex);575assert_eq!(future::block_on(task), "hello");576drop(s);577}578579#[test]580fn drop_executor_and_then_drop_finished_task() {581static DROP: AtomicUsize = AtomicUsize::new(0);582583let ex: Executor = Default::default();584let task = ex.spawn(async {585CallOnDrop(|| {586DROP.fetch_add(1, Ordering::SeqCst);587})588});589assert!(ex.try_tick());590591assert_eq!(DROP.load(Ordering::SeqCst), 0);592drop(ex);593assert_eq!(DROP.load(Ordering::SeqCst), 0);594drop(task);595assert_eq!(DROP.load(Ordering::SeqCst), 1);596}597598#[test]599fn drop_finished_task_and_then_drop_executor() {600static DROP: AtomicUsize = AtomicUsize::new(0);601602let ex: Executor = Default::default();603let task = ex.spawn(async {604CallOnDrop(|| {605DROP.fetch_add(1, Ordering::SeqCst);606})607});608assert!(ex.try_tick());609610assert_eq!(DROP.load(Ordering::SeqCst), 0);611drop(task);612assert_eq!(DROP.load(Ordering::SeqCst), 1);613drop(ex);614assert_eq!(DROP.load(Ordering::SeqCst), 1);615}616617struct CallOnDrop<F: Fn()>(F);618619impl<F: Fn()> Drop for CallOnDrop<F> {620fn drop(&mut self) {621(self.0)();622}623}624}625626#[cfg(test)]627mod local_queue {628use alloc::boxed::Box;629630use futures_lite::{future, pin};631632use super::Executor;633634#[test]635fn two_queues() {636future::block_on(async {637// Create an executor with two runners.638let ex: Executor = Default::default();639let (run1, run2) = (640ex.run(future::pending::<()>()),641ex.run(future::pending::<()>()),642);643let mut run1 = Box::pin(run1);644pin!(run2);645646// Poll them both.647assert!(future::poll_once(run1.as_mut()).await.is_none());648assert!(future::poll_once(run2.as_mut()).await.is_none());649650// Drop the first one, which should leave the local queue in the `None` state.651drop(run1);652assert!(future::poll_once(run2.as_mut()).await.is_none());653});654}655}656657658