Path: blob/main/crates/test-programs/src/http.rs
1693 views
use crate::wasi::http::{outgoing_handler, types as http_types};1use crate::wasi::io::streams;2use anyhow::{Result, anyhow};3use std::fmt;45pub struct Response {6pub status: http_types::StatusCode,7pub headers: Vec<(String, Vec<u8>)>,8pub body: Vec<u8>,9}10impl fmt::Debug for Response {11fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {12let mut out = f.debug_struct("Response");13out.field("status", &self.status)14.field("headers", &self.headers);15if let Ok(body) = std::str::from_utf8(&self.body) {16out.field("body", &body);17} else {18out.field("body", &self.body);19}20out.finish()21}22}2324impl Response {25pub fn header(&self, name: &str) -> Option<&Vec<u8>> {26self.headers27.iter()28.find_map(|(k, v)| if k == name { Some(v) } else { None })29}30}3132pub fn request(33method: http_types::Method,34scheme: http_types::Scheme,35authority: &str,36path_with_query: &str,37body: Option<&[u8]>,38additional_headers: Option<&[(String, Vec<u8>)]>,39connect_timeout: Option<u64>,40first_by_timeout: Option<u64>,41between_bytes_timeout: Option<u64>,42) -> Result<Response> {43fn header_val(v: &str) -> Vec<u8> {44v.to_string().into_bytes()45}46let headers = http_types::Headers::from_list(47&[48&[49("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),50("Content-type".to_string(), header_val("application/json")),51],52additional_headers.unwrap_or(&[]),53]54.concat(),55)?;5657let request = http_types::OutgoingRequest::new(headers);5859request60.set_method(&method)61.map_err(|()| anyhow!("failed to set method"))?;62request63.set_scheme(Some(&scheme))64.map_err(|()| anyhow!("failed to set scheme"))?;65request66.set_authority(Some(authority))67.map_err(|()| anyhow!("failed to set authority"))?;68request69.set_path_with_query(Some(&path_with_query))70.map_err(|()| anyhow!("failed to set path_with_query"))?;7172let outgoing_body = request73.body()74.map_err(|_| anyhow!("outgoing request write failed"))?;7576let options = http_types::RequestOptions::new();77options78.set_connect_timeout(connect_timeout)79.map_err(|()| anyhow!("failed to set connect_timeout"))?;80options81.set_first_byte_timeout(first_by_timeout)82.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;83options84.set_between_bytes_timeout(between_bytes_timeout)85.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;86let options = Some(options);8788let future_response = outgoing_handler::handle(request, options)?;8990if let Some(mut buf) = body {91let request_body = outgoing_body92.write()93.map_err(|_| anyhow!("outgoing request write failed"))?;9495let pollable = request_body.subscribe();96while !buf.is_empty() {97pollable.block();9899let permit = match request_body.check_write() {100Ok(n) => n,101Err(_) => anyhow::bail!("output stream error"),102};103104let len = buf.len().min(permit as usize);105let (chunk, rest) = buf.split_at(len);106buf = rest;107108match request_body.write(chunk) {109Err(_) => anyhow::bail!("output stream error"),110_ => {}111}112}113114match request_body.flush() {115Err(_) => anyhow::bail!("output stream error"),116_ => {}117}118119pollable.block();120121match request_body.check_write() {122Ok(_) => {}123Err(_) => anyhow::bail!("output stream error"),124};125}126http_types::OutgoingBody::finish(outgoing_body, None)?;127128let incoming_response = match future_response.get() {129Some(result) => result.map_err(|()| anyhow!("response already taken"))?,130None => {131let pollable = future_response.subscribe();132pollable.block();133future_response134.get()135.expect("incoming response available")136.map_err(|()| anyhow!("response already taken"))?137}138}?;139140drop(future_response);141142let status = incoming_response.status();143144let headers_handle = incoming_response.headers();145let headers = headers_handle.entries();146drop(headers_handle);147148let incoming_body = incoming_response149.consume()150.map_err(|()| anyhow!("incoming response has no body stream"))?;151152drop(incoming_response);153154let input_stream = incoming_body.stream().unwrap();155let input_stream_pollable = input_stream.subscribe();156157let mut body = Vec::new();158loop {159input_stream_pollable.block();160161let mut body_chunk = match input_stream.read(1024 * 1024) {162Ok(c) => c,163Err(streams::StreamError::Closed) => break,164Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?,165};166167if !body_chunk.is_empty() {168body.append(&mut body_chunk);169}170}171172Ok(Response {173status,174headers,175body,176})177}178179180