Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p2/filesystem.rs
1692 views
1
use crate::TrappableError;
2
use crate::filesystem::File;
3
use crate::p2::bindings::filesystem::types;
4
use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};
5
use crate::runtime::AbortOnDropJoinHandle;
6
use anyhow::anyhow;
7
use bytes::{Bytes, BytesMut};
8
use std::io;
9
use std::mem;
10
11
pub type FsResult<T> = Result<T, FsError>;
12
13
pub type FsError = TrappableError<types::ErrorCode>;
14
15
impl From<crate::filesystem::ErrorCode> for types::ErrorCode {
16
fn from(error: crate::filesystem::ErrorCode) -> Self {
17
match error {
18
crate::filesystem::ErrorCode::Access => Self::Access,
19
crate::filesystem::ErrorCode::Already => Self::Already,
20
crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,
21
crate::filesystem::ErrorCode::Busy => Self::Busy,
22
crate::filesystem::ErrorCode::Exist => Self::Exist,
23
crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,
24
crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,
25
crate::filesystem::ErrorCode::InProgress => Self::InProgress,
26
crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,
27
crate::filesystem::ErrorCode::Invalid => Self::Invalid,
28
crate::filesystem::ErrorCode::Io => Self::Io,
29
crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,
30
crate::filesystem::ErrorCode::Loop => Self::Loop,
31
crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,
32
crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,
33
crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,
34
crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,
35
crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,
36
crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,
37
crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,
38
crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,
39
crate::filesystem::ErrorCode::Overflow => Self::Overflow,
40
crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,
41
crate::filesystem::ErrorCode::Pipe => Self::Pipe,
42
crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,
43
}
44
}
45
}
46
47
impl From<crate::filesystem::ErrorCode> for FsError {
48
fn from(error: crate::filesystem::ErrorCode) -> Self {
49
types::ErrorCode::from(error).into()
50
}
51
}
52
53
impl From<wasmtime::component::ResourceTableError> for FsError {
54
fn from(error: wasmtime::component::ResourceTableError) -> Self {
55
Self::trap(error)
56
}
57
}
58
59
impl From<io::Error> for FsError {
60
fn from(error: io::Error) -> Self {
61
types::ErrorCode::from(error).into()
62
}
63
}
64
65
pub struct FileInputStream {
66
file: File,
67
position: u64,
68
state: ReadState,
69
}
70
enum ReadState {
71
Idle,
72
Waiting(AbortOnDropJoinHandle<ReadState>),
73
DataAvailable(Bytes),
74
Error(io::Error),
75
Closed,
76
}
77
impl FileInputStream {
78
pub fn new(file: &File, position: u64) -> Self {
79
Self {
80
file: file.clone(),
81
position,
82
state: ReadState::Idle,
83
}
84
}
85
86
fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {
87
use system_interface::fs::FileIoExt;
88
89
let mut buf = BytesMut::zeroed(size);
90
loop {
91
match file.read_at(&mut buf, offset) {
92
Ok(0) => return ReadState::Closed,
93
Ok(n) => {
94
buf.truncate(n);
95
return ReadState::DataAvailable(buf.freeze());
96
}
97
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
98
// Try again, continue looping
99
}
100
Err(e) => return ReadState::Error(e),
101
}
102
}
103
}
104
105
/// Wait for existing background task to finish, without starting any new background reads.
106
async fn wait_ready(&mut self) {
107
match &mut self.state {
108
ReadState::Waiting(task) => {
109
self.state = task.await;
110
}
111
_ => {}
112
}
113
}
114
}
115
#[async_trait::async_trait]
116
impl InputStream for FileInputStream {
117
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
118
match &mut self.state {
119
ReadState::Idle => {
120
if size == 0 {
121
return Ok(Bytes::new());
122
}
123
124
let p = self.position;
125
self.state = ReadState::Waiting(
126
self.file
127
.spawn_blocking(move |f| Self::blocking_read(f, p, size)),
128
);
129
Ok(Bytes::new())
130
}
131
ReadState::DataAvailable(b) => {
132
let min_len = b.len().min(size);
133
let chunk = b.split_to(min_len);
134
if b.len() == 0 {
135
self.state = ReadState::Idle;
136
}
137
self.position += min_len as u64;
138
Ok(chunk)
139
}
140
ReadState::Waiting(_) => Ok(Bytes::new()),
141
ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {
142
ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
143
_ => unreachable!(),
144
},
145
ReadState::Closed => Err(StreamError::Closed),
146
}
147
}
148
/// Specialized blocking_* variant to bypass tokio's task spawning & joining
149
/// overhead on synchronous file I/O.
150
async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {
151
self.wait_ready().await;
152
153
// Before we defer to the regular `read`, make sure it has data ready to go:
154
if let ReadState::Idle = self.state {
155
let p = self.position;
156
self.state = self
157
.file
158
.run_blocking(move |f| Self::blocking_read(f, p, size))
159
.await;
160
}
161
162
self.read(size)
163
}
164
async fn cancel(&mut self) {
165
match mem::replace(&mut self.state, ReadState::Closed) {
166
ReadState::Waiting(task) => {
167
// The task was created using `spawn_blocking`, so unless we're
168
// lucky enough that the task hasn't started yet, the abort
169
// signal won't have any effect and we're forced to wait for it
170
// to run to completion.
171
// From the guest's point of view, `input-stream::drop` then
172
// appears to block. Certainly less than ideal, but arguably still
173
// better than letting the guest rack up an unbounded number of
174
// background tasks. Also, the guest is only blocked if
175
// the stream was dropped mid-read, which we don't expect to
176
// occur frequently.
177
task.cancel().await;
178
}
179
_ => {}
180
}
181
}
182
}
183
#[async_trait::async_trait]
184
impl Pollable for FileInputStream {
185
async fn ready(&mut self) {
186
if let ReadState::Idle = self.state {
187
// The guest hasn't initiated any read, but is nonetheless waiting
188
// for data to be available. We'll start a read for them:
189
190
const DEFAULT_READ_SIZE: usize = 4096;
191
let p = self.position;
192
self.state = ReadState::Waiting(
193
self.file
194
.spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),
195
);
196
}
197
198
self.wait_ready().await
199
}
200
}
201
202
#[derive(Clone, Copy)]
203
pub(crate) enum FileOutputMode {
204
Position(u64),
205
Append,
206
}
207
208
pub(crate) struct FileOutputStream {
209
file: File,
210
mode: FileOutputMode,
211
state: OutputState,
212
}
213
214
enum OutputState {
215
Ready,
216
/// Allows join future to be awaited in a cancellable manner. Gone variant indicates
217
/// no task is currently outstanding.
218
Waiting(AbortOnDropJoinHandle<io::Result<usize>>),
219
/// The last I/O operation failed with this error.
220
Error(io::Error),
221
Closed,
222
}
223
224
impl FileOutputStream {
225
pub fn write_at(file: &File, position: u64) -> Self {
226
Self {
227
file: file.clone(),
228
mode: FileOutputMode::Position(position),
229
state: OutputState::Ready,
230
}
231
}
232
233
pub fn append(file: &File) -> Self {
234
Self {
235
file: file.clone(),
236
mode: FileOutputMode::Append,
237
state: OutputState::Ready,
238
}
239
}
240
241
fn blocking_write(
242
file: &cap_std::fs::File,
243
mut buf: Bytes,
244
mode: FileOutputMode,
245
) -> io::Result<usize> {
246
use system_interface::fs::FileIoExt;
247
248
match mode {
249
FileOutputMode::Position(mut p) => {
250
let mut total = 0;
251
loop {
252
let nwritten = file.write_at(buf.as_ref(), p)?;
253
// afterwards buf contains [nwritten, len):
254
let _ = buf.split_to(nwritten);
255
p += nwritten as u64;
256
total += nwritten;
257
if buf.is_empty() {
258
break;
259
}
260
}
261
Ok(total)
262
}
263
FileOutputMode::Append => {
264
let mut total = 0;
265
loop {
266
let nwritten = file.append(buf.as_ref())?;
267
let _ = buf.split_to(nwritten);
268
total += nwritten;
269
if buf.is_empty() {
270
break;
271
}
272
}
273
Ok(total)
274
}
275
}
276
}
277
}
278
279
// FIXME: configurable? determine from how much space left in file?
280
const FILE_WRITE_CAPACITY: usize = 1024 * 1024;
281
282
#[async_trait::async_trait]
283
impl OutputStream for FileOutputStream {
284
fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {
285
match self.state {
286
OutputState::Ready => {}
287
OutputState::Closed => return Err(StreamError::Closed),
288
OutputState::Waiting(_) | OutputState::Error(_) => {
289
// a write is pending - this call was not permitted
290
return Err(StreamError::Trap(anyhow!(
291
"write not permitted: check_write not called first"
292
)));
293
}
294
}
295
296
let m = self.mode;
297
self.state = OutputState::Waiting(
298
self.file
299
.spawn_blocking(move |f| Self::blocking_write(f, buf, m)),
300
);
301
Ok(())
302
}
303
/// Specialized blocking_* variant to bypass tokio's task spawning & joining
304
/// overhead on synchronous file I/O.
305
async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {
306
self.ready().await;
307
308
match self.state {
309
OutputState::Ready => {}
310
OutputState::Closed => return Err(StreamError::Closed),
311
OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
312
OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),
313
_ => unreachable!(),
314
},
315
OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),
316
}
317
318
let m = self.mode;
319
match self
320
.file
321
.run_blocking(move |f| Self::blocking_write(f, buf, m))
322
.await
323
{
324
Ok(nwritten) => {
325
if let FileOutputMode::Position(p) = &mut self.mode {
326
*p += nwritten as u64;
327
}
328
self.state = OutputState::Ready;
329
Ok(())
330
}
331
Err(e) => {
332
self.state = OutputState::Closed;
333
Err(StreamError::LastOperationFailed(e.into()))
334
}
335
}
336
}
337
fn flush(&mut self) -> Result<(), StreamError> {
338
match self.state {
339
// Only userland buffering of file writes is in the blocking task,
340
// so there's nothing extra that needs to be done to request a
341
// flush.
342
OutputState::Ready | OutputState::Waiting(_) => Ok(()),
343
OutputState::Closed => Err(StreamError::Closed),
344
OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
345
OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
346
_ => unreachable!(),
347
},
348
}
349
}
350
fn check_write(&mut self) -> Result<usize, StreamError> {
351
match self.state {
352
OutputState::Ready => Ok(FILE_WRITE_CAPACITY),
353
OutputState::Closed => Err(StreamError::Closed),
354
OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {
355
OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),
356
_ => unreachable!(),
357
},
358
OutputState::Waiting(_) => Ok(0),
359
}
360
}
361
async fn cancel(&mut self) {
362
match mem::replace(&mut self.state, OutputState::Closed) {
363
OutputState::Waiting(task) => {
364
// The task was created using `spawn_blocking`, so unless we're
365
// lucky enough that the task hasn't started yet, the abort
366
// signal won't have any effect and we're forced to wait for it
367
// to run to completion.
368
// From the guest's point of view, `output-stream::drop` then
369
// appears to block. Certainly less than ideal, but arguably still
370
// better than letting the guest rack up an unbounded number of
371
// background tasks. Also, the guest is only blocked if
372
// the stream was dropped mid-write, which we don't expect to
373
// occur frequently.
374
task.cancel().await;
375
}
376
_ => {}
377
}
378
}
379
}
380
381
#[async_trait::async_trait]
382
impl Pollable for FileOutputStream {
383
async fn ready(&mut self) {
384
if let OutputState::Waiting(task) = &mut self.state {
385
self.state = match task.await {
386
Ok(nwritten) => {
387
if let FileOutputMode::Position(p) = &mut self.mode {
388
*p += nwritten as u64;
389
}
390
OutputState::Ready
391
}
392
Err(e) => OutputState::Error(e),
393
};
394
}
395
}
396
}
397
398
pub struct ReaddirIterator(
399
std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,
400
);
401
402
impl ReaddirIterator {
403
pub(crate) fn new(
404
i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,
405
) -> Self {
406
ReaddirIterator(std::sync::Mutex::new(Box::new(i)))
407
}
408
pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {
409
self.0.lock().unwrap().next().transpose()
410
}
411
}
412
413
impl IntoIterator for ReaddirIterator {
414
type Item = FsResult<types::DirectoryEntry>;
415
type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;
416
417
fn into_iter(self) -> Self::IntoIter {
418
self.0.into_inner().unwrap()
419
}
420
}
421
422