Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-io/src/streams.rs
1692 views
1
use crate::poll::Pollable;
2
use alloc::boxed::Box;
3
use anyhow::Result;
4
use bytes::Bytes;
5
6
/// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7
/// prematurely due to `io::ErrorKind::WouldBlock`.
8
///
9
/// To ensure that `blocking_` functions return a valid non-empty result,
10
/// we use a loop with a maximum iteration limit.
11
///
12
/// This constant defines the maximum number of loop attempts allowed.
13
const MAX_BLOCKING_ATTEMPTS: u8 = 10;
14
15
/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
16
/// bytestream which can be read from.
17
#[async_trait::async_trait]
18
pub trait InputStream: Pollable {
19
/// Reads up to `size` bytes, returning a buffer holding these bytes on
20
/// success.
21
///
22
/// This function does not block the current thread and is the equivalent of
23
/// a non-blocking read. On success all bytes read are returned through
24
/// `Bytes`, which is no larger than the `size` provided. If the returned
25
/// list of `Bytes` is empty then no data is ready to be read at this time.
26
///
27
/// # Errors
28
///
29
/// The [`StreamError`] return value communicates when this stream is
30
/// closed, when a read fails, or when a trap should be generated.
31
fn read(&mut self, size: usize) -> StreamResult<Bytes>;
32
33
/// Similar to `read`, except that it blocks until at least one byte can be
34
/// read.
35
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
36
if size == 0 {
37
self.ready().await;
38
return self.read(size);
39
}
40
41
let mut i = 0;
42
loop {
43
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44
self.ready().await;
45
let data = self.read(size)?;
46
if !data.is_empty() {
47
return Ok(data);
48
}
49
if i >= MAX_BLOCKING_ATTEMPTS {
50
return Err(StreamError::trap("max blocking attempts exceeded"));
51
}
52
i += 1;
53
}
54
}
55
56
/// Same as the `read` method except that bytes are skipped.
57
///
58
/// Note that this method is non-blocking like `read` and returns the same
59
/// errors.
60
fn skip(&mut self, nelem: usize) -> StreamResult<usize> {
61
let bs = self.read(nelem)?;
62
Ok(bs.len())
63
}
64
65
/// Similar to `skip`, except that it blocks until at least one byte can be
66
/// skipped.
67
async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {
68
let bs = self.blocking_read(nelem).await?;
69
Ok(bs.len())
70
}
71
72
/// Cancel any asynchronous work and wait for it to wrap up.
73
async fn cancel(&mut self) {}
74
}
75
76
/// Representation of the `error` resource type in the `wasi:io/error`
77
/// interface.
78
///
79
/// This is currently `anyhow::Error` to retain full type information for
80
/// errors.
81
pub type Error = anyhow::Error;
82
83
pub type StreamResult<T> = Result<T, StreamError>;
84
85
#[derive(Debug)]
86
pub enum StreamError {
87
Closed,
88
LastOperationFailed(anyhow::Error),
89
Trap(anyhow::Error),
90
}
91
92
impl StreamError {
93
pub fn trap(msg: &str) -> StreamError {
94
StreamError::Trap(anyhow::anyhow!("{msg}"))
95
}
96
}
97
98
impl alloc::fmt::Display for StreamError {
99
fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {
100
match self {
101
StreamError::Closed => write!(f, "closed"),
102
StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),
103
StreamError::Trap(e) => write!(f, "trap: {e}"),
104
}
105
}
106
}
107
108
impl core::error::Error for StreamError {
109
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
110
match self {
111
StreamError::Closed => None,
112
StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),
113
}
114
}
115
}
116
117
impl From<wasmtime::component::ResourceTableError> for StreamError {
118
fn from(error: wasmtime::component::ResourceTableError) -> Self {
119
Self::Trap(error.into())
120
}
121
}
122
123
/// Host trait for implementing the `wasi:io/streams.output-stream` resource:
124
/// A bytestream which can be written to.
125
#[async_trait::async_trait]
126
pub trait OutputStream: Pollable {
127
/// Write bytes after obtaining a permit to write those bytes
128
///
129
/// Prior to calling [`write`](Self::write) the caller must call
130
/// [`check_write`](Self::check_write), which resolves to a non-zero permit
131
///
132
/// This method must never block. The [`check_write`](Self::check_write)
133
/// permit indicates the maximum amount of bytes that are permitted to be
134
/// written in a single [`write`](Self::write) following the
135
/// [`check_write`](Self::check_write) resolution.
136
///
137
/// # Errors
138
///
139
/// Returns a [`StreamError`] if:
140
/// - stream is closed
141
/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
142
/// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
143
fn write(&mut self, bytes: Bytes) -> StreamResult<()>;
144
145
/// Trigger a flush of any bytes buffered in this stream implementation.
146
///
147
/// This method may be called at any time and must never block.
148
///
149
/// After this method is called, [`check_write`](Self::check_write) must
150
/// pend until flush is complete.
151
///
152
/// When [`check_write`](Self::check_write) becomes ready after a flush,
153
/// that guarantees that all prior writes have been flushed from the
154
/// implementation successfully, or that any error associated with those
155
/// writes is reported in the return value of [`flush`](Self::flush) or
156
/// [`check_write`](Self::check_write)
157
///
158
/// # Errors
159
///
160
/// Returns a [`StreamError`] if:
161
/// - stream is closed
162
/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
163
/// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)
164
fn flush(&mut self) -> StreamResult<()>;
165
166
/// Returns the number of bytes that are ready to be written to this stream.
167
///
168
/// Zero bytes indicates that this stream is not currently ready for writing
169
/// and `ready()` must be awaited first.
170
///
171
/// Note that this method does not block.
172
///
173
/// # Errors
174
///
175
/// Returns an [`StreamError`] if:
176
/// - stream is closed
177
/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed
178
fn check_write(&mut self) -> StreamResult<usize>;
179
180
/// Perform a write of up to 4096 bytes, and then flush the stream. Block
181
/// until all of these operations are complete, or an error occurs.
182
///
183
/// This is a convenience wrapper around the use of `check-write`,
184
/// `subscribe`, `write`, and `flush`, and is implemented with the
185
/// following pseudo-code:
186
///
187
/// ```text
188
/// let pollable = this.subscribe();
189
/// while !contents.is_empty() {
190
/// // Wait for the stream to become writable
191
/// pollable.block();
192
/// let Ok(n) = this.check-write(); // eliding error handling
193
/// let len = min(n, contents.len());
194
/// let (chunk, rest) = contents.split_at(len);
195
/// this.write(chunk ); // eliding error handling
196
/// contents = rest;
197
/// }
198
/// this.flush();
199
/// // Wait for completion of `flush`
200
/// pollable.block();
201
/// // Check for any errors that arose during `flush`
202
/// let _ = this.check-write(); // eliding error handling
203
/// ```
204
async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {
205
loop {
206
let permit = self.write_ready().await?;
207
let len = bytes.len().min(permit);
208
let chunk = bytes.split_to(len);
209
self.write(chunk)?;
210
if bytes.is_empty() {
211
break;
212
}
213
}
214
215
// If the stream encounters an error, return it, but if the stream
216
// has become closed, do not.
217
match self.flush() {
218
Ok(_) => {}
219
Err(StreamError::Closed) => {}
220
Err(e) => Err(e)?,
221
};
222
match self.write_ready().await {
223
Ok(_) => {}
224
Err(StreamError::Closed) => {}
225
Err(e) => Err(e)?,
226
};
227
228
Ok(())
229
}
230
231
/// Repeatedly write a byte to a stream.
232
/// Important: this write must be non-blocking!
233
/// Returning an Err which downcasts to a [`StreamError`] will be
234
/// reported to Wasm as the empty error result. Otherwise, errors will trap.
235
fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {
236
// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
237
// repeatedly from a 'static buffer of zeros.
238
let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
239
self.write(bs)?;
240
Ok(())
241
}
242
243
/// Perform a write of up to 4096 zeroes, and then flush the stream.
244
/// Block until all of these operations are complete, or an error
245
/// occurs.
246
///
247
/// This is a convenience wrapper around the use of `check-write`,
248
/// `subscribe`, `write-zeroes`, and `flush`, and is implemented with
249
/// the following pseudo-code:
250
///
251
/// ```text
252
/// let pollable = this.subscribe();
253
/// while num_zeroes != 0 {
254
/// // Wait for the stream to become writable
255
/// pollable.block();
256
/// let Ok(n) = this.check-write(); // eliding error handling
257
/// let len = min(n, num_zeroes);
258
/// this.write-zeroes(len); // eliding error handling
259
/// num_zeroes -= len;
260
/// }
261
/// this.flush();
262
/// // Wait for completion of `flush`
263
/// pollable.block();
264
/// // Check for any errors that arose during `flush`
265
/// let _ = this.check-write(); // eliding error handling
266
/// ```
267
async fn blocking_write_zeroes_and_flush(&mut self, nelem: usize) -> StreamResult<()> {
268
// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write
269
// repeatedly from a 'static buffer of zeros.
270
let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));
271
self.blocking_write_and_flush(bs).await
272
}
273
274
/// Simultaneously waits for this stream to be writable and then returns how
275
/// much may be written or the last error that happened.
276
async fn write_ready(&mut self) -> StreamResult<usize> {
277
let mut i = 0;
278
loop {
279
// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
280
self.ready().await;
281
let n = self.check_write()?;
282
if n > 0 {
283
return Ok(n);
284
}
285
if i >= MAX_BLOCKING_ATTEMPTS {
286
return Err(StreamError::trap("max blocking attempts exceeded"));
287
}
288
i += 1;
289
}
290
}
291
292
/// Cancel any asynchronous work and wait for it to wrap up.
293
async fn cancel(&mut self) {}
294
}
295
296
#[async_trait::async_trait]
297
impl Pollable for Box<dyn OutputStream> {
298
async fn ready(&mut self) {
299
(**self).ready().await
300
}
301
}
302
303
#[async_trait::async_trait]
304
impl Pollable for Box<dyn InputStream> {
305
async fn ready(&mut self) {
306
(**self).ready().await
307
}
308
}
309
310
pub type DynInputStream = Box<dyn InputStream>;
311
312
pub type DynOutputStream = Box<dyn OutputStream>;
313
314