Path: blob/main/crates/wasi-http/src/p3/host/handler.rs
3088 views
use crate::p3::bindings::http::client::{Host, HostWithStore};1use crate::p3::bindings::http::types::{ErrorCode, Request, Response};2use crate::p3::body::{Body, BodyExt as _};3use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};4use core::task::{Context, Poll, Waker};5use http_body_util::BodyExt as _;6use std::sync::Arc;7use tokio::sync::oneshot;8use tokio::task::{self, JoinHandle};9use tracing::debug;10use wasmtime::component::{Accessor, Resource};11use wasmtime::error::Context as _;1213/// A wrapper around [`JoinHandle`], which will [`JoinHandle::abort`] the task14/// when dropped15struct AbortOnDropJoinHandle(JoinHandle<()>);1617impl Drop for AbortOnDropJoinHandle {18fn drop(&mut self) {19self.0.abort();20}21}2223async fn io_task_result(24rx: oneshot::Receiver<(25Arc<AbortOnDropJoinHandle>,26oneshot::Receiver<Result<(), ErrorCode>>,27)>,28) -> Result<(), ErrorCode> {29let Ok((_io, io_result_rx)) = rx.await else {30return Ok(());31};32io_result_rx.await.unwrap_or(Ok(()))33}3435impl HostWithStore for WasiHttp {36async fn send<T>(37store: &Accessor<T, Self>,38req: Resource<Request>,39) -> HttpResult<Resource<Response>> {40// A handle to the I/O task, if spawned, will be sent on this channel41// and kept as part of request body state42let (io_task_tx, io_task_rx) = oneshot::channel();4344// A handle to the I/O task, if spawned, will be sent on this channel45// along with the result receiver46let (io_result_tx, io_result_rx) = oneshot::channel();4748// Response processing result will be sent on this channel49let (res_result_tx, res_result_rx) = oneshot::channel();5051let getter = store.getter();52let fut = store.with(|mut store| {53let WasiHttpCtxView { table, .. } = store.get();54let req = table55.delete(req)56.context("failed to delete request from table")57.map_err(HttpError::trap)?;58let (req, options) =59req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;60HttpResult::Ok(store.get().ctx.send_request(61req.map(|body| body.with_state(io_task_rx).boxed_unsync()),62options.as_deref().copied(),63Box::new(async {64// Forward the response processing result to `WasiHttpCtx` implementation65let Ok(fut) = res_result_rx.await else {66return Ok(());67};68Box::into_pin(fut).await69}),70))71})?;72let (res, io) = Box::into_pin(fut).await?;73let (74http::response::Parts {75status, headers, ..76},77body,78) = res.into_parts();7980let mut io = Box::into_pin(io);81let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {82Poll::Ready(()) => body,83Poll::Pending => {84// I/O driver still needs to be polled, spawn a task and send handles to it85let (tx, rx) = oneshot::channel();86let io = task::spawn(async move {87let res = io.await;88debug!(?res, "`send_request` I/O future finished");89_ = tx.send(res);90});91let io = Arc::new(AbortOnDropJoinHandle(io));92_ = io_result_tx.send((Arc::clone(&io), rx));93_ = io_task_tx.send(Arc::clone(&io));94body.with_state(io).boxed_unsync()95}96};97let res = Response {98status,99headers: Arc::new(headers),100body: Body::Host {101body,102result_tx: res_result_tx,103},104};105store.with(|mut store| {106store107.get()108.table109.push(res)110.context("failed to push response to table")111.map_err(HttpError::trap)112})113}114}115116impl Host for WasiHttpCtxView<'_> {}117118119