Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/src/commands/serve.rs
3064 views
1
use crate::common::{Profile, RunCommon, RunTarget};
2
use bytes::Bytes;
3
use clap::Parser;
4
use futures::future::FutureExt;
5
use http::{Response, StatusCode};
6
use http_body_util::BodyExt as _;
7
use http_body_util::combinators::UnsyncBoxBody;
8
use std::convert::Infallible;
9
use std::net::SocketAddr;
10
use std::pin::Pin;
11
use std::task::{Context, Poll};
12
use std::{
13
path::PathBuf,
14
sync::{
15
Arc, Mutex,
16
atomic::{AtomicBool, Ordering},
17
},
18
time::Duration,
19
};
20
use tokio::io::{self, AsyncWrite};
21
use tokio::sync::Notify;
22
use wasmtime::component::{Component, Linker, ResourceTable};
23
use wasmtime::{
24
Engine, Result, Store, StoreContextMut, StoreLimits, UpdateDeadline, bail, error::Context as _,
25
};
26
use wasmtime_cli_flags::opt::WasmtimeOptionValue;
27
use wasmtime_wasi::p2::{StreamError, StreamResult};
28
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
29
#[cfg(feature = "component-model-async")]
30
use wasmtime_wasi_http::handler::p2::bindings as p2;
31
use wasmtime_wasi_http::handler::{HandlerState, Proxy, ProxyHandler, ProxyPre, StoreBundle};
32
use wasmtime_wasi_http::io::TokioIo;
33
use wasmtime_wasi_http::{
34
DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
35
WasiHttpView,
36
};
37
38
#[cfg(feature = "wasi-config")]
39
use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
40
#[cfg(feature = "wasi-keyvalue")]
41
use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
42
#[cfg(feature = "wasi-nn")]
43
use wasmtime_wasi_nn::wit::WasiNnCtx;
44
45
const DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT: usize = 128;
46
const DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT: usize = 1;
47
const DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT: usize = 16;
48
49
struct Host {
50
table: wasmtime::component::ResourceTable,
51
ctx: WasiCtx,
52
http: WasiHttpCtx,
53
http_outgoing_body_buffer_chunks: Option<usize>,
54
http_outgoing_body_chunk_size: Option<usize>,
55
56
#[cfg(feature = "component-model-async")]
57
p3_http: crate::common::DefaultP3Ctx,
58
59
limits: StoreLimits,
60
61
#[cfg(feature = "wasi-nn")]
62
nn: Option<WasiNnCtx>,
63
64
#[cfg(feature = "wasi-config")]
65
wasi_config: Option<WasiConfigVariables>,
66
67
#[cfg(feature = "wasi-keyvalue")]
68
wasi_keyvalue: Option<WasiKeyValueCtx>,
69
70
#[cfg(feature = "profiling")]
71
guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
72
}
73
74
impl WasiView for Host {
75
fn ctx(&mut self) -> WasiCtxView<'_> {
76
WasiCtxView {
77
ctx: &mut self.ctx,
78
table: &mut self.table,
79
}
80
}
81
}
82
83
impl WasiHttpView for Host {
84
fn ctx(&mut self) -> &mut WasiHttpCtx {
85
&mut self.http
86
}
87
fn table(&mut self) -> &mut ResourceTable {
88
&mut self.table
89
}
90
91
fn outgoing_body_buffer_chunks(&mut self) -> usize {
92
self.http_outgoing_body_buffer_chunks
93
.unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
94
}
95
96
fn outgoing_body_chunk_size(&mut self) -> usize {
97
self.http_outgoing_body_chunk_size
98
.unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
99
}
100
}
101
102
#[cfg(feature = "component-model-async")]
103
impl wasmtime_wasi_http::p3::WasiHttpView for Host {
104
fn http(&mut self) -> wasmtime_wasi_http::p3::WasiHttpCtxView<'_> {
105
wasmtime_wasi_http::p3::WasiHttpCtxView {
106
table: &mut self.table,
107
ctx: &mut self.p3_http,
108
}
109
}
110
}
111
112
const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
113
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
114
8080,
115
);
116
117
fn parse_duration(s: &str) -> Result<Duration, String> {
118
Duration::parse(Some(s)).map_err(|e| e.to_string())
119
}
120
121
/// Runs a WebAssembly module
122
#[derive(Parser)]
123
pub struct ServeCommand {
124
#[command(flatten)]
125
run: RunCommon,
126
127
/// Socket address for the web server to bind to.
128
#[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
129
addr: SocketAddr,
130
131
/// Socket address where, when connected to, will initiate a graceful
132
/// shutdown.
133
///
134
/// Note that graceful shutdown is also supported on ctrl-c.
135
#[arg(long, value_name = "SOCKADDR")]
136
shutdown_addr: Option<SocketAddr>,
137
138
/// Disable log prefixes of wasi-http handlers.
139
/// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
140
#[arg(long)]
141
no_logging_prefix: bool,
142
143
/// The WebAssembly component to run.
144
#[arg(value_name = "WASM", required = true)]
145
component: PathBuf,
146
147
/// Maximum number of requests to send to a single component instance before
148
/// dropping it.
149
///
150
/// This defaults to 1 for WASIp2 components and 128 for WASIp3 components.
151
#[arg(long)]
152
max_instance_reuse_count: Option<usize>,
153
154
/// Maximum number of concurrent requests to send to a single component
155
/// instance.
156
///
157
/// This defaults to 1 for WASIp2 components and 16 for WASIp3 components.
158
/// Note that setting it to more than 1 will have no effect for WASIp2
159
/// components since they cannot be called concurrently.
160
#[arg(long)]
161
max_instance_concurrent_reuse_count: Option<usize>,
162
163
/// Time to hold an idle component instance for possible reuse before
164
/// dropping it.
165
///
166
/// A number with no suffix or with an `s` suffix is interpreted as seconds;
167
/// other accepted suffixes include `ms` (milliseconds), `us` or `μs`
168
/// (microseconds), and `ns` (nanoseconds).
169
#[arg(long, default_value = "1s", value_parser = parse_duration)]
170
idle_instance_timeout: Duration,
171
}
172
173
impl ServeCommand {
174
/// Start a server to run the given wasi-http proxy component
175
pub fn execute(mut self) -> Result<()> {
176
self.run.common.init_logging()?;
177
178
// We force cli errors before starting to listen for connections so then
179
// we don't accidentally delay them to the first request.
180
181
if self.run.common.wasi.nn == Some(true) {
182
#[cfg(not(feature = "wasi-nn"))]
183
{
184
bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
185
}
186
}
187
188
if self.run.common.wasi.threads == Some(true) {
189
bail!("wasi-threads does not support components yet")
190
}
191
192
// The serve command requires both wasi-http and the component model, so
193
// we enable those by default here.
194
if self.run.common.wasi.http.replace(true) == Some(false) {
195
bail!("wasi-http is required for the serve command, and must not be disabled");
196
}
197
if self.run.common.wasm.component_model.replace(true) == Some(false) {
198
bail!("components are required for the serve command, and must not be disabled");
199
}
200
201
let runtime = tokio::runtime::Builder::new_multi_thread()
202
.enable_time()
203
.enable_io()
204
.build()?;
205
206
runtime.block_on(self.serve())?;
207
208
Ok(())
209
}
210
211
fn new_store(&self, engine: &Engine, req_id: Option<u64>) -> Result<Store<Host>> {
212
let mut builder = WasiCtxBuilder::new();
213
self.run.configure_wasip2(&mut builder)?;
214
215
if let Some(req_id) = req_id {
216
builder.env("REQUEST_ID", req_id.to_string());
217
}
218
219
let stdout_prefix: String;
220
let stderr_prefix: String;
221
match req_id {
222
Some(req_id) if !self.no_logging_prefix => {
223
stdout_prefix = format!("stdout [{req_id}] :: ");
224
stderr_prefix = format!("stderr [{req_id}] :: ");
225
}
226
_ => {
227
stdout_prefix = "".to_string();
228
stderr_prefix = "".to_string();
229
}
230
}
231
builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
232
builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
233
234
let mut host = Host {
235
table: wasmtime::component::ResourceTable::new(),
236
ctx: builder.build(),
237
http: WasiHttpCtx::new(),
238
http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
239
http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
240
241
limits: StoreLimits::default(),
242
243
#[cfg(feature = "wasi-nn")]
244
nn: None,
245
#[cfg(feature = "wasi-config")]
246
wasi_config: None,
247
#[cfg(feature = "wasi-keyvalue")]
248
wasi_keyvalue: None,
249
#[cfg(feature = "profiling")]
250
guest_profiler: None,
251
#[cfg(feature = "component-model-async")]
252
p3_http: crate::common::DefaultP3Ctx,
253
};
254
255
if self.run.common.wasi.nn == Some(true) {
256
#[cfg(feature = "wasi-nn")]
257
{
258
let graphs = self
259
.run
260
.common
261
.wasi
262
.nn_graph
263
.iter()
264
.map(|g| (g.format.clone(), g.dir.clone()))
265
.collect::<Vec<_>>();
266
let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
267
host.nn.replace(WasiNnCtx::new(backends, registry));
268
}
269
}
270
271
if self.run.common.wasi.config == Some(true) {
272
#[cfg(feature = "wasi-config")]
273
{
274
let vars = WasiConfigVariables::from_iter(
275
self.run
276
.common
277
.wasi
278
.config_var
279
.iter()
280
.map(|v| (v.key.clone(), v.value.clone())),
281
);
282
host.wasi_config.replace(vars);
283
}
284
}
285
286
if self.run.common.wasi.keyvalue == Some(true) {
287
#[cfg(feature = "wasi-keyvalue")]
288
{
289
let ctx = WasiKeyValueCtxBuilder::new()
290
.in_memory_data(
291
self.run
292
.common
293
.wasi
294
.keyvalue_in_memory_data
295
.iter()
296
.map(|v| (v.key.clone(), v.value.clone())),
297
)
298
.build();
299
host.wasi_keyvalue.replace(ctx);
300
}
301
}
302
303
let mut store = Store::new(engine, host);
304
305
store.data_mut().limits = self.run.store_limits();
306
store.limiter(|t| &mut t.limits);
307
308
// If fuel has been configured, we want to add the configured
309
// fuel amount to this store.
310
if let Some(fuel) = self.run.common.wasm.fuel {
311
store.set_fuel(fuel)?;
312
}
313
314
Ok(store)
315
}
316
317
fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
318
self.run.validate_p3_option()?;
319
let cli = self.run.validate_cli_enabled()?;
320
321
// Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
322
// to serve as a signal to enable all WASI interfaces instead of just
323
// those in the `proxy` world. If `-Scli` is present then add all
324
// `command` APIs and then additionally add in the required HTTP APIs.
325
//
326
// If `-Scli` isn't passed then use the `add_to_linker_async`
327
// bindings which adds just those interfaces that the proxy interface
328
// uses.
329
if cli == Some(true) {
330
self.run.add_wasmtime_wasi_to_linker(linker)?;
331
wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
332
#[cfg(feature = "component-model-async")]
333
if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
334
wasmtime_wasi_http::p3::add_to_linker(linker)?;
335
}
336
} else {
337
wasmtime_wasi_http::add_to_linker_async(linker)?;
338
#[cfg(feature = "component-model-async")]
339
if self.run.common.wasi.p3.unwrap_or(crate::common::P3_DEFAULT) {
340
wasmtime_wasi_http::p3::add_to_linker(linker)?;
341
wasmtime_wasi::p3::clocks::add_to_linker(linker)?;
342
wasmtime_wasi::p3::random::add_to_linker(linker)?;
343
wasmtime_wasi::p3::cli::add_to_linker(linker)?;
344
}
345
}
346
347
if self.run.common.wasi.nn == Some(true) {
348
#[cfg(not(feature = "wasi-nn"))]
349
{
350
bail!("support for wasi-nn was disabled at compile time");
351
}
352
#[cfg(feature = "wasi-nn")]
353
{
354
wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
355
let ctx = h.nn.as_mut().unwrap();
356
wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
357
})?;
358
}
359
}
360
361
if self.run.common.wasi.config == Some(true) {
362
#[cfg(not(feature = "wasi-config"))]
363
{
364
bail!("support for wasi-config was disabled at compile time");
365
}
366
#[cfg(feature = "wasi-config")]
367
{
368
wasmtime_wasi_config::add_to_linker(linker, |h| {
369
WasiConfig::from(h.wasi_config.as_ref().unwrap())
370
})?;
371
}
372
}
373
374
if self.run.common.wasi.keyvalue == Some(true) {
375
#[cfg(not(feature = "wasi-keyvalue"))]
376
{
377
bail!("support for wasi-keyvalue was disabled at compile time");
378
}
379
#[cfg(feature = "wasi-keyvalue")]
380
{
381
wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
382
WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
383
})?;
384
}
385
}
386
387
if self.run.common.wasi.threads == Some(true) {
388
bail!("support for wasi-threads is not available with components");
389
}
390
391
if self.run.common.wasi.http == Some(false) {
392
bail!("support for wasi-http must be enabled for `serve` subcommand");
393
}
394
395
Ok(())
396
}
397
398
async fn serve(mut self) -> Result<()> {
399
use hyper::server::conn::http1;
400
401
let mut config = self
402
.run
403
.common
404
.config(use_pooling_allocator_by_default().unwrap_or(None))?;
405
config.wasm_component_model(true);
406
407
if self.run.common.wasm.timeout.is_some() {
408
config.epoch_interruption(true);
409
}
410
411
match self.run.profile {
412
Some(Profile::Native(s)) => {
413
config.profiler(s);
414
}
415
Some(Profile::Guest { .. }) => {
416
config.epoch_interruption(true);
417
}
418
None => {}
419
}
420
421
let engine = Engine::new(&config)?;
422
let mut linker = Linker::new(&engine);
423
424
self.add_to_linker(&mut linker)?;
425
426
let component = match self.run.load_module(&engine, &self.component)? {
427
RunTarget::Core(_) => bail!("The serve command currently requires a component"),
428
RunTarget::Component(c) => c,
429
};
430
431
let instance = linker.instantiate_pre(&component)?;
432
#[cfg(feature = "component-model-async")]
433
let instance = match wasmtime_wasi_http::p3::bindings::ServicePre::new(instance.clone()) {
434
Ok(pre) => ProxyPre::P3(pre),
435
Err(_) => ProxyPre::P2(p2::ProxyPre::new(instance)?),
436
};
437
#[cfg(not(feature = "component-model-async"))]
438
let instance = ProxyPre::P2(p2::ProxyPre::new(instance)?);
439
440
// Spawn background task(s) waiting for graceful shutdown signals. This
441
// always listens for ctrl-c but additionally can listen for a TCP
442
// connection to the specified address.
443
let shutdown = Arc::new(GracefulShutdown::default());
444
tokio::task::spawn({
445
let shutdown = shutdown.clone();
446
async move {
447
tokio::signal::ctrl_c().await.unwrap();
448
shutdown.requested.notify_one();
449
}
450
});
451
if let Some(addr) = self.shutdown_addr {
452
let listener = tokio::net::TcpListener::bind(addr).await?;
453
eprintln!(
454
"Listening for shutdown on tcp://{}/",
455
listener.local_addr()?
456
);
457
let shutdown = shutdown.clone();
458
tokio::task::spawn(async move {
459
let _ = listener.accept().await;
460
shutdown.requested.notify_one();
461
});
462
}
463
464
let socket = match &self.addr {
465
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
466
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
467
};
468
// Conditionally enable `SO_REUSEADDR` depending on the current
469
// platform. On Unix we want this to be able to rebind an address in
470
// the `TIME_WAIT` state which can happen then a server is killed with
471
// active TCP connections and then restarted. On Windows though if
472
// `SO_REUSEADDR` is specified then it enables multiple applications to
473
// bind the port at the same time which is not something we want. Hence
474
// this is conditionally set based on the platform (and deviates from
475
// Tokio's default from always-on).
476
socket.set_reuseaddr(!cfg!(windows))?;
477
socket.bind(self.addr)?;
478
let listener = socket.listen(100)?;
479
480
eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
481
482
log::info!("Listening on {}", self.addr);
483
484
let epoch_interval = if let Some(Profile::Guest { interval, .. }) = self.run.profile {
485
Some(interval)
486
} else if let Some(t) = self.run.common.wasm.timeout {
487
Some(EPOCH_INTERRUPT_PERIOD.min(t))
488
} else {
489
None
490
};
491
let _epoch_thread = epoch_interval.map(|t| EpochThread::spawn(t, engine.clone()));
492
493
let max_instance_reuse_count = self.max_instance_reuse_count.unwrap_or_else(|| {
494
if let ProxyPre::P3(_) = &instance {
495
DEFAULT_WASIP3_MAX_INSTANCE_REUSE_COUNT
496
} else {
497
DEFAULT_WASIP2_MAX_INSTANCE_REUSE_COUNT
498
}
499
});
500
501
let max_instance_concurrent_reuse_count = if let ProxyPre::P3(_) = &instance {
502
self.max_instance_concurrent_reuse_count
503
.unwrap_or(DEFAULT_WASIP3_MAX_INSTANCE_CONCURRENT_REUSE_COUNT)
504
} else {
505
1
506
};
507
508
let handler = ProxyHandler::new(
509
HostHandlerState {
510
cmd: self,
511
engine,
512
component,
513
max_instance_reuse_count,
514
max_instance_concurrent_reuse_count,
515
},
516
instance,
517
);
518
519
loop {
520
// Wait for a socket, but also "race" against shutdown to break out
521
// of this loop. Once the graceful shutdown signal is received then
522
// this loop exits immediately.
523
let (stream, _) = tokio::select! {
524
_ = shutdown.requested.notified() => break,
525
v = listener.accept() => v?,
526
};
527
528
// The Nagle algorithm can impose a significant latency penalty
529
// (e.g. 40ms on Linux) on guests which write small, intermittent
530
// response body chunks (e.g. SSE streams). Here we disable that
531
// algorithm and rely on the guest to buffer if appropriate to avoid
532
// TCP fragmentation.
533
stream.set_nodelay(true)?;
534
535
let stream = TokioIo::new(stream);
536
let h = handler.clone();
537
let shutdown_guard = shutdown.clone().increment();
538
tokio::task::spawn(async move {
539
if let Err(e) = http1::Builder::new()
540
.keep_alive(true)
541
.serve_connection(
542
stream,
543
hyper::service::service_fn(move |req| {
544
let h = h.clone();
545
async move {
546
use http_body_util::{BodyExt, Full};
547
match handle_request(h, req).await {
548
Ok(r) => Ok::<_, Infallible>(r),
549
Err(e) => {
550
eprintln!("error: {e:?}");
551
let error_html = "\
552
<!doctype html>
553
<html>
554
<head>
555
<title>500 Internal Server Error</title>
556
</head>
557
<body>
558
<center>
559
<h1>500 Internal Server Error</h1>
560
<hr>
561
wasmtime
562
</center>
563
</body>
564
</html>";
565
Ok(Response::builder()
566
.status(StatusCode::INTERNAL_SERVER_ERROR)
567
.header("Content-Type", "text/html; charset=UTF-8")
568
.body(
569
Full::new(bytes::Bytes::from(error_html))
570
.map_err(|_| unreachable!())
571
.boxed_unsync(),
572
)
573
.unwrap())
574
}
575
}
576
}
577
}),
578
)
579
.await
580
{
581
eprintln!("error: {e:?}");
582
}
583
drop(shutdown_guard);
584
});
585
}
586
587
// Upon exiting the loop we'll no longer process any more incoming
588
// connections but there may still be outstanding connections
589
// processing in child tasks. If there are wait for those to complete
590
// before shutting down completely. Also enable short-circuiting this
591
// wait with a second ctrl-c signal.
592
if shutdown.close() {
593
return Ok(());
594
}
595
eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
596
tokio::select! {
597
_ = tokio::signal::ctrl_c() => {}
598
_ = shutdown.complete.notified() => {}
599
}
600
601
Ok(())
602
}
603
}
604
605
struct HostHandlerState {
606
cmd: ServeCommand,
607
engine: Engine,
608
component: Component,
609
max_instance_reuse_count: usize,
610
max_instance_concurrent_reuse_count: usize,
611
}
612
613
impl HandlerState for HostHandlerState {
614
type StoreData = Host;
615
616
fn new_store(&self, req_id: Option<u64>) -> Result<StoreBundle<Host>> {
617
let mut store = self.cmd.new_store(&self.engine, req_id)?;
618
let write_profile = setup_epoch_handler(&self.cmd, &mut store, self.component.clone())?;
619
620
Ok(StoreBundle {
621
store,
622
write_profile,
623
})
624
}
625
626
fn request_timeout(&self) -> Duration {
627
self.cmd.run.common.wasm.timeout.unwrap_or(Duration::MAX)
628
}
629
630
fn idle_instance_timeout(&self) -> Duration {
631
self.cmd.idle_instance_timeout
632
}
633
634
fn max_instance_reuse_count(&self) -> usize {
635
self.max_instance_reuse_count
636
}
637
638
fn max_instance_concurrent_reuse_count(&self) -> usize {
639
self.max_instance_concurrent_reuse_count
640
}
641
642
fn handle_worker_error(&self, error: wasmtime::Error) {
643
eprintln!("worker error: {error}");
644
}
645
}
646
647
/// Helper structure to manage graceful shutdown int he accept loop above.
648
#[derive(Default)]
649
struct GracefulShutdown {
650
/// Async notification that shutdown has been requested.
651
requested: Notify,
652
/// Async notification that shutdown has completed, signaled when
653
/// `notify_when_done` is `true` and `active_tasks` reaches 0.
654
complete: Notify,
655
/// Internal state related to what's in progress when shutdown is requested.
656
state: Mutex<GracefulShutdownState>,
657
}
658
659
#[derive(Default)]
660
struct GracefulShutdownState {
661
active_tasks: u32,
662
notify_when_done: bool,
663
}
664
665
impl GracefulShutdown {
666
/// Increments the number of active tasks and returns a guard indicating
667
fn increment(self: Arc<Self>) -> impl Drop {
668
struct Guard(Arc<GracefulShutdown>);
669
670
let mut state = self.state.lock().unwrap();
671
assert!(!state.notify_when_done);
672
state.active_tasks += 1;
673
drop(state);
674
675
return Guard(self);
676
677
impl Drop for Guard {
678
fn drop(&mut self) {
679
let mut state = self.0.state.lock().unwrap();
680
state.active_tasks -= 1;
681
if state.notify_when_done && state.active_tasks == 0 {
682
self.0.complete.notify_one();
683
}
684
}
685
}
686
}
687
688
/// Flags this state as done spawning tasks and returns whether there are no
689
/// more child tasks remaining.
690
fn close(&self) -> bool {
691
let mut state = self.state.lock().unwrap();
692
state.notify_when_done = true;
693
state.active_tasks == 0
694
}
695
}
696
697
/// When executing with a timeout enabled, this is how frequently epoch
698
/// interrupts will be executed to check for timeouts. If guest profiling
699
/// is enabled, the guest epoch period will be used.
700
const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
701
702
struct EpochThread {
703
shutdown: Arc<AtomicBool>,
704
handle: Option<std::thread::JoinHandle<()>>,
705
}
706
707
impl EpochThread {
708
fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
709
let shutdown = Arc::new(AtomicBool::new(false));
710
let handle = {
711
let shutdown = Arc::clone(&shutdown);
712
let handle = std::thread::spawn(move || {
713
while !shutdown.load(Ordering::Relaxed) {
714
std::thread::sleep(interval);
715
engine.increment_epoch();
716
}
717
});
718
Some(handle)
719
};
720
721
EpochThread { shutdown, handle }
722
}
723
}
724
725
impl Drop for EpochThread {
726
fn drop(&mut self) {
727
if let Some(handle) = self.handle.take() {
728
self.shutdown.store(true, Ordering::Relaxed);
729
handle.join().unwrap();
730
}
731
}
732
}
733
734
type WriteProfile = Box<dyn FnOnce(StoreContextMut<Host>) + Send>;
735
736
fn setup_epoch_handler(
737
cmd: &ServeCommand,
738
store: &mut Store<Host>,
739
component: Component,
740
) -> Result<WriteProfile> {
741
// Profiling Enabled
742
if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
743
#[cfg(feature = "profiling")]
744
return setup_guest_profiler(store, path.clone(), *interval, component.clone());
745
#[cfg(not(feature = "profiling"))]
746
{
747
let _ = (path, interval);
748
bail!("support for profiling disabled at compile time!");
749
}
750
}
751
752
// Profiling disabled but there's a global request timeout
753
if cmd.run.common.wasm.timeout.is_some() {
754
store.epoch_deadline_async_yield_and_update(1);
755
}
756
757
Ok(Box::new(|_store| {}))
758
}
759
760
#[cfg(feature = "profiling")]
761
fn setup_guest_profiler(
762
store: &mut Store<Host>,
763
path: String,
764
interval: Duration,
765
component: Component,
766
) -> Result<WriteProfile> {
767
use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
768
769
let module_name = "<main>";
770
771
store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
772
store.engine(),
773
module_name,
774
interval,
775
component,
776
std::iter::empty(),
777
)?));
778
779
fn sample(
780
mut store: StoreContextMut<Host>,
781
f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
782
) {
783
let mut profiler = store.data_mut().guest_profiler.take().unwrap();
784
f(
785
Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
786
store.as_context(),
787
);
788
store.data_mut().guest_profiler = Some(profiler);
789
}
790
791
// Hostcall entry/exit, etc.
792
store.call_hook(|store, kind| {
793
sample(store, |profiler, store| profiler.call_hook(store, kind));
794
Ok(())
795
});
796
797
store.epoch_deadline_callback(move |store| {
798
sample(store, |profiler, store| {
799
profiler.sample(store, std::time::Duration::ZERO)
800
});
801
802
Ok(UpdateDeadline::Continue(1))
803
});
804
805
store.set_epoch_deadline(1);
806
807
let write_profile = Box::new(move |mut store: StoreContextMut<Host>| {
808
let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
809
.expect("profiling doesn't support threads yet");
810
if let Err(e) = std::fs::File::create(&path)
811
.map_err(wasmtime::Error::new)
812
.and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
813
{
814
eprintln!("failed writing profile at {path}: {e:#}");
815
} else {
816
eprintln!();
817
eprintln!("Profile written to: {path}");
818
eprintln!("View this profile at https://profiler.firefox.com/.");
819
}
820
});
821
822
Ok(write_profile)
823
}
824
825
type Request = hyper::Request<hyper::body::Incoming>;
826
827
async fn handle_request(
828
handler: ProxyHandler<HostHandlerState>,
829
req: Request,
830
) -> Result<hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>> {
831
use tokio::sync::oneshot;
832
833
let req_id = handler.next_req_id();
834
835
log::info!(
836
"Request {req_id} handling {} to {}",
837
req.method(),
838
req.uri()
839
);
840
841
// Here we must declare different channel types for p2 and p3 since p2's
842
// `WasiHttpView::new_response_outparam` expects a specific kind of sender
843
// that uses `p2::http::types::ErrorCode`, and we don't want to have to
844
// convert from the p3 `ErrorCode` to the p2 one, only to convert again to
845
// `wasmtime::Error`.
846
847
type P2Response = Result<
848
hyper::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
849
p2::http::types::ErrorCode,
850
>;
851
type P3Response = hyper::Response<UnsyncBoxBody<Bytes, wasmtime::Error>>;
852
853
enum Sender {
854
P2(oneshot::Sender<P2Response>),
855
P3(oneshot::Sender<P3Response>),
856
}
857
858
enum Receiver {
859
P2(oneshot::Receiver<P2Response>),
860
P3(oneshot::Receiver<P3Response>),
861
}
862
863
let (tx, rx) = match handler.instance_pre() {
864
ProxyPre::P2(_) => {
865
let (tx, rx) = oneshot::channel();
866
(Sender::P2(tx), Receiver::P2(rx))
867
}
868
ProxyPre::P3(_) => {
869
let (tx, rx) = oneshot::channel();
870
(Sender::P3(tx), Receiver::P3(rx))
871
}
872
};
873
874
handler.spawn(
875
if handler.state().max_instance_reuse_count() == 1 {
876
Some(req_id)
877
} else {
878
None
879
},
880
Box::new(move |store, proxy| {
881
Box::pin(
882
async move {
883
match proxy {
884
Proxy::P2(proxy) => {
885
let Sender::P2(tx) = tx else { unreachable!() };
886
let (req, out) = store.with(move |mut store| {
887
let req = store
888
.data_mut()
889
.new_incoming_request(p2::http::types::Scheme::Http, req)?;
890
let out = store.data_mut().new_response_outparam(tx)?;
891
wasmtime::error::Ok((req, out))
892
})?;
893
894
proxy
895
.wasi_http_incoming_handler()
896
.call_handle(store, req, out)
897
.await
898
}
899
Proxy::P3(proxy) => {
900
use wasmtime_wasi_http::p3::bindings::http::types::{
901
ErrorCode, Request,
902
};
903
904
let Sender::P3(tx) = tx else { unreachable!() };
905
let (req, body) = req.into_parts();
906
let body = body.map_err(ErrorCode::from_hyper_request_error);
907
let req = http::Request::from_parts(req, body);
908
let (request, request_io_result) = Request::from_http(req);
909
let (res, task) = proxy.handle(store, request).await??;
910
let res = store
911
.with(|mut store| res.into_http(&mut store, request_io_result))?;
912
_ = tx.send(res.map(|body| body.map_err(|e| e.into()).boxed_unsync()));
913
914
// Wait for the task to finish.
915
task.block(store).await;
916
Ok(())
917
}
918
}
919
}
920
.map(move |result| {
921
if let Err(error) = result {
922
eprintln!("[{req_id}] :: {error:?}");
923
}
924
}),
925
)
926
}),
927
);
928
929
Ok(match rx {
930
Receiver::P2(rx) => rx
931
.await
932
.context("guest never invoked `response-outparam::set` method")?
933
.map_err(|e| wasmtime::Error::from(e))?
934
.map(|body| body.map_err(|e| e.into()).boxed_unsync()),
935
Receiver::P3(rx) => rx.await?,
936
})
937
}
938
939
#[derive(Clone)]
940
enum Output {
941
Stdout,
942
Stderr,
943
}
944
945
impl Output {
946
fn write_all(&self, buf: &[u8]) -> io::Result<()> {
947
use std::io::Write;
948
949
match self {
950
Output::Stdout => std::io::stdout().write_all(buf),
951
Output::Stderr => std::io::stderr().write_all(buf),
952
}
953
}
954
}
955
956
#[derive(Clone)]
957
struct LogStream {
958
output: Output,
959
state: Arc<LogStreamState>,
960
}
961
962
struct LogStreamState {
963
prefix: String,
964
needs_prefix_on_next_write: AtomicBool,
965
}
966
967
impl LogStream {
968
fn new(prefix: String, output: Output) -> LogStream {
969
LogStream {
970
output,
971
state: Arc::new(LogStreamState {
972
prefix,
973
needs_prefix_on_next_write: AtomicBool::new(true),
974
}),
975
}
976
}
977
978
fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
979
while !bytes.is_empty() {
980
if self
981
.state
982
.needs_prefix_on_next_write
983
.load(Ordering::Relaxed)
984
{
985
self.output.write_all(self.state.prefix.as_bytes())?;
986
self.state
987
.needs_prefix_on_next_write
988
.store(false, Ordering::Relaxed);
989
}
990
match bytes.iter().position(|b| *b == b'\n') {
991
Some(i) => {
992
let (a, b) = bytes.split_at(i + 1);
993
bytes = b;
994
self.output.write_all(a)?;
995
self.state
996
.needs_prefix_on_next_write
997
.store(true, Ordering::Relaxed);
998
}
999
None => {
1000
self.output.write_all(bytes)?;
1001
break;
1002
}
1003
}
1004
}
1005
1006
Ok(())
1007
}
1008
}
1009
1010
impl wasmtime_wasi::cli::StdoutStream for LogStream {
1011
fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
1012
Box::new(self.clone())
1013
}
1014
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
1015
Box::new(self.clone())
1016
}
1017
}
1018
1019
impl wasmtime_wasi::cli::IsTerminal for LogStream {
1020
fn is_terminal(&self) -> bool {
1021
match &self.output {
1022
Output::Stdout => std::io::stdout().is_terminal(),
1023
Output::Stderr => std::io::stderr().is_terminal(),
1024
}
1025
}
1026
}
1027
1028
impl wasmtime_wasi::p2::OutputStream for LogStream {
1029
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
1030
self.write_all(&bytes)
1031
.map_err(|e| StreamError::LastOperationFailed(e.into()))?;
1032
Ok(())
1033
}
1034
1035
fn flush(&mut self) -> StreamResult<()> {
1036
Ok(())
1037
}
1038
1039
fn check_write(&mut self) -> StreamResult<usize> {
1040
Ok(1024 * 1024)
1041
}
1042
}
1043
1044
#[async_trait::async_trait]
1045
impl wasmtime_wasi::p2::Pollable for LogStream {
1046
async fn ready(&mut self) {}
1047
}
1048
1049
impl AsyncWrite for LogStream {
1050
fn poll_write(
1051
mut self: Pin<&mut Self>,
1052
_cx: &mut Context<'_>,
1053
buf: &[u8],
1054
) -> Poll<io::Result<usize>> {
1055
Poll::Ready(self.write_all(buf).map(|_| buf.len()))
1056
}
1057
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1058
Poll::Ready(Ok(()))
1059
}
1060
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1061
Poll::Ready(Ok(()))
1062
}
1063
}
1064
1065
/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
1066
/// try to use it when we can. The main cost of the pooling allocator, however,
1067
/// is the virtual memory required to run it. Not all systems support the same
1068
/// amount of virtual memory, for example some aarch64 and riscv64 configuration
1069
/// only support 39 bits of virtual address space.
1070
///
1071
/// The pooling allocator, by default, will request 1000 linear memories each
1072
/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
1073
/// being about 42 bits of the address space. This exceeds the 39 bit limit of
1074
/// some systems, so there the pooling allocator will fail by default.
1075
///
1076
/// This function attempts to dynamically determine the hint for the pooling
1077
/// allocator. This returns `Some(true)` if the pooling allocator should be used
1078
/// by default, or `None` or an error otherwise.
1079
///
1080
/// The method for testing this is to allocate a 0-sized 64-bit linear memory
1081
/// with a maximum size that's N bits large where we force all memories to be
1082
/// static. This should attempt to acquire N bits of the virtual address space.
1083
/// If successful that should mean that the pooling allocator is OK to use, but
1084
/// if it fails then the pooling allocator is not used and the normal mmap-based
1085
/// implementation is used instead.
1086
fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
1087
use wasmtime::{Config, Memory, MemoryType};
1088
const BITS_TO_TEST: u32 = 42;
1089
let mut config = Config::new();
1090
config.wasm_memory64(true);
1091
config.memory_reservation(1 << BITS_TO_TEST);
1092
let engine = Engine::new(&config)?;
1093
let mut store = Store::new(&engine, ());
1094
// NB: the maximum size is in wasm pages to take out the 16-bits of wasm
1095
// page size here from the maximum size.
1096
let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
1097
if Memory::new(&mut store, ty).is_ok() {
1098
Ok(Some(true))
1099
} else {
1100
Ok(None)
1101
}
1102
}
1103
1104