Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/src/handler.rs
3078 views
1
//! Provides utilities useful for dispatching incoming HTTP requests
2
//! `wasi:http/handler` guest instances.
3
4
#[cfg(feature = "p3")]
5
use crate::p3;
6
use futures::stream::{FuturesUnordered, StreamExt};
7
use std::collections::VecDeque;
8
use std::collections::btree_map::{BTreeMap, Entry};
9
use std::future;
10
use std::pin::{Pin, pin};
11
use std::sync::{
12
Arc, Mutex,
13
atomic::{
14
AtomicBool, AtomicU64, AtomicUsize,
15
Ordering::{Relaxed, SeqCst},
16
},
17
};
18
use std::task::Poll;
19
use std::time::{Duration, Instant};
20
use tokio::sync::Notify;
21
use wasmtime::AsContextMut;
22
use wasmtime::component::Accessor;
23
use wasmtime::{Result, Store, StoreContextMut, format_err};
24
25
/// Alternative p2 bindings generated with `exports: { default: async | store }`
26
/// so we can use `TypedFunc::call_concurrent` with both p2 and p3 instances.
27
pub mod p2 {
28
#[expect(missing_docs, reason = "bindgen-generated code")]
29
pub mod bindings {
30
wasmtime::component::bindgen!({
31
path: "wit",
32
world: "wasi:http/proxy",
33
imports: { default: tracing },
34
exports: { default: async | store },
35
require_store_data_send: true,
36
with: {
37
// http is in this crate
38
"wasi:http": crate::bindings::http,
39
// Upstream package dependencies
40
"wasi:io": wasmtime_wasi::p2::bindings::io,
41
}
42
});
43
44
pub use wasi::*;
45
}
46
}
47
48
/// Represents either a `wasi:http/[email protected]` or
49
/// `wasi:http/[email protected]` pre-instance.
50
pub enum ProxyPre<T: 'static> {
51
/// A `wasi:http/[email protected]` pre-instance.
52
P2(p2::bindings::ProxyPre<T>),
53
/// A `wasi:http/[email protected]` pre-instance.
54
#[cfg(feature = "p3")]
55
P3(p3::bindings::ServicePre<T>),
56
}
57
58
impl<T: 'static> ProxyPre<T> {
59
async fn instantiate_async(&self, store: impl AsContextMut<Data = T>) -> Result<Proxy>
60
where
61
T: Send,
62
{
63
Ok(match self {
64
Self::P2(pre) => Proxy::P2(pre.instantiate_async(store).await?),
65
#[cfg(feature = "p3")]
66
Self::P3(pre) => Proxy::P3(pre.instantiate_async(store).await?),
67
})
68
}
69
}
70
71
/// Represents either a `wasi:http/[email protected]` or
72
/// `wasi:http/[email protected]` instance.
73
pub enum Proxy {
74
/// A `wasi:http/[email protected]` instance.
75
P2(p2::bindings::Proxy),
76
/// A `wasi:http/[email protected]` instance.
77
#[cfg(feature = "p3")]
78
P3(p3::bindings::Service),
79
}
80
81
/// Represents a task to run using a `wasi:http/[email protected]` or
82
/// `wasi:http/[email protected]` instance.
83
pub type TaskFn<T> = Box<
84
dyn for<'a> FnOnce(&'a Accessor<T>, &'a Proxy) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>
85
+ Send,
86
>;
87
88
/// Async MPMC channel where each item is delivered to at most one consumer.
89
struct Queue<T> {
90
queue: Mutex<VecDeque<T>>,
91
notify: Notify,
92
}
93
94
impl<T> Default for Queue<T> {
95
fn default() -> Self {
96
Self {
97
queue: Default::default(),
98
notify: Default::default(),
99
}
100
}
101
}
102
103
impl<T> Queue<T> {
104
fn is_empty(&self) -> bool {
105
self.queue.lock().unwrap().is_empty()
106
}
107
108
fn push(&self, item: T) {
109
self.queue.lock().unwrap().push_back(item);
110
self.notify.notify_one();
111
}
112
113
fn try_pop(&self) -> Option<T> {
114
self.queue.lock().unwrap().pop_front()
115
}
116
117
async fn pop(&self) -> T {
118
// This code comes from the Unbound MPMC Channel example in [the
119
// `tokio::sync::Notify`
120
// docs](https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html).
121
122
let mut notified = pin!(self.notify.notified());
123
124
loop {
125
notified.as_mut().enable();
126
if let Some(item) = self.try_pop() {
127
return item;
128
}
129
notified.as_mut().await;
130
notified.set(self.notify.notified());
131
}
132
}
133
}
134
135
/// Bundles a [`Store`] with a callback to write a profile (if configured).
136
pub struct StoreBundle<T: 'static> {
137
/// The [`Store`] to use to handle requests.
138
pub store: Store<T>,
139
/// Callback to write a profile (if enabled) once all requests have been
140
/// handled.
141
pub write_profile: Box<dyn FnOnce(StoreContextMut<T>) + Send>,
142
}
143
144
/// Represents the application-specific state of a web server.
145
pub trait HandlerState: 'static + Sync + Send {
146
/// The type of the associated data for [`Store`]s created using
147
/// [`Self::new_store`].
148
type StoreData: Send;
149
150
/// Create a new [`Store`] for handling one or more requests.
151
///
152
/// The `req_id` parameter is the value passed in the call to
153
/// [`ProxyHandler::spawn`] that created the worker to which the new `Store`
154
/// will belong. See that function's documentation for details.
155
fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Self::StoreData>>;
156
157
/// Maximum time allowed to handle a request.
158
///
159
/// In practice, a guest may be allowed to run up to 2x this time in the
160
/// case of instance reuse to avoid penalizing concurrent requests being
161
/// handled by the same instance.
162
fn request_timeout(&self) -> Duration;
163
164
/// Maximum time to keep an idle instance around before dropping it.
165
fn idle_instance_timeout(&self) -> Duration;
166
167
/// Maximum number of requests to handle using a single instance before
168
/// dropping it.
169
fn max_instance_reuse_count(&self) -> usize;
170
171
/// Maximum number of requests to handle concurrently using a single
172
/// instance.
173
fn max_instance_concurrent_reuse_count(&self) -> usize;
174
175
/// Called when a worker exits with an error.
176
fn handle_worker_error(&self, error: wasmtime::Error);
177
}
178
179
struct ProxyHandlerInner<S: HandlerState> {
180
state: S,
181
instance_pre: ProxyPre<S::StoreData>,
182
next_id: AtomicU64,
183
task_queue: Queue<TaskFn<S::StoreData>>,
184
worker_count: AtomicUsize,
185
}
186
187
/// Helper utility to track the start times of tasks accepted by a worker.
188
///
189
/// This is used to ensure that timeouts are enforced even when the
190
/// `StoreContextMut::run_concurrent` event loop is unable to make progress due
191
/// to the guest either busy looping or being blocked on a synchronous call to a
192
/// host function which has exclusive access to the `Store`.
193
#[derive(Default)]
194
struct StartTimes(BTreeMap<Instant, usize>);
195
196
impl StartTimes {
197
fn add(&mut self, time: Instant) {
198
*self.0.entry(time).or_insert(0) += 1;
199
}
200
201
fn remove(&mut self, time: Instant) {
202
let Entry::Occupied(mut entry) = self.0.entry(time) else {
203
unreachable!()
204
};
205
match *entry.get() {
206
0 => unreachable!(),
207
1 => {
208
entry.remove();
209
}
210
_ => {
211
*entry.get_mut() -= 1;
212
}
213
}
214
}
215
216
fn earliest(&self) -> Option<Instant> {
217
self.0.first_key_value().map(|(&k, _)| k)
218
}
219
}
220
221
struct Worker<S>
222
where
223
S: HandlerState,
224
{
225
handler: ProxyHandler<S>,
226
available: bool,
227
}
228
229
impl<S> Worker<S>
230
where
231
S: HandlerState,
232
{
233
fn set_available(&mut self, available: bool) {
234
if available != self.available {
235
self.available = available;
236
if available {
237
self.handler.0.worker_count.fetch_add(1, Relaxed);
238
} else {
239
// Here we use `SeqCst` to ensure the load/store is ordered
240
// correctly with respect to the `Queue::is_empty` check we do
241
// below.
242
let count = self.handler.0.worker_count.fetch_sub(1, SeqCst);
243
// This addresses what would otherwise be a race condition in
244
// `ProxyHandler::spawn` where it only starts a worker if the
245
// available worker count is zero. If we decrement the count to
246
// zero right after `ProxyHandler::spawn` checks it, then no
247
// worker will be started; thus it becomes our responsibility to
248
// start a worker here instead.
249
if count == 1 && !self.handler.0.task_queue.is_empty() {
250
self.handler.start_worker(None, None);
251
}
252
}
253
}
254
}
255
256
async fn run(mut self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
257
if let Err(error) = self.run_(task, req_id).await {
258
self.handler.0.state.handle_worker_error(error);
259
}
260
}
261
262
async fn run_(
263
&mut self,
264
task: Option<TaskFn<S::StoreData>>,
265
req_id: Option<u64>,
266
) -> Result<()> {
267
// NB: The code the follows is rather subtle in that it is structured
268
// carefully to provide a few key invariants related to how instance
269
// reuse and request timeouts interact:
270
//
271
// - A task must never be allowed to run for more than 2x the request
272
// timeout, if any.
273
//
274
// - Every task we accept here must be allowed to run for at least 1x
275
// the request timeout, if any.
276
//
277
// - When more than one task is run concurrently in the same instance,
278
// we must stop accepting new tasks as soon as any existing task reaches
279
// the request timeout. This serves to cap the amount of time we need
280
// to keep the instance alive before _all_ tasks have either completed
281
// or timed out.
282
//
283
// As of this writing, there's an additional wrinkle that makes
284
// guaranteeing those invariants particularly tricky: per #11869 and
285
// #11870, busy guest loops, epoch interruption, and host functions
286
// registered using `Linker::func_{wrap,new}_async` all require
287
// blocking, exclusive access to the `Store`, which effectively prevents
288
// the `StoreContextMut::run_concurrent` event loop from making
289
// progress. That, in turn, prevents any concurrent tasks from
290
// executing, and also prevents the `AsyncFnOnce` passed to
291
// `run_concurrent` from being polled. Consequently, we must rely on a
292
// "second line of defense" to ensure tasks are timed out promptly,
293
// which is to check for timeouts _outside_ the `run_concurrent` future.
294
// Once the aforementioned issues have been addressed, we'll be able to
295
// remove that check and its associated baggage.
296
297
let handler = &self.handler.0;
298
299
let StoreBundle {
300
mut store,
301
write_profile,
302
} = handler.state.new_store(req_id)?;
303
304
let request_timeout = handler.state.request_timeout();
305
let idle_instance_timeout = handler.state.idle_instance_timeout();
306
let max_instance_reuse_count = handler.state.max_instance_reuse_count();
307
let max_instance_concurrent_reuse_count =
308
handler.state.max_instance_concurrent_reuse_count();
309
310
let proxy = &handler.instance_pre.instantiate_async(&mut store).await?;
311
let accept_concurrent = AtomicBool::new(true);
312
let task_start_times = Mutex::new(StartTimes::default());
313
314
let mut future = pin!(store.run_concurrent(async |accessor| {
315
let mut reuse_count = 0;
316
let mut timed_out = false;
317
let mut futures = FuturesUnordered::new();
318
319
let accept_task = |task: TaskFn<S::StoreData>,
320
futures: &mut FuturesUnordered<_>,
321
reuse_count: &mut usize| {
322
// Set `accept_concurrent` to false, conservatively assuming
323
// that the new task will be CPU-bound, at least to begin with.
324
// Only once the `StoreContextMut::run_concurrent` event loop
325
// returns `Pending` will we set `accept_concurrent` back to
326
// true and consider accepting more tasks.
327
//
328
// This approach avoids taking on more than one CPU-bound task
329
// at a time, which would hurt throughput vs. leaving the
330
// additional tasks for other workers to handle.
331
accept_concurrent.store(false, Relaxed);
332
*reuse_count += 1;
333
334
let start_time = Instant::now().checked_add(request_timeout);
335
if let Some(start_time) = start_time {
336
task_start_times.lock().unwrap().add(start_time);
337
}
338
339
futures.push(tokio::time::timeout(request_timeout, async move {
340
(task)(accessor, proxy).await;
341
start_time
342
}));
343
};
344
345
if let Some(task) = task {
346
accept_task(task, &mut futures, &mut reuse_count);
347
}
348
349
let handler = self.handler.clone();
350
while !(futures.is_empty() && reuse_count >= max_instance_reuse_count) {
351
let new_task = {
352
let future_count = futures.len();
353
let mut next_future = pin!(async {
354
if futures.is_empty() {
355
future::pending().await
356
} else {
357
futures.next().await.unwrap()
358
}
359
});
360
let mut next_task = pin!(tokio::time::timeout(
361
if future_count == 0 {
362
idle_instance_timeout
363
} else {
364
Duration::MAX
365
},
366
handler.0.task_queue.pop()
367
));
368
// Poll any existing tasks, and if they're all `Pending`
369
// _and_ we haven't reached any reuse limits yet, poll for a
370
// new task from the queue.
371
//
372
// Note the the order of operations here is important. By
373
// polling `next_future` first, we'll disover any tasks that
374
// may have timed out, at which point we'll stop accepting
375
// new tasks altogether (see below for details). This is
376
// especially imporant in the case where the task was
377
// blocked on a synchronous call to a host function which
378
// has exclusive access to the `Store`; once that call
379
// finishes, the first think we need to do is time out the
380
// task. If we were to poll for a new task first, then we'd
381
// have to wait for _that_ task to finish or time out before
382
// we could kill the instance.
383
future::poll_fn(|cx| match next_future.as_mut().poll(cx) {
384
Poll::Pending => {
385
// Note that `Pending` here doesn't necessarily mean
386
// all tasks are blocked on I/O. They might simply
387
// be waiting for some deferred work to be done by
388
// the next turn of the
389
// `StoreContextMut::run_concurrent` event loop.
390
// Therefore, we check `accept_concurrent` here and
391
// only advertise we have capacity for another task
392
// if either we have no tasks at all or all our
393
// tasks really are blocked on I/O.
394
self.set_available(
395
reuse_count < max_instance_reuse_count
396
&& future_count < max_instance_concurrent_reuse_count
397
&& (future_count == 0 || accept_concurrent.load(Relaxed)),
398
);
399
400
if self.available {
401
next_task.as_mut().poll(cx).map(Some)
402
} else {
403
Poll::Pending
404
}
405
}
406
Poll::Ready(Ok(start_time)) => {
407
// Task completed; carry on!
408
if let Some(start_time) = start_time {
409
task_start_times.lock().unwrap().remove(start_time);
410
}
411
Poll::Ready(None)
412
}
413
Poll::Ready(Err(_)) => {
414
// Task timed out; stop accepting new tasks, but
415
// continue polling until any other, in-progress
416
// tasks until they have either finished or timed
417
// out. This effectively kicks off a "graceful
418
// shutdown" of the worker, allowing any other
419
// concurrent tasks time to finish before we drop
420
// the instance.
421
//
422
// TODO: We should also send a cancel request to the
423
// timed-out task to give it a chance to shut down
424
// gracefully (and delay dropping the instance for a
425
// reasonable amount of time), but as of this
426
// writing Wasmtime does not yet provide an API for
427
// doing that. See issue #11833.
428
timed_out = true;
429
reuse_count = max_instance_reuse_count;
430
Poll::Ready(None)
431
}
432
})
433
.await
434
};
435
436
match new_task {
437
Some(Ok(task)) => {
438
accept_task(task, &mut futures, &mut reuse_count);
439
}
440
Some(Err(_)) => break,
441
None => {}
442
}
443
}
444
445
accessor.with(|mut access| write_profile(access.as_context_mut()));
446
447
if timed_out {
448
Err(format_err!("guest timed out"))
449
} else {
450
wasmtime::error::Ok(())
451
}
452
}));
453
454
let mut sleep = pin!(tokio::time::sleep(Duration::MAX));
455
456
future::poll_fn(|cx| {
457
let poll = future.as_mut().poll(cx);
458
if poll.is_pending() {
459
// If the future returns `Pending`, that's either because it's
460
// idle (in which case it can definitely accept a new task) or
461
// because all its tasks are awaiting I/O, in which case it may
462
// have capacity for additional tasks to run concurrently.
463
//
464
// However, if one of the tasks is blocked on a sync call to a
465
// host function which has exclusive access to the `Store`, the
466
// `StoreContextMut::run_concurrent` event loop will be unable
467
// to make progress until that call finishes. Similarly, if the
468
// task loops indefinitely, subject only to epoch interruption,
469
// the event loop will also be stuck. Either way, any task
470
// timeouts created inside the `AsyncFnOnce` we passed to
471
// `run_concurrent` won't have a chance to trigger.
472
// Consequently, we need to _also_ enforce timeouts here,
473
// outside the event loop.
474
//
475
// Therefore, we check if the oldest outstanding task has been
476
// running for at least `request_timeout*2`, which is the
477
// maximum time needed for any other concurrent tasks to
478
// complete or time out, at which point we can safely discard
479
// the instance. If that deadline has not yet arrived, we
480
// schedule a wakeup to occur when it does.
481
//
482
// We uphold the "never kill an instance with a task which has
483
// been running for less than the request timeout" invariant
484
// here by noting that this timeout will only trigger if the
485
// `AsyncFnOnce` we passed to `run_concurrent` has been unable
486
// to run for at least the past `request_timeout` amount of
487
// time, meaning it can't possibly have accepted a task newer
488
// than that.
489
if let Some(deadline) = task_start_times
490
.lock()
491
.unwrap()
492
.earliest()
493
.and_then(|v| v.checked_add(request_timeout.saturating_mul(2)))
494
{
495
sleep.as_mut().reset(deadline.into());
496
// Note that this will schedule a wakeup for later if the
497
// deadline has not yet arrived:
498
if sleep.as_mut().poll(cx).is_ready() {
499
// Deadline has been reached; kill the instance with an
500
// error.
501
return Poll::Ready(Err(format_err!("guest timed out")));
502
}
503
}
504
505
// Otherwise, if no timeouts have elapsed, we set
506
// `accept_concurrent` to true and, if it wasn't already true
507
// before, poll the future one more time so it can ask for
508
// another task if appropriate.
509
if !accept_concurrent.swap(true, Relaxed) {
510
return future.as_mut().poll(cx);
511
}
512
}
513
514
poll
515
})
516
.await?
517
}
518
}
519
520
impl<S> Drop for Worker<S>
521
where
522
S: HandlerState,
523
{
524
fn drop(&mut self) {
525
self.set_available(false);
526
}
527
}
528
529
/// Represents the state of a web server.
530
///
531
/// Note that this supports optional instance reuse, enabled when
532
/// `S::max_instance_reuse_count()` returns a number greater than one. See
533
/// [`Self::spawn`] for details.
534
pub struct ProxyHandler<S: HandlerState>(Arc<ProxyHandlerInner<S>>);
535
536
impl<S: HandlerState> Clone for ProxyHandler<S> {
537
fn clone(&self) -> Self {
538
Self(self.0.clone())
539
}
540
}
541
542
impl<S> ProxyHandler<S>
543
where
544
S: HandlerState,
545
{
546
/// Create a new `ProxyHandler` with the specified application state and
547
/// pre-instance.
548
pub fn new(state: S, instance_pre: ProxyPre<S::StoreData>) -> Self {
549
Self(Arc::new(ProxyHandlerInner {
550
state,
551
instance_pre,
552
next_id: AtomicU64::from(0),
553
task_queue: Default::default(),
554
worker_count: AtomicUsize::from(0),
555
}))
556
}
557
558
/// Push a task to the task queue for this handler.
559
///
560
/// This will either spawn a new background worker to run the task or
561
/// deliver it to an already-running worker.
562
///
563
/// The `req_id` will be passed to `<S as HandlerState>::new_store` _if_ a
564
/// new worker is started for this task. It is intended to be used as a
565
/// "request identifier" corresponding to that task and can be used e.g. to
566
/// prefix all logging from the `Store` with that identifier. Note that a
567
/// non-`None` value only makes sense when `<S as
568
/// HandlerState>::max_instance_reuse_count == 1`; otherwise the identifier
569
/// will not match subsequent tasks handled by the worker.
570
pub fn spawn(&self, req_id: Option<u64>, task: TaskFn<S::StoreData>) {
571
match self.0.state.max_instance_reuse_count() {
572
0 => panic!("`max_instance_reuse_count` must be at least 1"),
573
_ => {
574
if self.0.worker_count.load(Relaxed) == 0 {
575
// There are no available workers; skip the queue and pass
576
// the task directly to the worker, which improves
577
// performance as measured by `wasmtime-server-rps.sh` by
578
// about 15%.
579
self.start_worker(Some(task), req_id);
580
} else {
581
self.0.task_queue.push(task);
582
// Start a new worker to handle the task if the last worker
583
// just went unavailable. See also `Worker::set_available`
584
// for what happens if the available worker count goes to
585
// zero right after we check it here, and note that we only
586
// check the count _after_ we've pushed the task to the
587
// queue. We use `SeqCst` here to ensure that we get an
588
// updated view of `worker_count` as it exists after the
589
// `Queue::push` above.
590
//
591
// The upshot is that at least one (or more) of the
592
// following will happen:
593
//
594
// - An existing worker will accept the task
595
// - We'll start a new worker here to accept the task
596
// - `Worker::set_available` will start a new worker to accept the task
597
//
598
// I.e. it should not be possible for the task to be
599
// orphaned indefinitely in the queue without being
600
// accepted.
601
if self.0.worker_count.load(SeqCst) == 0 {
602
self.start_worker(None, None);
603
}
604
}
605
}
606
}
607
}
608
609
/// Generate a unique request ID.
610
pub fn next_req_id(&self) -> u64 {
611
self.0.next_id.fetch_add(1, Relaxed)
612
}
613
614
/// Return a reference to the application state.
615
pub fn state(&self) -> &S {
616
&self.0.state
617
}
618
619
/// Return a reference to the pre-instance.
620
pub fn instance_pre(&self) -> &ProxyPre<S::StoreData> {
621
&self.0.instance_pre
622
}
623
624
fn start_worker(&self, task: Option<TaskFn<S::StoreData>>, req_id: Option<u64>) {
625
tokio::spawn(
626
Worker {
627
handler: self.clone(),
628
available: false,
629
}
630
.run(task, req_id),
631
);
632
}
633
}
634
635