Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p3/filesystem/host.rs
3088 views
1
use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
2
use crate::p3::bindings::clocks::system_clock;
3
use crate::p3::bindings::filesystem::types::{
4
self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
5
Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
6
};
7
use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8
use crate::p3::{DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer};
9
use crate::{DirPerms, FilePerms};
10
use bytes::BytesMut;
11
use core::pin::Pin;
12
use core::task::{Context, Poll, ready};
13
use core::{iter, mem};
14
use std::io::{self, Cursor};
15
use std::sync::Arc;
16
use system_interface::fs::FileIoExt as _;
17
use tokio::sync::{mpsc, oneshot};
18
use tokio::task::{JoinHandle, spawn_blocking};
19
use wasmtime::StoreContextMut;
20
use wasmtime::component::{
21
Access, Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
22
StreamProducer, StreamReader, StreamResult,
23
};
24
use wasmtime::error::Context as _;
25
26
fn get_descriptor<'a>(
27
table: &'a ResourceTable,
28
fd: &'a Resource<Descriptor>,
29
) -> FilesystemResult<&'a Descriptor> {
30
table
31
.get(fd)
32
.context("failed to get descriptor resource from table")
33
.map_err(FilesystemError::trap)
34
}
35
36
fn get_file<'a>(
37
table: &'a ResourceTable,
38
fd: &'a Resource<Descriptor>,
39
) -> FilesystemResult<&'a File> {
40
let file = get_descriptor(table, fd).map(Descriptor::file)??;
41
Ok(file)
42
}
43
44
fn get_dir<'a>(
45
table: &'a ResourceTable,
46
fd: &'a Resource<Descriptor>,
47
) -> FilesystemResult<&'a Dir> {
48
let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
49
Ok(dir)
50
}
51
52
trait AccessorExt {
53
fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
54
fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
55
fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
56
fn get_dir_pair(
57
&self,
58
a: &Resource<Descriptor>,
59
b: &Resource<Descriptor>,
60
) -> FilesystemResult<(Dir, Dir)>;
61
}
62
63
impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
64
fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
65
self.with(|mut store| {
66
let fd = get_descriptor(store.get().table, fd)?;
67
Ok(fd.clone())
68
})
69
}
70
71
fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
72
self.with(|mut store| {
73
let file = get_file(store.get().table, fd)?;
74
Ok(file.clone())
75
})
76
}
77
78
fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
79
self.with(|mut store| {
80
let dir = get_dir(store.get().table, fd)?;
81
Ok(dir.clone())
82
})
83
}
84
85
fn get_dir_pair(
86
&self,
87
a: &Resource<Descriptor>,
88
b: &Resource<Descriptor>,
89
) -> FilesystemResult<(Dir, Dir)> {
90
self.with(|mut store| {
91
let table = store.get().table;
92
let a = get_dir(table, a)?;
93
let b = get_dir(table, b)?;
94
Ok((a.clone(), b.clone()))
95
})
96
}
97
}
98
99
fn systemtime_from(t: system_clock::Instant) -> Result<std::time::SystemTime, ErrorCode> {
100
if let Ok(seconds) = t.seconds.try_into() {
101
std::time::SystemTime::UNIX_EPOCH
102
.checked_add(core::time::Duration::new(seconds, t.nanoseconds))
103
.ok_or(ErrorCode::Overflow)
104
} else {
105
std::time::SystemTime::UNIX_EPOCH
106
.checked_sub(core::time::Duration::new(
107
t.seconds.unsigned_abs(),
108
t.nanoseconds,
109
))
110
.ok_or(ErrorCode::Overflow)
111
}
112
}
113
114
fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
115
use fs_set_times::SystemTimeSpec;
116
match t {
117
NewTimestamp::NoChange => Ok(None),
118
NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
119
NewTimestamp::Timestamp(st) => {
120
let st = systemtime_from(st)?;
121
Ok(Some(SystemTimeSpec::Absolute(st)))
122
}
123
}
124
}
125
126
struct ReadStreamProducer {
127
file: File,
128
offset: u64,
129
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
130
task: Option<JoinHandle<std::io::Result<BytesMut>>>,
131
}
132
133
impl Drop for ReadStreamProducer {
134
fn drop(&mut self) {
135
self.close(Ok(()))
136
}
137
}
138
139
impl ReadStreamProducer {
140
fn close(&mut self, res: Result<(), ErrorCode>) {
141
if let Some(tx) = self.result.take() {
142
_ = tx.send(res);
143
}
144
}
145
146
/// Update the internal `offset` field after reading `amt` bytes from the file.
147
fn complete_read(&mut self, amt: usize) -> StreamResult {
148
let Ok(amt) = amt.try_into() else {
149
self.close(Err(ErrorCode::Overflow));
150
return StreamResult::Dropped;
151
};
152
let Some(amt) = self.offset.checked_add(amt) else {
153
self.close(Err(ErrorCode::Overflow));
154
return StreamResult::Dropped;
155
};
156
self.offset = amt;
157
StreamResult::Completed
158
}
159
}
160
161
impl<D> StreamProducer<D> for ReadStreamProducer {
162
type Item = u8;
163
type Buffer = Cursor<BytesMut>;
164
165
fn poll_produce<'a>(
166
mut self: Pin<&mut Self>,
167
cx: &mut Context<'_>,
168
store: StoreContextMut<'a, D>,
169
mut dst: Destination<'a, Self::Item, Self::Buffer>,
170
// Intentionally ignore this as in blocking mode everything is always
171
// ready and otherwise spawned blocking work can't be cancelled.
172
_finish: bool,
173
) -> Poll<wasmtime::Result<StreamResult>> {
174
if let Some(file) = self.file.as_blocking_file() {
175
// Once a blocking file, always a blocking file, so assert as such.
176
assert!(self.task.is_none());
177
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
178
let buf = dst.remaining();
179
if buf.is_empty() {
180
return Poll::Ready(Ok(StreamResult::Completed));
181
}
182
return match file.read_at(buf, self.offset) {
183
Ok(0) => {
184
self.close(Ok(()));
185
Poll::Ready(Ok(StreamResult::Dropped))
186
}
187
Ok(n) => {
188
dst.mark_written(n);
189
Poll::Ready(Ok(self.complete_read(n)))
190
}
191
Err(err) => {
192
self.close(Err(err.into()));
193
Poll::Ready(Ok(StreamResult::Dropped))
194
}
195
};
196
}
197
198
// Lazily spawn a read task if one hasn't already been spawned yet.
199
let me = &mut *self;
200
let task = me.task.get_or_insert_with(|| {
201
let mut buf = dst.take_buffer().into_inner();
202
buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
203
let file = Arc::clone(me.file.as_file());
204
let offset = me.offset;
205
spawn_blocking(move || {
206
file.read_at(&mut buf, offset).map(|n| {
207
buf.truncate(n);
208
buf
209
})
210
})
211
});
212
213
// Await the completion of the read task. Note that this is not a
214
// cancellable await point because we can't cancel the other task, so
215
// the `finish` parameter is ignored.
216
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
217
self.task = None;
218
match res {
219
Ok(buf) if buf.is_empty() => {
220
self.close(Ok(()));
221
Poll::Ready(Ok(StreamResult::Dropped))
222
}
223
Ok(buf) => {
224
let n = buf.len();
225
dst.set_buffer(Cursor::new(buf));
226
Poll::Ready(Ok(self.complete_read(n)))
227
}
228
Err(err) => {
229
self.close(Err(err.into()));
230
Poll::Ready(Ok(StreamResult::Dropped))
231
}
232
}
233
}
234
}
235
236
fn map_dir_entry(
237
entry: std::io::Result<cap_std::fs::DirEntry>,
238
) -> Result<Option<DirectoryEntry>, ErrorCode> {
239
match entry {
240
Ok(entry) => {
241
let meta = entry.metadata()?;
242
let Ok(name) = entry.file_name().into_string() else {
243
return Err(ErrorCode::IllegalByteSequence);
244
};
245
Ok(Some(DirectoryEntry {
246
type_: meta.file_type().into(),
247
name,
248
}))
249
}
250
Err(err) => {
251
// On windows, filter out files like `C:\DumpStack.log.tmp` which we
252
// can't get full metadata for.
253
#[cfg(windows)]
254
{
255
use windows_sys::Win32::Foundation::{
256
ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
257
};
258
if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
259
|| err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
260
{
261
return Ok(None);
262
}
263
}
264
Err(err.into())
265
}
266
}
267
}
268
269
struct ReadDirStream {
270
rx: mpsc::Receiver<DirectoryEntry>,
271
task: JoinHandle<Result<(), ErrorCode>>,
272
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
273
}
274
275
impl ReadDirStream {
276
fn new(
277
dir: Arc<cap_std::fs::Dir>,
278
result: oneshot::Sender<Result<(), ErrorCode>>,
279
) -> ReadDirStream {
280
let (tx, rx) = mpsc::channel(1);
281
ReadDirStream {
282
task: spawn_blocking(move || {
283
let entries = dir.entries()?;
284
for entry in entries {
285
if let Some(entry) = map_dir_entry(entry)? {
286
if let Err(_) = tx.blocking_send(entry) {
287
break;
288
}
289
}
290
}
291
Ok(())
292
}),
293
rx,
294
result: Some(result),
295
}
296
}
297
298
fn close(&mut self, res: Result<(), ErrorCode>) {
299
self.rx.close();
300
self.task.abort();
301
let _ = self.result.take().unwrap().send(res);
302
}
303
}
304
305
impl<D> StreamProducer<D> for ReadDirStream {
306
type Item = DirectoryEntry;
307
type Buffer = Option<DirectoryEntry>;
308
309
fn poll_produce<'a>(
310
mut self: Pin<&mut Self>,
311
cx: &mut Context<'_>,
312
mut store: StoreContextMut<'a, D>,
313
mut dst: Destination<'a, Self::Item, Self::Buffer>,
314
finish: bool,
315
) -> Poll<wasmtime::Result<StreamResult>> {
316
// If this is a 0-length read then `mpsc::Receiver` does not expose an
317
// API to wait for an item to be available without taking it out of the
318
// channel. In lieu of that just say that we're complete and ready for a
319
// read.
320
if dst.remaining(&mut store) == Some(0) {
321
return Poll::Ready(Ok(StreamResult::Completed));
322
}
323
324
match self.rx.poll_recv(cx) {
325
// If an item is on the channel then send that along and say that
326
// the read is now complete with one item being yielded.
327
Poll::Ready(Some(item)) => {
328
dst.set_buffer(Some(item));
329
Poll::Ready(Ok(StreamResult::Completed))
330
}
331
332
// If there's nothing left on the channel then that means that an
333
// error occurred or the iterator is done. In both cases an
334
// un-cancellable wait for the spawned task is entered and we await
335
// its completion. Upon completion there our own stream is closed
336
// with the result (sending an error code on our oneshot) and then
337
// the stream is reported as dropped.
338
Poll::Ready(None) => {
339
let result = ready!(Pin::new(&mut self.task).poll(cx))
340
.expect("spawned task should not panic");
341
self.close(result);
342
Poll::Ready(Ok(StreamResult::Dropped))
343
}
344
345
// If an item isn't ready yet then cancel this outstanding request
346
// if `finish` is set, otherwise propagate the `Pending` status.
347
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
348
Poll::Pending => Poll::Pending,
349
}
350
}
351
}
352
353
impl Drop for ReadDirStream {
354
fn drop(&mut self) {
355
if self.result.is_some() {
356
self.close(Ok(()));
357
}
358
}
359
}
360
361
struct WriteStreamConsumer {
362
file: File,
363
location: WriteLocation,
364
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
365
buffer: BytesMut,
366
task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
367
}
368
369
#[derive(Copy, Clone)]
370
enum WriteLocation {
371
End,
372
Offset(u64),
373
}
374
375
impl WriteStreamConsumer {
376
fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
377
Self {
378
file,
379
location: WriteLocation::Offset(offset),
380
result: Some(result),
381
buffer: BytesMut::default(),
382
task: None,
383
}
384
}
385
386
fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
387
Self {
388
file,
389
location: WriteLocation::End,
390
result: Some(result),
391
buffer: BytesMut::default(),
392
task: None,
393
}
394
}
395
396
fn close(&mut self, res: Result<(), ErrorCode>) {
397
_ = self.result.take().unwrap().send(res);
398
}
399
400
/// Update the internal `offset` field after writing `amt` bytes from the file.
401
fn complete_write(&mut self, amt: usize) -> StreamResult {
402
match &mut self.location {
403
WriteLocation::End => StreamResult::Completed,
404
WriteLocation::Offset(offset) => {
405
let Ok(amt) = amt.try_into() else {
406
self.close(Err(ErrorCode::Overflow));
407
return StreamResult::Dropped;
408
};
409
let Some(amt) = offset.checked_add(amt) else {
410
self.close(Err(ErrorCode::Overflow));
411
return StreamResult::Dropped;
412
};
413
*offset = amt;
414
StreamResult::Completed
415
}
416
}
417
}
418
}
419
420
impl WriteLocation {
421
fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
422
match *self {
423
WriteLocation::End => file.append(bytes),
424
WriteLocation::Offset(at) => file.write_at(bytes, at),
425
}
426
}
427
}
428
429
impl<D> StreamConsumer<D> for WriteStreamConsumer {
430
type Item = u8;
431
432
fn poll_consume(
433
mut self: Pin<&mut Self>,
434
cx: &mut Context<'_>,
435
store: StoreContextMut<D>,
436
src: Source<Self::Item>,
437
// Intentionally ignore this as in blocking mode everything is always
438
// ready and otherwise spawned blocking work can't be cancelled.
439
_finish: bool,
440
) -> Poll<wasmtime::Result<StreamResult>> {
441
let mut src = src.as_direct(store);
442
if let Some(file) = self.file.as_blocking_file() {
443
// Once a blocking file, always a blocking file, so assert as such.
444
assert!(self.task.is_none());
445
return match self.location.write(file, src.remaining()) {
446
Ok(n) => {
447
src.mark_read(n);
448
Poll::Ready(Ok(self.complete_write(n)))
449
}
450
Err(err) => {
451
self.close(Err(err.into()));
452
Poll::Ready(Ok(StreamResult::Dropped))
453
}
454
};
455
}
456
let me = &mut *self;
457
let task = me.task.get_or_insert_with(|| {
458
debug_assert!(me.buffer.is_empty());
459
me.buffer.extend_from_slice(src.remaining());
460
let buf = mem::take(&mut me.buffer);
461
let file = Arc::clone(me.file.as_file());
462
let location = me.location;
463
spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
464
});
465
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
466
self.task = None;
467
match res {
468
Ok((buf, n)) => {
469
src.mark_read(n);
470
self.buffer = buf;
471
self.buffer.clear();
472
Poll::Ready(Ok(self.complete_write(n)))
473
}
474
Err(err) => {
475
self.close(Err(err.into()));
476
Poll::Ready(Ok(StreamResult::Dropped))
477
}
478
}
479
}
480
}
481
482
impl Drop for WriteStreamConsumer {
483
fn drop(&mut self) {
484
if self.result.is_some() {
485
self.close(Ok(()))
486
}
487
}
488
}
489
490
impl types::Host for WasiFilesystemCtxView<'_> {
491
fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
492
error.downcast()
493
}
494
}
495
496
impl types::HostDescriptorWithStore for WasiFilesystem {
497
fn read_via_stream<U>(
498
mut store: Access<U, Self>,
499
fd: Resource<Descriptor>,
500
offset: Filesize,
501
) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
502
let file = get_file(store.get().table, &fd)?;
503
if !file.perms.contains(FilePerms::READ) {
504
return Ok((
505
StreamReader::new(&mut store, iter::empty()),
506
FutureReader::new(&mut store, async {
507
wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
508
}),
509
));
510
}
511
512
let file = file.clone();
513
let (result_tx, result_rx) = oneshot::channel();
514
Ok((
515
StreamReader::new(
516
&mut store,
517
ReadStreamProducer {
518
file,
519
offset,
520
result: Some(result_tx),
521
task: None,
522
},
523
),
524
FutureReader::new(&mut store, result_rx),
525
))
526
}
527
528
async fn write_via_stream<U>(
529
store: &Accessor<U, Self>,
530
fd: Resource<Descriptor>,
531
data: StreamReader<u8>,
532
offset: Filesize,
533
) -> FilesystemResult<()> {
534
let (result_tx, result_rx) = oneshot::channel();
535
store.with(|mut store| {
536
let file = get_file(store.get().table, &fd)?;
537
if !file.perms.contains(FilePerms::WRITE) {
538
return Err(ErrorCode::NotPermitted.into());
539
}
540
let file = file.clone();
541
data.pipe(store, WriteStreamConsumer::new_at(file, offset, result_tx));
542
FilesystemResult::Ok(())
543
})?;
544
result_rx
545
.await
546
.context("oneshot sender dropped")
547
.map_err(FilesystemError::trap)??;
548
Ok(())
549
}
550
551
async fn append_via_stream<U>(
552
store: &Accessor<U, Self>,
553
fd: Resource<Descriptor>,
554
data: StreamReader<u8>,
555
) -> FilesystemResult<()> {
556
let (result_tx, result_rx) = oneshot::channel();
557
store.with(|mut store| {
558
let file = get_file(store.get().table, &fd)?;
559
if !file.perms.contains(FilePerms::WRITE) {
560
return Err(ErrorCode::NotPermitted.into());
561
}
562
let file = file.clone();
563
data.pipe(store, WriteStreamConsumer::new_append(file, result_tx));
564
FilesystemResult::Ok(())
565
})?;
566
result_rx
567
.await
568
.context("oneshot sender dropped")
569
.map_err(FilesystemError::trap)??;
570
Ok(())
571
}
572
573
async fn advise<U>(
574
store: &Accessor<U, Self>,
575
fd: Resource<Descriptor>,
576
offset: Filesize,
577
length: Filesize,
578
advice: Advice,
579
) -> FilesystemResult<()> {
580
let file = store.get_file(&fd)?;
581
file.advise(offset, length, advice.into()).await?;
582
Ok(())
583
}
584
585
async fn sync_data<U>(
586
store: &Accessor<U, Self>,
587
fd: Resource<Descriptor>,
588
) -> FilesystemResult<()> {
589
let fd = store.get_descriptor(&fd)?;
590
fd.sync_data().await?;
591
Ok(())
592
}
593
594
async fn get_flags<U>(
595
store: &Accessor<U, Self>,
596
fd: Resource<Descriptor>,
597
) -> FilesystemResult<DescriptorFlags> {
598
let fd = store.get_descriptor(&fd)?;
599
let flags = fd.get_flags().await?;
600
Ok(flags.into())
601
}
602
603
async fn get_type<U>(
604
store: &Accessor<U, Self>,
605
fd: Resource<Descriptor>,
606
) -> FilesystemResult<DescriptorType> {
607
let fd = store.get_descriptor(&fd)?;
608
let ty = fd.get_type().await?;
609
Ok(ty.into())
610
}
611
612
async fn set_size<U>(
613
store: &Accessor<U, Self>,
614
fd: Resource<Descriptor>,
615
size: Filesize,
616
) -> FilesystemResult<()> {
617
let file = store.get_file(&fd)?;
618
file.set_size(size).await?;
619
Ok(())
620
}
621
622
async fn set_times<U>(
623
store: &Accessor<U, Self>,
624
fd: Resource<Descriptor>,
625
data_access_timestamp: NewTimestamp,
626
data_modification_timestamp: NewTimestamp,
627
) -> FilesystemResult<()> {
628
let fd = store.get_descriptor(&fd)?;
629
let atim = systemtimespec_from(data_access_timestamp)?;
630
let mtim = systemtimespec_from(data_modification_timestamp)?;
631
fd.set_times(atim, mtim).await?;
632
Ok(())
633
}
634
635
async fn read_directory<U>(
636
store: &Accessor<U, Self>,
637
fd: Resource<Descriptor>,
638
) -> wasmtime::Result<(
639
StreamReader<DirectoryEntry>,
640
FutureReader<Result<(), ErrorCode>>,
641
)> {
642
store.with(|mut store| {
643
let dir = get_dir(store.get().table, &fd)?;
644
if !dir.perms.contains(DirPerms::READ) {
645
return Ok((
646
StreamReader::new(&mut store, iter::empty()),
647
FutureReader::new(&mut store, async {
648
wasmtime::error::Ok(Err(ErrorCode::NotPermitted))
649
}),
650
));
651
}
652
let allow_blocking_current_thread = dir.allow_blocking_current_thread;
653
let dir = Arc::clone(dir.as_dir());
654
let (result_tx, result_rx) = oneshot::channel();
655
let stream = if allow_blocking_current_thread {
656
match dir.entries() {
657
Ok(readdir) => StreamReader::new(
658
&mut store,
659
FallibleIteratorProducer::new(
660
readdir.filter_map(|e| map_dir_entry(e).transpose()),
661
result_tx,
662
),
663
),
664
Err(e) => {
665
result_tx.send(Err(e.into())).unwrap();
666
StreamReader::new(&mut store, iter::empty())
667
}
668
}
669
} else {
670
StreamReader::new(&mut store, ReadDirStream::new(dir, result_tx))
671
};
672
Ok((stream, FutureReader::new(&mut store, result_rx)))
673
})
674
}
675
676
async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
677
let fd = store.get_descriptor(&fd)?;
678
fd.sync().await?;
679
Ok(())
680
}
681
682
async fn create_directory_at<U>(
683
store: &Accessor<U, Self>,
684
fd: Resource<Descriptor>,
685
path: String,
686
) -> FilesystemResult<()> {
687
let dir = store.get_dir(&fd)?;
688
dir.create_directory_at(path).await?;
689
Ok(())
690
}
691
692
async fn stat<U>(
693
store: &Accessor<U, Self>,
694
fd: Resource<Descriptor>,
695
) -> FilesystemResult<DescriptorStat> {
696
let fd = store.get_descriptor(&fd)?;
697
let stat = fd.stat().await?;
698
Ok(stat.into())
699
}
700
701
async fn stat_at<U>(
702
store: &Accessor<U, Self>,
703
fd: Resource<Descriptor>,
704
path_flags: PathFlags,
705
path: String,
706
) -> FilesystemResult<DescriptorStat> {
707
let dir = store.get_dir(&fd)?;
708
let stat = dir.stat_at(path_flags.into(), path).await?;
709
Ok(stat.into())
710
}
711
712
async fn set_times_at<U>(
713
store: &Accessor<U, Self>,
714
fd: Resource<Descriptor>,
715
path_flags: PathFlags,
716
path: String,
717
data_access_timestamp: NewTimestamp,
718
data_modification_timestamp: NewTimestamp,
719
) -> FilesystemResult<()> {
720
let dir = store.get_dir(&fd)?;
721
let atim = systemtimespec_from(data_access_timestamp)?;
722
let mtim = systemtimespec_from(data_modification_timestamp)?;
723
dir.set_times_at(path_flags.into(), path, atim, mtim)
724
.await?;
725
Ok(())
726
}
727
728
async fn link_at<U>(
729
store: &Accessor<U, Self>,
730
fd: Resource<Descriptor>,
731
old_path_flags: PathFlags,
732
old_path: String,
733
new_fd: Resource<Descriptor>,
734
new_path: String,
735
) -> FilesystemResult<()> {
736
let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
737
old_dir
738
.link_at(old_path_flags.into(), old_path, &new_dir, new_path)
739
.await?;
740
Ok(())
741
}
742
743
async fn open_at<U>(
744
store: &Accessor<U, Self>,
745
fd: Resource<Descriptor>,
746
path_flags: PathFlags,
747
path: String,
748
open_flags: OpenFlags,
749
flags: DescriptorFlags,
750
) -> FilesystemResult<Resource<Descriptor>> {
751
let (allow_blocking_current_thread, dir) = store.with(|mut store| {
752
let store = store.get();
753
let dir = get_dir(&store.table, &fd)?;
754
FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
755
})?;
756
let fd = dir
757
.open_at(
758
path_flags.into(),
759
path,
760
open_flags.into(),
761
flags.into(),
762
allow_blocking_current_thread,
763
)
764
.await?;
765
let fd = store.with(|mut store| store.get().table.push(fd))?;
766
Ok(fd)
767
}
768
769
async fn readlink_at<U>(
770
store: &Accessor<U, Self>,
771
fd: Resource<Descriptor>,
772
path: String,
773
) -> FilesystemResult<String> {
774
let dir = store.get_dir(&fd)?;
775
let path = dir.readlink_at(path).await?;
776
Ok(path)
777
}
778
779
async fn remove_directory_at<U>(
780
store: &Accessor<U, Self>,
781
fd: Resource<Descriptor>,
782
path: String,
783
) -> FilesystemResult<()> {
784
let dir = store.get_dir(&fd)?;
785
dir.remove_directory_at(path).await?;
786
Ok(())
787
}
788
789
async fn rename_at<U>(
790
store: &Accessor<U, Self>,
791
fd: Resource<Descriptor>,
792
old_path: String,
793
new_fd: Resource<Descriptor>,
794
new_path: String,
795
) -> FilesystemResult<()> {
796
let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
797
old_dir.rename_at(old_path, &new_dir, new_path).await?;
798
Ok(())
799
}
800
801
async fn symlink_at<U>(
802
store: &Accessor<U, Self>,
803
fd: Resource<Descriptor>,
804
old_path: String,
805
new_path: String,
806
) -> FilesystemResult<()> {
807
let dir = store.get_dir(&fd)?;
808
dir.symlink_at(old_path, new_path).await?;
809
Ok(())
810
}
811
812
async fn unlink_file_at<U>(
813
store: &Accessor<U, Self>,
814
fd: Resource<Descriptor>,
815
path: String,
816
) -> FilesystemResult<()> {
817
let dir = store.get_dir(&fd)?;
818
dir.unlink_file_at(path).await?;
819
Ok(())
820
}
821
822
async fn is_same_object<U>(
823
store: &Accessor<U, Self>,
824
fd: Resource<Descriptor>,
825
other: Resource<Descriptor>,
826
) -> wasmtime::Result<bool> {
827
let (fd, other) = store.with(|mut store| {
828
let table = store.get().table;
829
let fd = get_descriptor(table, &fd)?.clone();
830
let other = get_descriptor(table, &other)?.clone();
831
wasmtime::error::Ok((fd, other))
832
})?;
833
fd.is_same_object(&other).await
834
}
835
836
async fn metadata_hash<U>(
837
store: &Accessor<U, Self>,
838
fd: Resource<Descriptor>,
839
) -> FilesystemResult<MetadataHashValue> {
840
let fd = store.get_descriptor(&fd)?;
841
let meta = fd.metadata_hash().await?;
842
Ok(meta.into())
843
}
844
845
async fn metadata_hash_at<U>(
846
store: &Accessor<U, Self>,
847
fd: Resource<Descriptor>,
848
path_flags: PathFlags,
849
path: String,
850
) -> FilesystemResult<MetadataHashValue> {
851
let dir = store.get_dir(&fd)?;
852
let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
853
Ok(meta.into())
854
}
855
}
856
857
impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
858
fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
859
self.table
860
.delete(fd)
861
.context("failed to delete descriptor resource from table")?;
862
Ok(())
863
}
864
}
865
866
impl preopens::Host for WasiFilesystemCtxView<'_> {
867
fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
868
self.get_directories()
869
}
870
}
871
872