use crate::p2;
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, empty};
use wasmtime::component::{HasData, ResourceTable};
use wasmtime_wasi_io::streams::{InputStream, OutputStream};
mod empty;
mod file;
mod locked_async;
mod mem;
mod stdout;
mod worker_thread_stdin;
pub use self::file::{InputFile, OutputFile};
pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
#[doc(no_inline)]
pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
pub struct WasiCli;
impl HasData for WasiCli {
type Data<'a> = WasiCliCtxView<'a>;
}
pub trait WasiCliView: Send {
fn cli(&mut self) -> WasiCliCtxView<'_>;
}
pub struct WasiCliCtxView<'a> {
pub ctx: &'a mut WasiCliCtx,
pub table: &'a mut ResourceTable,
}
pub struct WasiCliCtx {
pub(crate) environment: Vec<(String, String)>,
pub(crate) arguments: Vec<String>,
pub(crate) initial_cwd: Option<String>,
pub(crate) stdin: Box<dyn StdinStream>,
pub(crate) stdout: Box<dyn StdoutStream>,
pub(crate) stderr: Box<dyn StdoutStream>,
}
impl Default for WasiCliCtx {
fn default() -> WasiCliCtx {
WasiCliCtx {
environment: Vec::new(),
arguments: Vec::new(),
initial_cwd: None,
stdin: Box::new(empty()),
stdout: Box::new(empty()),
stderr: Box::new(empty()),
}
}
}
pub trait IsTerminal {
fn is_terminal(&self) -> bool;
}
pub trait StdinStream: IsTerminal + Send {
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
fn p2_stream(&self) -> Box<dyn InputStream> {
Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
self.async_stream(),
)))
}
}
pub trait StdoutStream: IsTerminal + Send {
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
fn p2_stream(&self) -> Box<dyn OutputStream> {
Box::new(p2::pipe::AsyncWriteStream::new(
8192,
Pin::from(self.async_stream()),
))
}
}
impl<T: ?Sized + IsTerminal> IsTerminal for &T {
fn is_terminal(&self) -> bool {
T::is_terminal(self)
}
}
impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
fn p2_stream(&self) -> Box<dyn InputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
fn p2_stream(&self) -> Box<dyn OutputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
fn is_terminal(&self) -> bool {
T::is_terminal(self)
}
}
impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
fn p2_stream(&self) -> Box<dyn InputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
fn p2_stream(&self) -> Box<dyn OutputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
fn is_terminal(&self) -> bool {
T::is_terminal(self)
}
}
impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
fn p2_stream(&self) -> Box<dyn InputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
fn p2_stream(&self) -> Box<dyn OutputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
fn is_terminal(&self) -> bool {
T::is_terminal(self)
}
}
impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
fn p2_stream(&self) -> Box<dyn InputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
T::async_stream(self)
}
}
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
fn p2_stream(&self) -> Box<dyn OutputStream> {
T::p2_stream(self)
}
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
T::async_stream(self)
}
}
#[cfg(test)]
mod test {
use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
use crate::p2::{self, OutputStream};
use anyhow::Result;
use bytes::Bytes;
use tokio::io::AsyncReadExt;
#[test]
fn memory_stdin_stream() {
let pipe =
p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
let mut view1 = pipe.p2_stream();
let mut view2 = pipe.p2_stream();
let read1 = view1.read(10).expect("read first 10 bytes");
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
let read2 = view2.read(10).expect("read second 10 bytes");
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
let read3 = view1.read(10).expect("read third 10 bytes");
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
let read4 = view2.read(10).expect("read fourth 10 bytes");
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
}
#[tokio::test]
async fn async_stdin_stream() {
let dir = tempfile::tempdir().unwrap();
let mut path = std::path::PathBuf::from(dir.path());
path.push("file");
std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
let file = tokio::fs::File::open(&path)
.await
.expect("open created file");
let stdin_stream = super::AsyncStdinStream::new(file);
use super::StdinStream;
let mut view1 = stdin_stream.p2_stream();
let mut view2 = stdin_stream.p2_stream();
view1.ready().await;
let read1 = view1.read(10).expect("read first 10 bytes");
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
let read2 = view2.read(10).expect("read second 10 bytes");
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
let read3 = view1.read(10).expect("read third 10 bytes");
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
let read4 = view2.read(10).expect("read fourth 10 bytes");
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
}
#[tokio::test]
async fn async_stdout_stream_unblocks() {
let (mut read, write) = tokio::io::duplex(32);
let stdout = AsyncStdoutStream::new(32, write);
let task = tokio::task::spawn(async move {
let mut stream = stdout.p2_stream();
blocking_write_and_flush(&mut *stream, "x".into())
.await
.unwrap();
});
let mut buf = [0; 100];
let n = read.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"x");
task.await.unwrap();
}
async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
while !bytes.is_empty() {
let permit = s.write_ready().await?;
let len = bytes.len().min(permit);
let chunk = bytes.split_to(len);
s.write(chunk)?;
}
s.flush()?;
s.write_ready().await?;
Ok(())
}
}