// Copyright 2024 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::future::Future;5use std::pin::Pin;6use std::sync::Arc;7use std::sync::OnceLock;89#[cfg(any(target_os = "android", target_os = "linux"))]10use base::warn;11#[cfg(any(target_os = "android", target_os = "linux"))]12use base::AsRawDescriptors;13#[cfg(any(target_os = "android", target_os = "linux"))]14use base::RawDescriptor;15use serde::Deserialize;16use serde_keyvalue::argh::FromArgValue;17use serde_keyvalue::ErrorKind;18use serde_keyvalue::KeyValueDeserializer;1920use crate::common_executor;21use crate::common_executor::RawExecutor;22#[cfg(any(target_os = "android", target_os = "linux"))]23use crate::sys::linux;24#[cfg(windows)]25use crate::sys::windows;26use crate::sys::ExecutorKindSys;27use crate::AsyncResult;28use crate::IntoAsync;29use crate::IoSource;3031cfg_if::cfg_if! {32if #[cfg(feature = "tokio")] {33use crate::tokio_executor::TokioExecutor;34use crate::tokio_executor::TokioTaskHandle;35}36}3738#[derive(Clone, Copy, Debug, PartialEq, Eq)]39pub enum ExecutorKind {40SysVariants(ExecutorKindSys),41#[cfg(feature = "tokio")]42Tokio,43}4445impl From<ExecutorKindSys> for ExecutorKind {46fn from(e: ExecutorKindSys) -> ExecutorKind {47ExecutorKind::SysVariants(e)48}49}5051/// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.52/// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and53/// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.54static DEFAULT_EXECUTOR_KIND: OnceLock<ExecutorKind> = OnceLock::new();5556impl Default for ExecutorKind {57fn default() -> Self {58#[cfg(any(target_os = "android", target_os = "linux"))]59let default_fn = || ExecutorKindSys::Fd.into();60#[cfg(windows)]61let default_fn = || ExecutorKindSys::Handle.into();62*DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)63}64}6566/// The error type for [`Executor::set_default_executor_kind()`].67#[derive(thiserror::Error, Debug)]68pub enum SetDefaultExecutorKindError {69/// The default executor kind is set more than once.70#[error("The default executor kind is already set to {0:?}")]71SetMoreThanOnce(ExecutorKind),7273#[cfg(any(target_os = "android", target_os = "linux"))]74/// io_uring is unavailable. The reason might be the lack of the kernel support,75/// but is not limited to that.76#[error("io_uring is unavailable: {0}")]77UringUnavailable(linux::uring_executor::Error),78}7980impl FromArgValue for ExecutorKind {81fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {82// `from_arg_value` returns a `String` as error, but our deserializer API defines its own83// error type. Perform parsing from a closure so we can easily map returned errors.84let builder = move || {85let mut des = KeyValueDeserializer::from(value);8687let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {88#[cfg(any(target_os = "android", target_os = "linux"))]89("epoll", None) => ExecutorKindSys::Fd.into(),90#[cfg(any(target_os = "android", target_os = "linux"))]91("uring", None) => ExecutorKindSys::Uring.into(),92#[cfg(windows)]93("handle", None) => ExecutorKindSys::Handle.into(),94#[cfg(windows)]95("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),96#[cfg(windows)]97("overlapped", Some(',')) => {98if des.parse_identifier()? != "concurrency" {99let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());100return Err(des.error_here(kind));101}102if des.next_char() != Some('=') {103return Err(des.error_here(ErrorKind::ExpectedEqual));104}105let concurrency = des.parse_number()?;106ExecutorKindSys::Overlapped {107concurrency: Some(concurrency),108}109.into()110}111#[cfg(feature = "tokio")]112("tokio", None) => ExecutorKind::Tokio,113(_identifier, _next) => {114let kind = ErrorKind::SerdeError("unexpected kind".to_string());115return Err(des.error_here(kind));116}117};118des.finish()?;119Ok(kind)120};121122builder().map_err(|e| e.to_string())123}124}125126impl serde::Serialize for ExecutorKind {127fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>128where129S: serde::Serializer,130{131match self {132ExecutorKind::SysVariants(sv) => sv.serialize(serializer),133#[cfg(feature = "tokio")]134ExecutorKind::Tokio => "tokio".serialize(serializer),135}136}137}138139impl<'de> Deserialize<'de> for ExecutorKind {140fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>141where142D: serde::Deserializer<'de>,143{144let string = String::deserialize(deserializer)?;145ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)146}147}148149/// Reference to a task managed by the executor.150///151/// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to152/// continue running the background.153///154/// `await`ing the `TaskHandle` waits for the task to finish and yields its result.155pub enum TaskHandle<R> {156#[cfg(any(target_os = "android", target_os = "linux"))]157Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),158#[cfg(any(target_os = "android", target_os = "linux"))]159Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),160#[cfg(windows)]161Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),162#[cfg(feature = "tokio")]163Tokio(TokioTaskHandle<R>),164}165166impl<R: Send + 'static> TaskHandle<R> {167pub fn detach(self) {168match self {169#[cfg(any(target_os = "android", target_os = "linux"))]170TaskHandle::Fd(f) => f.detach(),171#[cfg(any(target_os = "android", target_os = "linux"))]172TaskHandle::Uring(u) => u.detach(),173#[cfg(windows)]174TaskHandle::Handle(h) => h.detach(),175#[cfg(feature = "tokio")]176TaskHandle::Tokio(t) => t.detach(),177}178}179180// Cancel the task and wait for it to stop. Returns the result of the task if it was already181// finished.182pub async fn cancel(self) -> Option<R> {183match self {184#[cfg(any(target_os = "android", target_os = "linux"))]185TaskHandle::Fd(f) => f.cancel().await,186#[cfg(any(target_os = "android", target_os = "linux"))]187TaskHandle::Uring(u) => u.cancel().await,188#[cfg(windows)]189TaskHandle::Handle(h) => h.cancel().await,190#[cfg(feature = "tokio")]191TaskHandle::Tokio(t) => t.cancel().await,192}193}194}195196impl<R: 'static> Future for TaskHandle<R> {197type Output = R;198199fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {200match self.get_mut() {201#[cfg(any(target_os = "android", target_os = "linux"))]202TaskHandle::Fd(f) => Pin::new(f).poll(cx),203#[cfg(any(target_os = "android", target_os = "linux"))]204TaskHandle::Uring(u) => Pin::new(u).poll(cx),205#[cfg(windows)]206TaskHandle::Handle(h) => Pin::new(h).poll(cx),207#[cfg(feature = "tokio")]208TaskHandle::Tokio(t) => Pin::new(t).poll(cx),209}210}211}212213pub(crate) trait ExecutorTrait {214fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;215216fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>217where218F: Future + Send + 'static,219F::Output: Send + 'static;220221fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>222where223F: FnOnce() -> R + Send + 'static,224R: Send + 'static;225226fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>227where228F: Future + 'static,229F::Output: 'static;230231fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;232}233234/// An executor for scheduling tasks that poll futures to completion.235///236/// All asynchronous operations must run within an executor, which is capable of spawning futures as237/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.238///239/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only240/// create a new reference, not a new executor.241///242/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be243/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &244/// interface duplication, but as far as we understand that is unavoidable.245///246/// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>247/// for further details.248///249/// # Examples250///251/// Concurrently wait for multiple files to become readable/writable and then read/write the data.252///253/// ```254/// use std::cmp::min;255/// use std::error::Error;256/// use std::fs::{File, OpenOptions};257///258/// use cros_async::{AsyncResult, Executor, IoSource, complete3};259/// const CHUNK_SIZE: usize = 32;260///261/// // Write all bytes from `data` to `f`.262/// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {263/// while data.len() > 0 {264/// let (count, mut buf) = f.write_from_vec(None, data).await?;265///266/// data = buf.split_off(count);267/// }268///269/// Ok(())270/// }271///272/// // Transfer `len` bytes of data from `from` to `to`.273/// async fn transfer_data(274/// from: IoSource<File>,275/// to: IoSource<File>,276/// len: usize,277/// ) -> AsyncResult<usize> {278/// let mut rem = len;279///280/// while rem > 0 {281/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];282/// let (count, mut data) = from.read_to_vec(None, buf).await?;283///284/// if count == 0 {285/// // End of file. Return the number of bytes transferred.286/// return Ok(len - rem);287/// }288///289/// data.truncate(count);290/// write_file(&to, data).await?;291///292/// rem = rem.saturating_sub(count);293/// }294///295/// Ok(len)296/// }297///298/// #[cfg(any(target_os = "android", target_os = "linux"))]299/// # fn do_it() -> Result<(), Box<dyn Error>> {300/// let ex = Executor::new()?;301///302/// let (rx, tx) = base::linux::pipe()?;303/// let zero = File::open("/dev/zero")?;304/// let zero_bytes = CHUNK_SIZE * 7;305/// let zero_to_pipe = transfer_data(306/// ex.async_from(zero)?,307/// ex.async_from(tx.try_clone()?)?,308/// zero_bytes,309/// );310///311/// let rand = File::open("/dev/urandom")?;312/// let rand_bytes = CHUNK_SIZE * 19;313/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);314///315/// let null = OpenOptions::new().write(true).open("/dev/null")?;316/// let null_bytes = zero_bytes + rand_bytes;317/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);318///319/// ex.run_until(complete3(320/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },321/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },322/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },323/// ))?;324///325/// # Ok(())326/// # }327/// #[cfg(any(target_os = "android", target_os = "linux"))]328/// # do_it().unwrap();329/// ```330#[derive(Clone)]331pub enum Executor {332#[cfg(any(target_os = "android", target_os = "linux"))]333Fd(Arc<RawExecutor<linux::EpollReactor>>),334#[cfg(any(target_os = "android", target_os = "linux"))]335Uring(Arc<RawExecutor<linux::UringReactor>>),336#[cfg(windows)]337Handle(Arc<RawExecutor<windows::HandleReactor>>),338#[cfg(windows)]339Overlapped(Arc<RawExecutor<windows::HandleReactor>>),340#[cfg(feature = "tokio")]341Tokio(TokioExecutor),342}343344impl Executor {345/// Create a new `Executor`.346pub fn new() -> AsyncResult<Self> {347Executor::with_executor_kind(ExecutorKind::default())348}349350/// Create a new `Executor` of the given `ExecutorKind`.351pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {352Ok(match kind {353#[cfg(any(target_os = "android", target_os = "linux"))]354ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),355#[cfg(any(target_os = "android", target_os = "linux"))]356ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {357Executor::Uring(RawExecutor::new()?)358}359#[cfg(windows)]360ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {361Executor::Handle(RawExecutor::new()?)362}363#[cfg(windows)]364ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {365let reactor = match concurrency {366Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,367None => windows::HandleReactor::new()?,368};369Executor::Overlapped(RawExecutor::new_with(reactor)?)370}371#[cfg(feature = "tokio")]372ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),373})374}375376/// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.377pub fn set_default_executor_kind(378executor_kind: ExecutorKind,379) -> Result<(), SetDefaultExecutorKindError> {380#[cfg(any(target_os = "android", target_os = "linux"))]381if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {382linux::uring_executor::check_uring_availability()383.map_err(SetDefaultExecutorKindError::UringUnavailable)?;384if !crate::is_uring_stable() {385warn!(386"Enabling io_uring executor on the kernel version where io_uring is unstable"387);388}389}390DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|391// `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.392SetDefaultExecutorKindError::SetMoreThanOnce(393*DEFAULT_EXECUTOR_KIND394.get()395.expect("Failed to get DEFAULT_EXECUTOR_KIND"),396))397}398399/// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned400/// `IoSource` to directly start async operations without needing a separate reference to the401/// executor.402pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {403match self {404#[cfg(any(target_os = "android", target_os = "linux"))]405Executor::Fd(ex) => ex.async_from(f),406#[cfg(any(target_os = "android", target_os = "linux"))]407Executor::Uring(ex) => ex.async_from(f),408#[cfg(windows)]409Executor::Handle(ex) => ex.async_from(f),410#[cfg(windows)]411Executor::Overlapped(ex) => ex.async_from(f),412#[cfg(feature = "tokio")]413Executor::Tokio(ex) => ex.async_from(f),414}415}416417/// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the418/// If the executor is not overlapped, then Handle source is returned.419/// returned `IoSource` to directly start async operations without needing a separate reference420/// to the executor.421#[cfg(windows)]422pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {423match self {424Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(425f, ex, false,426)?)),427_ => self.async_from(f),428}429}430431/// Spawn a new future for this executor to run to completion. Callers may use the returned432/// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel433/// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the434/// future associated with it use `TaskHandle::detach`.435///436/// # Examples437///438/// ```439/// # use cros_async::AsyncResult;440/// # fn example_spawn() -> AsyncResult<()> {441/// # use std::thread;442///443/// # use cros_async::Executor;444/// use futures::executor::block_on;445///446/// # let ex = Executor::new()?;447///448/// # // Spawn a thread that runs the executor.449/// # let ex2 = ex.clone();450/// # thread::spawn(move || ex2.run());451///452/// let task = ex.spawn(async { 7 + 13 });453///454/// let result = block_on(task);455/// assert_eq!(result, 20);456/// # Ok(())457/// # }458///459/// # example_spawn().unwrap();460/// ```461pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>462where463F: Future + Send + 'static,464F::Output: Send + 'static,465{466match self {467#[cfg(any(target_os = "android", target_os = "linux"))]468Executor::Fd(ex) => ex.spawn(f),469#[cfg(any(target_os = "android", target_os = "linux"))]470Executor::Uring(ex) => ex.spawn(f),471#[cfg(windows)]472Executor::Handle(ex) => ex.spawn(f),473#[cfg(windows)]474Executor::Overlapped(ex) => ex.spawn(f),475#[cfg(feature = "tokio")]476Executor::Tokio(ex) => ex.spawn(f),477}478}479480/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without481/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same482/// thread where `run()` or `run_until()` is called.483///484/// # Panics485///486/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was487/// added by calling `spawn_local` from a different thread.488///489/// # Examples490///491/// ```492/// # use cros_async::AsyncResult;493/// # fn example_spawn_local() -> AsyncResult<()> {494/// # use cros_async::Executor;495///496/// # let ex = Executor::new()?;497///498/// let task = ex.spawn_local(async { 7 + 13 });499///500/// let result = ex.run_until(task)?;501/// assert_eq!(result, 20);502/// Ok(())503/// # }504///505/// # example_spawn_local().unwrap();506/// ```507pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>508where509F: Future + 'static,510F::Output: 'static,511{512match self {513#[cfg(any(target_os = "android", target_os = "linux"))]514Executor::Fd(ex) => ex.spawn_local(f),515#[cfg(any(target_os = "android", target_os = "linux"))]516Executor::Uring(ex) => ex.spawn_local(f),517#[cfg(windows)]518Executor::Handle(ex) => ex.spawn_local(f),519#[cfg(windows)]520Executor::Overlapped(ex) => ex.spawn_local(f),521#[cfg(feature = "tokio")]522Executor::Tokio(ex) => ex.spawn_local(f),523}524}525526/// Run the provided closure on a dedicated thread where blocking is allowed.527///528/// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping529/// the returned `TaskHandle` may not cancel the operation if it was already started on a530/// worker thread.531///532/// # Panics533///534/// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not535/// already completed.536///537/// # Examples538///539/// ```edition2018540/// # use cros_async::Executor;541///542/// # async fn do_it(ex: &Executor) {543/// let res = ex.spawn_blocking(move || {544/// // Do some CPU-intensive or blocking work here.545///546/// 42547/// }).await;548///549/// assert_eq!(res, 42);550/// # }551///552/// # let ex = Executor::new().unwrap();553/// # ex.run_until(do_it(&ex)).unwrap();554/// ```555pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>556where557F: FnOnce() -> R + Send + 'static,558R: Send + 'static,559{560match self {561#[cfg(any(target_os = "android", target_os = "linux"))]562Executor::Fd(ex) => ex.spawn_blocking(f),563#[cfg(any(target_os = "android", target_os = "linux"))]564Executor::Uring(ex) => ex.spawn_blocking(f),565#[cfg(windows)]566Executor::Handle(ex) => ex.spawn_blocking(f),567#[cfg(windows)]568Executor::Overlapped(ex) => ex.spawn_blocking(f),569#[cfg(feature = "tokio")]570Executor::Tokio(ex) => ex.spawn_blocking(f),571}572}573574/// Run the executor indefinitely, driving all spawned futures to completion. This method will575/// block the current thread and only return in the case of an error.576///577/// # Panics578///579/// Once this method has been called on a thread, it may only be called on that thread from that580/// point on. Attempting to call it from another thread will panic.581///582/// # Examples583///584/// ```585/// # use cros_async::AsyncResult;586/// # fn example_run() -> AsyncResult<()> {587/// use std::thread;588///589/// use cros_async::Executor;590/// use futures::executor::block_on;591///592/// let ex = Executor::new()?;593///594/// // Spawn a thread that runs the executor.595/// let ex2 = ex.clone();596/// thread::spawn(move || ex2.run());597///598/// let task = ex.spawn(async { 7 + 13 });599///600/// let result = block_on(task);601/// assert_eq!(result, 20);602/// # Ok(())603/// # }604///605/// # example_run().unwrap();606/// ```607pub fn run(&self) -> AsyncResult<()> {608self.run_until(std::future::pending())609}610611/// Drive all futures spawned in this executor until `f` completes. This method will block the612/// current thread only until `f` is complete and there may still be unfinished futures in the613/// executor.614///615/// # Panics616///617/// Once this method has been called on a thread, from then onwards it may only be called on618/// that thread. Attempting to call it from another thread will panic.619///620/// # Examples621///622/// ```623/// # use cros_async::AsyncResult;624/// # fn example_run_until() -> AsyncResult<()> {625/// use cros_async::Executor;626///627/// let ex = Executor::new()?;628///629/// let task = ex.spawn_local(async { 7 + 13 });630///631/// let result = ex.run_until(task)?;632/// assert_eq!(result, 20);633/// # Ok(())634/// # }635///636/// # example_run_until().unwrap();637/// ```638pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {639match self {640#[cfg(any(target_os = "android", target_os = "linux"))]641Executor::Fd(ex) => ex.run_until(f),642#[cfg(any(target_os = "android", target_os = "linux"))]643Executor::Uring(ex) => ex.run_until(f),644#[cfg(windows)]645Executor::Handle(ex) => ex.run_until(f),646#[cfg(windows)]647Executor::Overlapped(ex) => ex.run_until(f),648#[cfg(feature = "tokio")]649Executor::Tokio(ex) => ex.run_until(f),650}651}652}653654#[cfg(any(target_os = "android", target_os = "linux"))]655impl AsRawDescriptors for Executor {656fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {657match self {658Executor::Fd(ex) => ex.as_raw_descriptors(),659Executor::Uring(ex) => ex.as_raw_descriptors(),660#[cfg(feature = "tokio")]661Executor::Tokio(ex) => ex.as_raw_descriptors(),662}663}664}665666667