Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/tests/all/p2.rs
3092 views
1
use crate::body;
2
use crate::http_server::Server;
3
use futures::{FutureExt, channel::oneshot, future, stream};
4
use http_body::Frame;
5
use http_body_util::{BodyExt, Collected, Empty, StreamBody, combinators::BoxBody};
6
use hyper::{Method, StatusCode, body::Bytes, server::conn::http1, service::service_fn};
7
use sha2::{Digest, Sha256};
8
use std::{collections::HashMap, iter, net::Ipv4Addr, str, sync::Arc};
9
use tokio::task;
10
use wasmtime::{
11
Config, Engine, Result, Store,
12
component::{Component, Linker, ResourceTable},
13
error::Context as _,
14
format_err,
15
};
16
use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView, p2::pipe::MemoryOutputPipe};
17
use wasmtime_wasi_http::{
18
HttpResult, WasiHttpCtx, WasiHttpView,
19
bindings::http::types::{ErrorCode, Scheme},
20
body::HyperOutgoingBody,
21
io::TokioIo,
22
types::{self, HostFutureIncomingResponse, IncomingResponse, OutgoingRequestConfig},
23
};
24
25
type RequestSender = Arc<
26
dyn Fn(hyper::Request<HyperOutgoingBody>, OutgoingRequestConfig) -> HostFutureIncomingResponse
27
+ Send
28
+ Sync,
29
>;
30
31
struct Ctx {
32
table: ResourceTable,
33
wasi: WasiCtx,
34
http: WasiHttpCtx,
35
stdout: MemoryOutputPipe,
36
stderr: MemoryOutputPipe,
37
send_request: Option<RequestSender>,
38
rejected_authority: Option<String>,
39
}
40
41
impl WasiView for Ctx {
42
fn ctx(&mut self) -> WasiCtxView<'_> {
43
WasiCtxView {
44
ctx: &mut self.wasi,
45
table: &mut self.table,
46
}
47
}
48
}
49
50
impl WasiHttpView for Ctx {
51
fn ctx(&mut self) -> &mut WasiHttpCtx {
52
&mut self.http
53
}
54
55
fn table(&mut self) -> &mut ResourceTable {
56
&mut self.table
57
}
58
59
fn send_request(
60
&mut self,
61
request: hyper::Request<HyperOutgoingBody>,
62
config: OutgoingRequestConfig,
63
) -> HttpResult<HostFutureIncomingResponse> {
64
if let Some(rejected_authority) = &self.rejected_authority {
65
let authority = request.uri().authority().map(ToString::to_string).unwrap();
66
if &authority == rejected_authority {
67
return Err(ErrorCode::HttpRequestDenied.into());
68
}
69
}
70
if let Some(send_request) = self.send_request.clone() {
71
Ok(send_request(request, config))
72
} else {
73
Ok(types::default_send_request(request, config))
74
}
75
}
76
77
fn is_forbidden_header(&mut self, name: &hyper::header::HeaderName) -> bool {
78
types::DEFAULT_FORBIDDEN_HEADERS.contains(name)
79
|| name.as_str() == "custom-forbidden-header"
80
}
81
}
82
83
fn store(engine: &Engine, server: &Server) -> Store<Ctx> {
84
let stdout = MemoryOutputPipe::new(4096);
85
let stderr = MemoryOutputPipe::new(4096);
86
87
// Create our wasi context.
88
let mut builder = WasiCtx::builder();
89
builder.stdout(stdout.clone());
90
builder.stderr(stderr.clone());
91
builder.env("HTTP_SERVER", &server.addr());
92
let ctx = Ctx {
93
table: ResourceTable::new(),
94
wasi: builder.build(),
95
http: WasiHttpCtx::new(),
96
stderr,
97
stdout,
98
send_request: None,
99
rejected_authority: None,
100
};
101
102
Store::new(&engine, ctx)
103
}
104
105
impl Drop for Ctx {
106
fn drop(&mut self) {
107
let stdout = self.stdout.contents();
108
if !stdout.is_empty() {
109
println!("[guest] stdout:\n{}\n===", String::from_utf8_lossy(&stdout));
110
}
111
let stderr = self.stderr.contents();
112
if !stderr.is_empty() {
113
println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr));
114
}
115
}
116
}
117
118
mod async_;
119
mod sync;
120
121
async fn run_wasi_http(
122
component_filename: &str,
123
req: hyper::Request<BoxBody<Bytes, hyper::Error>>,
124
send_request: Option<RequestSender>,
125
rejected_authority: Option<String>,
126
early_drop: bool,
127
) -> wasmtime::Result<Result<hyper::Response<Collected<Bytes>>, ErrorCode>> {
128
let stdout = MemoryOutputPipe::new(4096);
129
let stderr = MemoryOutputPipe::new(4096);
130
let table = ResourceTable::new();
131
132
let mut config = Config::new();
133
config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
134
config.wasm_component_model(true);
135
let engine = Engine::new(&config)?;
136
let component = Component::from_file(&engine, component_filename)?;
137
138
// Create our wasi context.
139
let mut builder = WasiCtx::builder();
140
builder.stdout(stdout.clone());
141
builder.stderr(stderr.clone());
142
let wasi = builder.build();
143
let http = WasiHttpCtx::new();
144
let ctx = Ctx {
145
table,
146
wasi,
147
http,
148
stderr,
149
stdout,
150
send_request,
151
rejected_authority,
152
};
153
let mut store = Store::new(&engine, ctx);
154
155
let mut linker = Linker::new(&engine);
156
wasmtime_wasi_http::add_to_linker_async(&mut linker)?;
157
let proxy =
158
wasmtime_wasi_http::bindings::Proxy::instantiate_async(&mut store, &component, &linker)
159
.await?;
160
161
let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;
162
163
let (sender, receiver) = tokio::sync::oneshot::channel();
164
let out = store.data_mut().new_response_outparam(sender)?;
165
166
let receiver = if early_drop {
167
// Drop the receiver early, emulating a host event like
168
// timeout occurred so that the processing has to be stopped.
169
drop(receiver);
170
None
171
} else {
172
Some(receiver)
173
};
174
175
let handle = wasmtime_wasi::runtime::spawn(async move {
176
proxy
177
.wasi_http_incoming_handler()
178
.call_handle(&mut store, req, out)
179
.await?;
180
181
Ok::<_, wasmtime::Error>(())
182
});
183
184
if let Some(r) = receiver {
185
let resp = match r.await {
186
Ok(Ok(resp)) => {
187
let (parts, body) = resp.into_parts();
188
let collected = BodyExt::collect(body).await?;
189
Some(Ok(hyper::Response::from_parts(parts, collected)))
190
}
191
Ok(Err(e)) => Some(Err(e)),
192
193
// Fall through below to the `resp.expect(...)` which will hopefully
194
// return a more specific error from `handle.await`.
195
Err(_) => None,
196
};
197
198
// Now that the response has been processed, we can wait on the wasm to
199
// finish without deadlocking.
200
handle.await.context("Component execution")?;
201
202
Ok(resp.expect("wasm never called set-response-outparam"))
203
} else {
204
handle.await.context("Component execution")?;
205
Ok(Err(ErrorCode::HttpResponseTimeout))
206
}
207
}
208
209
#[test_log::test(tokio::test)]
210
async fn wasi_http_proxy_tests() -> wasmtime::Result<()> {
211
let req = hyper::Request::builder()
212
.header("custom-forbidden-header", "yes")
213
.uri("http://example.com:8080/test-path")
214
.method(http::Method::GET);
215
216
let resp = run_wasi_http(
217
test_programs_artifacts::P2_API_PROXY_COMPONENT,
218
req.body(body::empty())?,
219
None,
220
None,
221
false,
222
)
223
.await?;
224
225
match resp {
226
Ok(resp) => println!("response: {resp:?}"),
227
Err(e) => panic!("Error given in response: {e:?}"),
228
};
229
230
Ok(())
231
}
232
233
#[test_log::test(tokio::test)]
234
async fn wasi_http_hash_all() -> Result<()> {
235
do_wasi_http_hash_all(false).await
236
}
237
238
#[test_log::test(tokio::test)]
239
async fn wasi_http_hash_all_with_override() -> Result<()> {
240
do_wasi_http_hash_all(true).await
241
}
242
243
async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> {
244
let bodies = Arc::new(
245
[
246
("/a", "’Twas brillig, and the slithy toves"),
247
("/b", "Did gyre and gimble in the wabe:"),
248
("/c", "All mimsy were the borogoves,"),
249
("/d", "And the mome raths outgrabe."),
250
]
251
.into_iter()
252
.collect::<HashMap<_, _>>(),
253
);
254
255
let listener = tokio::net::TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 0)).await?;
256
257
let prefix = format!("http://{}", listener.local_addr()?);
258
259
let (_tx, rx) = oneshot::channel::<()>();
260
261
let handle = {
262
let bodies = bodies.clone();
263
264
move |request: http::request::Parts| {
265
if let (Method::GET, Some(body)) = (request.method, bodies.get(request.uri.path())) {
266
Ok::<_, wasmtime::Error>(hyper::Response::new(body::full(Bytes::copy_from_slice(
267
body.as_bytes(),
268
))))
269
} else {
270
Ok(hyper::Response::builder()
271
.status(StatusCode::METHOD_NOT_ALLOWED)
272
.body(body::empty())?)
273
}
274
}
275
};
276
277
let send_request = if override_send_request {
278
Some(Arc::new(
279
move |request: hyper::Request<HyperOutgoingBody>,
280
OutgoingRequestConfig {
281
between_bytes_timeout,
282
..
283
}| {
284
let response = handle(request.into_parts().0).map(|resp| {
285
Ok(IncomingResponse {
286
resp: resp.map(|body| {
287
body.map_err(wasmtime_wasi_http::hyper_response_error)
288
.boxed_unsync()
289
}),
290
worker: None,
291
between_bytes_timeout,
292
})
293
});
294
HostFutureIncomingResponse::ready(response)
295
},
296
) as RequestSender)
297
} else {
298
let server = async move {
299
loop {
300
let (stream, _) = listener.accept().await?;
301
let stream = TokioIo::new(stream);
302
let handle = handle.clone();
303
task::spawn(async move {
304
if let Err(e) = http1::Builder::new()
305
.keep_alive(true)
306
.serve_connection(
307
stream,
308
service_fn(move |request| {
309
let handle = handle.clone();
310
async move { handle(request.into_parts().0) }
311
}),
312
)
313
.await
314
{
315
eprintln!("error serving connection: {e:?}");
316
}
317
});
318
319
// Help rustc with type inference:
320
if false {
321
return Ok::<_, wasmtime::Error>(());
322
}
323
}
324
}
325
.then(|result| {
326
if let Err(e) = result {
327
eprintln!("error listening for connections: {e:?}");
328
}
329
future::ready(())
330
})
331
.boxed();
332
333
task::spawn(async move {
334
drop(future::select(server, rx).await);
335
});
336
337
None
338
};
339
340
let mut request = hyper::Request::builder()
341
.method(http::Method::GET)
342
.uri("http://example.com:8080/hash-all");
343
for path in bodies.keys() {
344
request = request.header("url", format!("{prefix}{path}"));
345
}
346
let request = request.body(body::empty())?;
347
348
let response = run_wasi_http(
349
test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,
350
request,
351
send_request,
352
None,
353
false,
354
)
355
.await??;
356
357
assert_eq!(StatusCode::OK, response.status());
358
let body = response.into_body().to_bytes();
359
let body = str::from_utf8(&body)?;
360
for line in body.lines() {
361
let (url, hash) = line
362
.split_once(": ")
363
.ok_or_else(|| format_err!("expected string of form `<url>: <sha-256>`; got {line}"))?;
364
365
let path = url
366
.strip_prefix(&prefix)
367
.ok_or_else(|| format_err!("expected string with prefix {prefix}; got {url}"))?;
368
369
let mut hasher = Sha256::new();
370
hasher.update(
371
bodies
372
.get(path)
373
.ok_or_else(|| format_err!("unexpected path: {path}"))?,
374
);
375
376
use base64::Engine;
377
assert_eq!(
378
hash,
379
base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize())
380
);
381
}
382
383
Ok(())
384
}
385
386
// ensure the runtime rejects the outgoing request
387
#[test_log::test(tokio::test)]
388
async fn wasi_http_hash_all_with_reject() -> Result<()> {
389
let request = hyper::Request::builder()
390
.method(http::Method::GET)
391
.uri("http://example.com:8080/hash-all");
392
let request = request.header("url", format!("http://forbidden.com"));
393
let request = request.header("url", format!("http://localhost"));
394
let request = request.body(body::empty())?;
395
396
let response = run_wasi_http(
397
test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,
398
request,
399
None,
400
Some("forbidden.com".to_string()),
401
false,
402
)
403
.await??;
404
405
let body = response.into_body().to_bytes();
406
let body = str::from_utf8(&body).unwrap();
407
for line in body.lines() {
408
println!("{line}");
409
if line.contains("forbidden.com") {
410
assert!(line.contains("HttpRequestDenied"));
411
}
412
if line.contains("localhost") {
413
assert!(!line.contains("HttpRequestDenied"));
414
}
415
}
416
417
Ok(())
418
}
419
420
#[test_log::test(tokio::test)]
421
async fn wasi_http_echo() -> Result<()> {
422
do_wasi_http_echo("echo", None).await
423
}
424
425
#[test_log::test(tokio::test)]
426
async fn wasi_http_double_echo() -> Result<()> {
427
let listener = tokio::net::TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 0)).await?;
428
429
let prefix = format!("http://{}", listener.local_addr()?);
430
431
let (_tx, rx) = oneshot::channel::<()>();
432
433
let server = async move {
434
loop {
435
let (stream, _) = listener.accept().await?;
436
let stream = TokioIo::new(stream);
437
task::spawn(async move {
438
if let Err(e) = http1::Builder::new()
439
.keep_alive(true)
440
.serve_connection(
441
stream,
442
service_fn(
443
move |request: hyper::Request<hyper::body::Incoming>| async move {
444
use http_body_util::BodyExt;
445
446
if let (&Method::POST, "/echo") =
447
(request.method(), request.uri().path())
448
{
449
Ok::<_, wasmtime::Error>(hyper::Response::new(
450
request.into_body().boxed(),
451
))
452
} else {
453
Ok(hyper::Response::builder()
454
.status(StatusCode::METHOD_NOT_ALLOWED)
455
.body(BoxBody::new(
456
Empty::new().map_err(|_| unreachable!()),
457
))?)
458
}
459
},
460
),
461
)
462
.await
463
{
464
eprintln!("error serving connection: {e:?}");
465
}
466
});
467
468
// Help rustc with type inference:
469
if false {
470
return Ok::<_, wasmtime::Error>(());
471
}
472
}
473
}
474
.then(|result| {
475
if let Err(e) = result {
476
eprintln!("error listening for connections: {e:?}");
477
}
478
future::ready(())
479
})
480
.boxed();
481
482
task::spawn(async move {
483
drop(future::select(server, rx).await);
484
});
485
486
do_wasi_http_echo("double-echo", Some(&format!("{prefix}/echo"))).await
487
}
488
489
async fn do_wasi_http_echo(uri: &str, url_header: Option<&str>) -> Result<()> {
490
let body = {
491
// A sorta-random-ish megabyte
492
let mut n = 0_u8;
493
iter::repeat_with(move || {
494
n = n.wrapping_add(251);
495
n
496
})
497
.take(1024 * 1024)
498
.collect::<Vec<_>>()
499
};
500
501
let mut request = hyper::Request::builder()
502
.method(http::Method::POST)
503
.uri(format!("http://example.com:8080/{uri}"))
504
.header("content-type", "application/octet-stream");
505
506
if let Some(url_header) = url_header {
507
request = request.header("url", url_header);
508
}
509
510
let request = request.body(BoxBody::new(StreamBody::new(stream::iter(
511
body.chunks(16 * 1024)
512
.map(|chunk| Ok::<_, hyper::Error>(Frame::data(Bytes::copy_from_slice(chunk))))
513
.collect::<Vec<_>>(),
514
))))?;
515
516
let response = run_wasi_http(
517
test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,
518
request,
519
None,
520
None,
521
false,
522
)
523
.await??;
524
525
assert_eq!(StatusCode::OK, response.status());
526
assert_eq!(
527
response.headers()["content-type"],
528
"application/octet-stream"
529
);
530
let received = Vec::from(response.into_body().to_bytes());
531
if body != received {
532
panic!(
533
"body content mismatch (expected length {}; actual length {})",
534
body.len(),
535
received.len()
536
);
537
}
538
539
Ok(())
540
}
541
542
#[test_log::test(tokio::test)]
543
async fn wasi_http_without_port() -> Result<()> {
544
let req = hyper::Request::builder()
545
.method(http::Method::GET)
546
.uri("https://httpbin.org/get");
547
548
let _response: hyper::Response<_> = run_wasi_http(
549
test_programs_artifacts::P2_API_PROXY_FORWARD_REQUEST_COMPONENT,
550
req.body(body::empty())?,
551
None,
552
None,
553
false,
554
)
555
.await??;
556
557
// NB: don't test the actual return code of `response`. This is testing a
558
// live http request against a live server and things happen. If we got this
559
// far it's already successful that the request was made and the lack of
560
// port in the URI was handled.
561
562
Ok(())
563
}
564
565
#[test_log::test(tokio::test)]
566
async fn wasi_http_no_trap_on_early_drop() -> Result<()> {
567
let req = hyper::Request::builder()
568
.uri("http://example.com:8080/early_drop")
569
.method(http::Method::GET);
570
571
let resp = run_wasi_http(
572
test_programs_artifacts::P2_API_PROXY_COMPONENT,
573
req.body(body::empty())?,
574
None,
575
None,
576
true,
577
)
578
.await?;
579
580
if let Err(ErrorCode::HttpResponseTimeout) = resp {
581
Ok(())
582
} else {
583
panic!("test expects an error");
584
}
585
}
586
587