Path: blob/main/crates/wasi-http/src/handler.rs
3078 views
//! Provides utilities useful for dispatching incoming HTTP requests1//! `wasi:http/handler` guest instances.23#[cfg(feature = "p3")]4use crate::p3;5use futures::stream::{FuturesUnordered, StreamExt};6use std::collections::VecDeque;7use std::collections::btree_map::{BTreeMap, Entry};8use std::future;9use std::pin::{Pin, pin};10use std::sync::{11Arc, Mutex,12atomic::{13AtomicBool, AtomicU64, AtomicUsize,14Ordering::{Relaxed, SeqCst},15},16};17use std::task::Poll;18use std::time::{Duration, Instant};19use tokio::sync::Notify;20use wasmtime::AsContextMut;21use wasmtime::component::Accessor;22use wasmtime::{Result, Store, StoreContextMut, format_err};2324/// Alternative p2 bindings generated with `exports: { default: async | store }`25/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.26pub mod p2 {27#[expect(missing_docs, reason = "bindgen-generated code")]28pub mod bindings {29wasmtime::component::bindgen!({30path: "wit",31world: "wasi:http/proxy",32imports: { default: tracing },33exports: { default: async | store },34require_store_data_send: true,35with: {36// http is in this crate37"wasi:http": crate::bindings::http,38// Upstream package dependencies39"wasi:io": wasmtime_wasi::p2::bindings::io,40}41});4243pub use wasi::*;44}45}4647/// Represents either a `wasi:http/[email protected]` or48/// `wasi:http/[email protected]` pre-instance.49pub enum ProxyPre<T: 'static> {50/// A `wasi:http/[email protected]` pre-instance.51P2(p2::bindings::ProxyPre<T>),52/// A `wasi:http/[email protected]` pre-instance.53#[cfg(feature = "p3")]54P3(p3::bindings::ServicePre<T>),55}5657impl<T: 'static> ProxyPre<T> {58async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>59where60T: Send,61{62Ok(match self {63Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),64#[cfg(feature = "p3")]65Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),66})67}68}6970/// Represents either a `wasi:http/[email protected]` or71/// `wasi:http/[email protected]` instance.72pub enum Proxy {73/// A `wasi:http/[email protected]` instance.74P2(p2::bindings::Proxy),75/// A `wasi:http/[email protected]` instance.76#[cfg(feature = "p3")]77P3(p3::bindings::Service),78}7980/// Represents a task to run using a `wasi:http/[email protected]` or81/// `wasi:http/[email protected]` instance.82pub type TaskFn<T> = Box<83dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>84+ Send,85>;8687/// Async MPMC channel where each item is delivered to at most one consumer.88struct Queue<T> {89queue: Mutex<VecDeque<T>>,90notify: Notify,91}9293impl<T> Default for Queue<T> {94fn default() -> Self {95Self {96queue: Default::default(),97notify: Default::default(),98}99}100}101102impl<T> Queue<T> {103fn is_empty(&self) -> bool {104self.queue.lock().unwrap().is_empty()105}106107fn push(&self, item: T) {108self.queue.lock().unwrap().push_back(item);109self.notify.notify_one();110}111112fn try_pop(&self) -> Option<T> {113self.queue.lock().unwrap().pop_front()114}115116async fn pop(&self) -> T {117// This code comes from the Unbound MPMC Channel example in [the118// `tokio::sync::Notify`119// docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).120121let mut notified = pin!(self.notify.notified());122123loop {124notified.as_mut().enable();125if let Some(item) = self.try_pop() {126return item;127}128notified.as_mut().await;129notified.set(self.notify.notified());130}131}132}133134/// Bundles a [`Store`] with a callback to write a profile (if configured).135pub struct StoreBundle<T: 'static> {136/// The [`Store`] to use to handle requests.137pub store: Store<T>,138/// Callback to write a profile (if enabled) once all requests have been139/// handled.140pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,141}142143/// Represents the application-specific state of a web server.144pub trait HandlerState: 'static + Sync + Send {145/// The type of the associated data for [`Store`]s created using146/// [`Self::new_store`].147type StoreData: Send;148149/// Create a new [`Store`] for handling one or more requests.150///151/// The `req_id` parameter is the value passed in the call to152/// [`ProxyHandler::spawn`] that created the worker to which the new `Store`153/// will belong. See that function's documentation for details.154fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;155156/// Maximum time allowed to handle a request.157///158/// In practice, a guest may be allowed to run up to 2x this time in the159/// case of instance reuse to avoid penalizing concurrent requests being160/// handled by the same instance.161fn request_timeout(&self) -> Duration;162163/// Maximum time to keep an idle instance around before dropping it.164fn idle_instance_timeout(&self) -> Duration;165166/// Maximum number of requests to handle using a single instance before167/// dropping it.168fn max_instance_reuse_count(&self) -> usize;169170/// Maximum number of requests to handle concurrently using a single171/// instance.172fn max_instance_concurrent_reuse_count(&self) -> usize;173174/// Called when a worker exits with an error.175fn handle_worker_error(&self, error: wasmtime::Error);176}177178struct ProxyHandlerInner<S: HandlerState> {179state: S,180instance_pre: ProxyPre<S::StoreData>,181next_id: AtomicU64,182task_queue: Queue<TaskFn<S::StoreData>>,183worker_count: AtomicUsize,184}185186/// Helper utility to track the start times of tasks accepted by a worker.187///188/// This is used to ensure that timeouts are enforced even when the189/// `StoreContextMut::run_concurrent` event loop is unable to make progress due190/// to the guest either busy looping or being blocked on a synchronous call to a191/// host function which has exclusive access to the `Store`.192#[derive(Default)]193struct StartTimes(BTreeMap<Instant, usize>);194195impl StartTimes {196fn add(&mut self, time: Instant) {197*self.0.entry(time).or_insert(0) += 1;198}199200fn remove(&mut self, time: Instant) {201let Entry::Occupied(mut entry) = self.0.entry(time) else {202unreachable!()203};204match *entry.get() {2050 => unreachable!(),2061 => {207entry.remove();208}209_ => {210*entry.get_mut() -= 1;211}212}213}214215fn earliest(&self) -> Option<Instant> {216self.0.first_key_value().map(|(&k, _)| k)217}218}219220struct Worker<S>221where222S: HandlerState,223{224handler: ProxyHandler<S>,225available: bool,226}227228impl<S> Worker<S>229where230S: HandlerState,231{232fn set_available(&mut self, available: bool) {233if available != self.available {234self.available = available;235if available {236self.handler.0.worker_count.fetch_add(1, Relaxed);237} else {238// Here we use `SeqCst` to ensure the load/store is ordered239// correctly with respect to the `Queue::is_empty` check we do240// below.241let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);242// This addresses what would otherwise be a race condition in243// `ProxyHandler::spawn` where it only starts a worker if the244// available worker count is zero. If we decrement the count to245// zero right after `ProxyHandler::spawn` checks it, then no246// worker will be started; thus it becomes our responsibility to247// start a worker here instead.248if count == 1 && !self.handler.0.task_queue.is_empty() {249self.handler.start_worker(None, None);250}251}252}253}254255async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {256if let Err(error) = self.run_(task, req_id).await {257self.handler.0.state.handle_worker_error(error);258}259}260261async fn run_(262&mut self,263task: Option<TaskFn<S::StoreData>>,264req_id: Option<u64>,265) -> Result<()> {266// NB: The code the follows is rather subtle in that it is structured267// carefully to provide a few key invariants related to how instance268// reuse and request timeouts interact:269//270// - A task must never be allowed to run for more than 2x the request271// timeout, if any.272//273// - Every task we accept here must be allowed to run for at least 1x274// the request timeout, if any.275//276// - When more than one task is run concurrently in the same instance,277// we must stop accepting new tasks as soon as any existing task reaches278// the request timeout. This serves to cap the amount of time we need279// to keep the instance alive before _all_ tasks have either completed280// or timed out.281//282// As of this writing, there's an additional wrinkle that makes283// guaranteeing those invariants particularly tricky: per #11869 and284// #11870, busy guest loops, epoch interruption, and host functions285// registered using `Linker::func_{wrap,new}_async` all require286// blocking, exclusive access to the `Store`, which effectively prevents287// the `StoreContextMut::run_concurrent` event loop from making288// progress. That, in turn, prevents any concurrent tasks from289// executing, and also prevents the `AsyncFnOnce` passed to290// `run_concurrent` from being polled. Consequently, we must rely on a291// "second line of defense" to ensure tasks are timed out promptly,292// which is to check for timeouts _outside_ the `run_concurrent` future.293// Once the aforementioned issues have been addressed, we'll be able to294// remove that check and its associated baggage.295296let handler = &self.handler.0;297298let StoreBundle {299mut store,300write_profile,301} = handler.state.new_store(req_id)?;302303let request_timeout = handler.state.request_timeout();304let idle_instance_timeout = handler.state.idle_instance_timeout();305let max_instance_reuse_count = handler.state.max_instance_reuse_count();306let max_instance_concurrent_reuse_count =307handler.state.max_instance_concurrent_reuse_count();308309let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;310let accept_concurrent = AtomicBool::new(true);311let task_start_times = Mutex::new(StartTimes::default());312313let mut future = pin!(store.run_concurrent(async |accessor| {314let mut reuse_count = 0;315let mut timed_out = false;316let mut futures = FuturesUnordered::new();317318let accept_task = |task: TaskFn<S::StoreData>,319futures: &mut FuturesUnordered<_>,320reuse_count: &mut usize| {321// Set `accept_concurrent` to false, conservatively assuming322// that the new task will be CPU-bound, at least to begin with.323// Only once the `StoreContextMut::run_concurrent` event loop324// returns `Pending` will we set `accept_concurrent` back to325// true and consider accepting more tasks.326//327// This approach avoids taking on more than one CPU-bound task328// at a time, which would hurt throughput vs. leaving the329// additional tasks for other workers to handle.330accept_concurrent.store(false, Relaxed);331*reuse_count += 1;332333let start_time = Instant::now().checked_add(request_timeout);334if let Some(start_time) = start_time {335task_start_times.lock().unwrap().add(start_time);336}337338futures.push(tokio::time::timeout(request_timeout, async move {339(task)(accessor, proxy).await;340start_time341}));342};343344if let Some(task) = task {345accept_task(task, &mut futures, &mut reuse_count);346}347348let handler = self.handler.clone();349while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {350let new_task = {351let future_count = futures.len();352let mut next_future = pin!(async {353if futures.is_empty() {354future::pending().await355} else {356futures.next().await.unwrap()357}358});359let mut next_task = pin!(tokio::time::timeout(360if future_count == 0 {361idle_instance_timeout362} else {363Duration::MAX364},365handler.0.task_queue.pop()366));367// Poll any existing tasks, and if they're all `Pending`368// _and_ we haven't reached any reuse limits yet, poll for a369// new task from the queue.370//371// Note the the order of operations here is important. By372// polling `next_future` first, we'll disover any tasks that373// may have timed out, at which point we'll stop accepting374// new tasks altogether (see below for details). This is375// especially imporant in the case where the task was376// blocked on a synchronous call to a host function which377// has exclusive access to the `Store`; once that call378// finishes, the first think we need to do is time out the379// task. If we were to poll for a new task first, then we'd380// have to wait for _that_ task to finish or time out before381// we could kill the instance.382future::poll_fn(|cx| match next_future.as_mut().poll(cx) {383Poll::Pending => {384// Note that `Pending` here doesn't necessarily mean385// all tasks are blocked on I/O. They might simply386// be waiting for some deferred work to be done by387// the next turn of the388// `StoreContextMut::run_concurrent` event loop.389// Therefore, we check `accept_concurrent` here and390// only advertise we have capacity for another task391// if either we have no tasks at all or all our392// tasks really are blocked on I/O.393self.set_available(394reuse_count < max_instance_reuse_count395&& future_count < max_instance_concurrent_reuse_count396&& (future_count == 0 || accept_concurrent.load(Relaxed)),397);398399if self.available {400next_task.as_mut().poll(cx).map(Some)401} else {402Poll::Pending403}404}405Poll::Ready(Ok(start_time)) => {406// Task completed; carry on!407if let Some(start_time) = start_time {408task_start_times.lock().unwrap().remove(start_time);409}410Poll::Ready(None)411}412Poll::Ready(Err(_)) => {413// Task timed out; stop accepting new tasks, but414// continue polling until any other, in-progress415// tasks until they have either finished or timed416// out. This effectively kicks off a "graceful417// shutdown" of the worker, allowing any other418// concurrent tasks time to finish before we drop419// the instance.420//421// TODO: We should also send a cancel request to the422// timed-out task to give it a chance to shut down423// gracefully (and delay dropping the instance for a424// reasonable amount of time), but as of this425// writing Wasmtime does not yet provide an API for426// doing that. See issue #11833.427timed_out = true;428reuse_count = max_instance_reuse_count;429Poll::Ready(None)430}431})432.await433};434435match new_task {436Some(Ok(task)) => {437accept_task(task, &mut futures, &mut reuse_count);438}439Some(Err(_)) => break,440None => {}441}442}443444accessor.with(|mut access| write_profile(access.as_context_mut()));445446if timed_out {447Err(format_err!("guest timed out"))448} else {449wasmtime::error::Ok(())450}451}));452453let mut sleep = pin!(tokio::time::sleep(Duration::MAX));454455future::poll_fn(|cx| {456let poll = future.as_mut().poll(cx);457if poll.is_pending() {458// If the future returns `Pending`, that's either because it's459// idle (in which case it can definitely accept a new task) or460// because all its tasks are awaiting I/O, in which case it may461// have capacity for additional tasks to run concurrently.462//463// However, if one of the tasks is blocked on a sync call to a464// host function which has exclusive access to the `Store`, the465// `StoreContextMut::run_concurrent` event loop will be unable466// to make progress until that call finishes. Similarly, if the467// task loops indefinitely, subject only to epoch interruption,468// the event loop will also be stuck. Either way, any task469// timeouts created inside the `AsyncFnOnce` we passed to470// `run_concurrent` won't have a chance to trigger.471// Consequently, we need to _also_ enforce timeouts here,472// outside the event loop.473//474// Therefore, we check if the oldest outstanding task has been475// running for at least `request_timeout*2`, which is the476// maximum time needed for any other concurrent tasks to477// complete or time out, at which point we can safely discard478// the instance. If that deadline has not yet arrived, we479// schedule a wakeup to occur when it does.480//481// We uphold the "never kill an instance with a task which has482// been running for less than the request timeout" invariant483// here by noting that this timeout will only trigger if the484// `AsyncFnOnce` we passed to `run_concurrent` has been unable485// to run for at least the past `request_timeout` amount of486// time, meaning it can't possibly have accepted a task newer487// than that.488if let Some(deadline) = task_start_times489.lock()490.unwrap()491.earliest()492.and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))493{494sleep.as_mut().reset(deadline.into());495// Note that this will schedule a wakeup for later if the496// deadline has not yet arrived:497if sleep.as_mut().poll(cx).is_ready() {498// Deadline has been reached; kill the instance with an499// error.500return Poll::Ready(Err(format_err!("guest timed out")));501}502}503504// Otherwise, if no timeouts have elapsed, we set505// `accept_concurrent` to true and, if it wasn't already true506// before, poll the future one more time so it can ask for507// another task if appropriate.508if !accept_concurrent.swap(true, Relaxed) {509return future.as_mut().poll(cx);510}511}512513poll514})515.await?516}517}518519impl<S> Drop for Worker<S>520where521S: HandlerState,522{523fn drop(&mut self) {524self.set_available(false);525}526}527528/// Represents the state of a web server.529///530/// Note that this supports optional instance reuse, enabled when531/// `S::max_instance_reuse_count()` returns a number greater than one. See532/// [`Self::spawn`] for details.533pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);534535impl<S: HandlerState> Clone for ProxyHandler<S> {536fn clone(&self) -> Self {537Self(self.0.clone())538}539}540541impl<S> ProxyHandler<S>542where543S: HandlerState,544{545/// Create a new `ProxyHandler` with the specified application state and546/// pre-instance.547pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {548Self(Arc::new(ProxyHandlerInner {549state,550instance_pre,551next_id: AtomicU64::from(0),552task_queue: Default::default(),553worker_count: AtomicUsize::from(0),554}))555}556557/// Push a task to the task queue for this handler.558///559/// This will either spawn a new background worker to run the task or560/// deliver it to an already-running worker.561///562/// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a563/// new worker is started for this task. It is intended to be used as a564/// "request identifier" corresponding to that task and can be used e.g. to565/// prefix all logging from the `Store` with that identifier. Note that a566/// non-`None` value only makes sense when `<S as567/// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier568/// will not match subsequent tasks handled by the worker.569pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {570match self.0.state.max_instance_reuse_count() {5710 => panic!("`max_instance_reuse_count` must be at least 1"),572_ => {573if self.0.worker_count.load(Relaxed) == 0 {574// There are no available workers; skip the queue and pass575// the task directly to the worker, which improves576// performance as measured by `wasmtime-server-rps.sh` by577// about 15%.578self.start_worker(Some(task), req_id);579} else {580self.0.task_queue.push(task);581// Start a new worker to handle the task if the last worker582// just went unavailable. See also `Worker::set_available`583// for what happens if the available worker count goes to584// zero right after we check it here, and note that we only585// check the count _after_ we've pushed the task to the586// queue. We use `SeqCst` here to ensure that we get an587// updated view of `worker_count` as it exists after the588// `Queue::push` above.589//590// The upshot is that at least one (or more) of the591// following will happen:592//593// - An existing worker will accept the task594// - We'll start a new worker here to accept the task595// - `Worker::set_available` will start a new worker to accept the task596//597// I.e. it should not be possible for the task to be598// orphaned indefinitely in the queue without being599// accepted.600if self.0.worker_count.load(SeqCst) == 0 {601self.start_worker(None, None);602}603}604}605}606}607608/// Generate a unique request ID.609pub fn next_req_id(&self) -> u64 {610self.0.next_id.fetch_add(1, Relaxed)611}612613/// Return a reference to the application state.614pub fn state(&self) -> &S {615&self.0.state616}617618/// Return a reference to the pre-instance.619pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {620&self.0.instance_pre621}622623fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {624tokio::spawn(625Worker {626handler: self.clone(),627available: false,628}629.run(task, req_id),630);631}632}633634635