Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p3/filesystem/host.rs
1692 views
1
use crate::filesystem::{Descriptor, Dir, File, WasiFilesystem, WasiFilesystemCtxView};
2
use crate::p3::bindings::clocks::wall_clock;
3
use crate::p3::bindings::filesystem::types::{
4
self, Advice, DescriptorFlags, DescriptorStat, DescriptorType, DirectoryEntry, ErrorCode,
5
Filesize, MetadataHashValue, NewTimestamp, OpenFlags, PathFlags,
6
};
7
use crate::p3::filesystem::{FilesystemError, FilesystemResult, preopens};
8
use crate::p3::{
9
DEFAULT_BUFFER_CAPACITY, FallibleIteratorProducer, FutureOneshotProducer, FutureReadyProducer,
10
StreamEmptyProducer,
11
};
12
use crate::{DirPerms, FilePerms};
13
use anyhow::Context as _;
14
use bytes::BytesMut;
15
use core::mem;
16
use core::pin::Pin;
17
use core::task::{Context, Poll, ready};
18
use std::io::{self, Cursor};
19
use std::sync::Arc;
20
use system_interface::fs::FileIoExt as _;
21
use tokio::sync::{mpsc, oneshot};
22
use tokio::task::{JoinHandle, spawn_blocking};
23
use wasmtime::StoreContextMut;
24
use wasmtime::component::{
25
Accessor, Destination, FutureReader, Resource, ResourceTable, Source, StreamConsumer,
26
StreamProducer, StreamReader, StreamResult,
27
};
28
29
fn get_descriptor<'a>(
30
table: &'a ResourceTable,
31
fd: &'a Resource<Descriptor>,
32
) -> FilesystemResult<&'a Descriptor> {
33
table
34
.get(fd)
35
.context("failed to get descriptor resource from table")
36
.map_err(FilesystemError::trap)
37
}
38
39
fn get_file<'a>(
40
table: &'a ResourceTable,
41
fd: &'a Resource<Descriptor>,
42
) -> FilesystemResult<&'a File> {
43
let file = get_descriptor(table, fd).map(Descriptor::file)??;
44
Ok(file)
45
}
46
47
fn get_dir<'a>(
48
table: &'a ResourceTable,
49
fd: &'a Resource<Descriptor>,
50
) -> FilesystemResult<&'a Dir> {
51
let dir = get_descriptor(table, fd).map(Descriptor::dir)??;
52
Ok(dir)
53
}
54
55
trait AccessorExt {
56
fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor>;
57
fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File>;
58
fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir>;
59
fn get_dir_pair(
60
&self,
61
a: &Resource<Descriptor>,
62
b: &Resource<Descriptor>,
63
) -> FilesystemResult<(Dir, Dir)>;
64
}
65
66
impl<T> AccessorExt for Accessor<T, WasiFilesystem> {
67
fn get_descriptor(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Descriptor> {
68
self.with(|mut store| {
69
let fd = get_descriptor(store.get().table, fd)?;
70
Ok(fd.clone())
71
})
72
}
73
74
fn get_file(&self, fd: &Resource<Descriptor>) -> FilesystemResult<File> {
75
self.with(|mut store| {
76
let file = get_file(store.get().table, fd)?;
77
Ok(file.clone())
78
})
79
}
80
81
fn get_dir(&self, fd: &Resource<Descriptor>) -> FilesystemResult<Dir> {
82
self.with(|mut store| {
83
let dir = get_dir(store.get().table, fd)?;
84
Ok(dir.clone())
85
})
86
}
87
88
fn get_dir_pair(
89
&self,
90
a: &Resource<Descriptor>,
91
b: &Resource<Descriptor>,
92
) -> FilesystemResult<(Dir, Dir)> {
93
self.with(|mut store| {
94
let table = store.get().table;
95
let a = get_dir(table, a)?;
96
let b = get_dir(table, b)?;
97
Ok((a.clone(), b.clone()))
98
})
99
}
100
}
101
102
fn systemtime_from(t: wall_clock::Datetime) -> Result<std::time::SystemTime, ErrorCode> {
103
std::time::SystemTime::UNIX_EPOCH
104
.checked_add(core::time::Duration::new(t.seconds, t.nanoseconds))
105
.ok_or(ErrorCode::Overflow)
106
}
107
108
fn systemtimespec_from(t: NewTimestamp) -> Result<Option<fs_set_times::SystemTimeSpec>, ErrorCode> {
109
use fs_set_times::SystemTimeSpec;
110
match t {
111
NewTimestamp::NoChange => Ok(None),
112
NewTimestamp::Now => Ok(Some(SystemTimeSpec::SymbolicNow)),
113
NewTimestamp::Timestamp(st) => {
114
let st = systemtime_from(st)?;
115
Ok(Some(SystemTimeSpec::Absolute(st)))
116
}
117
}
118
}
119
120
struct ReadStreamProducer {
121
file: File,
122
offset: u64,
123
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
124
task: Option<JoinHandle<std::io::Result<BytesMut>>>,
125
}
126
127
impl Drop for ReadStreamProducer {
128
fn drop(&mut self) {
129
self.close(Ok(()))
130
}
131
}
132
133
impl ReadStreamProducer {
134
fn close(&mut self, res: Result<(), ErrorCode>) {
135
if let Some(tx) = self.result.take() {
136
_ = tx.send(res);
137
}
138
}
139
140
/// Update the internal `offset` field after reading `amt` bytes from the file.
141
fn complete_read(&mut self, amt: usize) -> StreamResult {
142
let Ok(amt) = amt.try_into() else {
143
self.close(Err(ErrorCode::Overflow));
144
return StreamResult::Dropped;
145
};
146
let Some(amt) = self.offset.checked_add(amt) else {
147
self.close(Err(ErrorCode::Overflow));
148
return StreamResult::Dropped;
149
};
150
self.offset = amt;
151
StreamResult::Completed
152
}
153
}
154
155
impl<D> StreamProducer<D> for ReadStreamProducer {
156
type Item = u8;
157
type Buffer = Cursor<BytesMut>;
158
159
fn poll_produce<'a>(
160
mut self: Pin<&mut Self>,
161
cx: &mut Context<'_>,
162
store: StoreContextMut<'a, D>,
163
mut dst: Destination<'a, Self::Item, Self::Buffer>,
164
// Intentionally ignore this as in blocking mode everything is always
165
// ready and otherwise spawned blocking work can't be cancelled.
166
_finish: bool,
167
) -> Poll<wasmtime::Result<StreamResult>> {
168
if let Some(file) = self.file.as_blocking_file() {
169
// Once a blocking file, always a blocking file, so assert as such.
170
assert!(self.task.is_none());
171
let mut dst = dst.as_direct(store, DEFAULT_BUFFER_CAPACITY);
172
let buf = dst.remaining();
173
if buf.is_empty() {
174
return Poll::Ready(Ok(StreamResult::Completed));
175
}
176
return match file.read_at(buf, self.offset) {
177
Ok(0) => {
178
self.close(Ok(()));
179
Poll::Ready(Ok(StreamResult::Dropped))
180
}
181
Ok(n) => {
182
dst.mark_written(n);
183
Poll::Ready(Ok(self.complete_read(n)))
184
}
185
Err(err) => {
186
self.close(Err(err.into()));
187
Poll::Ready(Ok(StreamResult::Dropped))
188
}
189
};
190
}
191
192
// Lazily spawn a read task if one hasn't already been spawned yet.
193
let me = &mut *self;
194
let task = me.task.get_or_insert_with(|| {
195
let mut buf = dst.take_buffer().into_inner();
196
buf.resize(DEFAULT_BUFFER_CAPACITY, 0);
197
let file = Arc::clone(me.file.as_file());
198
let offset = me.offset;
199
spawn_blocking(move || {
200
file.read_at(&mut buf, offset).map(|n| {
201
buf.truncate(n);
202
buf
203
})
204
})
205
});
206
207
// Await the completion of the read task. Note that this is not a
208
// cancellable await point because we can't cancel the other task, so
209
// the `finish` parameter is ignored.
210
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
211
self.task = None;
212
match res {
213
Ok(buf) if buf.is_empty() => {
214
self.close(Ok(()));
215
Poll::Ready(Ok(StreamResult::Dropped))
216
}
217
Ok(buf) => {
218
let n = buf.len();
219
dst.set_buffer(Cursor::new(buf));
220
Poll::Ready(Ok(self.complete_read(n)))
221
}
222
Err(err) => {
223
self.close(Err(err.into()));
224
Poll::Ready(Ok(StreamResult::Dropped))
225
}
226
}
227
}
228
}
229
230
fn map_dir_entry(
231
entry: std::io::Result<cap_std::fs::DirEntry>,
232
) -> Result<Option<DirectoryEntry>, ErrorCode> {
233
match entry {
234
Ok(entry) => {
235
let meta = entry.metadata()?;
236
let Ok(name) = entry.file_name().into_string() else {
237
return Err(ErrorCode::IllegalByteSequence);
238
};
239
Ok(Some(DirectoryEntry {
240
type_: meta.file_type().into(),
241
name,
242
}))
243
}
244
Err(err) => {
245
// On windows, filter out files like `C:\DumpStack.log.tmp` which we
246
// can't get full metadata for.
247
#[cfg(windows)]
248
{
249
use windows_sys::Win32::Foundation::{
250
ERROR_ACCESS_DENIED, ERROR_SHARING_VIOLATION,
251
};
252
if err.raw_os_error() == Some(ERROR_SHARING_VIOLATION as i32)
253
|| err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32)
254
{
255
return Ok(None);
256
}
257
}
258
Err(err.into())
259
}
260
}
261
}
262
263
struct ReadDirStream {
264
rx: mpsc::Receiver<DirectoryEntry>,
265
task: JoinHandle<Result<(), ErrorCode>>,
266
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
267
}
268
269
impl ReadDirStream {
270
fn new(
271
dir: Arc<cap_std::fs::Dir>,
272
result: oneshot::Sender<Result<(), ErrorCode>>,
273
) -> ReadDirStream {
274
let (tx, rx) = mpsc::channel(1);
275
ReadDirStream {
276
task: spawn_blocking(move || {
277
let entries = dir.entries()?;
278
for entry in entries {
279
if let Some(entry) = map_dir_entry(entry)? {
280
if let Err(_) = tx.blocking_send(entry) {
281
break;
282
}
283
}
284
}
285
Ok(())
286
}),
287
rx,
288
result: Some(result),
289
}
290
}
291
292
fn close(&mut self, res: Result<(), ErrorCode>) {
293
self.rx.close();
294
self.task.abort();
295
let _ = self.result.take().unwrap().send(res);
296
}
297
}
298
299
impl<D> StreamProducer<D> for ReadDirStream {
300
type Item = DirectoryEntry;
301
type Buffer = Option<DirectoryEntry>;
302
303
fn poll_produce<'a>(
304
mut self: Pin<&mut Self>,
305
cx: &mut Context<'_>,
306
mut store: StoreContextMut<'a, D>,
307
mut dst: Destination<'a, Self::Item, Self::Buffer>,
308
finish: bool,
309
) -> Poll<wasmtime::Result<StreamResult>> {
310
// If this is a 0-length read then `mpsc::Receiver` does not expose an
311
// API to wait for an item to be available without taking it out of the
312
// channel. In lieu of that just say that we're complete and ready for a
313
// read.
314
if dst.remaining(&mut store) == Some(0) {
315
return Poll::Ready(Ok(StreamResult::Completed));
316
}
317
318
match self.rx.poll_recv(cx) {
319
// If an item is on the channel then send that along and say that
320
// the read is now complete with one item being yielded.
321
Poll::Ready(Some(item)) => {
322
dst.set_buffer(Some(item));
323
Poll::Ready(Ok(StreamResult::Completed))
324
}
325
326
// If there's nothing left on the channel then that means that an
327
// error occurred or the iterator is done. In both cases an
328
// un-cancellable wait for the spawned task is entered and we await
329
// its completion. Upon completion there our own stream is closed
330
// with the result (sending an error code on our oneshot) and then
331
// the stream is reported as dropped.
332
Poll::Ready(None) => {
333
let result = ready!(Pin::new(&mut self.task).poll(cx))
334
.expect("spawned task should not panic");
335
self.close(result);
336
Poll::Ready(Ok(StreamResult::Dropped))
337
}
338
339
// If an item isn't ready yet then cancel this outstanding request
340
// if `finish` is set, otherwise propagate the `Pending` status.
341
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
342
Poll::Pending => Poll::Pending,
343
}
344
}
345
}
346
347
impl Drop for ReadDirStream {
348
fn drop(&mut self) {
349
if self.result.is_some() {
350
self.close(Ok(()));
351
}
352
}
353
}
354
355
struct WriteStreamConsumer {
356
file: File,
357
location: WriteLocation,
358
result: Option<oneshot::Sender<Result<(), ErrorCode>>>,
359
buffer: BytesMut,
360
task: Option<JoinHandle<std::io::Result<(BytesMut, usize)>>>,
361
}
362
363
#[derive(Copy, Clone)]
364
enum WriteLocation {
365
End,
366
Offset(u64),
367
}
368
369
impl WriteStreamConsumer {
370
fn new_at(file: File, offset: u64, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
371
Self {
372
file,
373
location: WriteLocation::Offset(offset),
374
result: Some(result),
375
buffer: BytesMut::default(),
376
task: None,
377
}
378
}
379
380
fn new_append(file: File, result: oneshot::Sender<Result<(), ErrorCode>>) -> Self {
381
Self {
382
file,
383
location: WriteLocation::End,
384
result: Some(result),
385
buffer: BytesMut::default(),
386
task: None,
387
}
388
}
389
390
fn close(&mut self, res: Result<(), ErrorCode>) {
391
_ = self.result.take().unwrap().send(res);
392
}
393
394
/// Update the internal `offset` field after writing `amt` bytes from the file.
395
fn complete_write(&mut self, amt: usize) -> StreamResult {
396
match &mut self.location {
397
WriteLocation::End => StreamResult::Completed,
398
WriteLocation::Offset(offset) => {
399
let Ok(amt) = amt.try_into() else {
400
self.close(Err(ErrorCode::Overflow));
401
return StreamResult::Dropped;
402
};
403
let Some(amt) = offset.checked_add(amt) else {
404
self.close(Err(ErrorCode::Overflow));
405
return StreamResult::Dropped;
406
};
407
*offset = amt;
408
StreamResult::Completed
409
}
410
}
411
}
412
}
413
414
impl WriteLocation {
415
fn write(&self, file: &cap_std::fs::File, bytes: &[u8]) -> io::Result<usize> {
416
match *self {
417
WriteLocation::End => file.append(bytes),
418
WriteLocation::Offset(at) => file.write_at(bytes, at),
419
}
420
}
421
}
422
423
impl<D> StreamConsumer<D> for WriteStreamConsumer {
424
type Item = u8;
425
426
fn poll_consume(
427
mut self: Pin<&mut Self>,
428
cx: &mut Context<'_>,
429
store: StoreContextMut<D>,
430
src: Source<Self::Item>,
431
// Intentionally ignore this as in blocking mode everything is always
432
// ready and otherwise spawned blocking work can't be cancelled.
433
_finish: bool,
434
) -> Poll<wasmtime::Result<StreamResult>> {
435
let mut src = src.as_direct(store);
436
if let Some(file) = self.file.as_blocking_file() {
437
// Once a blocking file, always a blocking file, so assert as such.
438
assert!(self.task.is_none());
439
return match self.location.write(file, src.remaining()) {
440
Ok(n) => {
441
src.mark_read(n);
442
Poll::Ready(Ok(self.complete_write(n)))
443
}
444
Err(err) => {
445
self.close(Err(err.into()));
446
Poll::Ready(Ok(StreamResult::Dropped))
447
}
448
};
449
}
450
let me = &mut *self;
451
let task = me.task.get_or_insert_with(|| {
452
debug_assert!(me.buffer.is_empty());
453
me.buffer.extend_from_slice(src.remaining());
454
let buf = mem::take(&mut me.buffer);
455
let file = Arc::clone(me.file.as_file());
456
let location = me.location;
457
spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
458
});
459
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
460
self.task = None;
461
match res {
462
Ok((buf, n)) => {
463
src.mark_read(n);
464
self.buffer = buf;
465
self.buffer.clear();
466
Poll::Ready(Ok(self.complete_write(n)))
467
}
468
Err(err) => {
469
self.close(Err(err.into()));
470
Poll::Ready(Ok(StreamResult::Dropped))
471
}
472
}
473
}
474
}
475
476
impl Drop for WriteStreamConsumer {
477
fn drop(&mut self) {
478
if self.result.is_some() {
479
self.close(Ok(()))
480
}
481
}
482
}
483
484
impl types::Host for WasiFilesystemCtxView<'_> {
485
fn convert_error_code(&mut self, error: FilesystemError) -> wasmtime::Result<ErrorCode> {
486
error.downcast()
487
}
488
}
489
490
impl types::HostDescriptorWithStore for WasiFilesystem {
491
async fn read_via_stream<U>(
492
store: &Accessor<U, Self>,
493
fd: Resource<Descriptor>,
494
offset: Filesize,
495
) -> wasmtime::Result<(StreamReader<u8>, FutureReader<Result<(), ErrorCode>>)> {
496
let instance = store.instance();
497
store.with(|mut store| {
498
let file = get_file(store.get().table, &fd)?;
499
if !file.perms.contains(FilePerms::READ) {
500
return Ok((
501
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
502
FutureReader::new(
503
instance,
504
&mut store,
505
FutureReadyProducer(Err(ErrorCode::NotPermitted)),
506
),
507
));
508
}
509
510
let file = file.clone();
511
let (result_tx, result_rx) = oneshot::channel();
512
Ok((
513
StreamReader::new(
514
instance,
515
&mut store,
516
ReadStreamProducer {
517
file,
518
offset,
519
result: Some(result_tx),
520
task: None,
521
},
522
),
523
FutureReader::new(instance, &mut store, FutureOneshotProducer(result_rx)),
524
))
525
})
526
}
527
528
async fn write_via_stream<U>(
529
store: &Accessor<U, Self>,
530
fd: Resource<Descriptor>,
531
data: StreamReader<u8>,
532
offset: Filesize,
533
) -> FilesystemResult<()> {
534
let (result_tx, result_rx) = oneshot::channel();
535
store.with(|mut store| {
536
let file = get_file(store.get().table, &fd)?;
537
if !file.perms.contains(FilePerms::WRITE) {
538
return Err(ErrorCode::NotPermitted.into());
539
}
540
let file = file.clone();
541
data.pipe(store, WriteStreamConsumer::new_at(file, offset, result_tx));
542
FilesystemResult::Ok(())
543
})?;
544
result_rx
545
.await
546
.context("oneshot sender dropped")
547
.map_err(FilesystemError::trap)??;
548
Ok(())
549
}
550
551
async fn append_via_stream<U>(
552
store: &Accessor<U, Self>,
553
fd: Resource<Descriptor>,
554
data: StreamReader<u8>,
555
) -> FilesystemResult<()> {
556
let (result_tx, result_rx) = oneshot::channel();
557
store.with(|mut store| {
558
let file = get_file(store.get().table, &fd)?;
559
if !file.perms.contains(FilePerms::WRITE) {
560
return Err(ErrorCode::NotPermitted.into());
561
}
562
let file = file.clone();
563
data.pipe(store, WriteStreamConsumer::new_append(file, result_tx));
564
FilesystemResult::Ok(())
565
})?;
566
result_rx
567
.await
568
.context("oneshot sender dropped")
569
.map_err(FilesystemError::trap)??;
570
Ok(())
571
}
572
573
async fn advise<U>(
574
store: &Accessor<U, Self>,
575
fd: Resource<Descriptor>,
576
offset: Filesize,
577
length: Filesize,
578
advice: Advice,
579
) -> FilesystemResult<()> {
580
let file = store.get_file(&fd)?;
581
file.advise(offset, length, advice.into()).await?;
582
Ok(())
583
}
584
585
async fn sync_data<U>(
586
store: &Accessor<U, Self>,
587
fd: Resource<Descriptor>,
588
) -> FilesystemResult<()> {
589
let fd = store.get_descriptor(&fd)?;
590
fd.sync_data().await?;
591
Ok(())
592
}
593
594
async fn get_flags<U>(
595
store: &Accessor<U, Self>,
596
fd: Resource<Descriptor>,
597
) -> FilesystemResult<DescriptorFlags> {
598
let fd = store.get_descriptor(&fd)?;
599
let flags = fd.get_flags().await?;
600
Ok(flags.into())
601
}
602
603
async fn get_type<U>(
604
store: &Accessor<U, Self>,
605
fd: Resource<Descriptor>,
606
) -> FilesystemResult<DescriptorType> {
607
let fd = store.get_descriptor(&fd)?;
608
let ty = fd.get_type().await?;
609
Ok(ty.into())
610
}
611
612
async fn set_size<U>(
613
store: &Accessor<U, Self>,
614
fd: Resource<Descriptor>,
615
size: Filesize,
616
) -> FilesystemResult<()> {
617
let file = store.get_file(&fd)?;
618
file.set_size(size).await?;
619
Ok(())
620
}
621
622
async fn set_times<U>(
623
store: &Accessor<U, Self>,
624
fd: Resource<Descriptor>,
625
data_access_timestamp: NewTimestamp,
626
data_modification_timestamp: NewTimestamp,
627
) -> FilesystemResult<()> {
628
let fd = store.get_descriptor(&fd)?;
629
let atim = systemtimespec_from(data_access_timestamp)?;
630
let mtim = systemtimespec_from(data_modification_timestamp)?;
631
fd.set_times(atim, mtim).await?;
632
Ok(())
633
}
634
635
async fn read_directory<U>(
636
store: &Accessor<U, Self>,
637
fd: Resource<Descriptor>,
638
) -> wasmtime::Result<(
639
StreamReader<DirectoryEntry>,
640
FutureReader<Result<(), ErrorCode>>,
641
)> {
642
let instance = store.instance();
643
store.with(|mut store| {
644
let dir = get_dir(store.get().table, &fd)?;
645
if !dir.perms.contains(DirPerms::READ) {
646
return Ok((
647
StreamReader::new(instance, &mut store, StreamEmptyProducer::default()),
648
FutureReader::new(
649
instance,
650
&mut store,
651
FutureReadyProducer(Err(ErrorCode::NotPermitted)),
652
),
653
));
654
}
655
let allow_blocking_current_thread = dir.allow_blocking_current_thread;
656
let dir = Arc::clone(dir.as_dir());
657
let (result_tx, result_rx) = oneshot::channel();
658
let stream = if allow_blocking_current_thread {
659
match dir.entries() {
660
Ok(readdir) => StreamReader::new(
661
instance,
662
&mut store,
663
FallibleIteratorProducer::new(
664
readdir.filter_map(|e| map_dir_entry(e).transpose()),
665
result_tx,
666
),
667
),
668
Err(e) => {
669
result_tx.send(Err(e.into())).unwrap();
670
StreamReader::new(instance, &mut store, StreamEmptyProducer::default())
671
}
672
}
673
} else {
674
StreamReader::new(instance, &mut store, ReadDirStream::new(dir, result_tx))
675
};
676
Ok((
677
stream,
678
FutureReader::new(instance, &mut store, FutureOneshotProducer(result_rx)),
679
))
680
})
681
}
682
683
async fn sync<U>(store: &Accessor<U, Self>, fd: Resource<Descriptor>) -> FilesystemResult<()> {
684
let fd = store.get_descriptor(&fd)?;
685
fd.sync().await?;
686
Ok(())
687
}
688
689
async fn create_directory_at<U>(
690
store: &Accessor<U, Self>,
691
fd: Resource<Descriptor>,
692
path: String,
693
) -> FilesystemResult<()> {
694
let dir = store.get_dir(&fd)?;
695
dir.create_directory_at(path).await?;
696
Ok(())
697
}
698
699
async fn stat<U>(
700
store: &Accessor<U, Self>,
701
fd: Resource<Descriptor>,
702
) -> FilesystemResult<DescriptorStat> {
703
let fd = store.get_descriptor(&fd)?;
704
let stat = fd.stat().await?;
705
Ok(stat.into())
706
}
707
708
async fn stat_at<U>(
709
store: &Accessor<U, Self>,
710
fd: Resource<Descriptor>,
711
path_flags: PathFlags,
712
path: String,
713
) -> FilesystemResult<DescriptorStat> {
714
let dir = store.get_dir(&fd)?;
715
let stat = dir.stat_at(path_flags.into(), path).await?;
716
Ok(stat.into())
717
}
718
719
async fn set_times_at<U>(
720
store: &Accessor<U, Self>,
721
fd: Resource<Descriptor>,
722
path_flags: PathFlags,
723
path: String,
724
data_access_timestamp: NewTimestamp,
725
data_modification_timestamp: NewTimestamp,
726
) -> FilesystemResult<()> {
727
let dir = store.get_dir(&fd)?;
728
let atim = systemtimespec_from(data_access_timestamp)?;
729
let mtim = systemtimespec_from(data_modification_timestamp)?;
730
dir.set_times_at(path_flags.into(), path, atim, mtim)
731
.await?;
732
Ok(())
733
}
734
735
async fn link_at<U>(
736
store: &Accessor<U, Self>,
737
fd: Resource<Descriptor>,
738
old_path_flags: PathFlags,
739
old_path: String,
740
new_fd: Resource<Descriptor>,
741
new_path: String,
742
) -> FilesystemResult<()> {
743
let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
744
old_dir
745
.link_at(old_path_flags.into(), old_path, &new_dir, new_path)
746
.await?;
747
Ok(())
748
}
749
750
async fn open_at<U>(
751
store: &Accessor<U, Self>,
752
fd: Resource<Descriptor>,
753
path_flags: PathFlags,
754
path: String,
755
open_flags: OpenFlags,
756
flags: DescriptorFlags,
757
) -> FilesystemResult<Resource<Descriptor>> {
758
let (allow_blocking_current_thread, dir) = store.with(|mut store| {
759
let store = store.get();
760
let dir = get_dir(&store.table, &fd)?;
761
FilesystemResult::Ok((store.ctx.allow_blocking_current_thread, dir.clone()))
762
})?;
763
let fd = dir
764
.open_at(
765
path_flags.into(),
766
path,
767
open_flags.into(),
768
flags.into(),
769
allow_blocking_current_thread,
770
)
771
.await?;
772
let fd = store.with(|mut store| store.get().table.push(fd))?;
773
Ok(fd)
774
}
775
776
async fn readlink_at<U>(
777
store: &Accessor<U, Self>,
778
fd: Resource<Descriptor>,
779
path: String,
780
) -> FilesystemResult<String> {
781
let dir = store.get_dir(&fd)?;
782
let path = dir.readlink_at(path).await?;
783
Ok(path)
784
}
785
786
async fn remove_directory_at<U>(
787
store: &Accessor<U, Self>,
788
fd: Resource<Descriptor>,
789
path: String,
790
) -> FilesystemResult<()> {
791
let dir = store.get_dir(&fd)?;
792
dir.remove_directory_at(path).await?;
793
Ok(())
794
}
795
796
async fn rename_at<U>(
797
store: &Accessor<U, Self>,
798
fd: Resource<Descriptor>,
799
old_path: String,
800
new_fd: Resource<Descriptor>,
801
new_path: String,
802
) -> FilesystemResult<()> {
803
let (old_dir, new_dir) = store.get_dir_pair(&fd, &new_fd)?;
804
old_dir.rename_at(old_path, &new_dir, new_path).await?;
805
Ok(())
806
}
807
808
async fn symlink_at<U>(
809
store: &Accessor<U, Self>,
810
fd: Resource<Descriptor>,
811
old_path: String,
812
new_path: String,
813
) -> FilesystemResult<()> {
814
let dir = store.get_dir(&fd)?;
815
dir.symlink_at(old_path, new_path).await?;
816
Ok(())
817
}
818
819
async fn unlink_file_at<U>(
820
store: &Accessor<U, Self>,
821
fd: Resource<Descriptor>,
822
path: String,
823
) -> FilesystemResult<()> {
824
let dir = store.get_dir(&fd)?;
825
dir.unlink_file_at(path).await?;
826
Ok(())
827
}
828
829
async fn is_same_object<U>(
830
store: &Accessor<U, Self>,
831
fd: Resource<Descriptor>,
832
other: Resource<Descriptor>,
833
) -> wasmtime::Result<bool> {
834
let (fd, other) = store.with(|mut store| {
835
let table = store.get().table;
836
let fd = get_descriptor(table, &fd)?.clone();
837
let other = get_descriptor(table, &other)?.clone();
838
anyhow::Ok((fd, other))
839
})?;
840
fd.is_same_object(&other).await
841
}
842
843
async fn metadata_hash<U>(
844
store: &Accessor<U, Self>,
845
fd: Resource<Descriptor>,
846
) -> FilesystemResult<MetadataHashValue> {
847
let fd = store.get_descriptor(&fd)?;
848
let meta = fd.metadata_hash().await?;
849
Ok(meta.into())
850
}
851
852
async fn metadata_hash_at<U>(
853
store: &Accessor<U, Self>,
854
fd: Resource<Descriptor>,
855
path_flags: PathFlags,
856
path: String,
857
) -> FilesystemResult<MetadataHashValue> {
858
let dir = store.get_dir(&fd)?;
859
let meta = dir.metadata_hash_at(path_flags.into(), path).await?;
860
Ok(meta.into())
861
}
862
}
863
864
impl types::HostDescriptor for WasiFilesystemCtxView<'_> {
865
fn drop(&mut self, fd: Resource<Descriptor>) -> wasmtime::Result<()> {
866
self.table
867
.delete(fd)
868
.context("failed to delete descriptor resource from table")?;
869
Ok(())
870
}
871
}
872
873
impl preopens::Host for WasiFilesystemCtxView<'_> {
874
fn get_directories(&mut self) -> wasmtime::Result<Vec<(Resource<Descriptor>, String)>> {
875
self.get_directories()
876
}
877
}
878
879