Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/tests/all/p3/mod.rs
3139 views
1
use crate::http_server::Server;
2
use bytes::Bytes;
3
use flate2::Compression;
4
use flate2::write::{DeflateDecoder, DeflateEncoder};
5
use futures::SinkExt;
6
use futures::channel::oneshot;
7
use http::HeaderValue;
8
use http_body::Body;
9
use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody};
10
use std::io::Write;
11
use std::path::Path;
12
use std::pin::Pin;
13
use std::task::{Context, Poll};
14
use test_programs_artifacts::*;
15
use tokio::{fs, try_join};
16
use wasm_compose::composer::ComponentComposer;
17
use wasm_compose::config::{Config, Dependency, Instantiation, InstantiationArg};
18
use wasmtime::component::{Component, Linker, ResourceTable};
19
use wasmtime::{Result, Store, ToWasmtimeResult as _, error::Context as _, format_err};
20
use wasmtime_wasi::p3::bindings::Command;
21
use wasmtime_wasi::{TrappableError, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
22
use wasmtime_wasi_http::p3::bindings::Service;
23
use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode;
24
use wasmtime_wasi_http::p3::{
25
self, Request, RequestOptions, WasiHttpCtx, WasiHttpCtxView, WasiHttpView,
26
};
27
use wasmtime_wasi_http::types::DEFAULT_FORBIDDEN_HEADERS;
28
29
foreach_p3_http!(assert_test_exists);
30
31
struct TestHttpCtx {
32
request_body_tx: Option<oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>>,
33
}
34
35
impl WasiHttpCtx for TestHttpCtx {
36
fn is_forbidden_header(&mut self, name: &http::header::HeaderName) -> bool {
37
name.as_str() == "custom-forbidden-header" || DEFAULT_FORBIDDEN_HEADERS.contains(name)
38
}
39
40
fn send_request(
41
&mut self,
42
request: http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,
43
options: Option<RequestOptions>,
44
fut: Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
45
) -> Box<
46
dyn Future<
47
Output = Result<
48
(
49
http::Response<UnsyncBoxBody<Bytes, ErrorCode>>,
50
Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,
51
),
52
TrappableError<ErrorCode>,
53
>,
54
> + Send,
55
> {
56
_ = fut;
57
if let Some("p3-test") = request.uri().authority().map(|v| v.as_str()) {
58
_ = self
59
.request_body_tx
60
.take()
61
.unwrap()
62
.send(request.into_body());
63
Box::new(async {
64
Ok((
65
http::Response::new(Default::default()),
66
Box::new(async { Ok(()) }) as Box<dyn Future<Output = _> + Send>,
67
))
68
})
69
} else {
70
Box::new(async move {
71
use http_body_util::BodyExt;
72
73
let (res, io) = p3::default_send_request(request, options).await?;
74
Ok((
75
res.map(BodyExt::boxed_unsync),
76
Box::new(io) as Box<dyn Future<Output = _> + Send>,
77
))
78
})
79
}
80
}
81
}
82
83
struct Ctx {
84
table: ResourceTable,
85
wasi: WasiCtx,
86
http: TestHttpCtx,
87
}
88
89
impl Ctx {
90
fn new(request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>) -> Self {
91
Self {
92
table: ResourceTable::default(),
93
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
94
http: TestHttpCtx {
95
request_body_tx: Some(request_body_tx),
96
},
97
}
98
}
99
}
100
101
impl WasiView for Ctx {
102
fn ctx(&mut self) -> WasiCtxView<'_> {
103
WasiCtxView {
104
ctx: &mut self.wasi,
105
table: &mut self.table,
106
}
107
}
108
}
109
110
impl WasiHttpView for Ctx {
111
fn http(&mut self) -> WasiHttpCtxView<'_> {
112
WasiHttpCtxView {
113
ctx: &mut self.http,
114
table: &mut self.table,
115
}
116
}
117
}
118
119
async fn run_cli(path: &str, server: &Server) -> wasmtime::Result<()> {
120
let engine = test_programs_artifacts::engine(|config| {
121
config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
122
config.wasm_component_model_async(true);
123
});
124
let component = Component::from_file(&engine, path)?;
125
let mut store = Store::new(
126
&engine,
127
Ctx {
128
wasi: wasmtime_wasi::WasiCtx::builder()
129
.env("HTTP_SERVER", server.addr())
130
.build(),
131
..Ctx::new(oneshot::channel().0)
132
},
133
);
134
let mut linker = Linker::new(&engine);
135
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
136
.context("failed to link `wasi:[email protected]`")?;
137
wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:[email protected]`")?;
138
wasmtime_wasi_http::p3::add_to_linker(&mut linker)
139
.context("failed to link `wasi:[email protected]`")?;
140
let command = Command::instantiate_async(&mut store, &component, &linker).await?;
141
store
142
.run_concurrent(async |store| command.wasi_cli_run().call_run(store).await)
143
.await
144
.context("failed to call `wasi:cli/run#run`")?
145
.context("guest trapped")?
146
.0
147
.map_err(|()| format_err!("`wasi:cli/run#run` failed"))
148
}
149
150
async fn run_http<E: Into<ErrorCode> + 'static>(
151
component_filename: &str,
152
req: http::Request<impl Body<Data = Bytes, Error = E> + Send + Sync + 'static>,
153
request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>,
154
) -> wasmtime::Result<Result<http::Response<Collected<Bytes>>, Option<ErrorCode>>> {
155
let engine = test_programs_artifacts::engine(|config| {
156
config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
157
config.wasm_component_model_async(true);
158
});
159
let component = Component::from_file(&engine, component_filename)?;
160
161
let mut store = Store::new(&engine, Ctx::new(request_body_tx));
162
163
let mut linker = Linker::new(&engine);
164
wasmtime_wasi::p2::add_to_linker_async(&mut linker)
165
.context("failed to link `wasi:[email protected]`")?;
166
wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:[email protected]`")?;
167
wasmtime_wasi_http::p3::add_to_linker(&mut linker)
168
.context("failed to link `wasi:[email protected]`")?;
169
let service = Service::instantiate_async(&mut store, &component, &linker).await?;
170
let (req, io) = Request::from_http(req);
171
let (tx, rx) = tokio::sync::oneshot::channel();
172
let ((handle_result, ()), res) = try_join!(
173
async move {
174
store
175
.run_concurrent(async |store| {
176
try_join!(
177
async {
178
let (res, task) = match service.handle(store, req).await? {
179
Ok(pair) => pair,
180
Err(err) => return Ok(Err(Some(err))),
181
};
182
_ = tx
183
.send(store.with(|store| res.into_http(store, async { Ok(()) }))?);
184
task.block(store).await;
185
Ok(Ok(()))
186
},
187
async { io.await.context("failed to consume request body") }
188
)
189
})
190
.await?
191
},
192
async move {
193
let res = rx.await?;
194
let (parts, body) = res.into_parts();
195
let body = body.collect().await.context("failed to collect body")?;
196
wasmtime::error::Ok(http::Response::from_parts(parts, body))
197
}
198
)?;
199
200
Ok(handle_result.map(|()| res))
201
}
202
203
#[test_log::test(tokio::test(flavor = "multi_thread"))]
204
async fn p3_http_outbound_request_get() -> wasmtime::Result<()> {
205
let server = Server::http1(1)?;
206
run_cli(P3_HTTP_OUTBOUND_REQUEST_GET_COMPONENT, &server).await
207
}
208
209
#[test_log::test(tokio::test(flavor = "multi_thread"))]
210
async fn p3_http_outbound_request_timeout() -> wasmtime::Result<()> {
211
let server = Server::http1(1)?;
212
run_cli(P3_HTTP_OUTBOUND_REQUEST_TIMEOUT_COMPONENT, &server).await
213
}
214
215
#[test_log::test(tokio::test(flavor = "multi_thread"))]
216
async fn p3_http_outbound_request_post() -> wasmtime::Result<()> {
217
let server = Server::http1(1)?;
218
run_cli(P3_HTTP_OUTBOUND_REQUEST_POST_COMPONENT, &server).await
219
}
220
221
#[test_log::test(tokio::test(flavor = "multi_thread"))]
222
async fn p3_http_outbound_request_large_post() -> wasmtime::Result<()> {
223
let server = Server::http1(1)?;
224
run_cli(P3_HTTP_OUTBOUND_REQUEST_LARGE_POST_COMPONENT, &server).await
225
}
226
227
#[test_log::test(tokio::test(flavor = "multi_thread"))]
228
async fn p3_http_outbound_request_put() -> wasmtime::Result<()> {
229
let server = Server::http1(1)?;
230
run_cli(P3_HTTP_OUTBOUND_REQUEST_PUT_COMPONENT, &server).await
231
}
232
233
#[test_log::test(tokio::test(flavor = "multi_thread"))]
234
async fn p3_http_outbound_request_invalid_version() -> wasmtime::Result<()> {
235
let server = Server::http2(1)?;
236
run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_VERSION_COMPONENT, &server).await
237
}
238
239
#[test_log::test(tokio::test(flavor = "multi_thread"))]
240
async fn p3_http_outbound_request_invalid_header() -> wasmtime::Result<()> {
241
let server = Server::http2(1)?;
242
run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_HEADER_COMPONENT, &server).await
243
}
244
245
#[test_log::test(tokio::test(flavor = "multi_thread"))]
246
async fn p3_http_outbound_request_unknown_method() -> wasmtime::Result<()> {
247
let server = Server::http1(1)?;
248
run_cli(P3_HTTP_OUTBOUND_REQUEST_UNKNOWN_METHOD_COMPONENT, &server).await
249
}
250
251
#[test_log::test(tokio::test(flavor = "multi_thread"))]
252
async fn p3_http_outbound_request_unsupported_scheme() -> wasmtime::Result<()> {
253
let server = Server::http1(1)?;
254
run_cli(
255
P3_HTTP_OUTBOUND_REQUEST_UNSUPPORTED_SCHEME_COMPONENT,
256
&server,
257
)
258
.await
259
}
260
261
#[test_log::test(tokio::test(flavor = "multi_thread"))]
262
async fn p3_http_outbound_request_invalid_port() -> wasmtime::Result<()> {
263
let server = Server::http1(1)?;
264
run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_PORT_COMPONENT, &server).await
265
}
266
267
#[test_log::test(tokio::test(flavor = "multi_thread"))]
268
async fn p3_http_outbound_request_invalid_dnsname() -> wasmtime::Result<()> {
269
let server = Server::http1(1)?;
270
run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_DNSNAME_COMPONENT, &server).await
271
}
272
273
#[test_log::test(tokio::test(flavor = "multi_thread"))]
274
async fn p3_http_outbound_request_response_build() -> wasmtime::Result<()> {
275
let server = Server::http1(1)?;
276
run_cli(P3_HTTP_OUTBOUND_REQUEST_RESPONSE_BUILD_COMPONENT, &server).await
277
}
278
279
#[test_log::test(tokio::test(flavor = "multi_thread"))]
280
async fn p3_http_outbound_request_content_length() -> wasmtime::Result<()> {
281
let server = Server::http1(3)?;
282
run_cli(P3_HTTP_OUTBOUND_REQUEST_CONTENT_LENGTH_COMPONENT, &server).await
283
}
284
285
#[test_log::test(tokio::test(flavor = "multi_thread"))]
286
async fn p3_http_outbound_request_missing_path_and_query() -> wasmtime::Result<()> {
287
let server = Server::http1(1)?;
288
run_cli(
289
P3_HTTP_OUTBOUND_REQUEST_MISSING_PATH_AND_QUERY_COMPONENT,
290
&server,
291
)
292
.await
293
}
294
295
#[test_log::test(tokio::test(flavor = "multi_thread"))]
296
async fn wasi_http_proxy_tests() -> wasmtime::Result<()> {
297
let req = http::Request::builder()
298
.uri("http://example.com:8080/test-path")
299
.method(http::Method::GET);
300
301
let res = run_http(
302
P3_API_PROXY_COMPONENT,
303
req.body(Empty::new())?,
304
oneshot::channel().0,
305
)
306
.await?;
307
308
match res {
309
Ok(res) => println!("response: {res:?}"),
310
Err(err) => panic!("Error given in response: {err:?}"),
311
};
312
313
Ok(())
314
}
315
316
#[test_log::test(tokio::test(flavor = "multi_thread"))]
317
async fn p3_http_echo() -> Result<()> {
318
test_http_echo(P3_HTTP_ECHO_COMPONENT, false, false).await
319
}
320
321
#[test_log::test(tokio::test(flavor = "multi_thread"))]
322
async fn p3_http_echo_host_to_host() -> Result<()> {
323
test_http_echo(P3_HTTP_ECHO_COMPONENT, false, true).await
324
}
325
326
#[test_log::test(tokio::test(flavor = "multi_thread"))]
327
async fn p3_http_middleware() -> Result<()> {
328
test_http_middleware(false).await
329
}
330
331
#[test_log::test(tokio::test(flavor = "multi_thread"))]
332
async fn p3_http_middleware_host_to_host() {
333
let error = format!("{:?}", test_http_middleware(true).await.unwrap_err());
334
335
let expected = "cannot read from and write to intra-component future with non-numeric payload";
336
337
assert!(
338
error.contains(expected),
339
"expected `{expected}`; got `{error}`"
340
);
341
}
342
343
async fn test_http_middleware(host_to_host: bool) -> Result<()> {
344
let tempdir = tempfile::tempdir()?;
345
let echo = &fs::read(P3_HTTP_ECHO_COMPONENT).await?;
346
let middleware = &fs::read(P3_HTTP_MIDDLEWARE_COMPONENT).await?;
347
348
let path = tempdir.path().join("temp.wasm");
349
fs::write(&path, compose(middleware, echo).await?).await?;
350
test_http_echo(&path.to_str().unwrap(), true, host_to_host).await
351
}
352
353
async fn compose(a: &[u8], b: &[u8]) -> Result<Vec<u8>> {
354
let dir = tempfile::tempdir()?;
355
356
let a_file = dir.path().join("a.wasm");
357
fs::write(&a_file, a).await?;
358
359
let b_file = dir.path().join("b.wasm");
360
fs::write(&b_file, b).await?;
361
362
// The middleware imports `wasi:http/handler` which matches the echo's
363
// `wasi:http/handler` export, so wasm-compose can link them automatically.
364
ComponentComposer::new(
365
&a_file,
366
&wasm_compose::config::Config {
367
dir: dir.path().to_owned(),
368
definitions: vec![b_file.to_owned()],
369
..Default::default()
370
},
371
)
372
.compose()
373
.to_wasmtime_result()
374
}
375
376
#[test_log::test(tokio::test(flavor = "multi_thread"))]
377
async fn p3_http_middleware_with_chain() -> Result<()> {
378
test_http_middleware_with_chain(false).await
379
}
380
381
#[test_log::test(tokio::test(flavor = "multi_thread"))]
382
async fn p3_http_middleware_with_chain_host_to_host() -> Result<()> {
383
test_http_middleware_with_chain(true).await
384
}
385
386
async fn test_http_middleware_with_chain(host_to_host: bool) -> Result<()> {
387
let dir = tempfile::tempdir()?;
388
let path = dir.path().join("temp.wasm");
389
390
fs::copy(P3_HTTP_ECHO_COMPONENT, &dir.path().join("chain-http.wasm")).await?;
391
392
let bytes = ComponentComposer::new(
393
Path::new(P3_HTTP_MIDDLEWARE_WITH_CHAIN_COMPONENT),
394
&Config {
395
dir: dir.path().to_owned(),
396
definitions: Vec::new(),
397
search_paths: Vec::new(),
398
skip_validation: false,
399
import_components: false,
400
disallow_imports: false,
401
dependencies: [(
402
"local:local/chain-http".to_owned(),
403
Dependency {
404
path: P3_HTTP_ECHO_COMPONENT.into(),
405
},
406
)]
407
.into_iter()
408
.collect(),
409
instantiations: [(
410
"root".to_owned(),
411
Instantiation {
412
dependency: Some("local:local/chain-http".to_owned()),
413
arguments: [(
414
"local:local/chain-http".to_owned(),
415
InstantiationArg {
416
instance: "local:local/chain-http".into(),
417
export: Some("wasi:http/[email protected]".into()),
418
},
419
)]
420
.into_iter()
421
.collect(),
422
},
423
)]
424
.into_iter()
425
.collect(),
426
},
427
)
428
.compose()
429
.to_wasmtime_result()?;
430
fs::write(&path, &bytes).await?;
431
432
test_http_echo(&path.to_str().unwrap(), true, host_to_host).await
433
}
434
435
async fn test_http_echo(component: &str, use_compression: bool, host_to_host: bool) -> Result<()> {
436
_ = env_logger::try_init();
437
438
let body = b"And the mome raths outgrabe";
439
440
// Prepare the raw body, optionally compressed if that's what we're
441
// testing.
442
let raw_body = if use_compression {
443
let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast());
444
encoder.write_all(body).unwrap();
445
Bytes::from(encoder.finish().unwrap())
446
} else {
447
Bytes::copy_from_slice(body)
448
};
449
450
// Prepare the http_body body, modeled here as a channel with the body
451
// chunk above buffered up followed by some trailers. Note that trailers
452
// are always here to test that code paths throughout the components.
453
let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);
454
455
// Build the `http::Request`, optionally specifying compression-related
456
// headers.
457
let mut request = http::Request::builder()
458
.uri("http://localhost/")
459
.method(http::Method::GET)
460
.header("foo", "bar");
461
if use_compression {
462
request = request
463
.header("content-encoding", "deflate")
464
.header("accept-encoding", "nonexistent-encoding, deflate");
465
}
466
if host_to_host {
467
request = request.header("x-host-to-host", "true");
468
}
469
470
// Send this request to wasm and assert that success comes back.
471
//
472
// Note that this will read the entire body internally and wait for
473
// everything to get collected before proceeding to below.
474
let response = futures::join!(
475
run_http(
476
component,
477
request.body(http_body_util::StreamBody::new(body_rx))?,
478
oneshot::channel().0
479
),
480
async {
481
body_tx
482
.send(Ok(http_body::Frame::data(raw_body)))
483
.await
484
.unwrap();
485
body_tx
486
.send(Ok(http_body::Frame::trailers({
487
let mut trailers = http::HeaderMap::new();
488
assert!(
489
trailers
490
.insert("fizz", http::HeaderValue::from_static("buzz"))
491
.is_none()
492
);
493
trailers
494
})))
495
.await
496
.unwrap();
497
drop(body_tx);
498
}
499
)
500
.0?
501
.unwrap();
502
assert!(response.status().as_u16() == 200);
503
504
// Our input header should be echo'd back.
505
assert_eq!(
506
response.headers().get("foo"),
507
Some(&HeaderValue::from_static("bar"))
508
);
509
510
// The compression headers should be set if `use_compression` was turned
511
// on.
512
if use_compression {
513
assert_eq!(
514
response.headers().get("content-encoding"),
515
Some(&HeaderValue::from_static("deflate"))
516
);
517
assert!(response.headers().get("content-length").is_none());
518
}
519
520
// Trailers should be echo'd back as well.
521
let trailers = response.body().trailers().expect("trailers missing");
522
assert_eq!(
523
trailers.get("fizz"),
524
Some(&HeaderValue::from_static("buzz"))
525
);
526
527
// And our body should match our original input body as well.
528
let (_, collected_body) = response.into_parts();
529
let collected_body = collected_body.to_bytes();
530
531
let response_body = if use_compression {
532
let mut decoder = DeflateDecoder::new(Vec::new());
533
decoder.write_all(&collected_body)?;
534
decoder.finish()?
535
} else {
536
collected_body.to_vec()
537
};
538
assert_eq!(response_body, body.as_slice());
539
Ok(())
540
}
541
542
#[test_log::test(tokio::test(flavor = "multi_thread"))]
543
async fn p3_http_proxy() -> Result<()> {
544
let body = b"And the mome raths outgrabe";
545
546
let raw_body = Bytes::copy_from_slice(body);
547
548
let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);
549
550
// Tell the guest to forward the request to `http://p3-test/`, which we
551
// handle specially in `TestHttpCtx::send_request` above, sending the
552
// request body to the oneshot sender we specify below and then immediately
553
// returning a dummy response. We won't start sending the request body
554
// until after the guest has exited and we've dropped the store.
555
556
let request = http::Request::builder()
557
.uri("http://localhost/")
558
.method(http::Method::GET)
559
.header("url", "http://p3-test/");
560
561
let (request_body_tx, request_body_rx) = oneshot::channel();
562
let response = run_http(
563
P3_HTTP_PROXY_COMPONENT,
564
request.body(http_body_util::StreamBody::new(body_rx))?,
565
request_body_tx,
566
)
567
.await?
568
.unwrap();
569
assert!(response.status().as_u16() == 200);
570
571
// The guest has exited and the store has been dropped; now we finally send
572
// the request body and assert that we've received the entire thing.
573
574
let ((), request_body) = futures::join!(
575
async {
576
body_tx
577
.send(Ok(http_body::Frame::data(raw_body)))
578
.await
579
.unwrap();
580
drop(body_tx);
581
},
582
async {
583
request_body_rx
584
.await
585
.unwrap()
586
.collect()
587
.await
588
.unwrap()
589
.to_bytes()
590
}
591
);
592
593
assert_eq!(request_body, body.as_slice());
594
Ok(())
595
}
596
597
// Custom body wrapper that sends a configurable frame at EOS while reporting is_end_stream() = true
598
struct BodyWithFrameAtEos {
599
inner: http_body_util::StreamBody<
600
futures::channel::mpsc::Receiver<Result<http_body::Frame<Bytes>, ErrorCode>>,
601
>,
602
final_frame: Option<Bytes>,
603
sent_final: bool,
604
at_eos: bool,
605
}
606
607
impl http_body::Body for BodyWithFrameAtEos {
608
type Data = Bytes;
609
type Error = ErrorCode;
610
611
fn poll_frame(
612
mut self: Pin<&mut Self>,
613
cx: &mut Context<'_>,
614
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
615
// First, poll the underlying body
616
let this = &mut *self;
617
match Pin::new(&mut this.inner).poll_frame(cx) {
618
Poll::Ready(None) if !this.sent_final => {
619
// When the underlying body ends, send the configured final frame
620
// This simulates HTTP implementations that send frames at EOS
621
this.sent_final = true;
622
this.at_eos = true;
623
if let Some(data) = this.final_frame.take() {
624
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
625
} else {
626
Poll::Ready(None)
627
}
628
}
629
other => other,
630
}
631
}
632
633
fn is_end_stream(&self) -> bool {
634
// Report end of stream once we've reached it
635
// This ensures is_end_stream() = true when we send the final frame
636
self.at_eos
637
}
638
}
639
640
#[test_log::test(tokio::test(flavor = "multi_thread"))]
641
async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> {
642
_ = env_logger::try_init();
643
644
// This test verifies the fix which handles the case where a zero-length frame is
645
// received when is_end_stream() is true. Without the fix, the StreamProducer would
646
// crash when the WASM guest tries to read such a frame.
647
648
let body = b"test";
649
let raw_body = Bytes::copy_from_slice(body);
650
651
let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);
652
653
let wrapped_body = BodyWithFrameAtEos {
654
inner: http_body_util::StreamBody::new(body_rx),
655
final_frame: Some(Bytes::new()), // Send empty frame at EOS
656
sent_final: false,
657
at_eos: false,
658
};
659
660
let request = http::Request::builder()
661
.uri("http://localhost/")
662
.method(http::Method::GET);
663
664
// Use the echo component which actually reads from the stream
665
let response = futures::join!(
666
run_http(
667
P3_HTTP_ECHO_COMPONENT,
668
request.body(wrapped_body)?,
669
oneshot::channel().0
670
),
671
async {
672
body_tx
673
.send(Ok(http_body::Frame::data(raw_body)))
674
.await
675
.unwrap();
676
drop(body_tx);
677
}
678
)
679
.0?
680
.unwrap();
681
682
assert_eq!(response.status().as_u16(), 200);
683
684
// Verify the body was echoed correctly (empty frames should be filtered out by the fix)
685
let (_, collected_body) = response.into_parts();
686
let collected_body = collected_body.to_bytes();
687
assert_eq!(collected_body, body.as_slice());
688
Ok(())
689
}
690
691
#[test_log::test(tokio::test(flavor = "multi_thread"))]
692
async fn p3_http_data_frame_at_end_of_stream() -> Result<()> {
693
_ = env_logger::try_init();
694
695
// This test verifies that when is_end_stream() is true but the frame contains data,
696
// we still process the data.
697
698
let body = b"test";
699
let final_data = b" final";
700
let raw_body = Bytes::copy_from_slice(body);
701
let final_frame = Bytes::copy_from_slice(final_data);
702
703
let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);
704
705
let wrapped_body = BodyWithFrameAtEos {
706
inner: http_body_util::StreamBody::new(body_rx),
707
final_frame: Some(final_frame), // Send data frame at EOS with is_end_stream() = true
708
sent_final: false,
709
at_eos: false,
710
};
711
712
let request = http::Request::builder()
713
.uri("http://localhost/")
714
.method(http::Method::GET);
715
716
// Use the echo component which actually reads from the stream
717
let response = futures::join!(
718
run_http(
719
P3_HTTP_ECHO_COMPONENT,
720
request.body(wrapped_body)?,
721
oneshot::channel().0
722
),
723
async {
724
body_tx
725
.send(Ok(http_body::Frame::data(raw_body)))
726
.await
727
.unwrap();
728
drop(body_tx);
729
}
730
)
731
.0?
732
.unwrap();
733
734
assert_eq!(response.status().as_u16(), 200);
735
736
// Verify the body was echoed correctly (the final frame's data should not be lost)
737
let (_, collected_body) = response.into_parts();
738
let collected_body = collected_body.to_bytes();
739
let expected = [body.as_slice(), final_data.as_slice()].concat();
740
assert_eq!(collected_body, expected.as_slice());
741
Ok(())
742
}
743
744