Path: blob/main/crates/wasi/src/p3/cli/host.rs
3073 views
use crate::I32Exit;1use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};2use crate::p3::DEFAULT_BUFFER_CAPACITY;3use crate::p3::bindings::cli::types::ErrorCode;4use crate::p3::bindings::cli::{5environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,6terminal_stdin, terminal_stdout,7};8use crate::p3::cli::{TerminalInput, TerminalOutput};9use bytes::BytesMut;10use core::pin::Pin;11use core::task::{Context, Poll};12use std::io::{self, Cursor};13use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};14use tokio::sync::oneshot;15use wasmtime::component::{16Access, Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,17StreamReader, StreamResult,18};19use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};2021struct InputStreamProducer {22rx: Pin<Box<dyn AsyncRead + Send + Sync>>,23result_tx: Option<oneshot::Sender<ErrorCode>>,24}2526fn io_error_to_error_code(err: io::Error) -> ErrorCode {27match err.kind() {28io::ErrorKind::BrokenPipe => ErrorCode::Pipe,29other => {30tracing::warn!("stdio error: {other}");31ErrorCode::Io32}33}34}3536impl<D> StreamProducer<D> for InputStreamProducer {37type Item = u8;38type Buffer = Cursor<BytesMut>;3940fn poll_produce<'a>(41mut self: Pin<&mut Self>,42cx: &mut Context<'_>,43mut store: StoreContextMut<'a, D>,44dst: Destination<'a, Self::Item, Self::Buffer>,45finish: bool,46) -> Poll<wasmtime::Result<StreamResult>> {47// If the destination buffer is empty then this is a request on48// behalf of the guest to wait for this input stream to be readable.49// The `AsyncRead` trait abstraction does not provide the ability to50// await this event so we're forced to basically just lie here and51// say we're ready read data later.52//53// See WebAssembly/component-model#561 for some more information.54if dst.remaining(store.as_context_mut()) == Some(0) {55return Poll::Ready(Ok(StreamResult::Completed));56}5758let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);59let mut buf = ReadBuf::new(dst.remaining());60match self.rx.as_mut().poll_read(cx, &mut buf) {61Poll::Ready(Ok(())) if buf.filled().is_empty() => {62Poll::Ready(Ok(StreamResult::Dropped))63}64Poll::Ready(Ok(())) => {65let n = buf.filled().len();66dst.mark_written(n);67Poll::Ready(Ok(StreamResult::Completed))68}69Poll::Ready(Err(e)) => {70let _ = self71.result_tx72.take()73.unwrap()74.send(io_error_to_error_code(e));75Poll::Ready(Ok(StreamResult::Dropped))76}77Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),78Poll::Pending => Poll::Pending,79}80}81}8283struct OutputStreamConsumer {84tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,85result_tx: Option<oneshot::Sender<ErrorCode>>,86}8788impl<D> StreamConsumer<D> for OutputStreamConsumer {89type Item = u8;9091fn poll_consume(92mut self: Pin<&mut Self>,93cx: &mut Context<'_>,94store: StoreContextMut<D>,95src: Source<Self::Item>,96finish: bool,97) -> Poll<wasmtime::Result<StreamResult>> {98let mut src = src.as_direct(store);99let buf = src.remaining();100101// If the source buffer is empty then this is a request on behalf of102// the guest to wait for this output stream to be writable. The103// `AsyncWrite` trait abstraction does not provide the ability to await104// this event so we're forced to basically just lie here and say we're105// ready write data later.106//107// See WebAssembly/component-model#561 for some more information.108if buf.len() == 0 {109return Poll::Ready(Ok(StreamResult::Completed));110}111match self.tx.as_mut().poll_write(cx, buf) {112Poll::Ready(Ok(n)) => {113src.mark_read(n);114Poll::Ready(Ok(StreamResult::Completed))115}116Poll::Ready(Err(e)) => {117let _ = self118.result_tx119.take()120.unwrap()121.send(io_error_to_error_code(e));122Poll::Ready(Ok(StreamResult::Dropped))123}124Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),125Poll::Pending => Poll::Pending,126}127}128}129130impl terminal_input::Host for WasiCliCtxView<'_> {}131impl terminal_output::Host for WasiCliCtxView<'_> {}132133impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {134fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {135self.table136.delete(rep)137.context("failed to delete terminal input resource from table")?;138Ok(())139}140}141142impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {143fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {144self.table145.delete(rep)146.context("failed to delete terminal output resource from table")?;147Ok(())148}149}150151impl terminal_stdin::Host for WasiCliCtxView<'_> {152fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {153if self.ctx.stdin.is_terminal() {154let fd = self155.table156.push(TerminalInput)157.context("failed to push terminal stdin resource to table")?;158Ok(Some(fd))159} else {160Ok(None)161}162}163}164165impl terminal_stdout::Host for WasiCliCtxView<'_> {166fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {167if self.ctx.stdout.is_terminal() {168let fd = self169.table170.push(TerminalOutput)171.context("failed to push terminal stdout resource to table")?;172Ok(Some(fd))173} else {174Ok(None)175}176}177}178179impl terminal_stderr::Host for WasiCliCtxView<'_> {180fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {181if self.ctx.stderr.is_terminal() {182let fd = self183.table184.push(TerminalOutput)185.context("failed to push terminal stderr resource to table")?;186Ok(Some(fd))187} else {188Ok(None)189}190}191}192193impl stdin::HostWithStore for WasiCli {194fn read_via_stream<U>(195mut store: Access<U, Self>,196) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {197let rx = store.get().ctx.stdin.async_stream();198let (result_tx, result_rx) = oneshot::channel();199let stream = StreamReader::new(200&mut store,201InputStreamProducer {202rx: Box::into_pin(rx),203result_tx: Some(result_tx),204},205);206let future = FutureReader::new(&mut store, async {207wasmtime::error::Ok(match result_rx.await {208Ok(err) => Err(err),209Err(_) => Ok(()),210})211});212Ok((stream, future))213}214}215216impl stdin::Host for WasiCliCtxView<'_> {}217218impl stdout::HostWithStore for WasiCli {219async fn write_via_stream<U>(220store: &Accessor<U, Self>,221data: StreamReader<u8>,222) -> wasmtime::Result<Result<(), ErrorCode>> {223let (result_tx, result_rx) = oneshot::channel();224store.with(|mut store| {225let tx = store.get().ctx.stdout.async_stream();226data.pipe(227store,228OutputStreamConsumer {229tx: Box::into_pin(tx),230result_tx: Some(result_tx),231},232);233});234Ok(match result_rx.await {235Ok(err) => Err(err),236Err(_) => Ok(()),237})238}239}240241impl stdout::Host for WasiCliCtxView<'_> {}242243impl stderr::HostWithStore for WasiCli {244async fn write_via_stream<U>(245store: &Accessor<U, Self>,246data: StreamReader<u8>,247) -> wasmtime::Result<Result<(), ErrorCode>> {248let (result_tx, result_rx) = oneshot::channel();249store.with(|mut store| {250let tx = store.get().ctx.stderr.async_stream();251data.pipe(252store,253OutputStreamConsumer {254tx: Box::into_pin(tx),255result_tx: Some(result_tx),256},257);258});259Ok(match result_rx.await {260Ok(err) => Err(err),261Err(_) => Ok(()),262})263}264}265266impl stderr::Host for WasiCliCtxView<'_> {}267268impl environment::Host for WasiCliCtxView<'_> {269fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {270Ok(self.ctx.environment.clone())271}272273fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {274Ok(self.ctx.arguments.clone())275}276277fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {278Ok(self.ctx.initial_cwd.clone())279}280}281282impl exit::Host for WasiCliCtxView<'_> {283fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {284let status = match status {285Ok(()) => 0,286Err(()) => 1,287};288Err(format_err!(I32Exit(status)))289}290291fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {292Err(format_err!(I32Exit(status_code.into())))293}294}295296297