Path: blob/main/crates/wasi-http/src/p3/response.rs
3070 views
use crate::get_content_length;1use crate::p3::bindings::http::types::ErrorCode;2use crate::p3::body::{Body, GuestBody};3use crate::p3::{WasiHttpCtxView, WasiHttpView};4use bytes::Bytes;5use http::{HeaderMap, StatusCode};6use http_body_util::BodyExt as _;7use http_body_util::combinators::UnsyncBoxBody;8use std::sync::Arc;9use wasmtime::AsContextMut;10use wasmtime::error::Context as _;1112/// The concrete type behind a `wasi:http/types.response` resource.13pub struct Response {14/// The status of the response.15pub status: StatusCode,16/// The headers of the response.17pub headers: Arc<HeaderMap>,18/// Response body.19pub(crate) body: Body,20}2122impl TryFrom<Response> for http::Response<Body> {23type Error = http::Error;2425fn try_from(26Response {27status,28headers,29body,30}: Response,31) -> Result<Self, Self::Error> {32let mut res = http::Response::builder().status(status);33*res.headers_mut().unwrap() = Arc::unwrap_or_clone(headers);34res.body(body)35}36}3738impl Response {39/// Convert [Response] into [http::Response].40///41/// The specified [Future] `fut` can be used to communicate42/// a response processing error, if any, to the constructor of the response.43/// For example, if the response was constructed via `wasi:http/types.response#new`,44/// a result sent on `fut` will be forwarded to the guest on the future handle returned.45pub fn into_http<T: WasiHttpView + 'static>(46self,47store: impl AsContextMut<Data = T>,48fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,49) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {50self.into_http_with_getter(store, fut, T::http)51}5253/// Like [`Self::into_http`], but with a custom function for converting `T`54/// to a [`WasiHttpCtxView`].55pub fn into_http_with_getter<T: 'static>(56self,57store: impl AsContextMut<Data = T>,58fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,59getter: fn(&mut T) -> WasiHttpCtxView<'_>,60) -> wasmtime::Result<http::Response<UnsyncBoxBody<Bytes, ErrorCode>>> {61let res = http::Response::try_from(self)?;62let (res, body) = res.into_parts();63let body = match body {64Body::Guest {65contents_rx,66trailers_rx,67result_tx,68} => {69// `Content-Length` header value is validated in `fields` implementation70let content_length =71get_content_length(&res.headers).context("failed to parse `content-length`")?;72GuestBody::new(73store,74contents_rx,75trailers_rx,76result_tx,77fut,78content_length,79ErrorCode::HttpResponseBodySize,80getter,81)82.boxed_unsync()83}84Body::Host { body, result_tx } => {85_ = result_tx.send(Box::new(fut));86body87}88};89Ok(http::Response::from_parts(res, body))90}9192/// Convert [http::Response] into [Response].93pub fn from_http<T>(94res: http::Response<T>,95) -> (96Self,97impl Future<Output = Result<(), ErrorCode>> + Send + 'static,98)99where100T: http_body::Body<Data = Bytes> + Send + 'static,101T::Error: Into<ErrorCode>,102{103let (parts, body) = res.into_parts();104let (result_tx, result_rx) = tokio::sync::oneshot::channel();105106let wasi_response = Response {107status: parts.status,108headers: Arc::new(parts.headers),109body: Body::Host {110body: body.map_err(Into::into).boxed_unsync(),111result_tx,112},113};114115let io_future = async {116let Ok(fut) = result_rx.await else {117return Ok(());118};119Box::into_pin(fut).await120};121122(wasi_response, io_future)123}124}125126#[cfg(test)]127mod tests {128use super::*;129use core::future::Future;130use core::pin::pin;131use core::task::{Context, Poll, Waker};132use http_body_util::Full;133134#[tokio::test]135async fn test_response_from_http() {136let http_response = http::Response::builder()137.status(StatusCode::OK)138.header("x-custom-header", "value123")139.body(Full::new(Bytes::from_static(b"hello wasm")))140.unwrap();141142let (wasi_resp, io_future) = Response::from_http(http_response);143assert_eq!(wasi_resp.status, StatusCode::OK);144assert_eq!(145wasi_resp.headers.get("x-custom-header").unwrap(),146"value123"147);148match wasi_resp.body {149Body::Host { body, result_tx } => {150let collected = body.collect().await;151assert!(collected.is_ok(), "Body stream failed unexpectedly");152let chunks = collected.unwrap().to_bytes();153assert_eq!(chunks, &b"hello wasm"[..]);154_ = result_tx.send(Box::new(async { Ok(()) }));155}156_ => panic!("Response body should be of type Host"),157}158159let mut cx = Context::from_waker(Waker::noop());160let result = pin!(io_future).poll(&mut cx);161assert!(matches!(result, Poll::Ready(Ok(_))));162}163}164165166