Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/test-programs/src/http.rs
1693 views
1
use crate::wasi::http::{outgoing_handler, types as http_types};
2
use crate::wasi::io::streams;
3
use anyhow::{Result, anyhow};
4
use std::fmt;
5
6
pub struct Response {
7
pub status: http_types::StatusCode,
8
pub headers: Vec<(String, Vec<u8>)>,
9
pub body: Vec<u8>,
10
}
11
impl fmt::Debug for Response {
12
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
13
let mut out = f.debug_struct("Response");
14
out.field("status", &self.status)
15
.field("headers", &self.headers);
16
if let Ok(body) = std::str::from_utf8(&self.body) {
17
out.field("body", &body);
18
} else {
19
out.field("body", &self.body);
20
}
21
out.finish()
22
}
23
}
24
25
impl Response {
26
pub fn header(&self, name: &str) -> Option<&Vec<u8>> {
27
self.headers
28
.iter()
29
.find_map(|(k, v)| if k == name { Some(v) } else { None })
30
}
31
}
32
33
pub fn request(
34
method: http_types::Method,
35
scheme: http_types::Scheme,
36
authority: &str,
37
path_with_query: &str,
38
body: Option<&[u8]>,
39
additional_headers: Option<&[(String, Vec<u8>)]>,
40
connect_timeout: Option<u64>,
41
first_by_timeout: Option<u64>,
42
between_bytes_timeout: Option<u64>,
43
) -> Result<Response> {
44
fn header_val(v: &str) -> Vec<u8> {
45
v.to_string().into_bytes()
46
}
47
let headers = http_types::Headers::from_list(
48
&[
49
&[
50
("User-agent".to_string(), header_val("WASI-HTTP/0.0.1")),
51
("Content-type".to_string(), header_val("application/json")),
52
],
53
additional_headers.unwrap_or(&[]),
54
]
55
.concat(),
56
)?;
57
58
let request = http_types::OutgoingRequest::new(headers);
59
60
request
61
.set_method(&method)
62
.map_err(|()| anyhow!("failed to set method"))?;
63
request
64
.set_scheme(Some(&scheme))
65
.map_err(|()| anyhow!("failed to set scheme"))?;
66
request
67
.set_authority(Some(authority))
68
.map_err(|()| anyhow!("failed to set authority"))?;
69
request
70
.set_path_with_query(Some(&path_with_query))
71
.map_err(|()| anyhow!("failed to set path_with_query"))?;
72
73
let outgoing_body = request
74
.body()
75
.map_err(|_| anyhow!("outgoing request write failed"))?;
76
77
let options = http_types::RequestOptions::new();
78
options
79
.set_connect_timeout(connect_timeout)
80
.map_err(|()| anyhow!("failed to set connect_timeout"))?;
81
options
82
.set_first_byte_timeout(first_by_timeout)
83
.map_err(|()| anyhow!("failed to set first_byte_timeout"))?;
84
options
85
.set_between_bytes_timeout(between_bytes_timeout)
86
.map_err(|()| anyhow!("failed to set between_bytes_timeout"))?;
87
let options = Some(options);
88
89
let future_response = outgoing_handler::handle(request, options)?;
90
91
if let Some(mut buf) = body {
92
let request_body = outgoing_body
93
.write()
94
.map_err(|_| anyhow!("outgoing request write failed"))?;
95
96
let pollable = request_body.subscribe();
97
while !buf.is_empty() {
98
pollable.block();
99
100
let permit = match request_body.check_write() {
101
Ok(n) => n,
102
Err(_) => anyhow::bail!("output stream error"),
103
};
104
105
let len = buf.len().min(permit as usize);
106
let (chunk, rest) = buf.split_at(len);
107
buf = rest;
108
109
match request_body.write(chunk) {
110
Err(_) => anyhow::bail!("output stream error"),
111
_ => {}
112
}
113
}
114
115
match request_body.flush() {
116
Err(_) => anyhow::bail!("output stream error"),
117
_ => {}
118
}
119
120
pollable.block();
121
122
match request_body.check_write() {
123
Ok(_) => {}
124
Err(_) => anyhow::bail!("output stream error"),
125
};
126
}
127
http_types::OutgoingBody::finish(outgoing_body, None)?;
128
129
let incoming_response = match future_response.get() {
130
Some(result) => result.map_err(|()| anyhow!("response already taken"))?,
131
None => {
132
let pollable = future_response.subscribe();
133
pollable.block();
134
future_response
135
.get()
136
.expect("incoming response available")
137
.map_err(|()| anyhow!("response already taken"))?
138
}
139
}?;
140
141
drop(future_response);
142
143
let status = incoming_response.status();
144
145
let headers_handle = incoming_response.headers();
146
let headers = headers_handle.entries();
147
drop(headers_handle);
148
149
let incoming_body = incoming_response
150
.consume()
151
.map_err(|()| anyhow!("incoming response has no body stream"))?;
152
153
drop(incoming_response);
154
155
let input_stream = incoming_body.stream().unwrap();
156
let input_stream_pollable = input_stream.subscribe();
157
158
let mut body = Vec::new();
159
loop {
160
input_stream_pollable.block();
161
162
let mut body_chunk = match input_stream.read(1024 * 1024) {
163
Ok(c) => c,
164
Err(streams::StreamError::Closed) => break,
165
Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?,
166
};
167
168
if !body_chunk.is_empty() {
169
body.append(&mut body_chunk);
170
}
171
}
172
173
Ok(Response {
174
status,
175
headers,
176
body,
177
})
178
}
179
180