Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/src/p3/host/handler.rs
3088 views
1
use crate::p3::bindings::http::client::{Host, HostWithStore};
2
use crate::p3::bindings::http::types::{ErrorCode, Request, Response};
3
use crate::p3::body::{Body, BodyExt as _};
4
use crate::p3::{HttpError, HttpResult, WasiHttp, WasiHttpCtxView};
5
use core::task::{Context, Poll, Waker};
6
use http_body_util::BodyExt as _;
7
use std::sync::Arc;
8
use tokio::sync::oneshot;
9
use tokio::task::{self, JoinHandle};
10
use tracing::debug;
11
use wasmtime::component::{Accessor, Resource};
12
use wasmtime::error::Context as _;
13
14
/// A wrapper around [`JoinHandle`], which will [`JoinHandle::abort`] the task
15
/// when dropped
16
struct AbortOnDropJoinHandle(JoinHandle<()>);
17
18
impl Drop for AbortOnDropJoinHandle {
19
fn drop(&mut self) {
20
self.0.abort();
21
}
22
}
23
24
async fn io_task_result(
25
rx: oneshot::Receiver<(
26
Arc<AbortOnDropJoinHandle>,
27
oneshot::Receiver<Result<(), ErrorCode>>,
28
)>,
29
) -> Result<(), ErrorCode> {
30
let Ok((_io, io_result_rx)) = rx.await else {
31
return Ok(());
32
};
33
io_result_rx.await.unwrap_or(Ok(()))
34
}
35
36
impl HostWithStore for WasiHttp {
37
async fn send<T>(
38
store: &Accessor<T, Self>,
39
req: Resource<Request>,
40
) -> HttpResult<Resource<Response>> {
41
// A handle to the I/O task, if spawned, will be sent on this channel
42
// and kept as part of request body state
43
let (io_task_tx, io_task_rx) = oneshot::channel();
44
45
// A handle to the I/O task, if spawned, will be sent on this channel
46
// along with the result receiver
47
let (io_result_tx, io_result_rx) = oneshot::channel();
48
49
// Response processing result will be sent on this channel
50
let (res_result_tx, res_result_rx) = oneshot::channel();
51
52
let getter = store.getter();
53
let fut = store.with(|mut store| {
54
let WasiHttpCtxView { table, .. } = store.get();
55
let req = table
56
.delete(req)
57
.context("failed to delete request from table")
58
.map_err(HttpError::trap)?;
59
let (req, options) =
60
req.into_http_with_getter(&mut store, io_task_result(io_result_rx), getter)?;
61
HttpResult::Ok(store.get().ctx.send_request(
62
req.map(|body| body.with_state(io_task_rx).boxed_unsync()),
63
options.as_deref().copied(),
64
Box::new(async {
65
// Forward the response processing result to `WasiHttpCtx` implementation
66
let Ok(fut) = res_result_rx.await else {
67
return Ok(());
68
};
69
Box::into_pin(fut).await
70
}),
71
))
72
})?;
73
let (res, io) = Box::into_pin(fut).await?;
74
let (
75
http::response::Parts {
76
status, headers, ..
77
},
78
body,
79
) = res.into_parts();
80
81
let mut io = Box::into_pin(io);
82
let body = match io.as_mut().poll(&mut Context::from_waker(Waker::noop()))? {
83
Poll::Ready(()) => body,
84
Poll::Pending => {
85
// I/O driver still needs to be polled, spawn a task and send handles to it
86
let (tx, rx) = oneshot::channel();
87
let io = task::spawn(async move {
88
let res = io.await;
89
debug!(?res, "`send_request` I/O future finished");
90
_ = tx.send(res);
91
});
92
let io = Arc::new(AbortOnDropJoinHandle(io));
93
_ = io_result_tx.send((Arc::clone(&io), rx));
94
_ = io_task_tx.send(Arc::clone(&io));
95
body.with_state(io).boxed_unsync()
96
}
97
};
98
let res = Response {
99
status,
100
headers: Arc::new(headers),
101
body: Body::Host {
102
body,
103
result_tx: res_result_tx,
104
},
105
};
106
store.with(|mut store| {
107
store
108
.get()
109
.table
110
.push(res)
111
.context("failed to push response to table")
112
.map_err(HttpError::trap)
113
})
114
}
115
}
116
117
impl Host for WasiHttpCtxView<'_> {}
118
119