Path: blob/main/crates/wasi-io/src/streams.rs
1692 views
use crate::poll::Pollable;1use alloc::boxed::Box;2use anyhow::Result;3use bytes::Bytes;45/// `Pollable::ready()` for `InputStream` and `OutputStream` may return6/// prematurely due to `io::ErrorKind::WouldBlock`.7///8/// To ensure that `blocking_` functions return a valid non-empty result,9/// we use a loop with a maximum iteration limit.10///11/// This constant defines the maximum number of loop attempts allowed.12const MAX_BLOCKING_ATTEMPTS: u8 = 10;1314/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A15/// bytestream which can be read from.16#[async_trait::async_trait]17pub trait InputStream: Pollable {18/// Reads up to `size` bytes, returning a buffer holding these bytes on19/// success.20///21/// This function does not block the current thread and is the equivalent of22/// a non-blocking read. On success all bytes read are returned through23/// `Bytes`, which is no larger than the `size` provided. If the returned24/// list of `Bytes` is empty then no data is ready to be read at this time.25///26/// # Errors27///28/// The [`StreamError`] return value communicates when this stream is29/// closed, when a read fails, or when a trap should be generated.30fn read(&mut self, size: usize) -> StreamResult<Bytes>;3132/// Similar to `read`, except that it blocks until at least one byte can be33/// read.34async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {35if size == 0 {36self.ready().await;37return self.read(size);38}3940let mut i = 0;41loop {42// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.43self.ready().await;44let data = self.read(size)?;45if !data.is_empty() {46return Ok(data);47}48if i >= MAX_BLOCKING_ATTEMPTS {49return Err(StreamError::trap("max blocking attempts exceeded"));50}51i += 1;52}53}5455/// Same as the `read` method except that bytes are skipped.56///57/// Note that this method is non-blocking like `read` and returns the same58/// errors.59fn skip(&mut self, nelem: usize) -> StreamResult<usize> {60let bs = self.read(nelem)?;61Ok(bs.len())62}6364/// Similar to `skip`, except that it blocks until at least one byte can be65/// skipped.66async fn blocking_skip(&mut self, nelem: usize) -> StreamResult<usize> {67let bs = self.blocking_read(nelem).await?;68Ok(bs.len())69}7071/// Cancel any asynchronous work and wait for it to wrap up.72async fn cancel(&mut self) {}73}7475/// Representation of the `error` resource type in the `wasi:io/error`76/// interface.77///78/// This is currently `anyhow::Error` to retain full type information for79/// errors.80pub type Error = anyhow::Error;8182pub type StreamResult<T> = Result<T, StreamError>;8384#[derive(Debug)]85pub enum StreamError {86Closed,87LastOperationFailed(anyhow::Error),88Trap(anyhow::Error),89}9091impl StreamError {92pub fn trap(msg: &str) -> StreamError {93StreamError::Trap(anyhow::anyhow!("{msg}"))94}95}9697impl alloc::fmt::Display for StreamError {98fn fmt(&self, f: &mut alloc::fmt::Formatter<'_>) -> alloc::fmt::Result {99match self {100StreamError::Closed => write!(f, "closed"),101StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"),102StreamError::Trap(e) => write!(f, "trap: {e}"),103}104}105}106107impl core::error::Error for StreamError {108fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {109match self {110StreamError::Closed => None,111StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(),112}113}114}115116impl From<wasmtime::component::ResourceTableError> for StreamError {117fn from(error: wasmtime::component::ResourceTableError) -> Self {118Self::Trap(error.into())119}120}121122/// Host trait for implementing the `wasi:io/streams.output-stream` resource:123/// A bytestream which can be written to.124#[async_trait::async_trait]125pub trait OutputStream: Pollable {126/// Write bytes after obtaining a permit to write those bytes127///128/// Prior to calling [`write`](Self::write) the caller must call129/// [`check_write`](Self::check_write), which resolves to a non-zero permit130///131/// This method must never block. The [`check_write`](Self::check_write)132/// permit indicates the maximum amount of bytes that are permitted to be133/// written in a single [`write`](Self::write) following the134/// [`check_write`](Self::check_write) resolution.135///136/// # Errors137///138/// Returns a [`StreamError`] if:139/// - stream is closed140/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed141/// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)142fn write(&mut self, bytes: Bytes) -> StreamResult<()>;143144/// Trigger a flush of any bytes buffered in this stream implementation.145///146/// This method may be called at any time and must never block.147///148/// After this method is called, [`check_write`](Self::check_write) must149/// pend until flush is complete.150///151/// When [`check_write`](Self::check_write) becomes ready after a flush,152/// that guarantees that all prior writes have been flushed from the153/// implementation successfully, or that any error associated with those154/// writes is reported in the return value of [`flush`](Self::flush) or155/// [`check_write`](Self::check_write)156///157/// # Errors158///159/// Returns a [`StreamError`] if:160/// - stream is closed161/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed162/// - caller performed an illegal operation (e.g. wrote more bytes than were permitted)163fn flush(&mut self) -> StreamResult<()>;164165/// Returns the number of bytes that are ready to be written to this stream.166///167/// Zero bytes indicates that this stream is not currently ready for writing168/// and `ready()` must be awaited first.169///170/// Note that this method does not block.171///172/// # Errors173///174/// Returns an [`StreamError`] if:175/// - stream is closed176/// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed177fn check_write(&mut self) -> StreamResult<usize>;178179/// Perform a write of up to 4096 bytes, and then flush the stream. Block180/// until all of these operations are complete, or an error occurs.181///182/// This is a convenience wrapper around the use of `check-write`,183/// `subscribe`, `write`, and `flush`, and is implemented with the184/// following pseudo-code:185///186/// ```text187/// let pollable = this.subscribe();188/// while !contents.is_empty() {189/// // Wait for the stream to become writable190/// pollable.block();191/// let Ok(n) = this.check-write(); // eliding error handling192/// let len = min(n, contents.len());193/// let (chunk, rest) = contents.split_at(len);194/// this.write(chunk ); // eliding error handling195/// contents = rest;196/// }197/// this.flush();198/// // Wait for completion of `flush`199/// pollable.block();200/// // Check for any errors that arose during `flush`201/// let _ = this.check-write(); // eliding error handling202/// ```203async fn blocking_write_and_flush(&mut self, mut bytes: Bytes) -> StreamResult<()> {204loop {205let permit = self.write_ready().await?;206let len = bytes.len().min(permit);207let chunk = bytes.split_to(len);208self.write(chunk)?;209if bytes.is_empty() {210break;211}212}213214// If the stream encounters an error, return it, but if the stream215// has become closed, do not.216match self.flush() {217Ok(_) => {}218Err(StreamError::Closed) => {}219Err(e) => Err(e)?,220};221match self.write_ready().await {222Ok(_) => {}223Err(StreamError::Closed) => {}224Err(e) => Err(e)?,225};226227Ok(())228}229230/// Repeatedly write a byte to a stream.231/// Important: this write must be non-blocking!232/// Returning an Err which downcasts to a [`StreamError`] will be233/// reported to Wasm as the empty error result. Otherwise, errors will trap.234fn write_zeroes(&mut self, nelem: usize) -> StreamResult<()> {235// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write236// repeatedly from a 'static buffer of zeros.237let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));238self.write(bs)?;239Ok(())240}241242/// Perform a write of up to 4096 zeroes, and then flush the stream.243/// Block until all of these operations are complete, or an error244/// occurs.245///246/// This is a convenience wrapper around the use of `check-write`,247/// `subscribe`, `write-zeroes`, and `flush`, and is implemented with248/// the following pseudo-code:249///250/// ```text251/// let pollable = this.subscribe();252/// while num_zeroes != 0 {253/// // Wait for the stream to become writable254/// pollable.block();255/// let Ok(n) = this.check-write(); // eliding error handling256/// let len = min(n, num_zeroes);257/// this.write-zeroes(len); // eliding error handling258/// num_zeroes -= len;259/// }260/// this.flush();261/// // Wait for completion of `flush`262/// pollable.block();263/// // Check for any errors that arose during `flush`264/// let _ = this.check-write(); // eliding error handling265/// ```266async fn blocking_write_zeroes_and_flush(&mut self, nelem: usize) -> StreamResult<()> {267// TODO: We could optimize this to not allocate one big zeroed buffer, and instead write268// repeatedly from a 'static buffer of zeros.269let bs = Bytes::from_iter(core::iter::repeat(0).take(nelem));270self.blocking_write_and_flush(bs).await271}272273/// Simultaneously waits for this stream to be writable and then returns how274/// much may be written or the last error that happened.275async fn write_ready(&mut self) -> StreamResult<usize> {276let mut i = 0;277loop {278// This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.279self.ready().await;280let n = self.check_write()?;281if n > 0 {282return Ok(n);283}284if i >= MAX_BLOCKING_ATTEMPTS {285return Err(StreamError::trap("max blocking attempts exceeded"));286}287i += 1;288}289}290291/// Cancel any asynchronous work and wait for it to wrap up.292async fn cancel(&mut self) {}293}294295#[async_trait::async_trait]296impl Pollable for Box<dyn OutputStream> {297async fn ready(&mut self) {298(**self).ready().await299}300}301302#[async_trait::async_trait]303impl Pollable for Box<dyn InputStream> {304async fn ready(&mut self) {305(**self).ready().await306}307}308309pub type DynInputStream = Box<dyn InputStream>;310311pub type DynOutputStream = Box<dyn OutputStream>;312313314