Path: blob/main/crates/wasi-http/src/p3/body.rs
3088 views
use crate::p3::bindings::http::types::{ErrorCode, Fields, Trailers};1use crate::p3::{WasiHttp, WasiHttpCtxView};2use bytes::Bytes;3use core::iter;4use core::num::NonZeroUsize;5use core::pin::Pin;6use core::task::{Context, Poll, ready};7use http::HeaderMap;8use http_body::Body as _;9use http_body_util::combinators::UnsyncBoxBody;10use std::any::{Any, TypeId};11use std::io::Cursor;12use std::sync::Arc;13use tokio::sync::{mpsc, oneshot};14use tokio_util::sync::PollSender;15use wasmtime::component::{16Access, Destination, FutureConsumer, FutureReader, Resource, Source, StreamConsumer,17StreamProducer, StreamReader, StreamResult,18};19use wasmtime::error::Context as _;20use wasmtime::{AsContextMut, StoreContextMut};2122/// The concrete type behind a `wasi:http/types.body` resource.23pub(crate) enum Body {24/// Body constructed by the guest25Guest {26/// The body stream27contents_rx: Option<StreamReader<u8>>,28/// Future, on which guest will write result and optional trailers29trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,30/// Channel, on which transmission result will be written31result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,32},33/// Body constructed by the host.34Host {35/// The [`http_body::Body`]36body: UnsyncBoxBody<Bytes, ErrorCode>,37/// Channel, on which transmission result will be written38result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,39},40}4142/// [FutureConsumer] implementation for future passed to `consume-body`.43struct BodyResultConsumer(44Option<oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>>,45);4647impl<D> FutureConsumer<D> for BodyResultConsumer48where49D: 'static,50{51type Item = Result<(), ErrorCode>;5253fn poll_consume(54mut self: Pin<&mut Self>,55_: &mut Context<'_>,56store: StoreContextMut<D>,57mut src: Source<'_, Self::Item>,58_: bool,59) -> Poll<wasmtime::Result<()>> {60let mut res = None;61src.read(store, &mut res).context("failed to read result")?;62let res = res.context("result value missing")?;63let tx = self.0.take().context("polled after returning `Ready`")?;64_ = tx.send(Box::new(async { res }));65Poll::Ready(Ok(()))66}67}6869impl Body {70/// Implementation of `consume-body` shared between requests and responses71pub(crate) fn consume<T>(72self,73mut store: Access<'_, T, WasiHttp>,74fut: FutureReader<Result<(), ErrorCode>>,75getter: fn(&mut T) -> WasiHttpCtxView<'_>,76) -> (77StreamReader<u8>,78FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,79) {80match self {81Body::Guest {82contents_rx: Some(contents_rx),83trailers_rx,84result_tx,85} => {86fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));87(contents_rx, trailers_rx)88}89Body::Guest {90contents_rx: None,91trailers_rx,92result_tx,93} => {94fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));95(StreamReader::new(&mut store, iter::empty()), trailers_rx)96}97Body::Host { body, result_tx } => {98fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));99let (trailers_tx, trailers_rx) = oneshot::channel();100(101StreamReader::new(102&mut store,103HostBodyStreamProducer {104body,105trailers: Some(trailers_tx),106getter,107},108),109FutureReader::new(&mut store, trailers_rx),110)111}112}113}114115/// Implementation of `drop` shared between requests and responses116pub(crate) fn drop(self, mut store: impl AsContextMut) {117if let Body::Guest {118contents_rx,119mut trailers_rx,120..121} = self122{123if let Some(mut contents_rx) = contents_rx {124contents_rx.close(&mut store);125}126trailers_rx.close(store);127}128}129}130131/// [StreamConsumer] implementation for bodies originating in the guest with `Content-Length`132/// header set.133struct LimitedGuestBodyConsumer {134contents_tx: PollSender<Result<Bytes, ErrorCode>>,135error_tx: Option<oneshot::Sender<ErrorCode>>,136make_error: fn(Option<u64>) -> ErrorCode,137/// Limit of bytes to be sent138limit: u64,139/// Number of bytes sent140sent: u64,141// `true` when the other side of `contents_tx` was unexpectedly closed142closed: bool,143}144145impl LimitedGuestBodyConsumer {146/// Sends the error constructed by [Self::make_error] on both error channels.147/// Does nothing if an error has already been sent on [Self::error_tx].148fn send_error(&mut self, sent: Option<u64>) {149if let Some(error_tx) = self.error_tx.take() {150_ = error_tx.send((self.make_error)(sent));151self.contents_tx.abort_send();152if let Some(tx) = self.contents_tx.get_ref() {153_ = tx.try_send(Err((self.make_error)(sent)))154}155self.contents_tx.close();156}157}158}159160impl Drop for LimitedGuestBodyConsumer {161fn drop(&mut self) {162if !self.closed && self.limit != self.sent {163self.send_error(Some(self.sent))164}165}166}167168impl<D> StreamConsumer<D> for LimitedGuestBodyConsumer {169type Item = u8;170171fn poll_consume(172mut self: Pin<&mut Self>,173cx: &mut Context<'_>,174store: StoreContextMut<D>,175src: Source<Self::Item>,176finish: bool,177) -> Poll<wasmtime::Result<StreamResult>> {178debug_assert!(!self.closed);179let mut src = src.as_direct(store);180let buf = src.remaining();181let n = buf.len();182183// Perform `content-length` check early and precompute the next value184let Ok(sent) = n.try_into() else {185self.send_error(None);186return Poll::Ready(Ok(StreamResult::Dropped));187};188let Some(sent) = self.sent.checked_add(sent) else {189self.send_error(None);190return Poll::Ready(Ok(StreamResult::Dropped));191};192if sent > self.limit {193self.send_error(Some(sent));194return Poll::Ready(Ok(StreamResult::Dropped));195}196match self.contents_tx.poll_reserve(cx) {197Poll::Ready(Ok(())) => {198let buf = Bytes::copy_from_slice(buf);199match self.contents_tx.send_item(Ok(buf)) {200Ok(()) => {201src.mark_read(n);202// Record new `content-length` only on successful send203self.sent = sent;204Poll::Ready(Ok(StreamResult::Completed))205}206Err(..) => {207self.closed = true;208Poll::Ready(Ok(StreamResult::Dropped))209}210}211}212Poll::Ready(Err(..)) => {213self.closed = true;214Poll::Ready(Ok(StreamResult::Dropped))215}216Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),217Poll::Pending => Poll::Pending,218}219}220}221222/// [StreamConsumer] implementation for bodies originating in the guest without `Content-Length`223/// header set.224struct UnlimitedGuestBodyConsumer(PollSender<Result<Bytes, ErrorCode>>);225226impl<D> StreamConsumer<D> for UnlimitedGuestBodyConsumer {227type Item = u8;228229fn poll_consume(230mut self: Pin<&mut Self>,231cx: &mut Context<'_>,232store: StoreContextMut<D>,233src: Source<Self::Item>,234finish: bool,235) -> Poll<wasmtime::Result<StreamResult>> {236match self.0.poll_reserve(cx) {237Poll::Ready(Ok(())) => {238let mut src = src.as_direct(store);239let buf = src.remaining();240let n = buf.len();241let buf = Bytes::copy_from_slice(buf);242match self.0.send_item(Ok(buf)) {243Ok(()) => {244src.mark_read(n);245Poll::Ready(Ok(StreamResult::Completed))246}247Err(..) => Poll::Ready(Ok(StreamResult::Dropped)),248}249}250Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)),251Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),252Poll::Pending => Poll::Pending,253}254}255}256257/// [http_body::Body] implementation for bodies originating in the guest.258pub(crate) struct GuestBody {259contents_rx: Option<mpsc::Receiver<Result<Bytes, ErrorCode>>>,260trailers_rx: Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>,261content_length: Option<u64>,262}263264impl GuestBody {265/// Construct a new [GuestBody]266pub(crate) fn new<T: 'static>(267mut store: impl AsContextMut<Data = T>,268contents_rx: Option<StreamReader<u8>>,269trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,270result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,271result_fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,272content_length: Option<u64>,273make_error: fn(Option<u64>) -> ErrorCode,274getter: fn(&mut T) -> WasiHttpCtxView<'_>,275) -> Self {276let (trailers_http_tx, trailers_http_rx) = oneshot::channel();277trailers_rx.pipe(278&mut store,279GuestTrailerConsumer {280tx: Some(trailers_http_tx),281getter,282},283);284285let contents_rx = if let Some(rx) = contents_rx {286let (http_tx, http_rx) = mpsc::channel(1);287let contents_tx = PollSender::new(http_tx);288if let Some(limit) = content_length {289let (error_tx, error_rx) = oneshot::channel();290_ = result_tx.send(Box::new(async move {291if let Ok(err) = error_rx.await {292return Err(err);293};294result_fut.await295}));296rx.pipe(297store,298LimitedGuestBodyConsumer {299contents_tx,300error_tx: Some(error_tx),301make_error,302limit,303sent: 0,304closed: false,305},306);307} else {308_ = result_tx.send(Box::new(result_fut));309rx.pipe(store, UnlimitedGuestBodyConsumer(contents_tx));310};311Some(http_rx)312} else {313_ = result_tx.send(Box::new(result_fut));314None315};316Self {317trailers_rx: Some(trailers_http_rx),318contents_rx,319content_length,320}321}322}323324impl http_body::Body for GuestBody {325type Data = Bytes;326type Error = ErrorCode;327328fn poll_frame(329mut self: Pin<&mut Self>,330cx: &mut Context<'_>,331) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {332if let Some(contents_rx) = self.contents_rx.as_mut() {333// `contents_rx` has not been closed yet, poll it334while let Some(res) = ready!(contents_rx.poll_recv(cx)) {335match res {336Ok(buf) => {337if let Some(n) = self.content_length.as_mut() {338// Subtract frame length from `content_length`,339// [LimitedGuestBodyConsumer] already performs the validation, so340// just keep count as optimization for341// `is_end_stream` and `size_hint`342*n = n.saturating_sub(buf.len().try_into().unwrap_or(u64::MAX));343}344return Poll::Ready(Some(Ok(http_body::Frame::data(buf))));345}346Err(err) => {347return Poll::Ready(Some(Err(err)));348}349}350}351// Record that `contents_rx` is closed352self.contents_rx = None;353}354355let Some(trailers_rx) = self.trailers_rx.as_mut() else {356// `trailers_rx` has already terminated - this is the end of stream357return Poll::Ready(None);358};359360let res = ready!(Pin::new(trailers_rx).poll(cx));361// Record that `trailers_rx` has terminated362self.trailers_rx = None;363match res {364Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(365Arc::unwrap_or_clone(trailers),366)))),367Ok(Ok(None)) => Poll::Ready(None),368Ok(Err(err)) => Poll::Ready(Some(Err(err))),369Err(..) => Poll::Ready(None),370}371}372373fn is_end_stream(&self) -> bool {374if let Some(contents_rx) = self.contents_rx.as_ref() {375if !contents_rx.is_empty()376|| !contents_rx.is_closed()377|| self.content_length.is_some_and(|n| n > 0)378{379// `contents_rx` might still produce data frames380return false;381}382}383if let Some(trailers_rx) = self.trailers_rx.as_ref() {384if !trailers_rx.is_terminated() {385// `trailers_rx` has not terminated yet386return false;387}388}389390// no data left391return true;392}393394fn size_hint(&self) -> http_body::SizeHint {395if let Some(n) = self.content_length {396http_body::SizeHint::with_exact(n)397} else {398http_body::SizeHint::default()399}400}401}402403/// [FutureConsumer] implementation for trailers originating in the guest.404struct GuestTrailerConsumer<T> {405tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,406getter: fn(&mut T) -> WasiHttpCtxView<'_>,407}408409impl<D> FutureConsumer<D> for GuestTrailerConsumer<D>410where411D: 'static,412{413type Item = Result<Option<Resource<Trailers>>, ErrorCode>;414415fn poll_consume(416mut self: Pin<&mut Self>,417_: &mut Context<'_>,418mut store: StoreContextMut<D>,419mut src: Source<'_, Self::Item>,420_: bool,421) -> Poll<wasmtime::Result<()>> {422let mut res = None;423src.read(&mut store, &mut res)424.context("failed to read result")?;425let res = match res.context("result value missing")? {426Ok(Some(trailers)) => {427let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());428let trailers = table429.delete(trailers)430.context("failed to delete trailers")?;431Ok(Some(Arc::from(trailers)))432}433Ok(None) => Ok(None),434Err(err) => Err(err),435};436_ = self.tx.take().unwrap().send(res);437Poll::Ready(Ok(()))438}439}440441/// [StreamProducer] implementation for bodies originating in the host.442pub(crate) struct HostBodyStreamProducer<T> {443pub(crate) body: UnsyncBoxBody<Bytes, ErrorCode>,444trailers: Option<oneshot::Sender<Result<Option<Resource<Trailers>>, ErrorCode>>>,445getter: fn(&mut T) -> WasiHttpCtxView<'_>,446}447448impl<T> Drop for HostBodyStreamProducer<T> {449fn drop(&mut self) {450self.close(Ok(None))451}452}453454impl<T> HostBodyStreamProducer<T> {455fn close(&mut self, res: Result<Option<Resource<Trailers>>, ErrorCode>) {456if let Some(tx) = self.trailers.take() {457_ = tx.send(res);458}459}460}461462impl<D> StreamProducer<D> for HostBodyStreamProducer<D>463where464D: 'static,465{466type Item = u8;467type Buffer = Cursor<Bytes>;468469fn poll_produce<'a>(470mut self: Pin<&mut Self>,471cx: &mut Context<'_>,472mut store: StoreContextMut<'a, D>,473mut dst: Destination<'a, Self::Item, Self::Buffer>,474finish: bool,475) -> Poll<wasmtime::Result<StreamResult>> {476let res = 'result: {477let cap = match dst.remaining(&mut store).map(NonZeroUsize::new) {478Some(Some(cap)) => Some(cap),479Some(None) => {480// On 0-length the best we can do is check that underlying stream has not481// reached the end yet482if self.body.is_end_stream() {483break 'result Ok(None);484} else {485return Poll::Ready(Ok(StreamResult::Completed));486}487}488None => None,489};490match Pin::new(&mut self.body).poll_frame(cx) {491Poll::Ready(Some(Ok(frame))) => {492match frame.into_data().map_err(http_body::Frame::into_trailers) {493Ok(mut frame) => {494// Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream,495// so we have to check for the body's end-of-stream indicator here too496if frame.len() == 0 && self.body.is_end_stream() {497break 'result Ok(None);498}499500if let Some(cap) = cap {501let n = frame.len();502let cap = cap.into();503if n > cap {504// data frame does not fit in destination, fill it and buffer the rest505dst.set_buffer(Cursor::new(frame.split_off(cap)));506let mut dst = dst.as_direct(store, cap);507dst.remaining().copy_from_slice(&frame);508dst.mark_written(cap);509} else {510// copy the whole frame into the destination511let mut dst = dst.as_direct(store, n);512dst.remaining()[..n].copy_from_slice(&frame);513dst.mark_written(n);514}515} else {516dst.set_buffer(Cursor::new(frame));517}518return Poll::Ready(Ok(StreamResult::Completed));519}520Err(Ok(trailers)) => {521let trailers = (self.getter)(store.data_mut())522.table523.push(Fields::new_mutable(trailers))524.context("failed to push trailers to table")?;525break 'result Ok(Some(trailers));526}527Err(Err(..)) => break 'result Err(ErrorCode::HttpProtocolError),528}529}530Poll::Ready(Some(Err(err))) => break 'result Err(err),531Poll::Ready(None) => break 'result Ok(None),532Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),533Poll::Pending => return Poll::Pending,534}535};536self.close(res);537Poll::Ready(Ok(StreamResult::Dropped))538}539540fn try_into(me: Pin<Box<Self>>, ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {541if ty == TypeId::of::<Self>() {542let me = Pin::into_inner(me);543Ok(me)544} else {545Err(me)546}547}548}549550/// A wrapper around [http_body::Body], which allows attaching arbitrary state to it551pub(crate) struct BodyWithState<T, U> {552body: T,553_state: U,554}555556impl<T, U> http_body::Body for BodyWithState<T, U>557where558T: http_body::Body + Unpin,559U: Unpin,560{561type Data = T::Data;562type Error = T::Error;563564#[inline]565fn poll_frame(566self: Pin<&mut Self>,567cx: &mut Context<'_>,568) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {569Pin::new(&mut self.get_mut().body).poll_frame(cx)570}571572#[inline]573fn is_end_stream(&self) -> bool {574self.body.is_end_stream()575}576577#[inline]578fn size_hint(&self) -> http_body::SizeHint {579self.body.size_hint()580}581}582583/// A wrapper around [http_body::Body], which validates `Content-Length`584pub(crate) struct BodyWithContentLength<T, E> {585body: T,586error_tx: Option<oneshot::Sender<E>>,587make_error: fn(Option<u64>) -> E,588/// Limit of bytes to be sent589limit: u64,590/// Number of bytes sent591sent: u64,592}593594impl<T, E> BodyWithContentLength<T, E> {595/// Sends the error constructed by [Self::make_error] on [Self::error_tx].596/// Does nothing if an error has already been sent on [Self::error_tx].597fn send_error<V>(&mut self, sent: Option<u64>) -> Poll<Option<Result<V, E>>> {598if let Some(error_tx) = self.error_tx.take() {599_ = error_tx.send((self.make_error)(sent));600}601Poll::Ready(Some(Err((self.make_error)(sent))))602}603}604605impl<T, E> http_body::Body for BodyWithContentLength<T, E>606where607T: http_body::Body<Data = Bytes, Error = E> + Unpin,608{609type Data = T::Data;610type Error = T::Error;611612#[inline]613fn poll_frame(614mut self: Pin<&mut Self>,615cx: &mut Context<'_>,616) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {617match ready!(Pin::new(&mut self.as_mut().body).poll_frame(cx)) {618Some(Ok(frame)) => {619let Some(data) = frame.data_ref() else {620return Poll::Ready(Some(Ok(frame)));621};622let Ok(sent) = data.len().try_into() else {623return self.send_error(None);624};625let Some(sent) = self.sent.checked_add(sent) else {626return self.send_error(None);627};628if sent > self.limit {629return self.send_error(Some(sent));630}631self.sent = sent;632Poll::Ready(Some(Ok(frame)))633}634Some(Err(err)) => Poll::Ready(Some(Err(err))),635None if self.limit != self.sent => {636// short write637let sent = self.sent;638self.send_error(Some(sent))639}640None => Poll::Ready(None),641}642}643644#[inline]645fn is_end_stream(&self) -> bool {646self.body.is_end_stream()647}648649#[inline]650fn size_hint(&self) -> http_body::SizeHint {651let n = self.limit.saturating_sub(self.sent);652let mut hint = self.body.size_hint();653if hint.lower() >= n {654hint.set_exact(n)655} else if let Some(max) = hint.upper() {656hint.set_upper(n.min(max))657} else {658hint.set_upper(n)659}660hint661}662}663664pub(crate) trait BodyExt {665fn with_state<T>(self, state: T) -> BodyWithState<Self, T>666where667Self: Sized,668{669BodyWithState {670body: self,671_state: state,672}673}674675fn with_content_length<E>(676self,677limit: u64,678error_tx: oneshot::Sender<E>,679make_error: fn(Option<u64>) -> E,680) -> BodyWithContentLength<Self, E>681where682Self: Sized,683{684BodyWithContentLength {685body: self,686error_tx: Some(error_tx),687make_error,688limit,689sent: 0,690}691}692}693694impl<T> BodyExt for T {}695696697