Path: blob/main/crates/wasi-http/src/p3/request.rs
3071 views
use crate::get_content_length;1use crate::p3::bindings::http::types::ErrorCode;2use crate::p3::body::{Body, BodyExt as _, GuestBody};3use crate::p3::{WasiHttpCtxView, WasiHttpView};4use bytes::Bytes;5use core::time::Duration;6use http::header::HOST;7use http::uri::{Authority, PathAndQuery, Scheme};8use http::{HeaderMap, HeaderValue, Method, Uri};9use http_body_util::BodyExt as _;10use http_body_util::combinators::UnsyncBoxBody;11use std::sync::Arc;12use tokio::sync::oneshot;13use tracing::debug;14use wasmtime::AsContextMut;1516/// The concrete type behind a `wasi:http/types.request-options` resource.17#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]18pub struct RequestOptions {19/// How long to wait for a connection to be established.20pub connect_timeout: Option<Duration>,21/// How long to wait for the first byte of the response body.22pub first_byte_timeout: Option<Duration>,23/// How long to wait between frames of the response body.24pub between_bytes_timeout: Option<Duration>,25}2627/// The concrete type behind a `wasi:http/types.request` resource.28pub struct Request {29/// The method of the request.30pub method: Method,31/// The scheme of the request.32pub scheme: Option<Scheme>,33/// The authority of the request.34pub authority: Option<Authority>,35/// The path and query of the request.36pub path_with_query: Option<PathAndQuery>,37/// The request headers.38pub headers: Arc<HeaderMap>,39/// Request options.40pub options: Option<Arc<RequestOptions>>,41/// Request body.42pub(crate) body: Body,43}4445impl Request {46/// Construct a new [Request]47///48/// This returns a [Future] that the will be used to communicate49/// a request processing error, if any.50///51/// Requests constructed this way will not perform any `Content-Length` validation.52pub fn new(53method: Method,54scheme: Option<Scheme>,55authority: Option<Authority>,56path_with_query: Option<PathAndQuery>,57headers: impl Into<Arc<HeaderMap>>,58options: Option<Arc<RequestOptions>>,59body: impl Into<UnsyncBoxBody<Bytes, ErrorCode>>,60) -> (61Self,62impl Future<Output = Result<(), ErrorCode>> + Send + 'static,63) {64let (tx, rx) = oneshot::channel();65(66Self {67method,68scheme,69authority,70path_with_query,71headers: headers.into(),72options,73body: Body::Host {74body: body.into(),75result_tx: tx,76},77},78async {79let Ok(fut) = rx.await else { return Ok(()) };80Box::into_pin(fut).await81},82)83}8485/// Construct a new [Request] from [http::Request].86///87/// This returns a [Future] that will be used to communicate88/// a request processing error, if any.89///90/// Requests constructed this way will not perform any `Content-Length` validation.91pub fn from_http<T>(92req: http::Request<T>,93) -> (94Self,95impl Future<Output = Result<(), ErrorCode>> + Send + 'static,96)97where98T: http_body::Body<Data = Bytes> + Send + 'static,99T::Error: Into<ErrorCode>,100{101let (102http::request::Parts {103method,104uri,105headers,106..107},108body,109) = req.into_parts();110let http::uri::Parts {111scheme,112authority,113path_and_query,114..115} = uri.into_parts();116Self::new(117method,118scheme,119authority,120path_and_query,121headers,122None,123body.map_err(Into::into).boxed_unsync(),124)125}126127/// Convert this [`Request`] into an [`http::Request<UnsyncBoxBody<Bytes, ErrorCode>>`].128///129/// The specified future `fut` can be used to communicate a request processing130/// error, if any, back to the caller (e.g., if this request was constructed131/// through `wasi:http/types.request#new`).132pub fn into_http<T: WasiHttpView + 'static>(133self,134store: impl AsContextMut<Data = T>,135fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,136) -> Result<137(138http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,139Option<Arc<RequestOptions>>,140),141ErrorCode,142> {143self.into_http_with_getter(store, fut, T::http)144}145146/// Like [`Self::into_http`], but uses a custom getter for obtaining the [`WasiHttpCtxView`].147pub fn into_http_with_getter<T: 'static>(148self,149mut store: impl AsContextMut<Data = T>,150fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,151getter: fn(&mut T) -> WasiHttpCtxView<'_>,152) -> Result<153(154http::Request<UnsyncBoxBody<Bytes, ErrorCode>>,155Option<Arc<RequestOptions>>,156),157ErrorCode,158> {159let Request {160method,161scheme,162authority,163path_with_query,164headers,165options,166body,167} = self;168// `Content-Length` header value is validated in `fields` implementation169let content_length = match get_content_length(&headers) {170Ok(content_length) => content_length,171Err(err) => {172body.drop(&mut store);173return Err(ErrorCode::InternalError(Some(format!("{err:#}"))));174}175};176// This match must appear before any potential errors handled with '?'177// (or errors have to explicitly be addressed and drop the body, as above),178// as otherwise the Body::Guest resources will not be cleaned up when dropped.179// see: https://github.com/bytecodealliance/wasmtime/pull/11440#discussion_r2326139381180// for additional context.181let body = match body {182Body::Guest {183contents_rx,184trailers_rx,185result_tx,186} => GuestBody::new(187&mut store,188contents_rx,189trailers_rx,190result_tx,191fut,192content_length,193ErrorCode::HttpRequestBodySize,194getter,195)196.boxed_unsync(),197Body::Host { body, result_tx } => {198if let Some(limit) = content_length {199let (http_result_tx, http_result_rx) = oneshot::channel();200_ = result_tx.send(Box::new(async move {201if let Ok(err) = http_result_rx.await {202return Err(err);203};204fut.await205}));206body.with_content_length(limit, http_result_tx, ErrorCode::HttpRequestBodySize)207.boxed_unsync()208} else {209_ = result_tx.send(Box::new(fut));210body211}212}213};214let mut headers = Arc::unwrap_or_clone(headers);215let mut store = store.as_context_mut();216let WasiHttpCtxView { ctx, .. } = getter(store.data_mut());217if ctx.set_host_header() {218let host = if let Some(authority) = authority.as_ref() {219HeaderValue::try_from(authority.as_str())220.map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?221} else {222HeaderValue::from_static("")223};224headers.insert(HOST, host);225}226let scheme = match scheme {227None => ctx.default_scheme().ok_or(ErrorCode::HttpProtocolError)?,228Some(scheme) if ctx.is_supported_scheme(&scheme) => scheme,229Some(..) => return Err(ErrorCode::HttpProtocolError),230};231let mut uri = Uri::builder().scheme(scheme);232if let Some(authority) = authority {233uri = uri.authority(authority)234};235if let Some(path_with_query) = path_with_query {236uri = uri.path_and_query(path_with_query)237};238let uri = uri.build().map_err(|err| {239debug!(?err, "failed to build request URI");240ErrorCode::HttpRequestUriInvalid241})?;242let mut req = http::Request::builder();243*req.headers_mut().unwrap() = headers;244let req = req245.method(method)246.uri(uri)247.body(body)248.map_err(|err| ErrorCode::InternalError(Some(err.to_string())))?;249let (req, body) = req.into_parts();250Ok((http::Request::from_parts(req, body), options))251}252}253254/// The default implementation of how an outgoing request is sent.255///256/// This implementation is used by the `wasi:http/handler` interface257/// default implementation.258///259/// The returned [Future] can be used to communicate260/// a request processing error, if any, to the constructor of the request.261/// For example, if the request was constructed via `wasi:http/types.request#new`,262/// a result resolved from it will be forwarded to the guest on the future handle returned.263///264/// This function performs no `Content-Length` validation.265#[cfg(feature = "default-send-request")]266pub async fn default_send_request(267mut req: http::Request<impl http_body::Body<Data = Bytes, Error = ErrorCode> + Send + 'static>,268options: Option<RequestOptions>,269) -> Result<270(271http::Response<impl http_body::Body<Data = Bytes, Error = ErrorCode>>,272impl Future<Output = Result<(), ErrorCode>> + Send,273),274ErrorCode,275> {276use core::future::poll_fn;277use core::pin::{Pin, pin};278use core::task::{Poll, ready};279use tokio::io::{AsyncRead, AsyncWrite};280use tokio::net::TcpStream;281282trait TokioStream: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static {283fn boxed(self) -> Box<dyn TokioStream>284where285Self: Sized,286{287Box::new(self)288}289}290impl<T> TokioStream for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static {}291292fn dns_error(rcode: String, info_code: u16) -> ErrorCode {293ErrorCode::DnsError(crate::p3::bindings::http::types::DnsErrorPayload {294rcode: Some(rcode),295info_code: Some(info_code),296})297}298299let uri = req.uri();300let authority = uri.authority().ok_or(ErrorCode::HttpRequestUriInvalid)?;301let use_tls = uri.scheme() == Some(&Scheme::HTTPS);302let authority = if authority.port().is_some() {303authority.to_string()304} else {305let port = if use_tls { 443 } else { 80 };306format!("{authority}:{port}")307};308309let connect_timeout = options310.and_then(311|RequestOptions {312connect_timeout, ..313}| connect_timeout,314)315.unwrap_or(Duration::from_secs(600));316317let first_byte_timeout = options318.and_then(319|RequestOptions {320first_byte_timeout, ..321}| first_byte_timeout,322)323.unwrap_or(Duration::from_secs(600));324325let between_bytes_timeout = options326.and_then(327|RequestOptions {328between_bytes_timeout,329..330}| between_bytes_timeout,331)332.unwrap_or(Duration::from_secs(600));333334let stream = match tokio::time::timeout(connect_timeout, TcpStream::connect(&authority)).await {335Ok(Ok(stream)) => stream,336Ok(Err(err)) if err.kind() == std::io::ErrorKind::AddrNotAvailable => {337return Err(dns_error("address not available".to_string(), 0));338}339Ok(Err(err))340if err341.to_string()342.starts_with("failed to lookup address information") =>343{344return Err(dns_error("address not available".to_string(), 0));345}346Ok(Err(err)) => {347tracing::warn!(?err, "connection refused");348return Err(ErrorCode::ConnectionRefused);349}350Err(..) => return Err(ErrorCode::ConnectionTimeout),351};352let stream = if use_tls {353use rustls::pki_types::ServerName;354355// derived from https://github.com/rustls/rustls/blob/main/examples/src/bin/simpleclient.rs356let root_cert_store = rustls::RootCertStore {357roots: webpki_roots::TLS_SERVER_ROOTS.into(),358};359let config = rustls::ClientConfig::builder()360.with_root_certificates(root_cert_store)361.with_no_client_auth();362let connector = tokio_rustls::TlsConnector::from(std::sync::Arc::new(config));363let mut parts = authority.split(":");364let host = parts.next().unwrap_or(&authority);365let domain = ServerName::try_from(host)366.map_err(|e| {367tracing::warn!("dns lookup error: {e:?}");368dns_error("invalid dns name".to_string(), 0)369})?370.to_owned();371let stream = connector.connect(domain, stream).await.map_err(|e| {372tracing::warn!("tls protocol error: {e:?}");373ErrorCode::TlsProtocolError374})?;375stream.boxed()376} else {377stream.boxed()378};379let (mut sender, conn) = tokio::time::timeout(380connect_timeout,381// TODO: we should plumb the builder through the http context, and use it here382hyper::client::conn::http1::Builder::new().handshake(crate::io::TokioIo::new(stream)),383)384.await385.map_err(|_| ErrorCode::ConnectionTimeout)?386.map_err(ErrorCode::from_hyper_request_error)?;387388// at this point, the request contains the scheme and the authority, but389// the http packet should only include those if addressing a proxy, so390// remove them here, since SendRequest::send_request does not do it for us391*req.uri_mut() = http::Uri::builder()392.path_and_query(393req.uri()394.path_and_query()395.map(|p| p.as_str())396.unwrap_or("/"),397)398.build()399.expect("comes from valid request");400401let send = async move {402use core::task::Context;403404/// Wrapper around [hyper::body::Incoming] used to405/// account for request option timeout configuration406struct IncomingResponseBody {407incoming: hyper::body::Incoming,408timeout: tokio::time::Interval,409}410impl http_body::Body for IncomingResponseBody {411type Data = <hyper::body::Incoming as http_body::Body>::Data;412type Error = ErrorCode;413414fn poll_frame(415mut self: Pin<&mut Self>,416cx: &mut Context<'_>,417) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {418match Pin::new(&mut self.as_mut().incoming).poll_frame(cx) {419Poll::Ready(None) => Poll::Ready(None),420Poll::Ready(Some(Err(err))) => {421Poll::Ready(Some(Err(ErrorCode::from_hyper_response_error(err))))422}423Poll::Ready(Some(Ok(frame))) => {424self.timeout.reset();425Poll::Ready(Some(Ok(frame)))426}427Poll::Pending => {428ready!(self.timeout.poll_tick(cx));429Poll::Ready(Some(Err(ErrorCode::ConnectionReadTimeout)))430}431}432}433fn is_end_stream(&self) -> bool {434self.incoming.is_end_stream()435}436fn size_hint(&self) -> http_body::SizeHint {437self.incoming.size_hint()438}439}440441let res = tokio::time::timeout(first_byte_timeout, sender.send_request(req))442.await443.map_err(|_| ErrorCode::ConnectionReadTimeout)?444.map_err(ErrorCode::from_hyper_request_error)?;445let mut timeout = tokio::time::interval(between_bytes_timeout);446timeout.reset();447Ok(res.map(|incoming| IncomingResponseBody { incoming, timeout }))448};449let mut send = pin!(send);450let mut conn = Some(conn);451// Wait for response while driving connection I/O452let res = poll_fn(|cx| match send.as_mut().poll(cx) {453Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)),454Poll::Ready(Err(err)) => Poll::Ready(Err(err)),455Poll::Pending => {456// Response is not ready, poll `hyper` connection to drive I/O if it has not completed yet457let Some(fut) = conn.as_mut() else {458// `hyper` connection already completed459return Poll::Pending;460};461let res = ready!(Pin::new(fut).poll(cx));462// `hyper` connection completed, record that to prevent repeated poll463conn = None;464match res {465// `hyper` connection has successfully completed, optimistically poll for response466Ok(()) => send.as_mut().poll(cx),467// `hyper` connection has failed, return the error468Err(err) => Poll::Ready(Err(ErrorCode::from_hyper_request_error(err))),469}470}471})472.await?;473Ok((res, async move {474let Some(conn) = conn.take() else {475// `hyper` connection has already completed476return Ok(());477};478conn.await.map_err(ErrorCode::from_hyper_response_error)479}))480}481482#[cfg(test)]483mod tests {484use super::*;485use crate::p3::DefaultWasiHttpCtx;486use core::future::Future;487use core::pin::pin;488use core::str::FromStr;489use core::task::{Context, Poll, Waker};490use http_body_util::{BodyExt, Empty, Full};491use wasmtime::Result;492use wasmtime::{Engine, Store};493use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};494495struct TestCtx {496table: ResourceTable,497wasi: WasiCtx,498http: DefaultWasiHttpCtx,499}500501impl TestCtx {502fn new() -> Self {503Self {504table: ResourceTable::default(),505wasi: WasiCtxBuilder::new().build(),506http: DefaultWasiHttpCtx,507}508}509}510511impl WasiView for TestCtx {512fn ctx(&mut self) -> WasiCtxView<'_> {513WasiCtxView {514ctx: &mut self.wasi,515table: &mut self.table,516}517}518}519520impl WasiHttpView for TestCtx {521fn http(&mut self) -> WasiHttpCtxView<'_> {522WasiHttpCtxView {523ctx: &mut self.http,524table: &mut self.table,525}526}527}528529#[tokio::test]530async fn test_request_into_http_schemes() -> Result<()> {531let schemes = vec![Some(Scheme::HTTP), Some(Scheme::HTTPS), None];532let engine = Engine::default();533534for scheme in schemes {535let (req, fut) = Request::new(536Method::POST,537scheme.clone(),538Some(Authority::from_static("example.com")),539Some(PathAndQuery::from_static("/path?query=1")),540HeaderMap::new(),541None,542Full::new(Bytes::from_static(b"body"))543.map_err(|x| match x {})544.boxed_unsync(),545);546let mut store = Store::new(&engine, TestCtx::new());547let (http_req, options) = req.into_http(&mut store, async { Ok(()) }).unwrap();548assert_eq!(options, None);549assert_eq!(http_req.method(), Method::POST);550let expected_scheme = scheme.unwrap_or(Scheme::HTTPS); // default scheme551assert_eq!(552http_req.uri(),553&http::Uri::from_str(&format!(554"{}://example.com/path?query=1",555expected_scheme.as_str()556))557.unwrap()558);559let body_bytes = http_req.into_body().collect().await?;560assert_eq!(body_bytes.to_bytes(), b"body".as_slice());561let mut cx = Context::from_waker(Waker::noop());562let result = pin!(fut).poll(&mut cx);563assert!(matches!(result, Poll::Ready(Ok(()))));564}565566Ok(())567}568569#[tokio::test]570async fn test_request_into_http_uri_error() -> Result<()> {571let (req, fut) = Request::new(572Method::GET,573Some(Scheme::HTTP),574Some(Authority::from_static("example.com")),575None, // <-- should fail, must be Some(_) when authority is set576HeaderMap::new(),577None,578Empty::new().map_err(|x| match x {}).boxed_unsync(),579);580let mut store = Store::new(&Engine::default(), TestCtx::new());581let result = req.into_http(&mut store, async {582Err(ErrorCode::InternalError(Some("uh oh".to_string())))583});584assert!(matches!(result, Err(ErrorCode::HttpRequestUriInvalid)));585let mut cx = Context::from_waker(Waker::noop());586let result = pin!(fut).poll(&mut cx);587assert!(matches!(588result,589Poll::Ready(Err(ErrorCode::InternalError(Some(_))))590));591592Ok(())593}594}595596597