Path: blob/main/crates/wasi-http/tests/all/p2.rs
3092 views
use crate::body;1use crate::http_server::Server;2use futures::{FutureExt, channel::oneshot, future, stream};3use http_body::Frame;4use http_body_util::{BodyExt, Collected, Empty, StreamBody, combinators::BoxBody};5use hyper::{Method, StatusCode, body::Bytes, server::conn::http1, service::service_fn};6use sha2::{Digest, Sha256};7use std::{collections::HashMap, iter, net::Ipv4Addr, str, sync::Arc};8use tokio::task;9use wasmtime::{10Config, Engine, Result, Store,11component::{Component, Linker, ResourceTable},12error::Context as _,13format_err,14};15use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView, p2::pipe::MemoryOutputPipe};16use wasmtime_wasi_http::{17HttpResult, WasiHttpCtx, WasiHttpView,18bindings::http::types::{ErrorCode, Scheme},19body::HyperOutgoingBody,20io::TokioIo,21types::{self, HostFutureIncomingResponse, IncomingResponse, OutgoingRequestConfig},22};2324type RequestSender = Arc<25dyn Fn(hyper::Request<HyperOutgoingBody>, OutgoingRequestConfig) -> HostFutureIncomingResponse26+ Send27+ Sync,28>;2930struct Ctx {31table: ResourceTable,32wasi: WasiCtx,33http: WasiHttpCtx,34stdout: MemoryOutputPipe,35stderr: MemoryOutputPipe,36send_request: Option<RequestSender>,37rejected_authority: Option<String>,38}3940impl WasiView for Ctx {41fn ctx(&mut self) -> WasiCtxView<'_> {42WasiCtxView {43ctx: &mut self.wasi,44table: &mut self.table,45}46}47}4849impl WasiHttpView for Ctx {50fn ctx(&mut self) -> &mut WasiHttpCtx {51&mut self.http52}5354fn table(&mut self) -> &mut ResourceTable {55&mut self.table56}5758fn send_request(59&mut self,60request: hyper::Request<HyperOutgoingBody>,61config: OutgoingRequestConfig,62) -> HttpResult<HostFutureIncomingResponse> {63if let Some(rejected_authority) = &self.rejected_authority {64let authority = request.uri().authority().map(ToString::to_string).unwrap();65if &authority == rejected_authority {66return Err(ErrorCode::HttpRequestDenied.into());67}68}69if let Some(send_request) = self.send_request.clone() {70Ok(send_request(request, config))71} else {72Ok(types::default_send_request(request, config))73}74}7576fn is_forbidden_header(&mut self, name: &hyper::header::HeaderName) -> bool {77types::DEFAULT_FORBIDDEN_HEADERS.contains(name)78|| name.as_str() == "custom-forbidden-header"79}80}8182fn store(engine: &Engine, server: &Server) -> Store<Ctx> {83let stdout = MemoryOutputPipe::new(4096);84let stderr = MemoryOutputPipe::new(4096);8586// Create our wasi context.87let mut builder = WasiCtx::builder();88builder.stdout(stdout.clone());89builder.stderr(stderr.clone());90builder.env("HTTP_SERVER", &server.addr());91let ctx = Ctx {92table: ResourceTable::new(),93wasi: builder.build(),94http: WasiHttpCtx::new(),95stderr,96stdout,97send_request: None,98rejected_authority: None,99};100101Store::new(&engine, ctx)102}103104impl Drop for Ctx {105fn drop(&mut self) {106let stdout = self.stdout.contents();107if !stdout.is_empty() {108println!("[guest] stdout:\n{}\n===", String::from_utf8_lossy(&stdout));109}110let stderr = self.stderr.contents();111if !stderr.is_empty() {112println!("[guest] stderr:\n{}\n===", String::from_utf8_lossy(&stderr));113}114}115}116117mod async_;118mod sync;119120async fn run_wasi_http(121component_filename: &str,122req: hyper::Request<BoxBody<Bytes, hyper::Error>>,123send_request: Option<RequestSender>,124rejected_authority: Option<String>,125early_drop: bool,126) -> wasmtime::Result<Result<hyper::Response<Collected<Bytes>>, ErrorCode>> {127let stdout = MemoryOutputPipe::new(4096);128let stderr = MemoryOutputPipe::new(4096);129let table = ResourceTable::new();130131let mut config = Config::new();132config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);133config.wasm_component_model(true);134let engine = Engine::new(&config)?;135let component = Component::from_file(&engine, component_filename)?;136137// Create our wasi context.138let mut builder = WasiCtx::builder();139builder.stdout(stdout.clone());140builder.stderr(stderr.clone());141let wasi = builder.build();142let http = WasiHttpCtx::new();143let ctx = Ctx {144table,145wasi,146http,147stderr,148stdout,149send_request,150rejected_authority,151};152let mut store = Store::new(&engine, ctx);153154let mut linker = Linker::new(&engine);155wasmtime_wasi_http::add_to_linker_async(&mut linker)?;156let proxy =157wasmtime_wasi_http::bindings::Proxy::instantiate_async(&mut store, &component, &linker)158.await?;159160let req = store.data_mut().new_incoming_request(Scheme::Http, req)?;161162let (sender, receiver) = tokio::sync::oneshot::channel();163let out = store.data_mut().new_response_outparam(sender)?;164165let receiver = if early_drop {166// Drop the receiver early, emulating a host event like167// timeout occurred so that the processing has to be stopped.168drop(receiver);169None170} else {171Some(receiver)172};173174let handle = wasmtime_wasi::runtime::spawn(async move {175proxy176.wasi_http_incoming_handler()177.call_handle(&mut store, req, out)178.await?;179180Ok::<_, wasmtime::Error>(())181});182183if let Some(r) = receiver {184let resp = match r.await {185Ok(Ok(resp)) => {186let (parts, body) = resp.into_parts();187let collected = BodyExt::collect(body).await?;188Some(Ok(hyper::Response::from_parts(parts, collected)))189}190Ok(Err(e)) => Some(Err(e)),191192// Fall through below to the `resp.expect(...)` which will hopefully193// return a more specific error from `handle.await`.194Err(_) => None,195};196197// Now that the response has been processed, we can wait on the wasm to198// finish without deadlocking.199handle.await.context("Component execution")?;200201Ok(resp.expect("wasm never called set-response-outparam"))202} else {203handle.await.context("Component execution")?;204Ok(Err(ErrorCode::HttpResponseTimeout))205}206}207208#[test_log::test(tokio::test)]209async fn wasi_http_proxy_tests() -> wasmtime::Result<()> {210let req = hyper::Request::builder()211.header("custom-forbidden-header", "yes")212.uri("http://example.com:8080/test-path")213.method(http::Method::GET);214215let resp = run_wasi_http(216test_programs_artifacts::P2_API_PROXY_COMPONENT,217req.body(body::empty())?,218None,219None,220false,221)222.await?;223224match resp {225Ok(resp) => println!("response: {resp:?}"),226Err(e) => panic!("Error given in response: {e:?}"),227};228229Ok(())230}231232#[test_log::test(tokio::test)]233async fn wasi_http_hash_all() -> Result<()> {234do_wasi_http_hash_all(false).await235}236237#[test_log::test(tokio::test)]238async fn wasi_http_hash_all_with_override() -> Result<()> {239do_wasi_http_hash_all(true).await240}241242async fn do_wasi_http_hash_all(override_send_request: bool) -> Result<()> {243let bodies = Arc::new(244[245("/a", "’Twas brillig, and the slithy toves"),246("/b", "Did gyre and gimble in the wabe:"),247("/c", "All mimsy were the borogoves,"),248("/d", "And the mome raths outgrabe."),249]250.into_iter()251.collect::<HashMap<_, _>>(),252);253254let listener = tokio::net::TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 0)).await?;255256let prefix = format!("http://{}", listener.local_addr()?);257258let (_tx, rx) = oneshot::channel::<()>();259260let handle = {261let bodies = bodies.clone();262263move |request: http::request::Parts| {264if let (Method::GET, Some(body)) = (request.method, bodies.get(request.uri.path())) {265Ok::<_, wasmtime::Error>(hyper::Response::new(body::full(Bytes::copy_from_slice(266body.as_bytes(),267))))268} else {269Ok(hyper::Response::builder()270.status(StatusCode::METHOD_NOT_ALLOWED)271.body(body::empty())?)272}273}274};275276let send_request = if override_send_request {277Some(Arc::new(278move |request: hyper::Request<HyperOutgoingBody>,279OutgoingRequestConfig {280between_bytes_timeout,281..282}| {283let response = handle(request.into_parts().0).map(|resp| {284Ok(IncomingResponse {285resp: resp.map(|body| {286body.map_err(wasmtime_wasi_http::hyper_response_error)287.boxed_unsync()288}),289worker: None,290between_bytes_timeout,291})292});293HostFutureIncomingResponse::ready(response)294},295) as RequestSender)296} else {297let server = async move {298loop {299let (stream, _) = listener.accept().await?;300let stream = TokioIo::new(stream);301let handle = handle.clone();302task::spawn(async move {303if let Err(e) = http1::Builder::new()304.keep_alive(true)305.serve_connection(306stream,307service_fn(move |request| {308let handle = handle.clone();309async move { handle(request.into_parts().0) }310}),311)312.await313{314eprintln!("error serving connection: {e:?}");315}316});317318// Help rustc with type inference:319if false {320return Ok::<_, wasmtime::Error>(());321}322}323}324.then(|result| {325if let Err(e) = result {326eprintln!("error listening for connections: {e:?}");327}328future::ready(())329})330.boxed();331332task::spawn(async move {333drop(future::select(server, rx).await);334});335336None337};338339let mut request = hyper::Request::builder()340.method(http::Method::GET)341.uri("http://example.com:8080/hash-all");342for path in bodies.keys() {343request = request.header("url", format!("{prefix}{path}"));344}345let request = request.body(body::empty())?;346347let response = run_wasi_http(348test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,349request,350send_request,351None,352false,353)354.await??;355356assert_eq!(StatusCode::OK, response.status());357let body = response.into_body().to_bytes();358let body = str::from_utf8(&body)?;359for line in body.lines() {360let (url, hash) = line361.split_once(": ")362.ok_or_else(|| format_err!("expected string of form `<url>: <sha-256>`; got {line}"))?;363364let path = url365.strip_prefix(&prefix)366.ok_or_else(|| format_err!("expected string with prefix {prefix}; got {url}"))?;367368let mut hasher = Sha256::new();369hasher.update(370bodies371.get(path)372.ok_or_else(|| format_err!("unexpected path: {path}"))?,373);374375use base64::Engine;376assert_eq!(377hash,378base64::engine::general_purpose::STANDARD_NO_PAD.encode(hasher.finalize())379);380}381382Ok(())383}384385// ensure the runtime rejects the outgoing request386#[test_log::test(tokio::test)]387async fn wasi_http_hash_all_with_reject() -> Result<()> {388let request = hyper::Request::builder()389.method(http::Method::GET)390.uri("http://example.com:8080/hash-all");391let request = request.header("url", format!("http://forbidden.com"));392let request = request.header("url", format!("http://localhost"));393let request = request.body(body::empty())?;394395let response = run_wasi_http(396test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,397request,398None,399Some("forbidden.com".to_string()),400false,401)402.await??;403404let body = response.into_body().to_bytes();405let body = str::from_utf8(&body).unwrap();406for line in body.lines() {407println!("{line}");408if line.contains("forbidden.com") {409assert!(line.contains("HttpRequestDenied"));410}411if line.contains("localhost") {412assert!(!line.contains("HttpRequestDenied"));413}414}415416Ok(())417}418419#[test_log::test(tokio::test)]420async fn wasi_http_echo() -> Result<()> {421do_wasi_http_echo("echo", None).await422}423424#[test_log::test(tokio::test)]425async fn wasi_http_double_echo() -> Result<()> {426let listener = tokio::net::TcpListener::bind((Ipv4Addr::new(127, 0, 0, 1), 0)).await?;427428let prefix = format!("http://{}", listener.local_addr()?);429430let (_tx, rx) = oneshot::channel::<()>();431432let server = async move {433loop {434let (stream, _) = listener.accept().await?;435let stream = TokioIo::new(stream);436task::spawn(async move {437if let Err(e) = http1::Builder::new()438.keep_alive(true)439.serve_connection(440stream,441service_fn(442move |request: hyper::Request<hyper::body::Incoming>| async move {443use http_body_util::BodyExt;444445if let (&Method::POST, "/echo") =446(request.method(), request.uri().path())447{448Ok::<_, wasmtime::Error>(hyper::Response::new(449request.into_body().boxed(),450))451} else {452Ok(hyper::Response::builder()453.status(StatusCode::METHOD_NOT_ALLOWED)454.body(BoxBody::new(455Empty::new().map_err(|_| unreachable!()),456))?)457}458},459),460)461.await462{463eprintln!("error serving connection: {e:?}");464}465});466467// Help rustc with type inference:468if false {469return Ok::<_, wasmtime::Error>(());470}471}472}473.then(|result| {474if let Err(e) = result {475eprintln!("error listening for connections: {e:?}");476}477future::ready(())478})479.boxed();480481task::spawn(async move {482drop(future::select(server, rx).await);483});484485do_wasi_http_echo("double-echo", Some(&format!("{prefix}/echo"))).await486}487488async fn do_wasi_http_echo(uri: &str, url_header: Option<&str>) -> Result<()> {489let body = {490// A sorta-random-ish megabyte491let mut n = 0_u8;492iter::repeat_with(move || {493n = n.wrapping_add(251);494n495})496.take(1024 * 1024)497.collect::<Vec<_>>()498};499500let mut request = hyper::Request::builder()501.method(http::Method::POST)502.uri(format!("http://example.com:8080/{uri}"))503.header("content-type", "application/octet-stream");504505if let Some(url_header) = url_header {506request = request.header("url", url_header);507}508509let request = request.body(BoxBody::new(StreamBody::new(stream::iter(510body.chunks(16 * 1024)511.map(|chunk| Ok::<_, hyper::Error>(Frame::data(Bytes::copy_from_slice(chunk))))512.collect::<Vec<_>>(),513))))?;514515let response = run_wasi_http(516test_programs_artifacts::P2_API_PROXY_STREAMING_COMPONENT,517request,518None,519None,520false,521)522.await??;523524assert_eq!(StatusCode::OK, response.status());525assert_eq!(526response.headers()["content-type"],527"application/octet-stream"528);529let received = Vec::from(response.into_body().to_bytes());530if body != received {531panic!(532"body content mismatch (expected length {}; actual length {})",533body.len(),534received.len()535);536}537538Ok(())539}540541#[test_log::test(tokio::test)]542async fn wasi_http_without_port() -> Result<()> {543let req = hyper::Request::builder()544.method(http::Method::GET)545.uri("https://httpbin.org/get");546547let _response: hyper::Response<_> = run_wasi_http(548test_programs_artifacts::P2_API_PROXY_FORWARD_REQUEST_COMPONENT,549req.body(body::empty())?,550None,551None,552false,553)554.await??;555556// NB: don't test the actual return code of `response`. This is testing a557// live http request against a live server and things happen. If we got this558// far it's already successful that the request was made and the lack of559// port in the URI was handled.560561Ok(())562}563564#[test_log::test(tokio::test)]565async fn wasi_http_no_trap_on_early_drop() -> Result<()> {566let req = hyper::Request::builder()567.uri("http://example.com:8080/early_drop")568.method(http::Method::GET);569570let resp = run_wasi_http(571test_programs_artifacts::P2_API_PROXY_COMPONENT,572req.body(body::empty())?,573None,574None,575true,576)577.await?;578579if let Err(ErrorCode::HttpResponseTimeout) = resp {580Ok(())581} else {582panic!("test expects an error");583}584}585586587