Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/src/commands/serve.rs
1692 views
1
use crate::common::{Profile, RunCommon, RunTarget};
2
use anyhow::{Result, bail};
3
use clap::Parser;
4
use http::{Response, StatusCode};
5
use std::convert::Infallible;
6
use std::net::SocketAddr;
7
use std::pin::Pin;
8
use std::task::{Context, Poll};
9
use std::time::Instant;
10
use std::{
11
path::PathBuf,
12
sync::{
13
Arc, Mutex,
14
atomic::{AtomicBool, AtomicU64, Ordering},
15
},
16
time::Duration,
17
};
18
use tokio::io::{self, AsyncWrite};
19
use tokio::sync::Notify;
20
use wasmtime::component::{Component, Linker, ResourceTable};
21
use wasmtime::{Engine, Store, StoreLimits, UpdateDeadline};
22
use wasmtime_wasi::p2::{StreamError, StreamResult};
23
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
24
use wasmtime_wasi_http::bindings::ProxyPre;
25
use wasmtime_wasi_http::bindings::http::types::{ErrorCode, Scheme};
26
use wasmtime_wasi_http::io::TokioIo;
27
use wasmtime_wasi_http::{
28
DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS, DEFAULT_OUTGOING_BODY_CHUNK_SIZE, WasiHttpCtx,
29
WasiHttpView, body::HyperOutgoingBody,
30
};
31
32
#[cfg(feature = "wasi-config")]
33
use wasmtime_wasi_config::{WasiConfig, WasiConfigVariables};
34
#[cfg(feature = "wasi-keyvalue")]
35
use wasmtime_wasi_keyvalue::{WasiKeyValue, WasiKeyValueCtx, WasiKeyValueCtxBuilder};
36
#[cfg(feature = "wasi-nn")]
37
use wasmtime_wasi_nn::wit::WasiNnCtx;
38
39
struct Host {
40
table: wasmtime::component::ResourceTable,
41
ctx: WasiCtx,
42
http: WasiHttpCtx,
43
http_outgoing_body_buffer_chunks: Option<usize>,
44
http_outgoing_body_chunk_size: Option<usize>,
45
46
limits: StoreLimits,
47
48
#[cfg(feature = "wasi-nn")]
49
nn: Option<WasiNnCtx>,
50
51
#[cfg(feature = "wasi-config")]
52
wasi_config: Option<WasiConfigVariables>,
53
54
#[cfg(feature = "wasi-keyvalue")]
55
wasi_keyvalue: Option<WasiKeyValueCtx>,
56
57
#[cfg(feature = "profiling")]
58
guest_profiler: Option<Arc<wasmtime::GuestProfiler>>,
59
}
60
61
impl WasiView for Host {
62
fn ctx(&mut self) -> WasiCtxView<'_> {
63
WasiCtxView {
64
ctx: &mut self.ctx,
65
table: &mut self.table,
66
}
67
}
68
}
69
70
impl WasiHttpView for Host {
71
fn ctx(&mut self) -> &mut WasiHttpCtx {
72
&mut self.http
73
}
74
fn table(&mut self) -> &mut ResourceTable {
75
&mut self.table
76
}
77
78
fn outgoing_body_buffer_chunks(&mut self) -> usize {
79
self.http_outgoing_body_buffer_chunks
80
.unwrap_or_else(|| DEFAULT_OUTGOING_BODY_BUFFER_CHUNKS)
81
}
82
83
fn outgoing_body_chunk_size(&mut self) -> usize {
84
self.http_outgoing_body_chunk_size
85
.unwrap_or_else(|| DEFAULT_OUTGOING_BODY_CHUNK_SIZE)
86
}
87
}
88
89
const DEFAULT_ADDR: std::net::SocketAddr = std::net::SocketAddr::new(
90
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
91
8080,
92
);
93
94
/// Runs a WebAssembly module
95
#[derive(Parser)]
96
pub struct ServeCommand {
97
#[command(flatten)]
98
run: RunCommon,
99
100
/// Socket address for the web server to bind to.
101
#[arg(long , value_name = "SOCKADDR", default_value_t = DEFAULT_ADDR)]
102
addr: SocketAddr,
103
104
/// Socket address where, when connected to, will initiate a graceful
105
/// shutdown.
106
///
107
/// Note that graceful shutdown is also supported on ctrl-c.
108
#[arg(long, value_name = "SOCKADDR")]
109
shutdown_addr: Option<SocketAddr>,
110
111
/// Disable log prefixes of wasi-http handlers.
112
/// if unspecified, logs will be prefixed with 'stdout|stderr [{req_id}] :: '
113
#[arg(long)]
114
no_logging_prefix: bool,
115
116
/// The WebAssembly component to run.
117
#[arg(value_name = "WASM", required = true)]
118
component: PathBuf,
119
}
120
121
impl ServeCommand {
122
/// Start a server to run the given wasi-http proxy component
123
pub fn execute(mut self) -> Result<()> {
124
self.run.common.init_logging()?;
125
126
// We force cli errors before starting to listen for connections so then
127
// we don't accidentally delay them to the first request.
128
129
if self.run.common.wasi.nn == Some(true) {
130
#[cfg(not(feature = "wasi-nn"))]
131
{
132
bail!("Cannot enable wasi-nn when the binary is not compiled with this feature.");
133
}
134
}
135
136
if self.run.common.wasi.threads == Some(true) {
137
bail!("wasi-threads does not support components yet")
138
}
139
140
// The serve command requires both wasi-http and the component model, so
141
// we enable those by default here.
142
if self.run.common.wasi.http.replace(true) == Some(false) {
143
bail!("wasi-http is required for the serve command, and must not be disabled");
144
}
145
if self.run.common.wasm.component_model.replace(true) == Some(false) {
146
bail!("components are required for the serve command, and must not be disabled");
147
}
148
149
let runtime = tokio::runtime::Builder::new_multi_thread()
150
.enable_time()
151
.enable_io()
152
.build()?;
153
154
runtime.block_on(self.serve())?;
155
156
Ok(())
157
}
158
159
fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
160
let mut builder = WasiCtxBuilder::new();
161
self.run.configure_wasip2(&mut builder)?;
162
163
builder.env("REQUEST_ID", req_id.to_string());
164
165
let stdout_prefix: String;
166
let stderr_prefix: String;
167
if self.no_logging_prefix {
168
stdout_prefix = "".to_string();
169
stderr_prefix = "".to_string();
170
} else {
171
stdout_prefix = format!("stdout [{req_id}] :: ");
172
stderr_prefix = format!("stderr [{req_id}] :: ");
173
}
174
builder.stdout(LogStream::new(stdout_prefix, Output::Stdout));
175
builder.stderr(LogStream::new(stderr_prefix, Output::Stderr));
176
177
let mut host = Host {
178
table: wasmtime::component::ResourceTable::new(),
179
ctx: builder.build(),
180
http: WasiHttpCtx::new(),
181
http_outgoing_body_buffer_chunks: self.run.common.wasi.http_outgoing_body_buffer_chunks,
182
http_outgoing_body_chunk_size: self.run.common.wasi.http_outgoing_body_chunk_size,
183
184
limits: StoreLimits::default(),
185
186
#[cfg(feature = "wasi-nn")]
187
nn: None,
188
#[cfg(feature = "wasi-config")]
189
wasi_config: None,
190
#[cfg(feature = "wasi-keyvalue")]
191
wasi_keyvalue: None,
192
#[cfg(feature = "profiling")]
193
guest_profiler: None,
194
};
195
196
if self.run.common.wasi.nn == Some(true) {
197
#[cfg(feature = "wasi-nn")]
198
{
199
let graphs = self
200
.run
201
.common
202
.wasi
203
.nn_graph
204
.iter()
205
.map(|g| (g.format.clone(), g.dir.clone()))
206
.collect::<Vec<_>>();
207
let (backends, registry) = wasmtime_wasi_nn::preload(&graphs)?;
208
host.nn.replace(WasiNnCtx::new(backends, registry));
209
}
210
}
211
212
if self.run.common.wasi.config == Some(true) {
213
#[cfg(feature = "wasi-config")]
214
{
215
let vars = WasiConfigVariables::from_iter(
216
self.run
217
.common
218
.wasi
219
.config_var
220
.iter()
221
.map(|v| (v.key.clone(), v.value.clone())),
222
);
223
host.wasi_config.replace(vars);
224
}
225
}
226
227
if self.run.common.wasi.keyvalue == Some(true) {
228
#[cfg(feature = "wasi-keyvalue")]
229
{
230
let ctx = WasiKeyValueCtxBuilder::new()
231
.in_memory_data(
232
self.run
233
.common
234
.wasi
235
.keyvalue_in_memory_data
236
.iter()
237
.map(|v| (v.key.clone(), v.value.clone())),
238
)
239
.build();
240
host.wasi_keyvalue.replace(ctx);
241
}
242
}
243
244
let mut store = Store::new(engine, host);
245
246
store.data_mut().limits = self.run.store_limits();
247
store.limiter(|t| &mut t.limits);
248
249
// If fuel has been configured, we want to add the configured
250
// fuel amount to this store.
251
if let Some(fuel) = self.run.common.wasm.fuel {
252
store.set_fuel(fuel)?;
253
}
254
255
Ok(store)
256
}
257
258
fn add_to_linker(&self, linker: &mut Linker<Host>) -> Result<()> {
259
self.run.validate_p3_option()?;
260
let cli = self.run.validate_cli_enabled()?;
261
262
// Repurpose the `-Scli` flag of `wasmtime run` for `wasmtime serve`
263
// to serve as a signal to enable all WASI interfaces instead of just
264
// those in the `proxy` world. If `-Scli` is present then add all
265
// `command` APIs and then additionally add in the required HTTP APIs.
266
//
267
// If `-Scli` isn't passed then use the `add_to_linker_async`
268
// bindings which adds just those interfaces that the proxy interface
269
// uses.
270
if cli == Some(true) {
271
self.run.add_wasmtime_wasi_to_linker(linker)?;
272
wasmtime_wasi_http::add_only_http_to_linker_async(linker)?;
273
} else {
274
wasmtime_wasi_http::add_to_linker_async(linker)?;
275
}
276
277
if self.run.common.wasi.nn == Some(true) {
278
#[cfg(not(feature = "wasi-nn"))]
279
{
280
bail!("support for wasi-nn was disabled at compile time");
281
}
282
#[cfg(feature = "wasi-nn")]
283
{
284
wasmtime_wasi_nn::wit::add_to_linker(linker, |h: &mut Host| {
285
let ctx = h.nn.as_mut().unwrap();
286
wasmtime_wasi_nn::wit::WasiNnView::new(&mut h.table, ctx)
287
})?;
288
}
289
}
290
291
if self.run.common.wasi.config == Some(true) {
292
#[cfg(not(feature = "wasi-config"))]
293
{
294
bail!("support for wasi-config was disabled at compile time");
295
}
296
#[cfg(feature = "wasi-config")]
297
{
298
wasmtime_wasi_config::add_to_linker(linker, |h| {
299
WasiConfig::from(h.wasi_config.as_ref().unwrap())
300
})?;
301
}
302
}
303
304
if self.run.common.wasi.keyvalue == Some(true) {
305
#[cfg(not(feature = "wasi-keyvalue"))]
306
{
307
bail!("support for wasi-keyvalue was disabled at compile time");
308
}
309
#[cfg(feature = "wasi-keyvalue")]
310
{
311
wasmtime_wasi_keyvalue::add_to_linker(linker, |h: &mut Host| {
312
WasiKeyValue::new(h.wasi_keyvalue.as_ref().unwrap(), &mut h.table)
313
})?;
314
}
315
}
316
317
if self.run.common.wasi.threads == Some(true) {
318
bail!("support for wasi-threads is not available with components");
319
}
320
321
if self.run.common.wasi.http == Some(false) {
322
bail!("support for wasi-http must be enabled for `serve` subcommand");
323
}
324
325
Ok(())
326
}
327
328
async fn serve(mut self) -> Result<()> {
329
use hyper::server::conn::http1;
330
331
let mut config = self
332
.run
333
.common
334
.config(use_pooling_allocator_by_default().unwrap_or(None))?;
335
config.wasm_component_model(true);
336
config.async_support(true);
337
338
if self.run.common.wasm.timeout.is_some() {
339
config.epoch_interruption(true);
340
}
341
342
match self.run.profile {
343
Some(Profile::Native(s)) => {
344
config.profiler(s);
345
}
346
Some(Profile::Guest { .. }) => {
347
config.epoch_interruption(true);
348
}
349
None => {}
350
}
351
352
let engine = Engine::new(&config)?;
353
let mut linker = Linker::new(&engine);
354
355
self.add_to_linker(&mut linker)?;
356
357
let component = match self.run.load_module(&engine, &self.component)? {
358
RunTarget::Core(_) => bail!("The serve command currently requires a component"),
359
RunTarget::Component(c) => c,
360
};
361
362
let instance = linker.instantiate_pre(&component)?;
363
let instance = ProxyPre::new(instance)?;
364
365
// Spawn background task(s) waiting for graceful shutdown signals. This
366
// always listens for ctrl-c but additionally can listen for a TCP
367
// connection to the specified address.
368
let shutdown = Arc::new(GracefulShutdown::default());
369
tokio::task::spawn({
370
let shutdown = shutdown.clone();
371
async move {
372
tokio::signal::ctrl_c().await.unwrap();
373
shutdown.requested.notify_one();
374
}
375
});
376
if let Some(addr) = self.shutdown_addr {
377
let listener = tokio::net::TcpListener::bind(addr).await?;
378
eprintln!(
379
"Listening for shutdown on tcp://{}/",
380
listener.local_addr()?
381
);
382
let shutdown = shutdown.clone();
383
tokio::task::spawn(async move {
384
let _ = listener.accept().await;
385
shutdown.requested.notify_one();
386
});
387
}
388
389
let socket = match &self.addr {
390
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
391
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
392
};
393
// Conditionally enable `SO_REUSEADDR` depending on the current
394
// platform. On Unix we want this to be able to rebind an address in
395
// the `TIME_WAIT` state which can happen then a server is killed with
396
// active TCP connections and then restarted. On Windows though if
397
// `SO_REUSEADDR` is specified then it enables multiple applications to
398
// bind the port at the same time which is not something we want. Hence
399
// this is conditionally set based on the platform (and deviates from
400
// Tokio's default from always-on).
401
socket.set_reuseaddr(!cfg!(windows))?;
402
socket.bind(self.addr)?;
403
let listener = socket.listen(100)?;
404
405
eprintln!("Serving HTTP on http://{}/", listener.local_addr()?);
406
407
log::info!("Listening on {}", self.addr);
408
409
let handler = ProxyHandler::new(self, engine, instance);
410
411
loop {
412
// Wait for a socket, but also "race" against shutdown to break out
413
// of this loop. Once the graceful shutdown signal is received then
414
// this loop exits immediately.
415
let (stream, _) = tokio::select! {
416
_ = shutdown.requested.notified() => break,
417
v = listener.accept() => v?,
418
};
419
let comp = component.clone();
420
let stream = TokioIo::new(stream);
421
let h = handler.clone();
422
let shutdown_guard = shutdown.clone().increment();
423
tokio::task::spawn(async move {
424
if let Err(e) = http1::Builder::new()
425
.keep_alive(true)
426
.serve_connection(
427
stream,
428
hyper::service::service_fn(move |req| {
429
let comp = comp.clone();
430
let h = h.clone();
431
async move {
432
use http_body_util::{BodyExt, Full};
433
fn to_errorcode(_: Infallible) -> ErrorCode {
434
unreachable!()
435
}
436
match handle_request(h, req, comp).await {
437
Ok(r) => Ok::<_, Infallible>(r),
438
Err(e) => {
439
eprintln!("error: {e:?}");
440
let error_html = "\
441
<!doctype html>
442
<html>
443
<head>
444
<title>500 Internal Server Error</title>
445
</head>
446
<body>
447
<center>
448
<h1>500 Internal Server Error</h1>
449
<hr>
450
wasmtime
451
</center>
452
</body>
453
</html>";
454
Ok(Response::builder()
455
.status(StatusCode::INTERNAL_SERVER_ERROR)
456
.header("Content-Type", "text/html; charset=UTF-8")
457
.body(
458
Full::new(bytes::Bytes::from(error_html))
459
.map_err(to_errorcode)
460
.boxed(),
461
)
462
.unwrap())
463
}
464
}
465
}
466
}),
467
)
468
.await
469
{
470
eprintln!("error: {e:?}");
471
}
472
drop(shutdown_guard);
473
});
474
}
475
476
// Upon exiting the loop we'll no longer process any more incoming
477
// connections but there may still be outstanding connections
478
// processing in child tasks. If there are wait for those to complete
479
// before shutting down completely. Also enable short-circuiting this
480
// wait with a second ctrl-c signal.
481
if shutdown.close() {
482
return Ok(());
483
}
484
eprintln!("Waiting for child tasks to exit, ctrl-c again to quit sooner...");
485
tokio::select! {
486
_ = tokio::signal::ctrl_c() => {}
487
_ = shutdown.complete.notified() => {}
488
}
489
490
Ok(())
491
}
492
}
493
494
/// Helper structure to manage graceful shutdown int he accept loop above.
495
#[derive(Default)]
496
struct GracefulShutdown {
497
/// Async notification that shutdown has been requested.
498
requested: Notify,
499
/// Async notification that shutdown has completed, signaled when
500
/// `notify_when_done` is `true` and `active_tasks` reaches 0.
501
complete: Notify,
502
/// Internal state related to what's in progress when shutdown is requested.
503
state: Mutex<GracefulShutdownState>,
504
}
505
506
#[derive(Default)]
507
struct GracefulShutdownState {
508
active_tasks: u32,
509
notify_when_done: bool,
510
}
511
512
impl GracefulShutdown {
513
/// Increments the number of active tasks and returns a guard indicating
514
fn increment(self: Arc<Self>) -> impl Drop {
515
struct Guard(Arc<GracefulShutdown>);
516
517
let mut state = self.state.lock().unwrap();
518
assert!(!state.notify_when_done);
519
state.active_tasks += 1;
520
drop(state);
521
522
return Guard(self);
523
524
impl Drop for Guard {
525
fn drop(&mut self) {
526
let mut state = self.0.state.lock().unwrap();
527
state.active_tasks -= 1;
528
if state.notify_when_done && state.active_tasks == 0 {
529
self.0.complete.notify_one();
530
}
531
}
532
}
533
}
534
535
/// Flags this state as done spawning tasks and returns whether there are no
536
/// more child tasks remaining.
537
fn close(&self) -> bool {
538
let mut state = self.state.lock().unwrap();
539
state.notify_when_done = true;
540
state.active_tasks == 0
541
}
542
}
543
544
/// When executing with a timeout enabled, this is how frequently epoch
545
/// interrupts will be executed to check for timeouts. If guest profiling
546
/// is enabled, the guest epoch period will be used.
547
const EPOCH_INTERRUPT_PERIOD: Duration = Duration::from_millis(50);
548
549
struct EpochThread {
550
shutdown: Arc<AtomicBool>,
551
handle: Option<std::thread::JoinHandle<()>>,
552
}
553
554
impl EpochThread {
555
fn spawn(interval: std::time::Duration, engine: Engine) -> Self {
556
let shutdown = Arc::new(AtomicBool::new(false));
557
let handle = {
558
let shutdown = Arc::clone(&shutdown);
559
let handle = std::thread::spawn(move || {
560
while !shutdown.load(Ordering::Relaxed) {
561
std::thread::sleep(interval);
562
engine.increment_epoch();
563
}
564
});
565
Some(handle)
566
};
567
568
EpochThread { shutdown, handle }
569
}
570
}
571
572
impl Drop for EpochThread {
573
fn drop(&mut self) {
574
if let Some(handle) = self.handle.take() {
575
self.shutdown.store(true, Ordering::Relaxed);
576
handle.join().unwrap();
577
}
578
}
579
}
580
581
type WriteProfile = Box<dyn FnOnce(&mut Store<Host>) + Send>;
582
583
fn setup_epoch_handler(
584
cmd: &ServeCommand,
585
store: &mut Store<Host>,
586
component: Component,
587
) -> Result<(WriteProfile, Option<EpochThread>)> {
588
// Profiling Enabled
589
if let Some(Profile::Guest { interval, path }) = &cmd.run.profile {
590
#[cfg(feature = "profiling")]
591
return setup_guest_profiler(cmd, store, path.clone(), *interval, component.clone());
592
#[cfg(not(feature = "profiling"))]
593
{
594
let _ = (path, interval);
595
bail!("support for profiling disabled at compile time!");
596
}
597
}
598
599
// Profiling disabled but there's a global request timeout
600
let epoch_thread = if let Some(timeout) = cmd.run.common.wasm.timeout {
601
let start = Instant::now();
602
store.epoch_deadline_callback(move |_store| {
603
if start.elapsed() > timeout {
604
bail!("Timeout expired");
605
}
606
Ok(UpdateDeadline::Continue(1))
607
});
608
store.set_epoch_deadline(1);
609
let engine = store.engine().clone();
610
Some(EpochThread::spawn(EPOCH_INTERRUPT_PERIOD, engine))
611
} else {
612
None
613
};
614
615
Ok((Box::new(|_store| {}), epoch_thread))
616
}
617
618
#[cfg(feature = "profiling")]
619
fn setup_guest_profiler(
620
cmd: &ServeCommand,
621
store: &mut Store<Host>,
622
path: String,
623
interval: Duration,
624
component: Component,
625
) -> Result<(WriteProfile, Option<EpochThread>)> {
626
use wasmtime::{AsContext, GuestProfiler, StoreContext, StoreContextMut};
627
628
let module_name = "<main>";
629
630
store.data_mut().guest_profiler = Some(Arc::new(GuestProfiler::new_component(
631
module_name,
632
interval,
633
component,
634
std::iter::empty(),
635
)));
636
637
fn sample(
638
mut store: StoreContextMut<Host>,
639
f: impl FnOnce(&mut GuestProfiler, StoreContext<Host>),
640
) {
641
let mut profiler = store.data_mut().guest_profiler.take().unwrap();
642
f(
643
Arc::get_mut(&mut profiler).expect("profiling doesn't support threads yet"),
644
store.as_context(),
645
);
646
store.data_mut().guest_profiler = Some(profiler);
647
}
648
649
// Hostcall entry/exit, etc.
650
store.call_hook(|store, kind| {
651
sample(store, |profiler, store| profiler.call_hook(store, kind));
652
Ok(())
653
});
654
655
let start = Instant::now();
656
let timeout = cmd.run.common.wasm.timeout;
657
store.epoch_deadline_callback(move |store| {
658
sample(store, |profiler, store| {
659
profiler.sample(store, std::time::Duration::ZERO)
660
});
661
662
// Originally epoch counting was used here; this is problematic in
663
// a lot of cases due to there being a lot of time (e.g. in hostcalls)
664
// when we are not expected to get sample hits.
665
if let Some(timeout) = timeout {
666
if start.elapsed() > timeout {
667
bail!("Timeout expired");
668
}
669
}
670
671
Ok(UpdateDeadline::Continue(1))
672
});
673
674
store.set_epoch_deadline(1);
675
let engine = store.engine().clone();
676
let epoch_thread = Some(EpochThread::spawn(interval, engine));
677
678
let write_profile = Box::new(move |store: &mut Store<Host>| {
679
let profiler = Arc::try_unwrap(store.data_mut().guest_profiler.take().unwrap())
680
.expect("profiling doesn't support threads yet");
681
if let Err(e) = std::fs::File::create(&path)
682
.map_err(anyhow::Error::new)
683
.and_then(|output| profiler.finish(std::io::BufWriter::new(output)))
684
{
685
eprintln!("failed writing profile at {path}: {e:#}");
686
} else {
687
eprintln!();
688
eprintln!("Profile written to: {path}");
689
eprintln!("View this profile at https://profiler.firefox.com/.");
690
}
691
});
692
693
Ok((write_profile, epoch_thread))
694
}
695
696
struct ProxyHandlerInner {
697
cmd: ServeCommand,
698
engine: Engine,
699
instance_pre: ProxyPre<Host>,
700
next_id: AtomicU64,
701
}
702
703
impl ProxyHandlerInner {
704
fn next_req_id(&self) -> u64 {
705
self.next_id.fetch_add(1, Ordering::Relaxed)
706
}
707
}
708
709
#[derive(Clone)]
710
struct ProxyHandler(Arc<ProxyHandlerInner>);
711
712
impl ProxyHandler {
713
fn new(cmd: ServeCommand, engine: Engine, instance_pre: ProxyPre<Host>) -> Self {
714
Self(Arc::new(ProxyHandlerInner {
715
cmd,
716
engine,
717
instance_pre,
718
next_id: AtomicU64::from(0),
719
}))
720
}
721
}
722
723
type Request = hyper::Request<hyper::body::Incoming>;
724
725
async fn handle_request(
726
ProxyHandler(inner): ProxyHandler,
727
req: Request,
728
component: Component,
729
) -> Result<hyper::Response<HyperOutgoingBody>> {
730
let (sender, receiver) = tokio::sync::oneshot::channel();
731
732
let req_id = inner.next_req_id();
733
734
log::info!(
735
"Request {req_id} handling {} to {}",
736
req.method(),
737
req.uri()
738
);
739
740
let mut store = inner.cmd.new_store(&inner.engine, req_id)?;
741
742
let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
743
let out = store.data_mut().new_response_outparam(sender)?;
744
let proxy = inner.instance_pre.instantiate_async(&mut store).await?;
745
746
let comp = component.clone();
747
let task = tokio::task::spawn(async move {
748
let (write_profile, epoch_thread) = setup_epoch_handler(&inner.cmd, &mut store, comp)?;
749
750
if let Err(e) = proxy
751
.wasi_http_incoming_handler()
752
.call_handle(&mut store, req, out)
753
.await
754
{
755
log::error!("[{req_id}] :: {e:?}");
756
return Err(e);
757
}
758
759
write_profile(&mut store);
760
drop(epoch_thread);
761
762
Ok(())
763
});
764
765
let result = match receiver.await {
766
Ok(Ok(resp)) => Ok(resp),
767
Ok(Err(e)) => Err(e.into()),
768
Err(_) => {
769
// An error in the receiver (`RecvError`) only indicates that the
770
// task exited before a response was sent (i.e., the sender was
771
// dropped); it does not describe the underlying cause of failure.
772
// Instead we retrieve and propagate the error from inside the task
773
// which should more clearly tell the user what went wrong. Note
774
// that we assume the task has already exited at this point so the
775
// `await` should resolve immediately.
776
let e = match task.await {
777
Ok(Ok(())) => {
778
bail!("guest never invoked `response-outparam::set` method")
779
}
780
Ok(Err(e)) => e,
781
Err(e) => e.into(),
782
};
783
Err(e.context("guest never invoked `response-outparam::set` method"))
784
}
785
};
786
787
result
788
}
789
790
#[derive(Clone)]
791
enum Output {
792
Stdout,
793
Stderr,
794
}
795
796
impl Output {
797
fn write_all(&self, buf: &[u8]) -> io::Result<()> {
798
use std::io::Write;
799
800
match self {
801
Output::Stdout => std::io::stdout().write_all(buf),
802
Output::Stderr => std::io::stderr().write_all(buf),
803
}
804
}
805
}
806
807
#[derive(Clone)]
808
struct LogStream {
809
output: Output,
810
state: Arc<LogStreamState>,
811
}
812
813
struct LogStreamState {
814
prefix: String,
815
needs_prefix_on_next_write: AtomicBool,
816
}
817
818
impl LogStream {
819
fn new(prefix: String, output: Output) -> LogStream {
820
LogStream {
821
output,
822
state: Arc::new(LogStreamState {
823
prefix,
824
needs_prefix_on_next_write: AtomicBool::new(true),
825
}),
826
}
827
}
828
829
fn write_all(&mut self, mut bytes: &[u8]) -> io::Result<()> {
830
while !bytes.is_empty() {
831
if self
832
.state
833
.needs_prefix_on_next_write
834
.load(Ordering::Relaxed)
835
{
836
self.output.write_all(self.state.prefix.as_bytes())?;
837
self.state
838
.needs_prefix_on_next_write
839
.store(false, Ordering::Relaxed);
840
}
841
match bytes.iter().position(|b| *b == b'\n') {
842
Some(i) => {
843
let (a, b) = bytes.split_at(i + 1);
844
bytes = b;
845
self.output.write_all(a)?;
846
self.state
847
.needs_prefix_on_next_write
848
.store(true, Ordering::Relaxed);
849
}
850
None => {
851
self.output.write_all(bytes)?;
852
break;
853
}
854
}
855
}
856
857
Ok(())
858
}
859
}
860
861
impl wasmtime_wasi::cli::StdoutStream for LogStream {
862
fn p2_stream(&self) -> Box<dyn wasmtime_wasi::p2::OutputStream> {
863
Box::new(self.clone())
864
}
865
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
866
Box::new(self.clone())
867
}
868
}
869
870
impl wasmtime_wasi::cli::IsTerminal for LogStream {
871
fn is_terminal(&self) -> bool {
872
match &self.output {
873
Output::Stdout => std::io::stdout().is_terminal(),
874
Output::Stderr => std::io::stderr().is_terminal(),
875
}
876
}
877
}
878
879
impl wasmtime_wasi::p2::OutputStream for LogStream {
880
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
881
self.write_all(&bytes)
882
.map_err(|e| StreamError::LastOperationFailed(e.into()))?;
883
Ok(())
884
}
885
886
fn flush(&mut self) -> StreamResult<()> {
887
Ok(())
888
}
889
890
fn check_write(&mut self) -> StreamResult<usize> {
891
Ok(1024 * 1024)
892
}
893
}
894
895
#[async_trait::async_trait]
896
impl wasmtime_wasi::p2::Pollable for LogStream {
897
async fn ready(&mut self) {}
898
}
899
900
impl AsyncWrite for LogStream {
901
fn poll_write(
902
mut self: Pin<&mut Self>,
903
_cx: &mut Context<'_>,
904
buf: &[u8],
905
) -> Poll<io::Result<usize>> {
906
Poll::Ready(self.write_all(buf).map(|_| buf.len()))
907
}
908
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
909
Poll::Ready(Ok(()))
910
}
911
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
912
Poll::Ready(Ok(()))
913
}
914
}
915
916
/// The pooling allocator is tailor made for the `wasmtime serve` use case, so
917
/// try to use it when we can. The main cost of the pooling allocator, however,
918
/// is the virtual memory required to run it. Not all systems support the same
919
/// amount of virtual memory, for example some aarch64 and riscv64 configuration
920
/// only support 39 bits of virtual address space.
921
///
922
/// The pooling allocator, by default, will request 1000 linear memories each
923
/// sized at 6G per linear memory. This is 6T of virtual memory which ends up
924
/// being about 42 bits of the address space. This exceeds the 39 bit limit of
925
/// some systems, so there the pooling allocator will fail by default.
926
///
927
/// This function attempts to dynamically determine the hint for the pooling
928
/// allocator. This returns `Some(true)` if the pooling allocator should be used
929
/// by default, or `None` or an error otherwise.
930
///
931
/// The method for testing this is to allocate a 0-sized 64-bit linear memory
932
/// with a maximum size that's N bits large where we force all memories to be
933
/// static. This should attempt to acquire N bits of the virtual address space.
934
/// If successful that should mean that the pooling allocator is OK to use, but
935
/// if it fails then the pooling allocator is not used and the normal mmap-based
936
/// implementation is used instead.
937
fn use_pooling_allocator_by_default() -> Result<Option<bool>> {
938
use wasmtime::{Config, Memory, MemoryType};
939
const BITS_TO_TEST: u32 = 42;
940
let mut config = Config::new();
941
config.wasm_memory64(true);
942
config.memory_reservation(1 << BITS_TO_TEST);
943
let engine = Engine::new(&config)?;
944
let mut store = Store::new(&engine, ());
945
// NB: the maximum size is in wasm pages to take out the 16-bits of wasm
946
// page size here from the maximum size.
947
let ty = MemoryType::new64(0, Some(1 << (BITS_TO_TEST - 16)));
948
if Memory::new(&mut store, ty).is_ok() {
949
Ok(Some(true))
950
} else {
951
Ok(None)
952
}
953
}
954
955