Path: blob/main/cros_async/src/sys/windows/tokio_source.rs
5394 views
// Copyright 2022 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::fs::File;5use std::io;6use std::io::Read;7use std::io::Seek;8use std::io::SeekFrom;9use std::io::Write;10use std::mem::ManuallyDrop;11use std::sync::Arc;1213use base::AsRawDescriptor;14use base::FileReadWriteAtVolatile;15use base::FileReadWriteVolatile;16use base::FromRawDescriptor;17use base::PunchHole;18use base::VolatileSlice;19use base::WriteZeroesAt;20use smallvec::SmallVec;21use sync::Mutex;2223use crate::mem::MemRegion;24use crate::AsyncError;25use crate::AsyncResult;26use crate::BackingMemory;2728#[derive(Debug, thiserror::Error)]29pub enum Error {30#[error("An error occurred trying to seek: {0}.")]31IoSeekError(io::Error),32#[error("An error occurred trying to read: {0}.")]33IoReadError(io::Error),34#[error("An error occurred trying to write: {0}.")]35IoWriteError(io::Error),36#[error("An error occurred trying to flush: {0}.")]37IoFlushError(io::Error),38#[error("An error occurred trying to punch hole: {0}.")]39IoPunchHoleError(io::Error),40#[error("An error occurred trying to write zeroes: {0}.")]41IoWriteZeroesError(io::Error),42#[error("Failed to join task: '{0}'")]43Join(tokio::task::JoinError),44#[error("An error occurred trying to duplicate source handles: {0}.")]45HandleDuplicationFailed(io::Error),46#[error("An error occurred trying to wait on source handles: {0}.")]47HandleWaitFailed(base::Error),48#[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]49BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),50#[error("TokioSource is gone, so no handles are available to fulfill the IO request.")]51NoTokioSource,52#[error("Operation on TokioSource is cancelled.")]53OperationCancelled,54#[error("Operation on TokioSource was aborted (unexpected).")]55OperationAborted,56}5758impl From<Error> for AsyncError {59fn from(e: Error) -> AsyncError {60AsyncError::SysVariants(e.into())61}62}6364impl From<Error> for io::Error {65fn from(e: Error) -> Self {66use Error::*;67match e {68IoSeekError(e) => e,69IoReadError(e) => e,70IoWriteError(e) => e,71IoFlushError(e) => e,72IoPunchHoleError(e) => e,73IoWriteZeroesError(e) => e,74Join(e) => io::Error::new(io::ErrorKind::Other, e),75HandleDuplicationFailed(e) => e,76HandleWaitFailed(e) => e.into(),77BackingMemoryVolatileSliceFetchFailed(e) => io::Error::new(io::ErrorKind::Other, e),78NoTokioSource => io::Error::new(io::ErrorKind::Other, NoTokioSource),79OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),80OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),81}82}83}8485pub type Result<T> = std::result::Result<T, Error>;8687pub struct TokioSource<T: AsRawDescriptor> {88source: Option<T>,89source_file: Arc<Mutex<Option<ManuallyDrop<File>>>>,90runtime: tokio::runtime::Handle,91}9293impl<T: AsRawDescriptor> TokioSource<T> {94pub(crate) fn new(source: T, runtime: tokio::runtime::Handle) -> Result<TokioSource<T>> {95let descriptor = source.as_raw_descriptor();96// SAFETY: The Drop implementation makes sure `source` outlives `source_file`.97let source_file = unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor)) };98Ok(Self {99source: Some(source),100source_file: Arc::new(Mutex::new(Some(source_file))),101runtime,102})103}104#[inline]105fn get_slices(106mem: &Arc<dyn BackingMemory + Send + Sync>,107mem_offsets: Vec<MemRegion>,108) -> Result<SmallVec<[VolatileSlice<'_>; 16]>> {109mem_offsets110.into_iter()111.map(|region| {112mem.get_volatile_slice(region)113.map_err(Error::BackingMemoryVolatileSliceFetchFailed)114})115.collect::<Result<SmallVec<[VolatileSlice; 16]>>>()116}117pub fn as_source(&self) -> &T {118self.source.as_ref().unwrap()119}120pub fn as_source_mut(&mut self) -> &mut T {121self.source.as_mut().unwrap()122}123pub async fn fdatasync(&self) -> AsyncResult<()> {124// TODO(b/282003931): Fall back to regular fsync.125self.fsync().await126}127pub async fn fsync(&self) -> AsyncResult<()> {128let source_file = self.source_file.clone();129Ok(self130.runtime131.spawn_blocking(move || {132source_file133.lock()134.as_mut()135.ok_or(Error::OperationCancelled)?136.flush()137.map_err(Error::IoFlushError)138})139.await140.map_err(Error::Join)??)141}142pub fn into_source(mut self) -> T {143self.source_file.lock().take();144self.source.take().unwrap()145}146pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {147let source_file = self.source_file.clone();148Ok(self149.runtime150.spawn_blocking(move || {151source_file152.lock()153.as_mut()154.ok_or(Error::OperationCancelled)?155.punch_hole(file_offset, len)156.map_err(Error::IoPunchHoleError)157})158.await159.map_err(Error::Join)??)160}161pub async fn read_to_mem(162&self,163file_offset: Option<u64>,164mem: Arc<dyn BackingMemory + Send + Sync>,165mem_offsets: impl IntoIterator<Item = MemRegion>,166) -> AsyncResult<usize> {167let mem_offsets = mem_offsets.into_iter().collect();168let source_file = self.source_file.clone();169Ok(self170.runtime171.spawn_blocking(move || {172let mut file_lock = source_file.lock();173let file = file_lock.as_mut().ok_or(Error::OperationCancelled)?;174let memory_slices = Self::get_slices(&mem, mem_offsets)?;175match file_offset {176Some(file_offset) => file177.read_vectored_at_volatile(memory_slices.as_slice(), file_offset)178.map_err(Error::IoReadError),179None => file180.read_vectored_volatile(memory_slices.as_slice())181.map_err(Error::IoReadError),182}183})184.await185.map_err(Error::Join)??)186}187pub async fn read_to_vec(188&self,189file_offset: Option<u64>,190mut vec: Vec<u8>,191) -> AsyncResult<(usize, Vec<u8>)> {192let source_file = self.source_file.clone();193Ok(self194.runtime195.spawn_blocking(move || {196let mut file_lock = source_file.lock();197let file = file_lock.as_mut().ok_or(Error::OperationCancelled)?;198if let Some(file_offset) = file_offset {199file.seek(SeekFrom::Start(file_offset))200.map_err(Error::IoSeekError)?;201}202Ok::<(usize, Vec<u8>), Error>((203file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,204vec,205))206})207.await208.map_err(Error::Join)??)209}210pub async fn wait_readable(&self) -> AsyncResult<()> {211unimplemented!();212}213pub async fn wait_for_handle(&self) -> AsyncResult<()> {214base::sys::windows::async_wait_for_single_object(self.source.as_ref().unwrap()).await?;215Ok(())216}217pub async fn write_from_mem(218&self,219file_offset: Option<u64>,220mem: Arc<dyn BackingMemory + Send + Sync>,221mem_offsets: impl IntoIterator<Item = MemRegion>,222) -> AsyncResult<usize> {223let mem_offsets = mem_offsets.into_iter().collect();224let source_file = self.source_file.clone();225Ok(self226.runtime227.spawn_blocking(move || {228let mut file_lock = source_file.lock();229let file = file_lock.as_mut().ok_or(Error::OperationCancelled)?;230let memory_slices = Self::get_slices(&mem, mem_offsets)?;231match file_offset {232Some(file_offset) => file233.write_vectored_at_volatile(memory_slices.as_slice(), file_offset)234.map_err(Error::IoWriteError),235None => file236.write_vectored_volatile(memory_slices.as_slice())237.map_err(Error::IoWriteError),238}239})240.await241.map_err(Error::Join)??)242}243pub async fn write_from_vec(244&self,245file_offset: Option<u64>,246vec: Vec<u8>,247) -> AsyncResult<(usize, Vec<u8>)> {248let source_file = self.source_file.clone();249Ok(self250.runtime251.spawn_blocking(move || {252let mut file_lock = source_file.lock();253let file = file_lock.as_mut().ok_or(Error::OperationCancelled)?;254if let Some(file_offset) = file_offset {255file.seek(SeekFrom::Start(file_offset))256.map_err(Error::IoSeekError)?;257}258Ok::<(usize, Vec<u8>), Error>((259file.write(vec.as_slice()).map_err(Error::IoWriteError)?,260vec,261))262})263.await264.map_err(Error::Join)??)265}266pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {267let source_file = self.source_file.clone();268Ok(self269.runtime270.spawn_blocking(move || {271// ZeroRange calls `punch_hole` which doesn't extend the File size if it needs to.272// Will fix if it becomes a problem.273source_file274.lock()275.as_mut()276.ok_or(Error::OperationCancelled)?277.write_zeroes_at(file_offset, len as usize)278.map_err(Error::IoWriteZeroesError)279.map(|_| ())280})281.await282.map_err(Error::Join)??)283}284}285impl<T: AsRawDescriptor> Drop for TokioSource<T> {286fn drop(&mut self) {287let mut source_file = self.source_file.lock();288source_file.take();289}290}291292293