Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/tests/all/http_server.rs
3104 views
1
use http::header::CONTENT_LENGTH;
2
use hyper::service::service_fn;
3
use hyper::{Request, Response};
4
use std::future::Future;
5
use std::net::{SocketAddr, TcpStream};
6
use std::thread::JoinHandle;
7
use tokio::net::TcpListener;
8
use tracing::{debug, trace, warn};
9
use wasmtime::{Result, error::Context as _};
10
use wasmtime_wasi_http::io::TokioIo;
11
12
async fn test(
13
req: Request<hyper::body::Incoming>,
14
) -> http::Result<Response<hyper::body::Incoming>> {
15
debug!(?req, "preparing mocked response for request");
16
let method = req.method().to_string();
17
let uri = req.uri().to_string();
18
let resp = Response::builder()
19
.header("x-wasmtime-test-method", method)
20
.header("x-wasmtime-test-uri", uri);
21
let resp = if let Some(content_length) = req.headers().get(CONTENT_LENGTH) {
22
resp.header(CONTENT_LENGTH, content_length)
23
} else {
24
resp
25
};
26
let body = req.into_body();
27
resp.body(body)
28
}
29
30
pub struct Server {
31
conns: usize,
32
addr: SocketAddr,
33
worker: Option<JoinHandle<()>>,
34
}
35
36
impl Server {
37
fn new<F>(
38
conns: usize,
39
run: impl Fn(TokioIo<tokio::net::TcpStream>) -> F + Send + 'static,
40
) -> Result<Self>
41
where
42
F: Future<Output = Result<()>>,
43
{
44
let thread = std::thread::spawn(|| -> Result<_> {
45
let rt = tokio::runtime::Builder::new_current_thread()
46
.enable_all()
47
.build()
48
.context("failed to start tokio runtime")?;
49
let listener = rt.block_on(async move {
50
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
51
TcpListener::bind(addr).await.context("failed to bind")
52
})?;
53
Ok((rt, listener))
54
});
55
let (rt, listener) = thread.join().unwrap()?;
56
let addr = listener.local_addr().context("failed to get local addr")?;
57
let worker = std::thread::spawn(move || {
58
debug!("dedicated thread to start listening");
59
rt.block_on(async move {
60
for i in 0..conns {
61
debug!(i, "preparing to accept connection");
62
match listener.accept().await {
63
Ok((stream, ..)) => {
64
debug!(i, "accepted connection");
65
if let Err(err) = run(TokioIo::new(stream)).await {
66
warn!(i, ?err, "failed to serve connection");
67
}
68
}
69
Err(err) => {
70
warn!(i, ?err, "failed to accept connection");
71
}
72
};
73
}
74
})
75
});
76
Ok(Self {
77
conns,
78
worker: Some(worker),
79
addr,
80
})
81
}
82
83
pub fn http1(conns: usize) -> Result<Self> {
84
debug!("initializing http1 server");
85
Self::new(conns, |io| async move {
86
let mut builder = hyper::server::conn::http1::Builder::new();
87
let http = builder.keep_alive(false).pipeline_flush(true);
88
89
debug!("preparing to bind connection to service");
90
let conn = http.serve_connection(io, service_fn(test)).await;
91
trace!("connection result {:?}", conn);
92
conn?;
93
Ok(())
94
})
95
}
96
97
pub fn http2(conns: usize) -> Result<Self> {
98
debug!("initializing http2 server");
99
Self::new(conns, |io| async move {
100
let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);
101
let http = builder.max_concurrent_streams(20);
102
103
debug!("preparing to bind connection to service");
104
let conn = http.serve_connection(io, service_fn(test)).await;
105
trace!("connection result {:?}", conn);
106
if let Err(e) = &conn {
107
let message = e.to_string();
108
if message.contains("connection closed before reading preface")
109
|| message.contains("unspecific protocol error detected")
110
{
111
return Ok(());
112
}
113
}
114
conn?;
115
Ok(())
116
})
117
}
118
119
pub fn addr(&self) -> String {
120
format!("localhost:{}", self.addr.port())
121
}
122
}
123
124
impl Drop for Server {
125
fn drop(&mut self) {
126
debug!("shutting down http1 server");
127
for _ in 0..self.conns {
128
// Force a connection to happen in case one hasn't happened already.
129
let _ = TcpStream::connect(&self.addr);
130
}
131
self.worker.take().unwrap().join().unwrap();
132
}
133
}
134
135
#[derive(Clone)]
136
/// An Executor that uses the tokio runtime.
137
struct TokioExecutor;
138
139
impl<F> hyper::rt::Executor<F> for TokioExecutor
140
where
141
F: Future + Send + 'static,
142
F::Output: Send + 'static,
143
{
144
fn execute(&self, fut: F) {
145
tokio::task::spawn(fut);
146
}
147
}
148
149