Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p3/cli/host.rs
3073 views
1
use crate::I32Exit;
2
use crate::cli::{IsTerminal, WasiCli, WasiCliCtxView};
3
use crate::p3::DEFAULT_BUFFER_CAPACITY;
4
use crate::p3::bindings::cli::types::ErrorCode;
5
use crate::p3::bindings::cli::{
6
environment, exit, stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr,
7
terminal_stdin, terminal_stdout,
8
};
9
use crate::p3::cli::{TerminalInput, TerminalOutput};
10
use bytes::BytesMut;
11
use core::pin::Pin;
12
use core::task::{Context, Poll};
13
use std::io::{self, Cursor};
14
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15
use tokio::sync::oneshot;
16
use wasmtime::component::{
17
Access, Accessor, Destination, FutureReader, Resource, Source, StreamConsumer, StreamProducer,
18
StreamReader, StreamResult,
19
};
20
use wasmtime::{AsContextMut as _, StoreContextMut, error::Context as _, format_err};
21
22
struct InputStreamProducer {
23
rx: Pin<Box<dyn AsyncRead + Send + Sync>>,
24
result_tx: Option<oneshot::Sender<ErrorCode>>,
25
}
26
27
fn io_error_to_error_code(err: io::Error) -> ErrorCode {
28
match err.kind() {
29
io::ErrorKind::BrokenPipe => ErrorCode::Pipe,
30
other => {
31
tracing::warn!("stdio error: {other}");
32
ErrorCode::Io
33
}
34
}
35
}
36
37
impl<D> StreamProducer<D> for InputStreamProducer {
38
type Item = u8;
39
type Buffer = Cursor<BytesMut>;
40
41
fn poll_produce<'a>(
42
mut self: Pin<&mut Self>,
43
cx: &mut Context<'_>,
44
mut store: StoreContextMut<'a, D>,
45
dst: Destination<'a, Self::Item, Self::Buffer>,
46
finish: bool,
47
) -> Poll<wasmtime::Result<StreamResult>> {
48
// If the destination buffer is empty then this is a request on
49
// behalf of the guest to wait for this input stream to be readable.
50
// The `AsyncRead` trait abstraction does not provide the ability to
51
// await this event so we're forced to basically just lie here and
52
// say we're ready read data later.
53
//
54
// See WebAssembly/component-model#561 for some more information.
55
if dst.remaining(store.as_context_mut()) == Some(0) {
56
return Poll::Ready(Ok(StreamResult::Completed));
57
}
58
59
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
60
let mut buf = ReadBuf::new(dst.remaining());
61
match self.rx.as_mut().poll_read(cx, &mut buf) {
62
Poll::Ready(Ok(())) if buf.filled().is_empty() => {
63
Poll::Ready(Ok(StreamResult::Dropped))
64
}
65
Poll::Ready(Ok(())) => {
66
let n = buf.filled().len();
67
dst.mark_written(n);
68
Poll::Ready(Ok(StreamResult::Completed))
69
}
70
Poll::Ready(Err(e)) => {
71
let _ = self
72
.result_tx
73
.take()
74
.unwrap()
75
.send(io_error_to_error_code(e));
76
Poll::Ready(Ok(StreamResult::Dropped))
77
}
78
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
79
Poll::Pending => Poll::Pending,
80
}
81
}
82
}
83
84
struct OutputStreamConsumer {
85
tx: Pin<Box<dyn AsyncWrite + Send + Sync>>,
86
result_tx: Option<oneshot::Sender<ErrorCode>>,
87
}
88
89
impl<D> StreamConsumer<D> for OutputStreamConsumer {
90
type Item = u8;
91
92
fn poll_consume(
93
mut self: Pin<&mut Self>,
94
cx: &mut Context<'_>,
95
store: StoreContextMut<D>,
96
src: Source<Self::Item>,
97
finish: bool,
98
) -> Poll<wasmtime::Result<StreamResult>> {
99
let mut src = src.as_direct(store);
100
let buf = src.remaining();
101
102
// If the source buffer is empty then this is a request on behalf of
103
// the guest to wait for this output stream to be writable. The
104
// `AsyncWrite` trait abstraction does not provide the ability to await
105
// this event so we're forced to basically just lie here and say we're
106
// ready write data later.
107
//
108
// See WebAssembly/component-model#561 for some more information.
109
if buf.len() == 0 {
110
return Poll::Ready(Ok(StreamResult::Completed));
111
}
112
match self.tx.as_mut().poll_write(cx, buf) {
113
Poll::Ready(Ok(n)) => {
114
src.mark_read(n);
115
Poll::Ready(Ok(StreamResult::Completed))
116
}
117
Poll::Ready(Err(e)) => {
118
let _ = self
119
.result_tx
120
.take()
121
.unwrap()
122
.send(io_error_to_error_code(e));
123
Poll::Ready(Ok(StreamResult::Dropped))
124
}
125
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
126
Poll::Pending => Poll::Pending,
127
}
128
}
129
}
130
131
impl terminal_input::Host for WasiCliCtxView<'_> {}
132
impl terminal_output::Host for WasiCliCtxView<'_> {}
133
134
impl terminal_input::HostTerminalInput for WasiCliCtxView<'_> {
135
fn drop(&mut self, rep: Resource<TerminalInput>) -> wasmtime::Result<()> {
136
self.table
137
.delete(rep)
138
.context("failed to delete terminal input resource from table")?;
139
Ok(())
140
}
141
}
142
143
impl terminal_output::HostTerminalOutput for WasiCliCtxView<'_> {
144
fn drop(&mut self, rep: Resource<TerminalOutput>) -> wasmtime::Result<()> {
145
self.table
146
.delete(rep)
147
.context("failed to delete terminal output resource from table")?;
148
Ok(())
149
}
150
}
151
152
impl terminal_stdin::Host for WasiCliCtxView<'_> {
153
fn get_terminal_stdin(&mut self) -> wasmtime::Result<Option<Resource<TerminalInput>>> {
154
if self.ctx.stdin.is_terminal() {
155
let fd = self
156
.table
157
.push(TerminalInput)
158
.context("failed to push terminal stdin resource to table")?;
159
Ok(Some(fd))
160
} else {
161
Ok(None)
162
}
163
}
164
}
165
166
impl terminal_stdout::Host for WasiCliCtxView<'_> {
167
fn get_terminal_stdout(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
168
if self.ctx.stdout.is_terminal() {
169
let fd = self
170
.table
171
.push(TerminalOutput)
172
.context("failed to push terminal stdout resource to table")?;
173
Ok(Some(fd))
174
} else {
175
Ok(None)
176
}
177
}
178
}
179
180
impl terminal_stderr::Host for WasiCliCtxView<'_> {
181
fn get_terminal_stderr(&mut self) -> wasmtime::Result<Option<Resource<TerminalOutput>>> {
182
if self.ctx.stderr.is_terminal() {
183
let fd = self
184
.table
185
.push(TerminalOutput)
186
.context("failed to push terminal stderr resource to table")?;
187
Ok(Some(fd))
188
} else {
189
Ok(None)
190
}
191
}
192
}
193
194
impl stdin::HostWithStore for WasiCli {
195
fn read_via_stream<U>(
196
mut store: Access<U, Self>,
197
) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
198
let rx = store.get().ctx.stdin.async_stream();
199
let (result_tx, result_rx) = oneshot::channel();
200
let stream = StreamReader::new(
201
&mut store,
202
InputStreamProducer {
203
rx: Box::into_pin(rx),
204
result_tx: Some(result_tx),
205
},
206
);
207
let future = FutureReader::new(&mut store, async {
208
wasmtime::error::Ok(match result_rx.await {
209
Ok(err) => Err(err),
210
Err(_) => Ok(()),
211
})
212
});
213
Ok((stream, future))
214
}
215
}
216
217
impl stdin::Host for WasiCliCtxView<'_> {}
218
219
impl stdout::HostWithStore for WasiCli {
220
async fn write_via_stream<U>(
221
store: &Accessor<U, Self>,
222
data: StreamReader<u8>,
223
) -> wasmtime::Result<Result<(), ErrorCode>> {
224
let (result_tx, result_rx) = oneshot::channel();
225
store.with(|mut store| {
226
let tx = store.get().ctx.stdout.async_stream();
227
data.pipe(
228
store,
229
OutputStreamConsumer {
230
tx: Box::into_pin(tx),
231
result_tx: Some(result_tx),
232
},
233
);
234
});
235
Ok(match result_rx.await {
236
Ok(err) => Err(err),
237
Err(_) => Ok(()),
238
})
239
}
240
}
241
242
impl stdout::Host for WasiCliCtxView<'_> {}
243
244
impl stderr::HostWithStore for WasiCli {
245
async fn write_via_stream<U>(
246
store: &Accessor<U, Self>,
247
data: StreamReader<u8>,
248
) -> wasmtime::Result<Result<(), ErrorCode>> {
249
let (result_tx, result_rx) = oneshot::channel();
250
store.with(|mut store| {
251
let tx = store.get().ctx.stderr.async_stream();
252
data.pipe(
253
store,
254
OutputStreamConsumer {
255
tx: Box::into_pin(tx),
256
result_tx: Some(result_tx),
257
},
258
);
259
});
260
Ok(match result_rx.await {
261
Ok(err) => Err(err),
262
Err(_) => Ok(()),
263
})
264
}
265
}
266
267
impl stderr::Host for WasiCliCtxView<'_> {}
268
269
impl environment::Host for WasiCliCtxView<'_> {
270
fn get_environment(&mut self) -> wasmtime::Result<Vec<(String, String)>> {
271
Ok(self.ctx.environment.clone())
272
}
273
274
fn get_arguments(&mut self) -> wasmtime::Result<Vec<String>> {
275
Ok(self.ctx.arguments.clone())
276
}
277
278
fn get_initial_cwd(&mut self) -> wasmtime::Result<Option<String>> {
279
Ok(self.ctx.initial_cwd.clone())
280
}
281
}
282
283
impl exit::Host for WasiCliCtxView<'_> {
284
fn exit(&mut self, status: Result<(), ()>) -> wasmtime::Result<()> {
285
let status = match status {
286
Ok(()) => 0,
287
Err(()) => 1,
288
};
289
Err(format_err!(I32Exit(status)))
290
}
291
292
fn exit_with_code(&mut self, status_code: u8) -> wasmtime::Result<()> {
293
Err(format_err!(I32Exit(status_code.into())))
294
}
295
}
296
297