Path: blob/main/crates/wasi-http/tests/all/http_server.rs
3104 views
use http::header::CONTENT_LENGTH;1use hyper::service::service_fn;2use hyper::{Request, Response};3use std::future::Future;4use std::net::{SocketAddr, TcpStream};5use std::thread::JoinHandle;6use tokio::net::TcpListener;7use tracing::{debug, trace, warn};8use wasmtime::{Result, error::Context as _};9use wasmtime_wasi_http::io::TokioIo;1011async fn test(12req: Request<hyper::body::Incoming>,13) -> http::Result<Response<hyper::body::Incoming>> {14debug!(?req, "preparing mocked response for request");15let method = req.method().to_string();16let uri = req.uri().to_string();17let resp = Response::builder()18.header("x-wasmtime-test-method", method)19.header("x-wasmtime-test-uri", uri);20let resp = if let Some(content_length) = req.headers().get(CONTENT_LENGTH) {21resp.header(CONTENT_LENGTH, content_length)22} else {23resp24};25let body = req.into_body();26resp.body(body)27}2829pub struct Server {30conns: usize,31addr: SocketAddr,32worker: Option<JoinHandle<()>>,33}3435impl Server {36fn new<F>(37conns: usize,38run: impl Fn(TokioIo<tokio::net::TcpStream>) -> F + Send + 'static,39) -> Result<Self>40where41F: Future<Output = Result<()>>,42{43let thread = std::thread::spawn(|| -> Result<_> {44let rt = tokio::runtime::Builder::new_current_thread()45.enable_all()46.build()47.context("failed to start tokio runtime")?;48let listener = rt.block_on(async move {49let addr = SocketAddr::from(([127, 0, 0, 1], 0));50TcpListener::bind(addr).await.context("failed to bind")51})?;52Ok((rt, listener))53});54let (rt, listener) = thread.join().unwrap()?;55let addr = listener.local_addr().context("failed to get local addr")?;56let worker = std::thread::spawn(move || {57debug!("dedicated thread to start listening");58rt.block_on(async move {59for i in 0..conns {60debug!(i, "preparing to accept connection");61match listener.accept().await {62Ok((stream, ..)) => {63debug!(i, "accepted connection");64if let Err(err) = run(TokioIo::new(stream)).await {65warn!(i, ?err, "failed to serve connection");66}67}68Err(err) => {69warn!(i, ?err, "failed to accept connection");70}71};72}73})74});75Ok(Self {76conns,77worker: Some(worker),78addr,79})80}8182pub fn http1(conns: usize) -> Result<Self> {83debug!("initializing http1 server");84Self::new(conns, |io| async move {85let mut builder = hyper::server::conn::http1::Builder::new();86let http = builder.keep_alive(false).pipeline_flush(true);8788debug!("preparing to bind connection to service");89let conn = http.serve_connection(io, service_fn(test)).await;90trace!("connection result {:?}", conn);91conn?;92Ok(())93})94}9596pub fn http2(conns: usize) -> Result<Self> {97debug!("initializing http2 server");98Self::new(conns, |io| async move {99let mut builder = hyper::server::conn::http2::Builder::new(TokioExecutor);100let http = builder.max_concurrent_streams(20);101102debug!("preparing to bind connection to service");103let conn = http.serve_connection(io, service_fn(test)).await;104trace!("connection result {:?}", conn);105if let Err(e) = &conn {106let message = e.to_string();107if message.contains("connection closed before reading preface")108|| message.contains("unspecific protocol error detected")109{110return Ok(());111}112}113conn?;114Ok(())115})116}117118pub fn addr(&self) -> String {119format!("localhost:{}", self.addr.port())120}121}122123impl Drop for Server {124fn drop(&mut self) {125debug!("shutting down http1 server");126for _ in 0..self.conns {127// Force a connection to happen in case one hasn't happened already.128let _ = TcpStream::connect(&self.addr);129}130self.worker.take().unwrap().join().unwrap();131}132}133134#[derive(Clone)]135/// An Executor that uses the tokio runtime.136struct TokioExecutor;137138impl<F> hyper::rt::Executor<F> for TokioExecutor139where140F: Future + Send + 'static,141F::Output: Send + 'static,142{143fn execute(&self, fut: F) {144tokio::task::spawn(fut);145}146}147148149