Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p3/cli/host.rs
1692 views
1
use crate::I32Exit;
2
use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3
use crate::p3::DEFAULT_BUFFER_CAPACITY;
4
use crate::p3::bindings::cli::{
5
environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
6
terminal_stdin, terminal_stdout,
7
};
8
use crate::p3::cli::{TerminalInput, TerminalOutput};
9
use anyhow::{Context as _, anyhow};
10
use bytes::BytesMut;
11
use core::pin::Pin;
12
use core::task::{Context, Poll};
13
use std::io::Cursor;
14
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15
use wasmtime::component::{
16
Accessor, Destination, Resource, Source, StreamConsumer, StreamProducer, StreamReader,
17
StreamResult,
18
};
19
use wasmtime::{AsContextMut as _, StoreContextMut};
20
21
struct InputStreamProducer {
22
rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
23
}
24
25
impl<D> StreamProducer<D> for InputStreamProducer {
26
type Item = u8;
27
type Buffer = Cursor<BytesMut>;
28
29
fn poll_produce<'a>(
30
mut self: Pin<&mut Self>,
31
cx: &mut Context<'_>,
32
mut store: StoreContextMut<'a, D>,
33
dst: Destination<'a, Self::Item, Self::Buffer>,
34
finish: bool,
35
) -> Poll<wasmtime::Result<StreamResult>> {
36
// If the destination buffer is empty then this is a request on
37
// behalf of the guest to wait for this input stream to be readable.
38
// The `AsyncRead` trait abstraction does not provide the ability to
39
// await this event so we're forced to basically just lie here and
40
// say we're ready read data later.
41
//
42
// See WebAssembly/component-model#561 for some more information.
43
if dst.remaining(store.as_context_mut()) == Some(0) {
44
return Poll::Ready(Ok(StreamResult::Completed));
45
}
46
47
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
48
let mut buf = ReadBuf::new(dst.remaining());
49
match self.rx.as_mut().poll_read(cx, &mut buf) {
50
Poll::Ready(Ok(())) if buf.filled().is_empty() => {
51
Poll::Ready(Ok(StreamResult::Dropped))
52
}
53
Poll::Ready(Ok(())) => {
54
let n = buf.filled().len();
55
dst.mark_written(n);
56
Poll::Ready(Ok(StreamResult::Completed))
57
}
58
Poll::Ready(Err(..)) => {
59
// TODO: Report the error to the guest
60
Poll::Ready(Ok(StreamResult::Dropped))
61
}
62
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
63
Poll::Pending => Poll::Pending,
64
}
65
}
66
}
67
68
struct OutputStreamConsumer {
69
tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
70
}
71
72
impl<D> StreamConsumer<D> for OutputStreamConsumer {
73
type Item = u8;
74
75
fn poll_consume(
76
mut self: Pin<&mut Self>,
77
cx: &mut Context<'_>,
78
store: StoreContextMut<D>,
79
src: Source<Self::Item>,
80
finish: bool,
81
) -> Poll<wasmtime::Result<StreamResult>> {
82
let mut src = src.as_direct(store);
83
let buf = src.remaining();
84
85
// If the source buffer is empty then this is a request on behalf of
86
// the guest to wait for this output stream to be writable. The
87
// `AsyncWrite` trait abstraction does not provide the ability to await
88
// this event so we're forced to basically just lie here and say we're
89
// ready write data later.
90
//
91
// See WebAssembly/component-model#561 for some more information.
92
if buf.len() == 0 {
93
return Poll::Ready(Ok(StreamResult::Completed));
94
}
95
match self.tx.as_mut().poll_write(cx, buf) {
96
Poll::Ready(Ok(n)) => {
97
src.mark_read(n);
98
Poll::Ready(Ok(StreamResult::Completed))
99
}
100
Poll::Ready(Err(e)) => {
101
// FIXME(WebAssembly/wasi-cli#81) should communicate this
102
// error to the guest somehow.
103
tracing::warn!("dropping stdin error: {e}");
104
Poll::Ready(Ok(StreamResult::Dropped))
105
}
106
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
107
Poll::Pending => Poll::Pending,
108
}
109
}
110
}
111
112
impl terminal_input::Host for WasiCliCtxView<'_> {}
113
impl terminal_output::Host for WasiCliCtxView<'_> {}
114
115
impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
116
fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
117
self.table
118
.delete(rep)
119
.context("failed to delete terminal input resource from table")?;
120
Ok(())
121
}
122
}
123
124
impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
125
fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
126
self.table
127
.delete(rep)
128
.context("failed to delete terminal output resource from table")?;
129
Ok(())
130
}
131
}
132
133
impl terminal_stdin::Host for WasiCliCtxView<'_> {
134
fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
135
if self.ctx.stdin.is_terminal() {
136
let fd = self
137
.table
138
.push(TerminalInput)
139
.context("failed to push terminal stdin resource to table")?;
140
Ok(Some(fd))
141
} else {
142
Ok(None)
143
}
144
}
145
}
146
147
impl terminal_stdout::Host for WasiCliCtxView<'_> {
148
fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
149
if self.ctx.stdout.is_terminal() {
150
let fd = self
151
.table
152
.push(TerminalOutput)
153
.context("failed to push terminal stdout resource to table")?;
154
Ok(Some(fd))
155
} else {
156
Ok(None)
157
}
158
}
159
}
160
161
impl terminal_stderr::Host for WasiCliCtxView<'_> {
162
fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
163
if self.ctx.stderr.is_terminal() {
164
let fd = self
165
.table
166
.push(TerminalOutput)
167
.context("failed to push terminal stderr resource to table")?;
168
Ok(Some(fd))
169
} else {
170
Ok(None)
171
}
172
}
173
}
174
175
impl stdin::HostWithStore for WasiCli {
176
async fn get_stdin<U>(store: &Accessor<U, Self>) -> wasmtime::Result<StreamReader<u8>> {
177
let instance = store.instance();
178
store.with(|mut store| {
179
let rx = store.get().ctx.stdin.async_stream();
180
Ok(StreamReader::new(
181
instance,
182
&mut store,
183
InputStreamProducer {
184
rx: Box::into_pin(rx),
185
},
186
))
187
})
188
}
189
}
190
191
impl stdin::Host for WasiCliCtxView<'_> {}
192
193
impl stdout::HostWithStore for WasiCli {
194
async fn set_stdout<U>(
195
store: &Accessor<U, Self>,
196
data: StreamReader<u8>,
197
) -> wasmtime::Result<()> {
198
store.with(|mut store| {
199
let tx = store.get().ctx.stdout.async_stream();
200
data.pipe(
201
store,
202
OutputStreamConsumer {
203
tx: Box::into_pin(tx),
204
},
205
);
206
Ok(())
207
})
208
}
209
}
210
211
impl stdout::Host for WasiCliCtxView<'_> {}
212
213
impl stderr::HostWithStore for WasiCli {
214
async fn set_stderr<U>(
215
store: &Accessor<U, Self>,
216
data: StreamReader<u8>,
217
) -> wasmtime::Result<()> {
218
store.with(|mut store| {
219
let tx = store.get().ctx.stderr.async_stream();
220
data.pipe(
221
store,
222
OutputStreamConsumer {
223
tx: Box::into_pin(tx),
224
},
225
);
226
Ok(())
227
})
228
}
229
}
230
231
impl stderr::Host for WasiCliCtxView<'_> {}
232
233
impl environment::Host for WasiCliCtxView<'_> {
234
fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
235
Ok(self.ctx.environment.clone())
236
}
237
238
fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
239
Ok(self.ctx.arguments.clone())
240
}
241
242
fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
243
Ok(self.ctx.initial_cwd.clone())
244
}
245
}
246
247
impl exit::Host for WasiCliCtxView<'_> {
248
fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
249
let status = match status {
250
Ok(()) => 0,
251
Err(()) => 1,
252
};
253
Err(anyhow!(I32Exit(status)))
254
}
255
256
fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
257
Err(anyhow!(I32Exit(status_code.into())))
258
}
259
}
260
261