Path: blob/main/crates/wasi-http/tests/all/p3/mod.rs
3139 views
use crate::http_server::Server;1use bytes::Bytes;2use flate2::Compression;3use flate2::write::{DeflateDecoder, DeflateEncoder};4use futures::SinkExt;5use futures::channel::oneshot;6use http::HeaderValue;7use http_body::Body;8use http_body_util::{BodyExt as _, Collected, Empty, combinators::UnsyncBoxBody};9use std::io::Write;10use std::path::Path;11use std::pin::Pin;12use std::task::{Context, Poll};13use test_programs_artifacts::*;14use tokio::{fs, try_join};15use wasm_compose::composer::ComponentComposer;16use wasm_compose::config::{Config, Dependency, Instantiation, InstantiationArg};17use wasmtime::component::{Component, Linker, ResourceTable};18use wasmtime::{Result, Store, ToWasmtimeResult as _, error::Context as _, format_err};19use wasmtime_wasi::p3::bindings::Command;20use wasmtime_wasi::{TrappableError, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};21use wasmtime_wasi_http::p3::bindings::Service;22use wasmtime_wasi_http::p3::bindings::http::types::ErrorCode;23use wasmtime_wasi_http::p3::{24self, Request, RequestOptions, WasiHttpCtx, WasiHttpCtxView, WasiHttpView,25};26use wasmtime_wasi_http::types::DEFAULT_FORBIDDEN_HEADERS;2728foreach_p3_http!(assert_test_exists);2930struct TestHttpCtx {31request_body_tx: Option<oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>>,32}3334impl WasiHttpCtx for TestHttpCtx {35fn is_forbidden_header(&mut self, name: &http::header::HeaderName) -> bool {36name.as_str() == "custom-forbidden-header" || DEFAULT_FORBIDDEN_HEADERS.contains(name)37}3839fn send_request(40&mut self,41request: http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,42options: Option<RequestOptions>,43fut: Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,44) -> Box<45dyn Future<46Output = Result<47(48http::Response<UnsyncBoxBody<Bytes, ErrorCode>>,49Box<dyn Future<Output = Result<(), ErrorCode>> + Send>,50),51TrappableError<ErrorCode>,52>,53> + Send,54> {55_ = fut;56if let Some("p3-test") = request.uri().authority().map(|v| v.as_str()) {57_ = self58.request_body_tx59.take()60.unwrap()61.send(request.into_body());62Box::new(async {63Ok((64http::Response::new(Default::default()),65Box::new(async { Ok(()) }) as Box<dyn Future<Output = _> + Send>,66))67})68} else {69Box::new(async move {70use http_body_util::BodyExt;7172let (res, io) = p3::default_send_request(request, options).await?;73Ok((74res.map(BodyExt::boxed_unsync),75Box::new(io) as Box<dyn Future<Output = _> + Send>,76))77})78}79}80}8182struct Ctx {83table: ResourceTable,84wasi: WasiCtx,85http: TestHttpCtx,86}8788impl Ctx {89fn new(request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>) -> Self {90Self {91table: ResourceTable::default(),92wasi: WasiCtxBuilder::new().inherit_stdio().build(),93http: TestHttpCtx {94request_body_tx: Some(request_body_tx),95},96}97}98}99100impl WasiView for Ctx {101fn ctx(&mut self) -> WasiCtxView<'_> {102WasiCtxView {103ctx: &mut self.wasi,104table: &mut self.table,105}106}107}108109impl WasiHttpView for Ctx {110fn http(&mut self) -> WasiHttpCtxView<'_> {111WasiHttpCtxView {112ctx: &mut self.http,113table: &mut self.table,114}115}116}117118async fn run_cli(path: &str, server: &Server) -> wasmtime::Result<()> {119let engine = test_programs_artifacts::engine(|config| {120config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);121config.wasm_component_model_async(true);122});123let component = Component::from_file(&engine, path)?;124let mut store = Store::new(125&engine,126Ctx {127wasi: wasmtime_wasi::WasiCtx::builder()128.env("HTTP_SERVER", server.addr())129.build(),130..Ctx::new(oneshot::channel().0)131},132);133let mut linker = Linker::new(&engine);134wasmtime_wasi::p2::add_to_linker_async(&mut linker)135.context("failed to link `wasi:[email protected]`")?;136wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:[email protected]`")?;137wasmtime_wasi_http::p3::add_to_linker(&mut linker)138.context("failed to link `wasi:[email protected]`")?;139let command = Command::instantiate_async(&mut store, &component, &linker).await?;140store141.run_concurrent(async |store| command.wasi_cli_run().call_run(store).await)142.await143.context("failed to call `wasi:cli/run#run`")?144.context("guest trapped")?145.0146.map_err(|()| format_err!("`wasi:cli/run#run` failed"))147}148149async fn run_http<E: Into<ErrorCode> + 'static>(150component_filename: &str,151req: http::Request<impl Body<Data = Bytes, Error = E> + Send + Sync + 'static>,152request_body_tx: oneshot::Sender<UnsyncBoxBody<Bytes, ErrorCode>>,153) -> wasmtime::Result<Result<http::Response<Collected<Bytes>>, Option<ErrorCode>>> {154let engine = test_programs_artifacts::engine(|config| {155config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);156config.wasm_component_model_async(true);157});158let component = Component::from_file(&engine, component_filename)?;159160let mut store = Store::new(&engine, Ctx::new(request_body_tx));161162let mut linker = Linker::new(&engine);163wasmtime_wasi::p2::add_to_linker_async(&mut linker)164.context("failed to link `wasi:[email protected]`")?;165wasmtime_wasi::p3::add_to_linker(&mut linker).context("failed to link `wasi:[email protected]`")?;166wasmtime_wasi_http::p3::add_to_linker(&mut linker)167.context("failed to link `wasi:[email protected]`")?;168let service = Service::instantiate_async(&mut store, &component, &linker).await?;169let (req, io) = Request::from_http(req);170let (tx, rx) = tokio::sync::oneshot::channel();171let ((handle_result, ()), res) = try_join!(172async move {173store174.run_concurrent(async |store| {175try_join!(176async {177let (res, task) = match service.handle(store, req).await? {178Ok(pair) => pair,179Err(err) => return Ok(Err(Some(err))),180};181_ = tx182.send(store.with(|store| res.into_http(store, async { Ok(()) }))?);183task.block(store).await;184Ok(Ok(()))185},186async { io.await.context("failed to consume request body") }187)188})189.await?190},191async move {192let res = rx.await?;193let (parts, body) = res.into_parts();194let body = body.collect().await.context("failed to collect body")?;195wasmtime::error::Ok(http::Response::from_parts(parts, body))196}197)?;198199Ok(handle_result.map(|()| res))200}201202#[test_log::test(tokio::test(flavor = "multi_thread"))]203async fn p3_http_outbound_request_get() -> wasmtime::Result<()> {204let server = Server::http1(1)?;205run_cli(P3_HTTP_OUTBOUND_REQUEST_GET_COMPONENT, &server).await206}207208#[test_log::test(tokio::test(flavor = "multi_thread"))]209async fn p3_http_outbound_request_timeout() -> wasmtime::Result<()> {210let server = Server::http1(1)?;211run_cli(P3_HTTP_OUTBOUND_REQUEST_TIMEOUT_COMPONENT, &server).await212}213214#[test_log::test(tokio::test(flavor = "multi_thread"))]215async fn p3_http_outbound_request_post() -> wasmtime::Result<()> {216let server = Server::http1(1)?;217run_cli(P3_HTTP_OUTBOUND_REQUEST_POST_COMPONENT, &server).await218}219220#[test_log::test(tokio::test(flavor = "multi_thread"))]221async fn p3_http_outbound_request_large_post() -> wasmtime::Result<()> {222let server = Server::http1(1)?;223run_cli(P3_HTTP_OUTBOUND_REQUEST_LARGE_POST_COMPONENT, &server).await224}225226#[test_log::test(tokio::test(flavor = "multi_thread"))]227async fn p3_http_outbound_request_put() -> wasmtime::Result<()> {228let server = Server::http1(1)?;229run_cli(P3_HTTP_OUTBOUND_REQUEST_PUT_COMPONENT, &server).await230}231232#[test_log::test(tokio::test(flavor = "multi_thread"))]233async fn p3_http_outbound_request_invalid_version() -> wasmtime::Result<()> {234let server = Server::http2(1)?;235run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_VERSION_COMPONENT, &server).await236}237238#[test_log::test(tokio::test(flavor = "multi_thread"))]239async fn p3_http_outbound_request_invalid_header() -> wasmtime::Result<()> {240let server = Server::http2(1)?;241run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_HEADER_COMPONENT, &server).await242}243244#[test_log::test(tokio::test(flavor = "multi_thread"))]245async fn p3_http_outbound_request_unknown_method() -> wasmtime::Result<()> {246let server = Server::http1(1)?;247run_cli(P3_HTTP_OUTBOUND_REQUEST_UNKNOWN_METHOD_COMPONENT, &server).await248}249250#[test_log::test(tokio::test(flavor = "multi_thread"))]251async fn p3_http_outbound_request_unsupported_scheme() -> wasmtime::Result<()> {252let server = Server::http1(1)?;253run_cli(254P3_HTTP_OUTBOUND_REQUEST_UNSUPPORTED_SCHEME_COMPONENT,255&server,256)257.await258}259260#[test_log::test(tokio::test(flavor = "multi_thread"))]261async fn p3_http_outbound_request_invalid_port() -> wasmtime::Result<()> {262let server = Server::http1(1)?;263run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_PORT_COMPONENT, &server).await264}265266#[test_log::test(tokio::test(flavor = "multi_thread"))]267async fn p3_http_outbound_request_invalid_dnsname() -> wasmtime::Result<()> {268let server = Server::http1(1)?;269run_cli(P3_HTTP_OUTBOUND_REQUEST_INVALID_DNSNAME_COMPONENT, &server).await270}271272#[test_log::test(tokio::test(flavor = "multi_thread"))]273async fn p3_http_outbound_request_response_build() -> wasmtime::Result<()> {274let server = Server::http1(1)?;275run_cli(P3_HTTP_OUTBOUND_REQUEST_RESPONSE_BUILD_COMPONENT, &server).await276}277278#[test_log::test(tokio::test(flavor = "multi_thread"))]279async fn p3_http_outbound_request_content_length() -> wasmtime::Result<()> {280let server = Server::http1(3)?;281run_cli(P3_HTTP_OUTBOUND_REQUEST_CONTENT_LENGTH_COMPONENT, &server).await282}283284#[test_log::test(tokio::test(flavor = "multi_thread"))]285async fn p3_http_outbound_request_missing_path_and_query() -> wasmtime::Result<()> {286let server = Server::http1(1)?;287run_cli(288P3_HTTP_OUTBOUND_REQUEST_MISSING_PATH_AND_QUERY_COMPONENT,289&server,290)291.await292}293294#[test_log::test(tokio::test(flavor = "multi_thread"))]295async fn wasi_http_proxy_tests() -> wasmtime::Result<()> {296let req = http::Request::builder()297.uri("http://example.com:8080/test-path")298.method(http::Method::GET);299300let res = run_http(301P3_API_PROXY_COMPONENT,302req.body(Empty::new())?,303oneshot::channel().0,304)305.await?;306307match res {308Ok(res) => println!("response: {res:?}"),309Err(err) => panic!("Error given in response: {err:?}"),310};311312Ok(())313}314315#[test_log::test(tokio::test(flavor = "multi_thread"))]316async fn p3_http_echo() -> Result<()> {317test_http_echo(P3_HTTP_ECHO_COMPONENT, false, false).await318}319320#[test_log::test(tokio::test(flavor = "multi_thread"))]321async fn p3_http_echo_host_to_host() -> Result<()> {322test_http_echo(P3_HTTP_ECHO_COMPONENT, false, true).await323}324325#[test_log::test(tokio::test(flavor = "multi_thread"))]326async fn p3_http_middleware() -> Result<()> {327test_http_middleware(false).await328}329330#[test_log::test(tokio::test(flavor = "multi_thread"))]331async fn p3_http_middleware_host_to_host() {332let error = format!("{:?}", test_http_middleware(true).await.unwrap_err());333334let expected = "cannot read from and write to intra-component future with non-numeric payload";335336assert!(337error.contains(expected),338"expected `{expected}`; got `{error}`"339);340}341342async fn test_http_middleware(host_to_host: bool) -> Result<()> {343let tempdir = tempfile::tempdir()?;344let echo = &fs::read(P3_HTTP_ECHO_COMPONENT).await?;345let middleware = &fs::read(P3_HTTP_MIDDLEWARE_COMPONENT).await?;346347let path = tempdir.path().join("temp.wasm");348fs::write(&path, compose(middleware, echo).await?).await?;349test_http_echo(&path.to_str().unwrap(), true, host_to_host).await350}351352async fn compose(a: &[u8], b: &[u8]) -> Result<Vec<u8>> {353let dir = tempfile::tempdir()?;354355let a_file = dir.path().join("a.wasm");356fs::write(&a_file, a).await?;357358let b_file = dir.path().join("b.wasm");359fs::write(&b_file, b).await?;360361// The middleware imports `wasi:http/handler` which matches the echo's362// `wasi:http/handler` export, so wasm-compose can link them automatically.363ComponentComposer::new(364&a_file,365&wasm_compose::config::Config {366dir: dir.path().to_owned(),367definitions: vec![b_file.to_owned()],368..Default::default()369},370)371.compose()372.to_wasmtime_result()373}374375#[test_log::test(tokio::test(flavor = "multi_thread"))]376async fn p3_http_middleware_with_chain() -> Result<()> {377test_http_middleware_with_chain(false).await378}379380#[test_log::test(tokio::test(flavor = "multi_thread"))]381async fn p3_http_middleware_with_chain_host_to_host() -> Result<()> {382test_http_middleware_with_chain(true).await383}384385async fn test_http_middleware_with_chain(host_to_host: bool) -> Result<()> {386let dir = tempfile::tempdir()?;387let path = dir.path().join("temp.wasm");388389fs::copy(P3_HTTP_ECHO_COMPONENT, &dir.path().join("chain-http.wasm")).await?;390391let bytes = ComponentComposer::new(392Path::new(P3_HTTP_MIDDLEWARE_WITH_CHAIN_COMPONENT),393&Config {394dir: dir.path().to_owned(),395definitions: Vec::new(),396search_paths: Vec::new(),397skip_validation: false,398import_components: false,399disallow_imports: false,400dependencies: [(401"local:local/chain-http".to_owned(),402Dependency {403path: P3_HTTP_ECHO_COMPONENT.into(),404},405)]406.into_iter()407.collect(),408instantiations: [(409"root".to_owned(),410Instantiation {411dependency: Some("local:local/chain-http".to_owned()),412arguments: [(413"local:local/chain-http".to_owned(),414InstantiationArg {415instance: "local:local/chain-http".into(),416export: Some("wasi:http/[email protected]".into()),417},418)]419.into_iter()420.collect(),421},422)]423.into_iter()424.collect(),425},426)427.compose()428.to_wasmtime_result()?;429fs::write(&path, &bytes).await?;430431test_http_echo(&path.to_str().unwrap(), true, host_to_host).await432}433434async fn test_http_echo(component: &str, use_compression: bool, host_to_host: bool) -> Result<()> {435_ = env_logger::try_init();436437let body = b"And the mome raths outgrabe";438439// Prepare the raw body, optionally compressed if that's what we're440// testing.441let raw_body = if use_compression {442let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast());443encoder.write_all(body).unwrap();444Bytes::from(encoder.finish().unwrap())445} else {446Bytes::copy_from_slice(body)447};448449// Prepare the http_body body, modeled here as a channel with the body450// chunk above buffered up followed by some trailers. Note that trailers451// are always here to test that code paths throughout the components.452let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);453454// Build the `http::Request`, optionally specifying compression-related455// headers.456let mut request = http::Request::builder()457.uri("http://localhost/")458.method(http::Method::GET)459.header("foo", "bar");460if use_compression {461request = request462.header("content-encoding", "deflate")463.header("accept-encoding", "nonexistent-encoding, deflate");464}465if host_to_host {466request = request.header("x-host-to-host", "true");467}468469// Send this request to wasm and assert that success comes back.470//471// Note that this will read the entire body internally and wait for472// everything to get collected before proceeding to below.473let response = futures::join!(474run_http(475component,476request.body(http_body_util::StreamBody::new(body_rx))?,477oneshot::channel().0478),479async {480body_tx481.send(Ok(http_body::Frame::data(raw_body)))482.await483.unwrap();484body_tx485.send(Ok(http_body::Frame::trailers({486let mut trailers = http::HeaderMap::new();487assert!(488trailers489.insert("fizz", http::HeaderValue::from_static("buzz"))490.is_none()491);492trailers493})))494.await495.unwrap();496drop(body_tx);497}498)499.0?500.unwrap();501assert!(response.status().as_u16() == 200);502503// Our input header should be echo'd back.504assert_eq!(505response.headers().get("foo"),506Some(&HeaderValue::from_static("bar"))507);508509// The compression headers should be set if `use_compression` was turned510// on.511if use_compression {512assert_eq!(513response.headers().get("content-encoding"),514Some(&HeaderValue::from_static("deflate"))515);516assert!(response.headers().get("content-length").is_none());517}518519// Trailers should be echo'd back as well.520let trailers = response.body().trailers().expect("trailers missing");521assert_eq!(522trailers.get("fizz"),523Some(&HeaderValue::from_static("buzz"))524);525526// And our body should match our original input body as well.527let (_, collected_body) = response.into_parts();528let collected_body = collected_body.to_bytes();529530let response_body = if use_compression {531let mut decoder = DeflateDecoder::new(Vec::new());532decoder.write_all(&collected_body)?;533decoder.finish()?534} else {535collected_body.to_vec()536};537assert_eq!(response_body, body.as_slice());538Ok(())539}540541#[test_log::test(tokio::test(flavor = "multi_thread"))]542async fn p3_http_proxy() -> Result<()> {543let body = b"And the mome raths outgrabe";544545let raw_body = Bytes::copy_from_slice(body);546547let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);548549// Tell the guest to forward the request to `http://p3-test/`, which we550// handle specially in `TestHttpCtx::send_request` above, sending the551// request body to the oneshot sender we specify below and then immediately552// returning a dummy response. We won't start sending the request body553// until after the guest has exited and we've dropped the store.554555let request = http::Request::builder()556.uri("http://localhost/")557.method(http::Method::GET)558.header("url", "http://p3-test/");559560let (request_body_tx, request_body_rx) = oneshot::channel();561let response = run_http(562P3_HTTP_PROXY_COMPONENT,563request.body(http_body_util::StreamBody::new(body_rx))?,564request_body_tx,565)566.await?567.unwrap();568assert!(response.status().as_u16() == 200);569570// The guest has exited and the store has been dropped; now we finally send571// the request body and assert that we've received the entire thing.572573let ((), request_body) = futures::join!(574async {575body_tx576.send(Ok(http_body::Frame::data(raw_body)))577.await578.unwrap();579drop(body_tx);580},581async {582request_body_rx583.await584.unwrap()585.collect()586.await587.unwrap()588.to_bytes()589}590);591592assert_eq!(request_body, body.as_slice());593Ok(())594}595596// Custom body wrapper that sends a configurable frame at EOS while reporting is_end_stream() = true597struct BodyWithFrameAtEos {598inner: http_body_util::StreamBody<599futures::channel::mpsc::Receiver<Result<http_body::Frame<Bytes>, ErrorCode>>,600>,601final_frame: Option<Bytes>,602sent_final: bool,603at_eos: bool,604}605606impl http_body::Body for BodyWithFrameAtEos {607type Data = Bytes;608type Error = ErrorCode;609610fn poll_frame(611mut self: Pin<&mut Self>,612cx: &mut Context<'_>,613) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {614// First, poll the underlying body615let this = &mut *self;616match Pin::new(&mut this.inner).poll_frame(cx) {617Poll::Ready(None) if !this.sent_final => {618// When the underlying body ends, send the configured final frame619// This simulates HTTP implementations that send frames at EOS620this.sent_final = true;621this.at_eos = true;622if let Some(data) = this.final_frame.take() {623Poll::Ready(Some(Ok(http_body::Frame::data(data))))624} else {625Poll::Ready(None)626}627}628other => other,629}630}631632fn is_end_stream(&self) -> bool {633// Report end of stream once we've reached it634// This ensures is_end_stream() = true when we send the final frame635self.at_eos636}637}638639#[test_log::test(tokio::test(flavor = "multi_thread"))]640async fn p3_http_empty_frame_at_end_of_stream() -> Result<()> {641_ = env_logger::try_init();642643// This test verifies the fix which handles the case where a zero-length frame is644// received when is_end_stream() is true. Without the fix, the StreamProducer would645// crash when the WASM guest tries to read such a frame.646647let body = b"test";648let raw_body = Bytes::copy_from_slice(body);649650let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);651652let wrapped_body = BodyWithFrameAtEos {653inner: http_body_util::StreamBody::new(body_rx),654final_frame: Some(Bytes::new()), // Send empty frame at EOS655sent_final: false,656at_eos: false,657};658659let request = http::Request::builder()660.uri("http://localhost/")661.method(http::Method::GET);662663// Use the echo component which actually reads from the stream664let response = futures::join!(665run_http(666P3_HTTP_ECHO_COMPONENT,667request.body(wrapped_body)?,668oneshot::channel().0669),670async {671body_tx672.send(Ok(http_body::Frame::data(raw_body)))673.await674.unwrap();675drop(body_tx);676}677)678.0?679.unwrap();680681assert_eq!(response.status().as_u16(), 200);682683// Verify the body was echoed correctly (empty frames should be filtered out by the fix)684let (_, collected_body) = response.into_parts();685let collected_body = collected_body.to_bytes();686assert_eq!(collected_body, body.as_slice());687Ok(())688}689690#[test_log::test(tokio::test(flavor = "multi_thread"))]691async fn p3_http_data_frame_at_end_of_stream() -> Result<()> {692_ = env_logger::try_init();693694// This test verifies that when is_end_stream() is true but the frame contains data,695// we still process the data.696697let body = b"test";698let final_data = b" final";699let raw_body = Bytes::copy_from_slice(body);700let final_frame = Bytes::copy_from_slice(final_data);701702let (mut body_tx, body_rx) = futures::channel::mpsc::channel::<Result<_, ErrorCode>>(1);703704let wrapped_body = BodyWithFrameAtEos {705inner: http_body_util::StreamBody::new(body_rx),706final_frame: Some(final_frame), // Send data frame at EOS with is_end_stream() = true707sent_final: false,708at_eos: false,709};710711let request = http::Request::builder()712.uri("http://localhost/")713.method(http::Method::GET);714715// Use the echo component which actually reads from the stream716let response = futures::join!(717run_http(718P3_HTTP_ECHO_COMPONENT,719request.body(wrapped_body)?,720oneshot::channel().0721),722async {723body_tx724.send(Ok(http_body::Frame::data(raw_body)))725.await726.unwrap();727drop(body_tx);728}729)730.0?731.unwrap();732733assert_eq!(response.status().as_u16(), 200);734735// Verify the body was echoed correctly (the final frame's data should not be lost)736let (_, collected_body) = response.into_parts();737let collected_body = collected_body.to_bytes();738let expected = [body.as_slice(), final_data.as_slice()].concat();739assert_eq!(collected_body, expected.as_slice());740Ok(())741}742743744