// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34//! An Executor and future combinators based on operations that block on file descriptors.5//!6//! This crate is meant to be used with the `futures-rs` crate that provides further combinators7//! and utility functions to combine and manage futures. All futures will run until they block on a8//! file descriptor becoming readable or writable. Facilities are provided to register future9//! wakers based on such events.10//!11//! # Running top-level futures.12//!13//! Use helper functions based the desired behavior of your application.14//!15//! ## Completing one of several futures.16//!17//! If there are several top level tasks that should run until any one completes, use the "select"18//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`19//! function will return when the first future completes. The uncompleted futures will also be20//! returned so they can be run further or otherwise cleaned up. These functions are inspired by21//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and22//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),23//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).24//!25//! ## Completing all of several futures.26//!27//! If there are several top level tasks that all need to be completed, use the "complete" family28//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`29//! function will return only once all the futures passed to it have completed. These functions are30//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based31//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),32//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and33//! [`complete5`](fn.complete5.html).34//!35//! # Implementing new FD-based futures.36//!37//! For URing implementations should provide an implementation of the `IoSource` trait.38//! For the FD executor, new futures can use the existing ability to poll a source to build async39//! functionality on top of.40//!41//! # Implementations42//!43//! Currently there are two paths for using the asynchronous IO. One uses a WaitContext and drives44//! futures based on the FDs signaling they are ready for the opteration. This method will exist so45//! long as kernels < 5.4 are supported.46//! The other method submits operations to io_uring and is signaled when they complete. This is more47//! efficient, but only supported on kernel 5.4+.48//! If `IoSource::new` is used to interface with async IO, then the correct backend will be chosen49//! automatically.50//!51//! # Examples52//!53//! See the docs for `IoSource` if support for kernels <5.4 is required. Focus on `UringSource` if54//! all systems have support for io_uring.5556mod async_types;57pub mod audio_streams_async;58mod blocking;59mod common_executor;60mod complete;61mod event;62mod executor;63mod io_ext;64mod io_source;65pub mod mem;66mod queue;67mod select;68pub mod sync;69pub mod sys;70mod timer;71#[cfg(feature = "tokio")]72mod tokio_executor;73mod waker;7475use std::future::Future;76use std::pin::Pin;77use std::task::Poll;7879pub use async_types::*;80pub use base::Event;81#[cfg(any(target_os = "android", target_os = "linux"))]82pub use blocking::sys::linux::block_on::block_on;83pub use blocking::unblock;84pub use blocking::unblock_disarm;85pub use blocking::BlockingPool;86pub use blocking::CancellableBlockingPool;87pub use blocking::TimeoutAction;88pub use event::EventAsync;89pub use executor::Executor;90pub use executor::ExecutorKind;91pub(crate) use executor::ExecutorTrait;92pub use executor::TaskHandle;93#[cfg(windows)]94pub use futures::executor::block_on;95use futures::stream::FuturesUnordered;96pub use io_ext::AsyncError;97pub use io_ext::AsyncResult;98pub use io_ext::AsyncWrapper;99pub use io_ext::IntoAsync;100pub use io_source::IoSource;101pub use mem::BackingMemory;102pub use mem::MemRegion;103pub use mem::MemRegionIter;104pub use mem::VecIoWrapper;105use remain::sorted;106pub use select::SelectResult;107#[cfg(any(target_os = "android", target_os = "linux"))]108pub use sys::linux::uring_executor::is_uring_stable;109use thiserror::Error as ThisError;110pub use timer::TimerAsync;111112#[sorted]113#[derive(ThisError, Debug)]114pub enum Error {115/// Error from EventAsync116#[error("Failure in EventAsync: {0}")]117EventAsync(base::Error),118/// Error from the handle executor.119#[cfg(windows)]120#[error("Failure in the handle executor: {0}")]121HandleExecutor(sys::windows::handle_executor::Error),122#[error("IO error: {0}")]123Io(std::io::Error),124/// Error from the polled(FD) source, which includes error from the FD executor.125#[cfg(any(target_os = "android", target_os = "linux"))]126#[error("An error with a poll source: {0}")]127PollSource(sys::linux::poll_source::Error),128/// Error from Timer.129#[error("Failure in Timer: {0}")]130Timer(base::Error),131/// Error from TimerFd.132#[error("Failure in TimerAsync: {0}")]133TimerAsync(AsyncError),134/// Error from the uring executor.135#[cfg(any(target_os = "android", target_os = "linux"))]136#[error("Failure in the uring executor: {0}")]137URingExecutor(sys::linux::uring_executor::Error),138}139pub type Result<T> = std::result::Result<T, Error>;140141/// Heterogeneous collection of `async_task:Task` that are running in a "detached" state.142///143/// We keep them around to ensure they are dropped before the executor they are running on.144pub(crate) struct DetachedTasks(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>);145146impl DetachedTasks {147pub(crate) fn new() -> Self {148DetachedTasks(FuturesUnordered::new())149}150151pub(crate) fn push<R: Send + 'static>(&self, task: async_task::Task<R>) {152// Convert to fallible, otherwise poll could panic if the `Runnable` is dropped early.153let task = task.fallible();154self.0.push(Box::pin(async {155let _ = task.await;156}));157}158159/// Polls all the tasks, dropping any that complete.160pub(crate) fn poll(&mut self, cx: &mut std::task::Context) {161use futures::Stream;162while let Poll::Ready(Some(_)) = Pin::new(&mut self.0).poll_next(cx) {}163}164}165166// Select helpers to run until any future completes.167168/// Creates a combinator that runs the two given futures until one completes, returning a tuple169/// containing the result of the finished future and the still pending future.170///171/// # Example172///173/// ```174/// use cros_async::{SelectResult, select2, block_on};175/// use futures::future::pending;176/// use futures::pin_mut;177///178/// let first = async {5};179/// let second = async {let () = pending().await;};180/// pin_mut!(first);181/// pin_mut!(second);182/// match block_on(select2(first, second)) {183/// (SelectResult::Finished(5), SelectResult::Pending(_second)) => (),184/// _ => panic!("Select didn't return the first future"),185/// };186/// ```187pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(188f1: F1,189f2: F2,190) -> (SelectResult<F1>, SelectResult<F2>) {191select::Select2::new(f1, f2).await192}193194/// Creates a combinator that runs the three given futures until one or more completes, returning a195/// tuple containing the result of the finished future(s) and the still pending future(s).196///197/// # Example198///199/// ```200/// use cros_async::{SelectResult, select3, block_on};201/// use futures::future::pending;202/// use futures::pin_mut;203///204/// let first = async {4};205/// let second = async {let () = pending().await;};206/// let third = async {5};207/// pin_mut!(first);208/// pin_mut!(second);209/// pin_mut!(third);210/// match block_on(select3(first, second, third)) {211/// (SelectResult::Finished(4),212/// SelectResult::Pending(_second),213/// SelectResult::Finished(5)) => (),214/// _ => panic!("Select didn't return the futures"),215/// };216/// ```217pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(218f1: F1,219f2: F2,220f3: F3,221) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {222select::Select3::new(f1, f2, f3).await223}224225/// Creates a combinator that runs the four given futures until one or more completes, returning a226/// tuple containing the result of the finished future(s) and the still pending future(s).227///228/// # Example229///230/// ```231/// use cros_async::{SelectResult, select4, block_on};232/// use futures::future::pending;233/// use futures::pin_mut;234///235/// let first = async {4};236/// let second = async {let () = pending().await;};237/// let third = async {5};238/// let fourth = async {let () = pending().await;};239/// pin_mut!(first);240/// pin_mut!(second);241/// pin_mut!(third);242/// pin_mut!(fourth);243/// match block_on(select4(first, second, third, fourth)) {244/// (SelectResult::Finished(4), SelectResult::Pending(_second),245/// SelectResult::Finished(5), SelectResult::Pending(_fourth)) => (),246/// _ => panic!("Select didn't return the futures"),247/// };248/// ```249pub async fn select4<250F1: Future + Unpin,251F2: Future + Unpin,252F3: Future + Unpin,253F4: Future + Unpin,254>(255f1: F1,256f2: F2,257f3: F3,258f4: F4,259) -> (260SelectResult<F1>,261SelectResult<F2>,262SelectResult<F3>,263SelectResult<F4>,264) {265select::Select4::new(f1, f2, f3, f4).await266}267268/// Creates a combinator that runs the five given futures until one or more completes, returning a269/// tuple containing the result of the finished future(s) and the still pending future(s).270///271/// # Example272///273/// ```274/// use cros_async::{SelectResult, select5, block_on};275/// use futures::future::pending;276/// use futures::pin_mut;277///278/// let first = async {4};279/// let second = async {let () = pending().await;};280/// let third = async {5};281/// let fourth = async {let () = pending().await;};282/// let fifth = async {6};283/// pin_mut!(first);284/// pin_mut!(second);285/// pin_mut!(third);286/// pin_mut!(fourth);287/// pin_mut!(fifth);288/// match block_on(select5(first, second, third, fourth, fifth)) {289/// (SelectResult::Finished(4), SelectResult::Pending(_second),290/// SelectResult::Finished(5), SelectResult::Pending(_fourth),291/// SelectResult::Finished(6)) => (),292/// _ => panic!("Select didn't return the futures"),293/// };294/// ```295pub async fn select5<296F1: Future + Unpin,297F2: Future + Unpin,298F3: Future + Unpin,299F4: Future + Unpin,300F5: Future + Unpin,301>(302f1: F1,303f2: F2,304f3: F3,305f4: F4,306f5: F5,307) -> (308SelectResult<F1>,309SelectResult<F2>,310SelectResult<F3>,311SelectResult<F4>,312SelectResult<F5>,313) {314select::Select5::new(f1, f2, f3, f4, f5).await315}316317/// Creates a combinator that runs the six given futures until one or more completes, returning a318/// tuple containing the result of the finished future(s) and the still pending future(s).319///320/// # Example321///322/// ```323/// use cros_async::{SelectResult, select6, block_on};324/// use futures::future::pending;325/// use futures::pin_mut;326///327/// let first = async {1};328/// let second = async {let () = pending().await;};329/// let third = async {3};330/// let fourth = async {let () = pending().await;};331/// let fifth = async {5};332/// let sixth = async {6};333/// pin_mut!(first);334/// pin_mut!(second);335/// pin_mut!(third);336/// pin_mut!(fourth);337/// pin_mut!(fifth);338/// pin_mut!(sixth);339/// match block_on(select6(first, second, third, fourth, fifth, sixth)) {340/// (SelectResult::Finished(1), SelectResult::Pending(_second),341/// SelectResult::Finished(3), SelectResult::Pending(_fourth),342/// SelectResult::Finished(5), SelectResult::Finished(6)) => (),343/// _ => panic!("Select didn't return the futures"),344/// };345/// ```346pub async fn select6<347F1: Future + Unpin,348F2: Future + Unpin,349F3: Future + Unpin,350F4: Future + Unpin,351F5: Future + Unpin,352F6: Future + Unpin,353>(354f1: F1,355f2: F2,356f3: F3,357f4: F4,358f5: F5,359f6: F6,360) -> (361SelectResult<F1>,362SelectResult<F2>,363SelectResult<F3>,364SelectResult<F4>,365SelectResult<F5>,366SelectResult<F6>,367) {368select::Select6::new(f1, f2, f3, f4, f5, f6).await369}370371pub async fn select7<372F1: Future + Unpin,373F2: Future + Unpin,374F3: Future + Unpin,375F4: Future + Unpin,376F5: Future + Unpin,377F6: Future + Unpin,378F7: Future + Unpin,379>(380f1: F1,381f2: F2,382f3: F3,383f4: F4,384f5: F5,385f6: F6,386f7: F7,387) -> (388SelectResult<F1>,389SelectResult<F2>,390SelectResult<F3>,391SelectResult<F4>,392SelectResult<F5>,393SelectResult<F6>,394SelectResult<F7>,395) {396select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await397}398399pub async fn select8<400F1: Future + Unpin,401F2: Future + Unpin,402F3: Future + Unpin,403F4: Future + Unpin,404F5: Future + Unpin,405F6: Future + Unpin,406F7: Future + Unpin,407F8: Future + Unpin,408>(409f1: F1,410f2: F2,411f3: F3,412f4: F4,413f5: F5,414f6: F6,415f7: F7,416f8: F8,417) -> (418SelectResult<F1>,419SelectResult<F2>,420SelectResult<F3>,421SelectResult<F4>,422SelectResult<F5>,423SelectResult<F6>,424SelectResult<F7>,425SelectResult<F8>,426) {427select::Select8::new(f1, f2, f3, f4, f5, f6, f7, f8).await428}429430pub async fn select9<431F1: Future + Unpin,432F2: Future + Unpin,433F3: Future + Unpin,434F4: Future + Unpin,435F5: Future + Unpin,436F6: Future + Unpin,437F7: Future + Unpin,438F8: Future + Unpin,439F9: Future + Unpin,440>(441f1: F1,442f2: F2,443f3: F3,444f4: F4,445f5: F5,446f6: F6,447f7: F7,448f8: F8,449f9: F9,450) -> (451SelectResult<F1>,452SelectResult<F2>,453SelectResult<F3>,454SelectResult<F4>,455SelectResult<F5>,456SelectResult<F6>,457SelectResult<F7>,458SelectResult<F8>,459SelectResult<F9>,460) {461select::Select9::new(f1, f2, f3, f4, f5, f6, f7, f8, f9).await462}463464pub async fn select10<465F1: Future + Unpin,466F2: Future + Unpin,467F3: Future + Unpin,468F4: Future + Unpin,469F5: Future + Unpin,470F6: Future + Unpin,471F7: Future + Unpin,472F8: Future + Unpin,473F9: Future + Unpin,474F10: Future + Unpin,475>(476f1: F1,477f2: F2,478f3: F3,479f4: F4,480f5: F5,481f6: F6,482f7: F7,483f8: F8,484f9: F9,485f10: F10,486) -> (487SelectResult<F1>,488SelectResult<F2>,489SelectResult<F3>,490SelectResult<F4>,491SelectResult<F5>,492SelectResult<F6>,493SelectResult<F7>,494SelectResult<F8>,495SelectResult<F9>,496SelectResult<F10>,497) {498select::Select10::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10).await499}500501pub async fn select11<502F1: Future + Unpin,503F2: Future + Unpin,504F3: Future + Unpin,505F4: Future + Unpin,506F5: Future + Unpin,507F6: Future + Unpin,508F7: Future + Unpin,509F8: Future + Unpin,510F9: Future + Unpin,511F10: Future + Unpin,512F11: Future + Unpin,513>(514f1: F1,515f2: F2,516f3: F3,517f4: F4,518f5: F5,519f6: F6,520f7: F7,521f8: F8,522f9: F9,523f10: F10,524f11: F11,525) -> (526SelectResult<F1>,527SelectResult<F2>,528SelectResult<F3>,529SelectResult<F4>,530SelectResult<F5>,531SelectResult<F6>,532SelectResult<F7>,533SelectResult<F8>,534SelectResult<F9>,535SelectResult<F10>,536SelectResult<F11>,537) {538select::Select11::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11).await539}540541pub async fn select12<542F1: Future + Unpin,543F2: Future + Unpin,544F3: Future + Unpin,545F4: Future + Unpin,546F5: Future + Unpin,547F6: Future + Unpin,548F7: Future + Unpin,549F8: Future + Unpin,550F9: Future + Unpin,551F10: Future + Unpin,552F11: Future + Unpin,553F12: Future + Unpin,554>(555f1: F1,556f2: F2,557f3: F3,558f4: F4,559f5: F5,560f6: F6,561f7: F7,562f8: F8,563f9: F9,564f10: F10,565f11: F11,566f12: F12,567) -> (568SelectResult<F1>,569SelectResult<F2>,570SelectResult<F3>,571SelectResult<F4>,572SelectResult<F5>,573SelectResult<F6>,574SelectResult<F7>,575SelectResult<F8>,576SelectResult<F9>,577SelectResult<F10>,578SelectResult<F11>,579SelectResult<F12>,580) {581select::Select12::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12).await582}583584// Combination helpers to run until all futures are complete.585586/// Creates a combinator that runs the two given futures to completion, returning a tuple of the587/// outputs each yields.588///589/// # Example590///591/// ```592/// use cros_async::{complete2, block_on};593///594/// let first = async {5};595/// let second = async {6};596/// assert_eq!(block_on(complete2(first, second)), (5,6));597/// ```598pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)599where600F1: Future,601F2: Future,602{603complete::Complete2::new(f1, f2).await604}605606/// Creates a combinator that runs the three given futures to completion, returning a tuple of the607/// outputs each yields.608///609/// # Example610///611/// ```612/// use cros_async::{complete3, block_on};613///614/// let first = async {5};615/// let second = async {6};616/// let third = async {7};617/// assert_eq!(block_on(complete3(first, second, third)), (5,6,7));618/// ```619pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)620where621F1: Future,622F2: Future,623F3: Future,624{625complete::Complete3::new(f1, f2, f3).await626}627628/// Creates a combinator that runs the four given futures to completion, returning a tuple of the629/// outputs each yields.630///631/// # Example632///633/// ```634/// use cros_async::{complete4, block_on};635///636/// let first = async {5};637/// let second = async {6};638/// let third = async {7};639/// let fourth = async {8};640/// assert_eq!(block_on(complete4(first, second, third, fourth)), (5,6,7,8));641/// ```642pub async fn complete4<F1, F2, F3, F4>(643f1: F1,644f2: F2,645f3: F3,646f4: F4,647) -> (F1::Output, F2::Output, F3::Output, F4::Output)648where649F1: Future,650F2: Future,651F3: Future,652F4: Future,653{654complete::Complete4::new(f1, f2, f3, f4).await655}656657/// Creates a combinator that runs the five given futures to completion, returning a tuple of the658/// outputs each yields.659///660/// # Example661///662/// ```663/// use cros_async::{complete5, block_on};664///665/// let first = async {5};666/// let second = async {6};667/// let third = async {7};668/// let fourth = async {8};669/// let fifth = async {9};670/// assert_eq!(block_on(complete5(first, second, third, fourth, fifth)),671/// (5,6,7,8,9));672/// ```673pub async fn complete5<F1, F2, F3, F4, F5>(674f1: F1,675f2: F2,676f3: F3,677f4: F4,678f5: F5,679) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)680where681F1: Future,682F2: Future,683F3: Future,684F4: Future,685F5: Future,686{687complete::Complete5::new(f1, f2, f3, f4, f5).await688}689690691