Path: blob/main/crates/wasi/src/p2/filesystem.rs
1692 views
use crate::TrappableError;1use crate::filesystem::File;2use crate::p2::bindings::filesystem::types;3use crate::p2::{InputStream, OutputStream, Pollable, StreamError, StreamResult};4use crate::runtime::AbortOnDropJoinHandle;5use anyhow::anyhow;6use bytes::{Bytes, BytesMut};7use std::io;8use std::mem;910pub type FsResult<T> = Result<T, FsError>;1112pub type FsError = TrappableError<types::ErrorCode>;1314impl From<crate::filesystem::ErrorCode> for types::ErrorCode {15fn from(error: crate::filesystem::ErrorCode) -> Self {16match error {17crate::filesystem::ErrorCode::Access => Self::Access,18crate::filesystem::ErrorCode::Already => Self::Already,19crate::filesystem::ErrorCode::BadDescriptor => Self::BadDescriptor,20crate::filesystem::ErrorCode::Busy => Self::Busy,21crate::filesystem::ErrorCode::Exist => Self::Exist,22crate::filesystem::ErrorCode::FileTooLarge => Self::FileTooLarge,23crate::filesystem::ErrorCode::IllegalByteSequence => Self::IllegalByteSequence,24crate::filesystem::ErrorCode::InProgress => Self::InProgress,25crate::filesystem::ErrorCode::Interrupted => Self::Interrupted,26crate::filesystem::ErrorCode::Invalid => Self::Invalid,27crate::filesystem::ErrorCode::Io => Self::Io,28crate::filesystem::ErrorCode::IsDirectory => Self::IsDirectory,29crate::filesystem::ErrorCode::Loop => Self::Loop,30crate::filesystem::ErrorCode::TooManyLinks => Self::TooManyLinks,31crate::filesystem::ErrorCode::NameTooLong => Self::NameTooLong,32crate::filesystem::ErrorCode::NoEntry => Self::NoEntry,33crate::filesystem::ErrorCode::InsufficientMemory => Self::InsufficientMemory,34crate::filesystem::ErrorCode::InsufficientSpace => Self::InsufficientSpace,35crate::filesystem::ErrorCode::NotDirectory => Self::NotDirectory,36crate::filesystem::ErrorCode::NotEmpty => Self::NotEmpty,37crate::filesystem::ErrorCode::Unsupported => Self::Unsupported,38crate::filesystem::ErrorCode::Overflow => Self::Overflow,39crate::filesystem::ErrorCode::NotPermitted => Self::NotPermitted,40crate::filesystem::ErrorCode::Pipe => Self::Pipe,41crate::filesystem::ErrorCode::InvalidSeek => Self::InvalidSeek,42}43}44}4546impl From<crate::filesystem::ErrorCode> for FsError {47fn from(error: crate::filesystem::ErrorCode) -> Self {48types::ErrorCode::from(error).into()49}50}5152impl From<wasmtime::component::ResourceTableError> for FsError {53fn from(error: wasmtime::component::ResourceTableError) -> Self {54Self::trap(error)55}56}5758impl From<io::Error> for FsError {59fn from(error: io::Error) -> Self {60types::ErrorCode::from(error).into()61}62}6364pub struct FileInputStream {65file: File,66position: u64,67state: ReadState,68}69enum ReadState {70Idle,71Waiting(AbortOnDropJoinHandle<ReadState>),72DataAvailable(Bytes),73Error(io::Error),74Closed,75}76impl FileInputStream {77pub fn new(file: &File, position: u64) -> Self {78Self {79file: file.clone(),80position,81state: ReadState::Idle,82}83}8485fn blocking_read(file: &cap_std::fs::File, offset: u64, size: usize) -> ReadState {86use system_interface::fs::FileIoExt;8788let mut buf = BytesMut::zeroed(size);89loop {90match file.read_at(&mut buf, offset) {91Ok(0) => return ReadState::Closed,92Ok(n) => {93buf.truncate(n);94return ReadState::DataAvailable(buf.freeze());95}96Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {97// Try again, continue looping98}99Err(e) => return ReadState::Error(e),100}101}102}103104/// Wait for existing background task to finish, without starting any new background reads.105async fn wait_ready(&mut self) {106match &mut self.state {107ReadState::Waiting(task) => {108self.state = task.await;109}110_ => {}111}112}113}114#[async_trait::async_trait]115impl InputStream for FileInputStream {116fn read(&mut self, size: usize) -> StreamResult<Bytes> {117match &mut self.state {118ReadState::Idle => {119if size == 0 {120return Ok(Bytes::new());121}122123let p = self.position;124self.state = ReadState::Waiting(125self.file126.spawn_blocking(move |f| Self::blocking_read(f, p, size)),127);128Ok(Bytes::new())129}130ReadState::DataAvailable(b) => {131let min_len = b.len().min(size);132let chunk = b.split_to(min_len);133if b.len() == 0 {134self.state = ReadState::Idle;135}136self.position += min_len as u64;137Ok(chunk)138}139ReadState::Waiting(_) => Ok(Bytes::new()),140ReadState::Error(_) => match mem::replace(&mut self.state, ReadState::Closed) {141ReadState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),142_ => unreachable!(),143},144ReadState::Closed => Err(StreamError::Closed),145}146}147/// Specialized blocking_* variant to bypass tokio's task spawning & joining148/// overhead on synchronous file I/O.149async fn blocking_read(&mut self, size: usize) -> StreamResult<Bytes> {150self.wait_ready().await;151152// Before we defer to the regular `read`, make sure it has data ready to go:153if let ReadState::Idle = self.state {154let p = self.position;155self.state = self156.file157.run_blocking(move |f| Self::blocking_read(f, p, size))158.await;159}160161self.read(size)162}163async fn cancel(&mut self) {164match mem::replace(&mut self.state, ReadState::Closed) {165ReadState::Waiting(task) => {166// The task was created using `spawn_blocking`, so unless we're167// lucky enough that the task hasn't started yet, the abort168// signal won't have any effect and we're forced to wait for it169// to run to completion.170// From the guest's point of view, `input-stream::drop` then171// appears to block. Certainly less than ideal, but arguably still172// better than letting the guest rack up an unbounded number of173// background tasks. Also, the guest is only blocked if174// the stream was dropped mid-read, which we don't expect to175// occur frequently.176task.cancel().await;177}178_ => {}179}180}181}182#[async_trait::async_trait]183impl Pollable for FileInputStream {184async fn ready(&mut self) {185if let ReadState::Idle = self.state {186// The guest hasn't initiated any read, but is nonetheless waiting187// for data to be available. We'll start a read for them:188189const DEFAULT_READ_SIZE: usize = 4096;190let p = self.position;191self.state = ReadState::Waiting(192self.file193.spawn_blocking(move |f| Self::blocking_read(f, p, DEFAULT_READ_SIZE)),194);195}196197self.wait_ready().await198}199}200201#[derive(Clone, Copy)]202pub(crate) enum FileOutputMode {203Position(u64),204Append,205}206207pub(crate) struct FileOutputStream {208file: File,209mode: FileOutputMode,210state: OutputState,211}212213enum OutputState {214Ready,215/// Allows join future to be awaited in a cancellable manner. Gone variant indicates216/// no task is currently outstanding.217Waiting(AbortOnDropJoinHandle<io::Result<usize>>),218/// The last I/O operation failed with this error.219Error(io::Error),220Closed,221}222223impl FileOutputStream {224pub fn write_at(file: &File, position: u64) -> Self {225Self {226file: file.clone(),227mode: FileOutputMode::Position(position),228state: OutputState::Ready,229}230}231232pub fn append(file: &File) -> Self {233Self {234file: file.clone(),235mode: FileOutputMode::Append,236state: OutputState::Ready,237}238}239240fn blocking_write(241file: &cap_std::fs::File,242mut buf: Bytes,243mode: FileOutputMode,244) -> io::Result<usize> {245use system_interface::fs::FileIoExt;246247match mode {248FileOutputMode::Position(mut p) => {249let mut total = 0;250loop {251let nwritten = file.write_at(buf.as_ref(), p)?;252// afterwards buf contains [nwritten, len):253let _ = buf.split_to(nwritten);254p += nwritten as u64;255total += nwritten;256if buf.is_empty() {257break;258}259}260Ok(total)261}262FileOutputMode::Append => {263let mut total = 0;264loop {265let nwritten = file.append(buf.as_ref())?;266let _ = buf.split_to(nwritten);267total += nwritten;268if buf.is_empty() {269break;270}271}272Ok(total)273}274}275}276}277278// FIXME: configurable? determine from how much space left in file?279const FILE_WRITE_CAPACITY: usize = 1024 * 1024;280281#[async_trait::async_trait]282impl OutputStream for FileOutputStream {283fn write(&mut self, buf: Bytes) -> Result<(), StreamError> {284match self.state {285OutputState::Ready => {}286OutputState::Closed => return Err(StreamError::Closed),287OutputState::Waiting(_) | OutputState::Error(_) => {288// a write is pending - this call was not permitted289return Err(StreamError::Trap(anyhow!(290"write not permitted: check_write not called first"291)));292}293}294295let m = self.mode;296self.state = OutputState::Waiting(297self.file298.spawn_blocking(move |f| Self::blocking_write(f, buf, m)),299);300Ok(())301}302/// Specialized blocking_* variant to bypass tokio's task spawning & joining303/// overhead on synchronous file I/O.304async fn blocking_write_and_flush(&mut self, buf: Bytes) -> StreamResult<()> {305self.ready().await;306307match self.state {308OutputState::Ready => {}309OutputState::Closed => return Err(StreamError::Closed),310OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {311OutputState::Error(e) => return Err(StreamError::LastOperationFailed(e.into())),312_ => unreachable!(),313},314OutputState::Waiting(_) => unreachable!("we've just waited for readiness"),315}316317let m = self.mode;318match self319.file320.run_blocking(move |f| Self::blocking_write(f, buf, m))321.await322{323Ok(nwritten) => {324if let FileOutputMode::Position(p) = &mut self.mode {325*p += nwritten as u64;326}327self.state = OutputState::Ready;328Ok(())329}330Err(e) => {331self.state = OutputState::Closed;332Err(StreamError::LastOperationFailed(e.into()))333}334}335}336fn flush(&mut self) -> Result<(), StreamError> {337match self.state {338// Only userland buffering of file writes is in the blocking task,339// so there's nothing extra that needs to be done to request a340// flush.341OutputState::Ready | OutputState::Waiting(_) => Ok(()),342OutputState::Closed => Err(StreamError::Closed),343OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {344OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),345_ => unreachable!(),346},347}348}349fn check_write(&mut self) -> Result<usize, StreamError> {350match self.state {351OutputState::Ready => Ok(FILE_WRITE_CAPACITY),352OutputState::Closed => Err(StreamError::Closed),353OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) {354OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())),355_ => unreachable!(),356},357OutputState::Waiting(_) => Ok(0),358}359}360async fn cancel(&mut self) {361match mem::replace(&mut self.state, OutputState::Closed) {362OutputState::Waiting(task) => {363// The task was created using `spawn_blocking`, so unless we're364// lucky enough that the task hasn't started yet, the abort365// signal won't have any effect and we're forced to wait for it366// to run to completion.367// From the guest's point of view, `output-stream::drop` then368// appears to block. Certainly less than ideal, but arguably still369// better than letting the guest rack up an unbounded number of370// background tasks. Also, the guest is only blocked if371// the stream was dropped mid-write, which we don't expect to372// occur frequently.373task.cancel().await;374}375_ => {}376}377}378}379380#[async_trait::async_trait]381impl Pollable for FileOutputStream {382async fn ready(&mut self) {383if let OutputState::Waiting(task) = &mut self.state {384self.state = match task.await {385Ok(nwritten) => {386if let FileOutputMode::Position(p) = &mut self.mode {387*p += nwritten as u64;388}389OutputState::Ready390}391Err(e) => OutputState::Error(e),392};393}394}395}396397pub struct ReaddirIterator(398std::sync::Mutex<Box<dyn Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static>>,399);400401impl ReaddirIterator {402pub(crate) fn new(403i: impl Iterator<Item = FsResult<types::DirectoryEntry>> + Send + 'static,404) -> Self {405ReaddirIterator(std::sync::Mutex::new(Box::new(i)))406}407pub(crate) fn next(&self) -> FsResult<Option<types::DirectoryEntry>> {408self.0.lock().unwrap().next().transpose()409}410}411412impl IntoIterator for ReaddirIterator {413type Item = FsResult<types::DirectoryEntry>;414type IntoIter = Box<dyn Iterator<Item = Self::Item> + Send>;415416fn into_iter(self) -> Self::IntoIter {417self.0.into_inner().unwrap()418}419}420421422