Path: blob/main/crates/wasi/src/p3/cli/host.rs
1692 views
use crate::I32Exit;1use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};2use crate::p3::DEFAULT_BUFFER_CAPACITY;3use crate::p3::bindings::cli::{4environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,5terminal_stdin, terminal_stdout,6};7use crate::p3::cli::{TerminalInput, TerminalOutput};8use anyhow::{Context as _, anyhow};9use bytes::BytesMut;10use core::pin::Pin;11use core::task::{Context, Poll};12use std::io::Cursor;13use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};14use wasmtime::component::{15Accessor, Destination, Resource, Source, StreamConsumer, StreamProducer, StreamReader,16StreamResult,17};18use wasmtime::{AsContextMut as _, StoreContextMut};1920struct InputStreamProducer {21rx: Pin<Box<dyn AsyncRead + Send + Sync>>,22}2324impl<D> StreamProducer<D> for InputStreamProducer {25type Item = u8;26type Buffer = Cursor<BytesMut>;2728fn poll_produce<'a>(29mut self: Pin<&mut Self>,30cx: &mut Context<'_>,31mut store: StoreContextMut<'a, D>,32dst: Destination<'a, Self::Item, Self::Buffer>,33finish: bool,34) -> Poll<wasmtime::Result<StreamResult>> {35// If the destination buffer is empty then this is a request on36// behalf of the guest to wait for this input stream to be readable.37// The `AsyncRead` trait abstraction does not provide the ability to38// await this event so we're forced to basically just lie here and39// say we're ready read data later.40//41// See WebAssembly/component-model#561 for some more information.42if dst.remaining(store.as_context_mut()) == Some(0) {43return Poll::Ready(Ok(StreamResult::Completed));44}4546let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);47let mut buf = ReadBuf::new(dst.remaining());48match self.rx.as_mut().poll_read(cx, &mut buf) {49Poll::Ready(Ok(())) if buf.filled().is_empty() => {50Poll::Ready(Ok(StreamResult::Dropped))51}52Poll::Ready(Ok(())) => {53let n = buf.filled().len();54dst.mark_written(n);55Poll::Ready(Ok(StreamResult::Completed))56}57Poll::Ready(Err(..)) => {58// TODO: Report the error to the guest59Poll::Ready(Ok(StreamResult::Dropped))60}61Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),62Poll::Pending => Poll::Pending,63}64}65}6667struct OutputStreamConsumer {68tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,69}7071impl<D> StreamConsumer<D> for OutputStreamConsumer {72type Item = u8;7374fn poll_consume(75mut self: Pin<&mut Self>,76cx: &mut Context<'_>,77store: StoreContextMut<D>,78src: Source<Self::Item>,79finish: bool,80) -> Poll<wasmtime::Result<StreamResult>> {81let mut src = src.as_direct(store);82let buf = src.remaining();8384// If the source buffer is empty then this is a request on behalf of85// the guest to wait for this output stream to be writable. The86// `AsyncWrite` trait abstraction does not provide the ability to await87// this event so we're forced to basically just lie here and say we're88// ready write data later.89//90// See WebAssembly/component-model#561 for some more information.91if buf.len() == 0 {92return Poll::Ready(Ok(StreamResult::Completed));93}94match self.tx.as_mut().poll_write(cx, buf) {95Poll::Ready(Ok(n)) => {96src.mark_read(n);97Poll::Ready(Ok(StreamResult::Completed))98}99Poll::Ready(Err(e)) => {100// FIXME(WebAssembly/wasi-cli#81) should communicate this101// error to the guest somehow.102tracing::warn!("dropping stdin error: {e}");103Poll::Ready(Ok(StreamResult::Dropped))104}105Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),106Poll::Pending => Poll::Pending,107}108}109}110111impl terminal_input::Host for WasiCliCtxView<'_> {}112impl terminal_output::Host for WasiCliCtxView<'_> {}113114impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {115fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {116self.table117.delete(rep)118.context("failed to delete terminal input resource from table")?;119Ok(())120}121}122123impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {124fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {125self.table126.delete(rep)127.context("failed to delete terminal output resource from table")?;128Ok(())129}130}131132impl terminal_stdin::Host for WasiCliCtxView<'_> {133fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {134if self.ctx.stdin.is_terminal() {135let fd = self136.table137.push(TerminalInput)138.context("failed to push terminal stdin resource to table")?;139Ok(Some(fd))140} else {141Ok(None)142}143}144}145146impl terminal_stdout::Host for WasiCliCtxView<'_> {147fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {148if self.ctx.stdout.is_terminal() {149let fd = self150.table151.push(TerminalOutput)152.context("failed to push terminal stdout resource to table")?;153Ok(Some(fd))154} else {155Ok(None)156}157}158}159160impl terminal_stderr::Host for WasiCliCtxView<'_> {161fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {162if self.ctx.stderr.is_terminal() {163let fd = self164.table165.push(TerminalOutput)166.context("failed to push terminal stderr resource to table")?;167Ok(Some(fd))168} else {169Ok(None)170}171}172}173174impl stdin::HostWithStore for WasiCli {175async fn get_stdin<U>(store: &Accessor<U, Self>) -> wasmtime::Result<StreamReader<u8>> {176let instance = store.instance();177store.with(|mut store| {178let rx = store.get().ctx.stdin.async_stream();179Ok(StreamReader::new(180instance,181&mut store,182InputStreamProducer {183rx: Box::into_pin(rx),184},185))186})187}188}189190impl stdin::Host for WasiCliCtxView<'_> {}191192impl stdout::HostWithStore for WasiCli {193async fn set_stdout<U>(194store: &Accessor<U, Self>,195data: StreamReader<u8>,196) -> wasmtime::Result<()> {197store.with(|mut store| {198let tx = store.get().ctx.stdout.async_stream();199data.pipe(200store,201OutputStreamConsumer {202tx: Box::into_pin(tx),203},204);205Ok(())206})207}208}209210impl stdout::Host for WasiCliCtxView<'_> {}211212impl stderr::HostWithStore for WasiCli {213async fn set_stderr<U>(214store: &Accessor<U, Self>,215data: StreamReader<u8>,216) -> wasmtime::Result<()> {217store.with(|mut store| {218let tx = store.get().ctx.stderr.async_stream();219data.pipe(220store,221OutputStreamConsumer {222tx: Box::into_pin(tx),223},224);225Ok(())226})227}228}229230impl stderr::Host for WasiCliCtxView<'_> {}231232impl environment::Host for WasiCliCtxView<'_> {233fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {234Ok(self.ctx.environment.clone())235}236237fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {238Ok(self.ctx.arguments.clone())239}240241fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {242Ok(self.ctx.initial_cwd.clone())243}244}245246impl exit::Host for WasiCliCtxView<'_> {247fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {248let status = match status {249Ok(()) => 0,250Err(()) => 1,251};252Err(anyhow!(I32Exit(status)))253}254255fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {256Err(anyhow!(I32Exit(status_code.into())))257}258}259260261