Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/cli.rs
1691 views
1
use crate::p2;
2
use std::pin::Pin;
3
use std::sync::Arc;
4
use tokio::io::{AsyncRead, AsyncWrite, empty};
5
use wasmtime::component::{HasData, ResourceTable};
6
use wasmtime_wasi_io::streams::{InputStream, OutputStream};
7
8
mod empty;
9
mod file;
10
mod locked_async;
11
mod mem;
12
mod stdout;
13
mod worker_thread_stdin;
14
15
pub use self::file::{InputFile, OutputFile};
16
pub use self::locked_async::{AsyncStdinStream, AsyncStdoutStream};
17
18
// Convenience reexport for stdio types so tokio doesn't have to be imported
19
// itself.
20
#[doc(no_inline)]
21
pub use tokio::io::{Stderr, Stdin, Stdout, stderr, stdin, stdout};
22
23
/// A helper struct which implements [`HasData`] for the `wasi:cli` APIs.
24
///
25
/// This can be useful when directly calling `add_to_linker` functions directly,
26
/// such as [`wasmtime_wasi::p2::bindings::cli::environment::add_to_linker`] as
27
/// the `D` type parameter. See [`HasData`] for more information about the type
28
/// parameter's purpose.
29
///
30
/// When using this type you can skip the [`WasiCliView`] trait, for
31
/// example.
32
///
33
/// # Examples
34
///
35
/// ```
36
/// use wasmtime::component::{Linker, ResourceTable};
37
/// use wasmtime::{Engine, Result, Config};
38
/// use wasmtime_wasi::cli::*;
39
///
40
/// struct MyStoreState {
41
/// table: ResourceTable,
42
/// cli: WasiCliCtx,
43
/// }
44
///
45
/// fn main() -> Result<()> {
46
/// let mut config = Config::new();
47
/// config.async_support(true);
48
/// let engine = Engine::new(&config)?;
49
/// let mut linker = Linker::new(&engine);
50
///
51
/// wasmtime_wasi::p2::bindings::cli::environment::add_to_linker::<MyStoreState, WasiCli>(
52
/// &mut linker,
53
/// |state| WasiCliCtxView {
54
/// table: &mut state.table,
55
/// ctx: &mut state.cli,
56
/// },
57
/// )?;
58
/// Ok(())
59
/// }
60
/// ```
61
pub struct WasiCli;
62
63
impl HasData for WasiCli {
64
type Data<'a> = WasiCliCtxView<'a>;
65
}
66
67
/// Provides a "view" of `wasi:cli`-related context used to implement host
68
/// traits.
69
pub trait WasiCliView: Send {
70
fn cli(&mut self) -> WasiCliCtxView<'_>;
71
}
72
73
pub struct WasiCliCtxView<'a> {
74
pub ctx: &'a mut WasiCliCtx,
75
pub table: &'a mut ResourceTable,
76
}
77
78
pub struct WasiCliCtx {
79
pub(crate) environment: Vec<(String, String)>,
80
pub(crate) arguments: Vec<String>,
81
pub(crate) initial_cwd: Option<String>,
82
pub(crate) stdin: Box<dyn StdinStream>,
83
pub(crate) stdout: Box<dyn StdoutStream>,
84
pub(crate) stderr: Box<dyn StdoutStream>,
85
}
86
87
impl Default for WasiCliCtx {
88
fn default() -> WasiCliCtx {
89
WasiCliCtx {
90
environment: Vec::new(),
91
arguments: Vec::new(),
92
initial_cwd: None,
93
stdin: Box::new(empty()),
94
stdout: Box::new(empty()),
95
stderr: Box::new(empty()),
96
}
97
}
98
}
99
100
pub trait IsTerminal {
101
/// Returns whether this stream is backed by a TTY.
102
fn is_terminal(&self) -> bool;
103
}
104
105
/// A trait used to represent the standard input to a guest program.
106
///
107
/// Note that there are many built-in implementations of this trait for various
108
/// types such as [`tokio::io::Stdin`], [`tokio::io::Empty`], and
109
/// [`p2::pipe::MemoryInputPipe`].
110
pub trait StdinStream: IsTerminal + Send {
111
/// Creates a fresh stream which is reading stdin.
112
///
113
/// Note that the returned stream must share state with all other streams
114
/// previously created. Guests may create multiple handles to the same stdin
115
/// and they should all be synchronized in their progress through the
116
/// program's input.
117
///
118
/// Note that this means that if one handle becomes ready for reading they
119
/// all become ready for reading. Subsequently if one is read from it may
120
/// mean that all the others are no longer ready for reading. This is
121
/// basically a consequence of the way the WIT APIs are designed today.
122
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync>;
123
124
/// Same as [`Self::async_stream`] except that a WASIp2 [`InputStream`] is
125
/// returned.
126
///
127
/// Note that this has a default implementation which uses
128
/// [`p2::pipe::AsyncReadStream`] as an adapter, but this can be overridden
129
/// if there's a more specialized implementation available.
130
fn p2_stream(&self) -> Box<dyn InputStream> {
131
Box::new(p2::pipe::AsyncReadStream::new(Pin::from(
132
self.async_stream(),
133
)))
134
}
135
}
136
137
/// Similar to [`StdinStream`], except for output.
138
///
139
/// This is used both for a guest stdin and a guest stdout.
140
///
141
/// Note that there are many built-in implementations of this trait for various
142
/// types such as [`tokio::io::Stdout`], [`tokio::io::Empty`], and
143
/// [`p2::pipe::MemoryOutputPipe`].
144
pub trait StdoutStream: IsTerminal + Send {
145
/// Returns a fresh new stream which can write to this output stream.
146
///
147
/// Note that all output streams should output to the same logical source.
148
/// This means that it's possible for each independent stream to acquire a
149
/// separate "permit" to write and then act on that permit. Note that
150
/// additionally at this time once a permit is "acquired" there's no way to
151
/// release it, for example you can wait for readiness and then never
152
/// actually write in WASI. This means that acquisition of a permit for one
153
/// stream cannot discount the size of a permit another stream could
154
/// obtain.
155
///
156
/// Implementations must be able to handle this
157
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync>;
158
159
/// Same as [`Self::async_stream`] except that a WASIp2 [`OutputStream`] is
160
/// returned.
161
///
162
/// Note that this has a default implementation which uses
163
/// [`p2::pipe::AsyncWriteStream`] as an adapter, but this can be overridden
164
/// if there's a more specialized implementation available.
165
fn p2_stream(&self) -> Box<dyn OutputStream> {
166
Box::new(p2::pipe::AsyncWriteStream::new(
167
8192, // FIXME: extract this to a constant.
168
Pin::from(self.async_stream()),
169
))
170
}
171
}
172
173
// Forward `&T => T`
174
impl<T: ?Sized + IsTerminal> IsTerminal for &T {
175
fn is_terminal(&self) -> bool {
176
T::is_terminal(self)
177
}
178
}
179
impl<T: ?Sized + StdinStream + Sync> StdinStream for &T {
180
fn p2_stream(&self) -> Box<dyn InputStream> {
181
T::p2_stream(self)
182
}
183
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
184
T::async_stream(self)
185
}
186
}
187
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &T {
188
fn p2_stream(&self) -> Box<dyn OutputStream> {
189
T::p2_stream(self)
190
}
191
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
192
T::async_stream(self)
193
}
194
}
195
196
// Forward `&mut T => T`
197
impl<T: ?Sized + IsTerminal> IsTerminal for &mut T {
198
fn is_terminal(&self) -> bool {
199
T::is_terminal(self)
200
}
201
}
202
impl<T: ?Sized + StdinStream + Sync> StdinStream for &mut T {
203
fn p2_stream(&self) -> Box<dyn InputStream> {
204
T::p2_stream(self)
205
}
206
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
207
T::async_stream(self)
208
}
209
}
210
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for &mut T {
211
fn p2_stream(&self) -> Box<dyn OutputStream> {
212
T::p2_stream(self)
213
}
214
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
215
T::async_stream(self)
216
}
217
}
218
219
// Forward `Box<T> => T`
220
impl<T: ?Sized + IsTerminal> IsTerminal for Box<T> {
221
fn is_terminal(&self) -> bool {
222
T::is_terminal(self)
223
}
224
}
225
impl<T: ?Sized + StdinStream + Sync> StdinStream for Box<T> {
226
fn p2_stream(&self) -> Box<dyn InputStream> {
227
T::p2_stream(self)
228
}
229
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
230
T::async_stream(self)
231
}
232
}
233
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Box<T> {
234
fn p2_stream(&self) -> Box<dyn OutputStream> {
235
T::p2_stream(self)
236
}
237
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
238
T::async_stream(self)
239
}
240
}
241
242
// Forward `Arc<T> => T`
243
impl<T: ?Sized + IsTerminal> IsTerminal for Arc<T> {
244
fn is_terminal(&self) -> bool {
245
T::is_terminal(self)
246
}
247
}
248
impl<T: ?Sized + StdinStream + Sync> StdinStream for Arc<T> {
249
fn p2_stream(&self) -> Box<dyn InputStream> {
250
T::p2_stream(self)
251
}
252
fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {
253
T::async_stream(self)
254
}
255
}
256
impl<T: ?Sized + StdoutStream + Sync> StdoutStream for Arc<T> {
257
fn p2_stream(&self) -> Box<dyn OutputStream> {
258
T::p2_stream(self)
259
}
260
fn async_stream(&self) -> Box<dyn AsyncWrite + Send + Sync> {
261
T::async_stream(self)
262
}
263
}
264
265
#[cfg(test)]
266
mod test {
267
use crate::cli::{AsyncStdoutStream, StdinStream, StdoutStream};
268
use crate::p2::{self, OutputStream};
269
use anyhow::Result;
270
use bytes::Bytes;
271
use tokio::io::AsyncReadExt;
272
273
#[test]
274
fn memory_stdin_stream() {
275
// A StdinStream has the property that there are multiple
276
// InputStreams created, using the stream() method which are each
277
// views on the same shared state underneath. Consuming input on one
278
// stream results in consuming that input on all streams.
279
//
280
// The simplest way to measure this is to check if the MemoryInputPipe
281
// impl of StdinStream follows this property.
282
283
let pipe =
284
p2::pipe::MemoryInputPipe::new("the quick brown fox jumped over the three lazy dogs");
285
286
let mut view1 = pipe.p2_stream();
287
let mut view2 = pipe.p2_stream();
288
289
let read1 = view1.read(10).expect("read first 10 bytes");
290
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
291
let read2 = view2.read(10).expect("read second 10 bytes");
292
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
293
let read3 = view1.read(10).expect("read third 10 bytes");
294
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
295
let read4 = view2.read(10).expect("read fourth 10 bytes");
296
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
297
}
298
299
#[tokio::test]
300
async fn async_stdin_stream() {
301
// A StdinStream has the property that there are multiple
302
// InputStreams created, using the stream() method which are each
303
// views on the same shared state underneath. Consuming input on one
304
// stream results in consuming that input on all streams.
305
//
306
// AsyncStdinStream is a slightly more complex impl of StdinStream
307
// than the MemoryInputPipe above. We can create an AsyncReadStream
308
// from a file on the disk, and an AsyncStdinStream from that common
309
// stream, then check that the same property holds as above.
310
311
let dir = tempfile::tempdir().unwrap();
312
let mut path = std::path::PathBuf::from(dir.path());
313
path.push("file");
314
std::fs::write(&path, "the quick brown fox jumped over the three lazy dogs").unwrap();
315
316
let file = tokio::fs::File::open(&path)
317
.await
318
.expect("open created file");
319
let stdin_stream = super::AsyncStdinStream::new(file);
320
321
use super::StdinStream;
322
323
let mut view1 = stdin_stream.p2_stream();
324
let mut view2 = stdin_stream.p2_stream();
325
326
view1.ready().await;
327
328
let read1 = view1.read(10).expect("read first 10 bytes");
329
assert_eq!(read1, "the quick ".as_bytes(), "first 10 bytes");
330
let read2 = view2.read(10).expect("read second 10 bytes");
331
assert_eq!(read2, "brown fox ".as_bytes(), "second 10 bytes");
332
let read3 = view1.read(10).expect("read third 10 bytes");
333
assert_eq!(read3, "jumped ove".as_bytes(), "third 10 bytes");
334
let read4 = view2.read(10).expect("read fourth 10 bytes");
335
assert_eq!(read4, "r the thre".as_bytes(), "fourth 10 bytes");
336
}
337
338
#[tokio::test]
339
async fn async_stdout_stream_unblocks() {
340
let (mut read, write) = tokio::io::duplex(32);
341
let stdout = AsyncStdoutStream::new(32, write);
342
343
let task = tokio::task::spawn(async move {
344
let mut stream = stdout.p2_stream();
345
blocking_write_and_flush(&mut *stream, "x".into())
346
.await
347
.unwrap();
348
});
349
350
let mut buf = [0; 100];
351
let n = read.read(&mut buf).await.unwrap();
352
assert_eq!(&buf[..n], b"x");
353
354
task.await.unwrap();
355
}
356
357
async fn blocking_write_and_flush(s: &mut dyn OutputStream, mut bytes: Bytes) -> Result<()> {
358
while !bytes.is_empty() {
359
let permit = s.write_ready().await?;
360
let len = bytes.len().min(permit);
361
let chunk = bytes.split_to(len);
362
s.write(chunk)?;
363
}
364
365
s.flush()?;
366
s.write_ready().await?;
367
Ok(())
368
}
369
}
370
371