Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-io/src/impls.rs
1692 views
1
use crate::bindings::wasi::io::{error, poll, streams};
2
use crate::poll::{DynFuture, DynPollable, MakeFuture, subscribe};
3
use crate::streams::{DynInputStream, DynOutputStream, StreamError, StreamResult};
4
use alloc::collections::BTreeMap;
5
use alloc::string::String;
6
use alloc::vec::Vec;
7
use anyhow::{Result, anyhow};
8
use core::future::Future;
9
use core::pin::Pin;
10
use core::task::{Context, Poll};
11
use wasmtime::component::{Resource, ResourceTable};
12
13
impl poll::Host for ResourceTable {
14
async fn poll(&mut self, pollables: Vec<Resource<DynPollable>>) -> Result<Vec<u32>> {
15
type ReadylistIndex = u32;
16
17
if pollables.is_empty() {
18
return Err(anyhow!("empty poll list"));
19
}
20
21
let mut table_futures: BTreeMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = BTreeMap::new();
22
23
for (ix, p) in pollables.iter().enumerate() {
24
let ix: u32 = ix.try_into()?;
25
26
let pollable = self.get(p)?;
27
let (_, list) = table_futures
28
.entry(pollable.index)
29
.or_insert((pollable.make_future, Vec::new()));
30
list.push(ix);
31
}
32
33
let mut futures: Vec<(DynFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
34
for (entry, (make_future, readylist_indices)) in self.iter_entries(table_futures) {
35
let entry = entry?;
36
futures.push((make_future(entry), readylist_indices));
37
}
38
39
struct PollList<'a> {
40
futures: Vec<(DynFuture<'a>, Vec<ReadylistIndex>)>,
41
}
42
impl<'a> Future for PollList<'a> {
43
type Output = Vec<u32>;
44
45
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46
let mut any_ready = false;
47
let mut results = Vec::new();
48
for (fut, readylist_indicies) in self.futures.iter_mut() {
49
match fut.as_mut().poll(cx) {
50
Poll::Ready(()) => {
51
results.extend_from_slice(readylist_indicies);
52
any_ready = true;
53
}
54
Poll::Pending => {}
55
}
56
}
57
if any_ready {
58
Poll::Ready(results)
59
} else {
60
Poll::Pending
61
}
62
}
63
}
64
65
Ok(PollList { futures }.await)
66
}
67
}
68
69
impl crate::bindings::wasi::io::poll::HostPollable for ResourceTable {
70
async fn block(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
71
let pollable = self.get(&pollable)?;
72
let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
73
ready.await;
74
Ok(())
75
}
76
async fn ready(&mut self, pollable: Resource<DynPollable>) -> Result<bool> {
77
let pollable = self.get(&pollable)?;
78
let ready = (pollable.make_future)(self.get_any_mut(pollable.index)?);
79
futures::pin_mut!(ready);
80
Ok(matches!(
81
futures::future::poll_immediate(ready).await,
82
Some(())
83
))
84
}
85
fn drop(&mut self, pollable: Resource<DynPollable>) -> Result<()> {
86
let pollable = self.delete(pollable)?;
87
if let Some(delete) = pollable.remove_index_on_delete {
88
delete(self, pollable.index)?;
89
}
90
Ok(())
91
}
92
}
93
94
impl error::Host for ResourceTable {}
95
96
impl streams::Host for ResourceTable {
97
fn convert_stream_error(&mut self, err: StreamError) -> Result<streams::StreamError> {
98
match err {
99
StreamError::Closed => Ok(streams::StreamError::Closed),
100
StreamError::LastOperationFailed(e) => {
101
Ok(streams::StreamError::LastOperationFailed(self.push(e)?))
102
}
103
StreamError::Trap(e) => Err(e),
104
}
105
}
106
}
107
108
impl error::HostError for ResourceTable {
109
fn drop(&mut self, err: Resource<streams::Error>) -> Result<()> {
110
self.delete(err)?;
111
Ok(())
112
}
113
114
fn to_debug_string(&mut self, err: Resource<streams::Error>) -> Result<String> {
115
Ok(alloc::format!("{:?}", self.get(&err)?))
116
}
117
}
118
119
impl streams::HostOutputStream for ResourceTable {
120
async fn drop(&mut self, stream: Resource<DynOutputStream>) -> Result<()> {
121
self.delete(stream)?.cancel().await;
122
Ok(())
123
}
124
125
fn check_write(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<u64> {
126
let bytes = self.get_mut(&stream)?.check_write()?;
127
Ok(bytes as u64)
128
}
129
130
fn write(&mut self, stream: Resource<DynOutputStream>, bytes: Vec<u8>) -> StreamResult<()> {
131
self.get_mut(&stream)?.write(bytes.into())?;
132
Ok(())
133
}
134
135
fn subscribe(&mut self, stream: Resource<DynOutputStream>) -> Result<Resource<DynPollable>> {
136
subscribe(self, stream)
137
}
138
139
async fn blocking_write_and_flush(
140
&mut self,
141
stream: Resource<DynOutputStream>,
142
bytes: Vec<u8>,
143
) -> StreamResult<()> {
144
if bytes.len() > 4096 {
145
return Err(StreamError::trap(
146
"Buffer too large for blocking-write-and-flush (expected at most 4096)",
147
));
148
}
149
150
self.get_mut(&stream)?
151
.blocking_write_and_flush(bytes.into())
152
.await
153
}
154
155
async fn blocking_write_zeroes_and_flush(
156
&mut self,
157
stream: Resource<DynOutputStream>,
158
len: u64,
159
) -> StreamResult<()> {
160
if len > 4096 {
161
return Err(StreamError::trap(
162
"Buffer too large for blocking-write-zeroes-and-flush (expected at most 4096)",
163
));
164
}
165
166
self.get_mut(&stream)?
167
.blocking_write_zeroes_and_flush(len as usize)
168
.await
169
}
170
171
fn write_zeroes(&mut self, stream: Resource<DynOutputStream>, len: u64) -> StreamResult<()> {
172
self.get_mut(&stream)?.write_zeroes(len as usize)?;
173
Ok(())
174
}
175
176
fn flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
177
self.get_mut(&stream)?.flush()?;
178
Ok(())
179
}
180
181
async fn blocking_flush(&mut self, stream: Resource<DynOutputStream>) -> StreamResult<()> {
182
let s = self.get_mut(&stream)?;
183
s.flush()?;
184
s.write_ready().await?;
185
Ok(())
186
}
187
188
fn splice(
189
&mut self,
190
dest: Resource<DynOutputStream>,
191
src: Resource<DynInputStream>,
192
len: u64,
193
) -> StreamResult<u64> {
194
let len = len.try_into().unwrap_or(usize::MAX);
195
196
let permit = {
197
let output = self.get_mut(&dest)?;
198
output.check_write()?
199
};
200
let len = len.min(permit);
201
if len == 0 {
202
return Ok(0);
203
}
204
205
let contents = self.get_mut(&src)?.read(len)?;
206
207
let len = contents.len();
208
if len == 0 {
209
return Ok(0);
210
}
211
212
let output = self.get_mut(&dest)?;
213
output.write(contents)?;
214
Ok(len.try_into().expect("usize can fit in u64"))
215
}
216
217
async fn blocking_splice(
218
&mut self,
219
dest: Resource<DynOutputStream>,
220
src: Resource<DynInputStream>,
221
len: u64,
222
) -> StreamResult<u64> {
223
let len = len.try_into().unwrap_or(usize::MAX);
224
225
let permit = {
226
let output = self.get_mut(&dest)?;
227
output.write_ready().await?
228
};
229
let len = len.min(permit);
230
if len == 0 {
231
return Ok(0);
232
}
233
234
let contents = self.get_mut(&src)?.blocking_read(len).await?;
235
236
let len = contents.len();
237
if len == 0 {
238
return Ok(0);
239
}
240
241
let output = self.get_mut(&dest)?;
242
output.blocking_write_and_flush(contents).await?;
243
Ok(len.try_into().expect("usize can fit in u64"))
244
}
245
}
246
247
impl streams::HostInputStream for ResourceTable {
248
async fn drop(&mut self, stream: Resource<DynInputStream>) -> Result<()> {
249
self.delete(stream)?.cancel().await;
250
Ok(())
251
}
252
253
fn read(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<Vec<u8>> {
254
let len = len.try_into().unwrap_or(usize::MAX);
255
let bytes = self.get_mut(&stream)?.read(len)?;
256
debug_assert!(bytes.len() <= len);
257
Ok(bytes.into())
258
}
259
260
async fn blocking_read(
261
&mut self,
262
stream: Resource<DynInputStream>,
263
len: u64,
264
) -> StreamResult<Vec<u8>> {
265
let len = len.try_into().unwrap_or(usize::MAX);
266
let bytes = self.get_mut(&stream)?.blocking_read(len).await?;
267
debug_assert!(bytes.len() <= len);
268
Ok(bytes.into())
269
}
270
271
fn skip(&mut self, stream: Resource<DynInputStream>, len: u64) -> StreamResult<u64> {
272
let len = len.try_into().unwrap_or(usize::MAX);
273
let written = self.get_mut(&stream)?.skip(len)?;
274
Ok(written.try_into().expect("usize always fits in u64"))
275
}
276
277
async fn blocking_skip(
278
&mut self,
279
stream: Resource<DynInputStream>,
280
len: u64,
281
) -> StreamResult<u64> {
282
let len = len.try_into().unwrap_or(usize::MAX);
283
let written = self.get_mut(&stream)?.blocking_skip(len).await?;
284
Ok(written.try_into().expect("usize always fits in u64"))
285
}
286
287
fn subscribe(&mut self, stream: Resource<DynInputStream>) -> Result<Resource<DynPollable>> {
288
crate::poll::subscribe(self, stream)
289
}
290
}
291
292