Path: blob/main/cros_async/src/sys/windows/handle_source.rs
5394 views
// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::fs::File;5use std::io;6use std::io::Read;7use std::io::Seek;8use std::io::SeekFrom;9use std::io::Write;10use std::mem::ManuallyDrop;11use std::ptr::null_mut;12use std::sync::Arc;13use std::time::Duration;1415use base::error;16use base::warn;17use base::AsRawDescriptor;18use base::Descriptor;19use base::Error as SysUtilError;20use base::FileReadWriteAtVolatile;21use base::FileReadWriteVolatile;22use base::FromRawDescriptor;23use base::PunchHole;24use base::VolatileSlice;25use base::WriteZeroesAt;26use smallvec::SmallVec;27use sync::Mutex;28use thiserror::Error as ThisError;29use winapi::um::ioapiset::CancelIoEx;3031use crate::mem::BackingMemory;32use crate::mem::MemRegion;33use crate::AsyncError;34use crate::AsyncResult;35use crate::CancellableBlockingPool;3637#[derive(ThisError, Debug)]38pub enum Error {39#[error("An error occurred trying to seek: {0}.")]40IoSeekError(io::Error),41#[error("An error occurred trying to read: {0}.")]42IoReadError(io::Error),43#[error("An error occurred trying to write: {0}.")]44IoWriteError(io::Error),45#[error("An error occurred trying to flush: {0}.")]46IoFlushError(io::Error),47#[error("An error occurred trying to punch hole: {0}.")]48IoPunchHoleError(io::Error),49#[error("An error occurred trying to write zeroes: {0}.")]50IoWriteZeroesError(io::Error),51#[error("An error occurred trying to duplicate source handles: {0}.")]52HandleDuplicationFailed(io::Error),53#[error("An error occurred trying to wait on source handles: {0}.")]54HandleWaitFailed(io::Error),55#[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]56BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),57#[error("HandleSource is gone, so no handles are available to fulfill the IO request.")]58NoHandleSource,59#[error("Operation on HandleSource is cancelled.")]60OperationCancelled,61#[error("Operation on HandleSource was aborted (unexpected).")]62OperationAborted,63}6465impl From<Error> for io::Error {66fn from(e: Error) -> Self {67use Error::*;68match e {69IoSeekError(e) => e,70IoReadError(e) => e,71IoWriteError(e) => e,72IoFlushError(e) => e,73IoPunchHoleError(e) => e,74IoWriteZeroesError(e) => e,75HandleDuplicationFailed(e) => e,76HandleWaitFailed(e) => e,77BackingMemoryVolatileSliceFetchFailed(e) => io::Error::other(e),78NoHandleSource => io::Error::other(NoHandleSource),79OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),80OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),81}82}83}8485impl From<Error> for AsyncError {86fn from(e: Error) -> AsyncError {87AsyncError::SysVariants(e.into())88}89}9091pub type Result<T> = std::result::Result<T, Error>;9293/// Used to shutdown IO running on a CancellableBlockingPool.94pub struct HandleWrapper {95handle: Descriptor,96}9798impl HandleWrapper {99pub fn new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>> {100Arc::new(Mutex::new(Self { handle }))101}102103pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {104// There isn't much we can do if cancel fails.105// SAFETY: trivially safe106if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 {107warn!(108"Cancel IO for handle:{:?} failed with {}",109self.handle.as_raw_descriptor(),110SysUtilError::last()111);112}113ret114}115}116117/// Async IO source for Windows, such as a file.118pub struct HandleSource<F: AsRawDescriptor> {119source: F,120source_descriptor: Descriptor,121blocking_pool: CancellableBlockingPool,122}123124impl<F: AsRawDescriptor> HandleSource<F> {125/// Create a new `HandleSource` from the given IO source.126///127/// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these128/// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.129/// Long term, we may migrate away from this approach toward IOCP or overlapped IO.130///131/// WARNING: `source` MUST be a unique file object (e.g. separate handles132/// each created by CreateFile), and point at the same file on disk. This is because IO133/// operations on the HandleSource are randomly distributed to each source.134///135/// # Safety136/// The caller must guarantee that `F`'s handle is compatible with the underlying functions137/// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined138/// by this struct. Note that most winapis will fail with reasonable errors.139pub fn new(source: F) -> Result<Self> {140let source_descriptor = Descriptor(source.as_raw_descriptor());141142Ok(Self {143source,144source_descriptor,145blocking_pool: CancellableBlockingPool::new(146// WARNING: this is a safety requirement! Threads are 1:1 with sources.1471,148Duration::from_secs(10),149),150})151}152153#[inline]154fn get_slices(155mem: &Arc<dyn BackingMemory + Send + Sync>,156mem_offsets: Vec<MemRegion>,157) -> Result<SmallVec<[VolatileSlice<'_>; 16]>> {158mem_offsets159.into_iter()160.map(|region| {161mem.get_volatile_slice(region)162.map_err(Error::BackingMemoryVolatileSliceFetchFailed)163})164.collect::<Result<SmallVec<[VolatileSlice; 16]>>>()165}166}167168fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File> {169// SAFETY: trivially safe170// Safe because all callers must exit *before* these handles will be closed (guaranteed by171// HandleSource's Drop impl.).172unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) }173}174175impl<F: AsRawDescriptor> HandleSource<F> {176/// Reads from the iosource at `file_offset` and fill the given `vec`.177pub async fn read_to_vec(178&self,179file_offset: Option<u64>,180mut vec: Vec<u8>,181) -> AsyncResult<(usize, Vec<u8>)> {182let handles = HandleWrapper::new(self.source_descriptor);183let descriptors = self.source_descriptor;184185Ok(self186.blocking_pool187.spawn(188move || {189let mut file = get_thread_file(descriptors);190if let Some(file_offset) = file_offset {191file.seek(SeekFrom::Start(file_offset))192.map_err(Error::IoSeekError)?;193}194Ok((195file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,196vec,197))198},199move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),200)201.await?)202}203204/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.205pub async fn read_to_mem(206&self,207file_offset: Option<u64>,208mem: Arc<dyn BackingMemory + Send + Sync>,209mem_offsets: impl IntoIterator<Item = MemRegion>,210) -> AsyncResult<usize> {211let mem_offsets = mem_offsets.into_iter().collect();212let handles = HandleWrapper::new(self.source_descriptor);213let descriptors = self.source_descriptor;214215Ok(self216.blocking_pool217.spawn(218move || {219let mut file = get_thread_file(descriptors);220let memory_slices = Self::get_slices(&mem, mem_offsets)?;221222match file_offset {223Some(file_offset) => file224.read_vectored_at_volatile(memory_slices.as_slice(), file_offset)225.map_err(Error::IoReadError),226None => file227.read_vectored_volatile(memory_slices.as_slice())228.map_err(Error::IoReadError),229}230},231move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),232)233.await?)234}235236/// Wait for the handle of `self` to be readable.237pub async fn wait_readable(&self) -> AsyncResult<()> {238unimplemented!()239}240241/// Reads a single u64 from the current offset.242pub async fn read_u64(&self) -> AsyncResult<u64> {243unimplemented!()244}245246/// Writes from the given `vec` to the file starting at `file_offset`.247pub async fn write_from_vec(248&self,249file_offset: Option<u64>,250vec: Vec<u8>,251) -> AsyncResult<(usize, Vec<u8>)> {252let handles = HandleWrapper::new(self.source_descriptor);253let descriptors = self.source_descriptor;254255Ok(self256.blocking_pool257.spawn(258move || {259let mut file = get_thread_file(descriptors);260if let Some(file_offset) = file_offset {261file.seek(SeekFrom::Start(file_offset))262.map_err(Error::IoSeekError)?;263}264Ok((265file.write(vec.as_slice()).map_err(Error::IoWriteError)?,266vec,267))268},269move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),270)271.await?)272}273274/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.275pub async fn write_from_mem(276&self,277file_offset: Option<u64>,278mem: Arc<dyn BackingMemory + Send + Sync>,279mem_offsets: impl IntoIterator<Item = MemRegion>,280) -> AsyncResult<usize> {281let mem_offsets = mem_offsets.into_iter().collect();282let handles = HandleWrapper::new(self.source_descriptor);283let descriptors = self.source_descriptor;284285Ok(self286.blocking_pool287.spawn(288move || {289let mut file = get_thread_file(descriptors);290let memory_slices = Self::get_slices(&mem, mem_offsets)?;291292match file_offset {293Some(file_offset) => file294.write_vectored_at_volatile(memory_slices.as_slice(), file_offset)295.map_err(Error::IoWriteError),296None => file297.write_vectored_volatile(memory_slices.as_slice())298.map_err(Error::IoWriteError),299}300},301move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),302)303.await?)304}305306/// Deallocates the given range of a file.307pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {308let handles = HandleWrapper::new(self.source_descriptor);309let descriptors = self.source_descriptor;310Ok(self311.blocking_pool312.spawn(313move || {314let file = get_thread_file(descriptors);315file.punch_hole(file_offset, len)316.map_err(Error::IoPunchHoleError)?;317Ok(())318},319move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),320)321.await?)322}323324/// Fills the given range with zeroes.325pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {326let handles = HandleWrapper::new(self.source_descriptor);327let descriptors = self.source_descriptor;328Ok(self329.blocking_pool330.spawn(331move || {332let file = get_thread_file(descriptors);333// ZeroRange calls `punch_hole` which doesn't extend the File size if it needs334// to. Will fix if it becomes a problem.335file.write_zeroes_at(file_offset, len as usize)336.map_err(Error::IoWriteZeroesError)?;337Ok(())338},339move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),340)341.await?)342}343344/// Sync all completed write operations to the backing storage.345pub async fn fsync(&self) -> AsyncResult<()> {346let handles = HandleWrapper::new(self.source_descriptor);347let descriptors = self.source_descriptor;348349Ok(self350.blocking_pool351.spawn(352move || {353let mut file = get_thread_file(descriptors);354file.flush().map_err(Error::IoFlushError)355},356move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),357)358.await?)359}360361/// Sync all data of completed write operations to the backing storage. Currently, the362/// implementation is equivalent to fsync.363pub async fn fdatasync(&self) -> AsyncResult<()> {364// TODO(b/282003931): Fall back to regular fsync.365self.fsync().await366}367368/// Yields the underlying IO source.369pub fn into_source(self) -> F {370self.source371}372373/// Provides a mutable ref to the underlying IO source.374pub fn as_source_mut(&mut self) -> &mut F {375&mut self.source376}377378/// Provides a ref to the underlying IO source.379///380/// If sources are not interchangeable, behavior is undefined.381pub fn as_source(&self) -> &F {382&self.source383}384385/// If sources are not interchangeable, behavior is undefined.386pub async fn wait_for_handle(&self) -> AsyncResult<()> {387base::sys::windows::async_wait_for_single_object(&self.source)388.await389.map_err(Error::HandleWaitFailed)?;390Ok(())391}392}393394// NOTE: Prefer adding tests to io_source.rs if not backend specific.395#[cfg(test)]396mod tests {397use std::fs;398399use tempfile::NamedTempFile;400401use super::super::HandleReactor;402use super::*;403use crate::common_executor::RawExecutor;404use crate::ExecutorTrait;405406#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]407#[test]408fn test_punch_holes() {409let mut temp_file = NamedTempFile::new().unwrap();410temp_file.write_all("abcdefghijk".as_bytes()).unwrap();411temp_file.flush().unwrap();412temp_file.seek(SeekFrom::Start(0)).unwrap();413414async fn punch_hole(handle_src: &HandleSource<File>) {415let offset = 1;416let len = 3;417handle_src.punch_hole(offset, len).await.unwrap();418}419420let ex = RawExecutor::<HandleReactor>::new().unwrap();421let f = fs::OpenOptions::new()422.write(true)423.open(temp_file.path())424.unwrap();425let handle_src = HandleSource::new(f).unwrap();426ex.run_until(punch_hole(&handle_src)).unwrap();427428let mut buf = vec![0; 11];429temp_file.read_exact(&mut buf).unwrap();430assert_eq!(431std::str::from_utf8(buf.as_slice()).unwrap(),432"a\0\0\0efghijk"433);434}435436/// Test should fail because punch hole should not be allowed to allocate more memory437#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]438#[test]439fn test_punch_holes_fail_out_of_bounds() {440let mut temp_file = NamedTempFile::new().unwrap();441temp_file.write_all("abcdefghijk".as_bytes()).unwrap();442temp_file.flush().unwrap();443temp_file.seek(SeekFrom::Start(0)).unwrap();444445async fn punch_hole(handle_src: &HandleSource<File>) {446let offset = 9;447let len = 4;448handle_src.punch_hole(offset, len).await.unwrap();449}450451let ex = RawExecutor::<HandleReactor>::new().unwrap();452let f = fs::OpenOptions::new()453.write(true)454.open(temp_file.path())455.unwrap();456let handle_src = HandleSource::new(f).unwrap();457ex.run_until(punch_hole(&handle_src)).unwrap();458459let mut buf = vec![0; 13];460assert!(temp_file.read_exact(&mut buf).is_err());461}462463// TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the464// bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.465// #[test]466// fn test_write_zeroes() {467// let mut temp_file = NamedTempFile::new().unwrap();468// temp_file.write("abcdefghijk".as_bytes()).unwrap();469// temp_file.flush().unwrap();470// temp_file.seek(SeekFrom::Start(0)).unwrap();471472// async fn punch_hole(handle_src: &HandleSource<File>) {473// let offset = 9;474// let len = 4;475// handle_src476// .fallocate(offset, len, AllocateMode::ZeroRange)477// .await478// .unwrap();479// }480481// let ex = RawExecutor::<HandleReactor>::new();482// let f = fs::OpenOptions::new()483// .write(true)484// .open(temp_file.path())485// .unwrap();486// let handle_src = HandleSource::new(f).unwrap();487// ex.run_until(punch_hole(&handle_src)).unwrap();488489// let mut buf = vec![0; 13];490// temp_file.read_exact(&mut buf).unwrap();491// assert_eq!(492// std::str::from_utf8(buf.as_slice()).unwrap(),493// "abcdefghi\0\0\0\0"494// );495// }496}497498499