Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/runtime.rs
1691 views
1
//! This module provides an "ambient Tokio runtime"
2
//! [`with_ambient_tokio_runtime`]. Embedders of wasmtime-wasi may do so from
3
//! synchronous Rust, and not use tokio directly. The implementation of
4
//! wasmtime-wasi requires a tokio executor in a way that is [deeply tied to
5
//! its
6
//! design](https://github.com/bytecodealliance/wasmtime/issues/7973#issuecomment-1960513214).
7
//! When used from a synchronous wasmtime context, this module provides the
8
//! wrapper function [`in_tokio`] used throughout the shim implementations of
9
//! synchronous component binding `Host` traits in terms of the async ones.
10
//!
11
//! This module also provides a thin wrapper on tokio's tasks.
12
//! [`AbortOnDropJoinHandle`], which is exactly like a
13
//! [`tokio::task::JoinHandle`] except for the obvious behavioral change. This
14
//! whole crate, and any child crates which spawn tasks as part of their
15
//! implementations, should please use this crate's [`spawn`] and
16
//! [`spawn_blocking`] over tokio's. so we wanted the type name to stick out
17
//! if someone misses it.
18
//!
19
//! Each of these facilities should be used by dependencies of wasmtime-wasi
20
//! which when implementing component bindings.
21
22
use std::future::Future;
23
use std::pin::Pin;
24
use std::sync::LazyLock;
25
use std::task::{Context, Poll, Waker};
26
27
pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
28
tokio::runtime::Builder::new_multi_thread()
29
.enable_time()
30
.enable_io()
31
.build()
32
.unwrap()
33
});
34
35
/// Exactly like a [`tokio::task::JoinHandle`], except that it aborts the task when
36
/// the handle is dropped.
37
///
38
/// This behavior makes it easier to tie a worker task to the lifetime of a Resource
39
/// by keeping this handle owned by the Resource.
40
#[derive(Debug)]
41
pub struct AbortOnDropJoinHandle<T>(tokio::task::JoinHandle<T>);
42
impl<T> AbortOnDropJoinHandle<T> {
43
/// Abort the task and wait for it to finish. Optionally returns the result
44
/// of the task if it ran to completion prior to being aborted.
45
pub async fn cancel(mut self) -> Option<T> {
46
self.0.abort();
47
48
match (&mut self.0).await {
49
Ok(value) => Some(value),
50
Err(err) if err.is_cancelled() => None,
51
Err(err) => std::panic::resume_unwind(err.into_panic()),
52
}
53
}
54
}
55
impl<T> Drop for AbortOnDropJoinHandle<T> {
56
fn drop(&mut self) {
57
self.0.abort()
58
}
59
}
60
impl<T> std::ops::Deref for AbortOnDropJoinHandle<T> {
61
type Target = tokio::task::JoinHandle<T>;
62
fn deref(&self) -> &Self::Target {
63
&self.0
64
}
65
}
66
impl<T> std::ops::DerefMut for AbortOnDropJoinHandle<T> {
67
fn deref_mut(&mut self) -> &mut tokio::task::JoinHandle<T> {
68
&mut self.0
69
}
70
}
71
impl<T> From<tokio::task::JoinHandle<T>> for AbortOnDropJoinHandle<T> {
72
fn from(jh: tokio::task::JoinHandle<T>) -> Self {
73
AbortOnDropJoinHandle(jh)
74
}
75
}
76
impl<T> Future for AbortOnDropJoinHandle<T> {
77
type Output = T;
78
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79
match Pin::new(&mut self.as_mut().0).poll(cx) {
80
Poll::Pending => Poll::Pending,
81
Poll::Ready(r) => Poll::Ready(r.expect("child task panicked")),
82
}
83
}
84
}
85
86
pub fn spawn<F>(f: F) -> AbortOnDropJoinHandle<F::Output>
87
where
88
F: Future + Send + 'static,
89
F::Output: Send + 'static,
90
{
91
let j = with_ambient_tokio_runtime(|| tokio::task::spawn(f));
92
AbortOnDropJoinHandle(j)
93
}
94
95
pub fn spawn_blocking<F, R>(f: F) -> AbortOnDropJoinHandle<R>
96
where
97
F: FnOnce() -> R + Send + 'static,
98
R: Send + 'static,
99
{
100
let j = with_ambient_tokio_runtime(|| tokio::task::spawn_blocking(f));
101
AbortOnDropJoinHandle(j)
102
}
103
104
pub fn in_tokio<F: Future>(f: F) -> F::Output {
105
match tokio::runtime::Handle::try_current() {
106
Ok(h) => {
107
let _enter = h.enter();
108
h.block_on(f)
109
}
110
// The `yield_now` here is non-obvious and if you're reading this
111
// you're likely curious about why it's here. This is currently required
112
// to get some features of "sync mode" working correctly, such as with
113
// the CLI. To illustrate why this is required, consider a program
114
// organized as:
115
//
116
// * A program has a `pollable` that it's waiting on.
117
// * This `pollable` is always ready .
118
// * Actually making the corresponding operation ready, however,
119
// requires some background work on Tokio's part.
120
// * The program is looping on "wait for readiness" coupled with
121
// performing the operation.
122
//
123
// In this situation this program ends up infinitely looping in waiting
124
// for pollables. The reason appears to be that when we enter the tokio
125
// runtime here it doesn't necessary yield to background work because
126
// the provided future `f` is ready immediately. The future `f` will run
127
// through the list of pollables and determine one of them is ready.
128
//
129
// Historically this happened with UDP sockets. A test send a datagram
130
// from one socket to another and the other socket infinitely didn't
131
// receive the data. This appeared to be because the server socket was
132
// waiting on `READABLE | WRITABLE` (which is itself a bug but ignore
133
// that) and the socket was currently in the "writable" state but never
134
// ended up receiving a notification for the "readable" state. Moving
135
// the socket to "readable" would require Tokio to perform some
136
// background work via epoll/kqueue/handle events but if the future
137
// provided here is always ready, then that never happened.
138
//
139
// Thus the `yield_now()` is an attempt to force Tokio to go do some
140
// background work eventually and look at new interest masks for
141
// example. This is a bit of a kludge but everything's already a bit
142
// wonky in synchronous mode anyway. Note that this is hypothesized to
143
// not be an issue in async mode because async mode typically has the
144
// Tokio runtime in a separate thread or otherwise participating in a
145
// larger application, it's only here in synchronous mode where we
146
// effectively own the runtime that we need some special care.
147
Err(_) => {
148
let _enter = RUNTIME.enter();
149
RUNTIME.block_on(async move {
150
tokio::task::yield_now().await;
151
f.await
152
})
153
}
154
}
155
}
156
157
/// Executes the closure `f` with an "ambient Tokio runtime" which basically
158
/// means that if code in `f` tries to get a runtime `Handle` it'll succeed.
159
///
160
/// If a `Handle` is already available, e.g. in async contexts, then `f` is run
161
/// immediately. Otherwise for synchronous contexts this crate's fallback
162
/// runtime is configured and then `f` is executed.
163
pub fn with_ambient_tokio_runtime<R>(f: impl FnOnce() -> R) -> R {
164
match tokio::runtime::Handle::try_current() {
165
Ok(_) => f(),
166
Err(_) => {
167
let _enter = RUNTIME.enter();
168
f()
169
}
170
}
171
}
172
173
/// Attempts to get the result of a `future`.
174
///
175
/// This function does not block and will poll the provided future once. If the
176
/// result is here then `Some` is returned, otherwise `None` is returned.
177
///
178
/// Note that by polling `future` this means that `future` must be re-polled
179
/// later if it's to wake up a task.
180
pub fn poll_noop<F>(future: Pin<&mut F>) -> Option<F::Output>
181
where
182
F: Future,
183
{
184
let mut task = Context::from_waker(Waker::noop());
185
match future.poll(&mut task) {
186
Poll::Ready(result) => Some(result),
187
Poll::Pending => None,
188
}
189
}
190
191