//! This module provides an "ambient Tokio runtime"1//! [`with_ambient_tokio_runtime`]. Embedders of wasmtime-wasi may do so from2//! synchronous Rust, and not use tokio directly. The implementation of3//! wasmtime-wasi requires a tokio executor in a way that is [deeply tied to4//! its5//! design](https://github.com/bytecodealliance/wasmtime/issues/7973#issuecomment-1960513214).6//! When used from a synchronous wasmtime context, this module provides the7//! wrapper function [`in_tokio`] used throughout the shim implementations of8//! synchronous component binding `Host` traits in terms of the async ones.9//!10//! This module also provides a thin wrapper on tokio's tasks.11//! [`AbortOnDropJoinHandle`], which is exactly like a12//! [`tokio::task::JoinHandle`] except for the obvious behavioral change. This13//! whole crate, and any child crates which spawn tasks as part of their14//! implementations, should please use this crate's [`spawn`] and15//! [`spawn_blocking`] over tokio's. so we wanted the type name to stick out16//! if someone misses it.17//!18//! Each of these facilities should be used by dependencies of wasmtime-wasi19//! which when implementing component bindings.2021use std::future::Future;22use std::pin::Pin;23use std::sync::LazyLock;24use std::task::{Context, Poll, Waker};2526pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {27tokio::runtime::Builder::new_multi_thread()28.enable_time()29.enable_io()30.build()31.unwrap()32});3334/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when35/// the handle is dropped.36///37/// This behavior makes it easier to tie a worker task to the lifetime of a Resource38/// by keeping this handle owned by the Resource.39#[derive(Debug)]40pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);41impl<T> AbortOnDropJoinHandle<T> {42/// Abort the task and wait for it to finish. Optionally returns the result43/// of the task if it ran to completion prior to being aborted.44pub async fn cancel(mut self) -> Option<T> {45self.0.abort();4647match (&mut self.0).await {48Ok(value) => Some(value),49Err(err) if err.is_cancelled() => None,50Err(err) => std::panic::resume_unwind(err.into_panic()),51}52}53}54impl<T> Drop for AbortOnDropJoinHandle<T> {55fn drop(&mut self) {56self.0.abort()57}58}59impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {60type Target = tokio::task::JoinHandle<T>;61fn deref(&self) -> &Self::Target {62&self.063}64}65impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {66fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {67&mut self.068}69}70impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {71fn from(jh: tokio::task::JoinHandle<T>) -> Self {72AbortOnDropJoinHandle(jh)73}74}75impl<T> Future for AbortOnDropJoinHandle<T> {76type Output = T;77fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {78match Pin::new(&mut self.as_mut().0).poll(cx) {79Poll::Pending => Poll::Pending,80Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),81}82}83}8485pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>86where87F: Future + Send + 'static,88F::Output: Send + 'static,89{90let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));91AbortOnDropJoinHandle(j)92}9394pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>95where96F: FnOnce() -> R + Send + 'static,97R: Send + 'static,98{99let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));100AbortOnDropJoinHandle(j)101}102103pub fn in_tokio<F: Future>(f: F) -> F::Output {104match tokio::runtime::Handle::try_current() {105Ok(h) => {106let _enter = h.enter();107h.block_on(f)108}109// The `yield_now` here is non-obvious and if you're reading this110// you're likely curious about why it's here. This is currently required111// to get some features of "sync mode" working correctly, such as with112// the CLI. To illustrate why this is required, consider a program113// organized as:114//115// * A program has a `pollable` that it's waiting on.116// * This `pollable` is always ready .117// * Actually making the corresponding operation ready, however,118// requires some background work on Tokio's part.119// * The program is looping on "wait for readiness" coupled with120// performing the operation.121//122// In this situation this program ends up infinitely looping in waiting123// for pollables. The reason appears to be that when we enter the tokio124// runtime here it doesn't necessary yield to background work because125// the provided future `f` is ready immediately. The future `f` will run126// through the list of pollables and determine one of them is ready.127//128// Historically this happened with UDP sockets. A test send a datagram129// from one socket to another and the other socket infinitely didn't130// receive the data. This appeared to be because the server socket was131// waiting on `READABLE | WRITABLE` (which is itself a bug but ignore132// that) and the socket was currently in the "writable" state but never133// ended up receiving a notification for the "readable" state. Moving134// the socket to "readable" would require Tokio to perform some135// background work via epoll/kqueue/handle events but if the future136// provided here is always ready, then that never happened.137//138// Thus the `yield_now()` is an attempt to force Tokio to go do some139// background work eventually and look at new interest masks for140// example. This is a bit of a kludge but everything's already a bit141// wonky in synchronous mode anyway. Note that this is hypothesized to142// not be an issue in async mode because async mode typically has the143// Tokio runtime in a separate thread or otherwise participating in a144// larger application, it's only here in synchronous mode where we145// effectively own the runtime that we need some special care.146Err(_) => {147let _enter = RUNTIME.enter();148RUNTIME.block_on(async move {149tokio::task::yield_now().await;150f.await151})152}153}154}155156/// Executes the closure `f` with an "ambient Tokio runtime" which basically157/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.158///159/// If a `Handle` is already available, e.g. in async contexts, then `f` is run160/// immediately. Otherwise for synchronous contexts this crate's fallback161/// runtime is configured and then `f` is executed.162pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {163match tokio::runtime::Handle::try_current() {164Ok(_) => f(),165Err(_) => {166let _enter = RUNTIME.enter();167f()168}169}170}171172/// Attempts to get the result of a `future`.173///174/// This function does not block and will poll the provided future once. If the175/// result is here then `Some` is returned, otherwise `None` is returned.176///177/// Note that by polling `future` this means that `future` must be re-polled178/// later if it's to wake up a task.179pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>180where181F: Future,182{183let mut task = Context::from_waker(Waker::noop());184match future.poll(&mut task) {185Poll::Ready(result) => Some(result),186Poll::Pending => None,187}188}189190191