Path: blob/main/cros_async/src/sys/windows/overlapped_source.rs
5394 views
// Copyright 2023 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34//! EXPERIMENTAL overlapped IO based async IO wrapper. Do not use in production.56use std::fs::File;7use std::io;8use std::io::Write;9use std::mem::ManuallyDrop;10use std::sync::Arc;1112use base::error;13use base::AsRawDescriptor;14use base::Descriptor;15use base::FromRawDescriptor;16use base::PunchHole;17use base::RawDescriptor;18use base::WriteZeroesAt;19use thiserror::Error as ThisError;20use winapi::um::minwinbase::OVERLAPPED;2122use crate::common_executor::RawExecutor;23use crate::mem::BackingMemory;24use crate::mem::MemRegion;25use crate::sys::windows::handle_executor::HandleReactor;26use crate::sys::windows::handle_executor::RegisteredOverlappedSource;27use crate::AsyncError;28use crate::AsyncResult;29use crate::BlockingPool;3031#[derive(ThisError, Debug)]32pub enum Error {33#[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]34BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),35#[error("An error occurred trying to seek: {0}.")]36IoSeekError(io::Error),37#[error("An error occurred trying to read: {0}.")]38IoReadError(base::Error),39#[error("An error occurred trying to write: {0}.")]40IoWriteError(base::Error),41#[error("An error occurred trying to flush: {0}.")]42IoFlushError(io::Error),43#[error("An error occurred trying to punch hole: {0}.")]44IoPunchHoleError(io::Error),45#[error("An error occurred trying to write zeroes: {0}.")]46IoWriteZeroesError(io::Error),47#[error("An error occurred trying to duplicate source handles: {0}.")]48HandleDuplicationFailed(io::Error),49#[error("A IO error occurred trying to read: {0}.")]50StdIoReadError(io::Error),51#[error("An IO error occurred trying to write: {0}.")]52StdIoWriteError(io::Error),53}5455impl From<Error> for io::Error {56fn from(e: Error) -> Self {57use Error::*;58match e {59BackingMemoryVolatileSliceFetchFailed(e) => io::Error::other(e),60IoSeekError(e) => e,61IoReadError(e) => e.into(),62IoWriteError(e) => e.into(),63IoFlushError(e) => e,64IoPunchHoleError(e) => e,65IoWriteZeroesError(e) => e,66HandleDuplicationFailed(e) => e,67StdIoReadError(e) => e,68StdIoWriteError(e) => e,69}70}71}7273impl From<Error> for AsyncError {74fn from(e: Error) -> AsyncError {75AsyncError::SysVariants(e.into())76}77}7879pub type Result<T> = std::result::Result<T, Error>;8081/// Async IO source for Windows that uses a multi-threaded, multi-handle approach to provide fast IO82/// operations. It demuxes IO requests across a set of handles that refer to the same underlying IO83/// source, such as a file, and executes those requests across multiple threads. Benchmarks show84/// that this is the fastest method to perform IO on Windows, especially for file reads.85pub struct OverlappedSource<F: AsRawDescriptor> {86blocking_pool: BlockingPool,87reg_source: RegisteredOverlappedSource,88source: F,89seek_forbidden: bool,90}9192impl<F: AsRawDescriptor> OverlappedSource<F> {93/// Create a new `OverlappedSource` from the given IO source. The source MUST be opened in94/// overlapped mode or undefined behavior will result.95///96/// seek_forbidden should be set for non seekable types like named pipes.97pub fn new(98source: F,99ex: &Arc<RawExecutor<HandleReactor>>,100seek_forbidden: bool,101) -> AsyncResult<Self> {102Ok(Self {103blocking_pool: BlockingPool::default(),104reg_source: ex.reactor.register_overlapped_source(ex, &source)?,105source,106seek_forbidden,107})108}109}110111/// SAFETY:112/// Safety requirements:113/// Same as base::windows::read_file.114unsafe fn read(115file: RawDescriptor,116buf: *mut u8,117buf_len: usize,118overlapped: &mut OVERLAPPED,119) -> AsyncResult<()> {120Ok(121base::windows::read_file(&Descriptor(file), buf, buf_len, Some(overlapped))122.map(|_len| ())123.map_err(Error::StdIoReadError)?,124)125}126127/// SAFETY:128/// Safety requirements:129/// Same as base::windows::write_file.130unsafe fn write(131file: RawDescriptor,132buf: *const u8,133buf_len: usize,134overlapped: &mut OVERLAPPED,135) -> AsyncResult<()> {136Ok(137base::windows::write_file(&Descriptor(file), buf, buf_len, Some(overlapped))138.map(|_len| ())139.map_err(Error::StdIoWriteError)?,140)141}142143impl<F: AsRawDescriptor> OverlappedSource<F> {144/// Reads from the iosource at `file_offset` and fill the given `vec`.145pub async fn read_to_vec(146&self,147file_offset: Option<u64>,148mut vec: Vec<u8>,149) -> AsyncResult<(usize, Vec<u8>)> {150if self.seek_forbidden && file_offset.is_some() {151return Err(Error::IoSeekError(io::Error::new(152io::ErrorKind::InvalidInput,153"seek on non-seekable handle",154))155.into());156}157let mut overlapped_op = self.reg_source.register_overlapped_operation(file_offset)?;158159// SAFETY:160// Safe because we pass a pointer to a valid vec and that same vector's length.161unsafe {162read(163self.source.as_raw_descriptor(),164vec.as_mut_ptr(),165vec.len(),166overlapped_op.get_overlapped(),167)?168};169let overlapped_result = overlapped_op.await?;170let bytes_read = overlapped_result.result.map_err(Error::IoReadError)?;171Ok((bytes_read, vec))172}173174/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.175pub async fn read_to_mem(176&self,177file_offset: Option<u64>,178mem: Arc<dyn BackingMemory + Send + Sync>,179mem_offsets: impl IntoIterator<Item = MemRegion>,180) -> AsyncResult<usize> {181let mut total_bytes_read = 0;182let mut offset = match file_offset {183Some(offset) if !self.seek_forbidden => Some(offset),184None if self.seek_forbidden => None,185// For devices that are seekable (files), we have to track the offset otherwise186// subsequent read calls will just read the same bytes into each of the memory regions.187None => Some(0),188_ => {189return Err(Error::IoSeekError(io::Error::new(190io::ErrorKind::InvalidInput,191"seek on non-seekable handle",192))193.into())194}195};196197for region in mem_offsets.into_iter() {198let mut overlapped_op = self.reg_source.register_overlapped_operation(offset)?;199200let slice = mem201.get_volatile_slice(region)202.map_err(Error::BackingMemoryVolatileSliceFetchFailed)?;203204// SAFETY:205// Safe because we're passing a volatile slice (valid ptr), and the size of the memory206// region it refers to.207unsafe {208read(209self.source.as_raw_descriptor(),210slice.as_mut_ptr(),211slice.size(),212overlapped_op.get_overlapped(),213)?214};215let overlapped_result = overlapped_op.await?;216let bytes_read = overlapped_result.result.map_err(Error::IoReadError)?;217offset = offset.map(|offset| offset + bytes_read as u64);218total_bytes_read += bytes_read;219}220Ok(total_bytes_read)221}222223/// Wait for the handle of `self` to be readable.224pub async fn wait_readable(&self) -> AsyncResult<()> {225unimplemented!()226}227228/// Reads a single u64 from the current offset.229pub async fn read_u64(&self) -> AsyncResult<u64> {230unimplemented!()231}232233/// Writes from the given `vec` to the file starting at `file_offset`.234pub async fn write_from_vec(235&self,236file_offset: Option<u64>,237vec: Vec<u8>,238) -> AsyncResult<(usize, Vec<u8>)> {239if self.seek_forbidden && file_offset.is_some() {240return Err(Error::IoSeekError(io::Error::new(241io::ErrorKind::InvalidInput,242"seek on non-seekable handle",243))244.into());245}246let mut overlapped_op = self.reg_source.register_overlapped_operation(file_offset)?;247248// SAFETY:249// Safe because we pass a pointer to a valid vec and that same vector's length.250unsafe {251write(252self.source.as_raw_descriptor(),253vec.as_ptr(),254vec.len(),255overlapped_op.get_overlapped(),256)?257};258259let bytes_written = overlapped_op.await?.result.map_err(Error::IoWriteError)?;260Ok((bytes_written, vec))261}262263/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.264pub async fn write_from_mem(265&self,266file_offset: Option<u64>,267mem: Arc<dyn BackingMemory + Send + Sync>,268mem_offsets: impl IntoIterator<Item = MemRegion>,269) -> AsyncResult<usize> {270let mut total_bytes_written = 0;271let mut offset = match file_offset {272Some(offset) if !self.seek_forbidden => Some(offset),273None if self.seek_forbidden => None,274// For devices that are seekable (files), we have to track the offset otherwise275// subsequent read calls will just read the same bytes into each of the memory regions.276None => Some(0),277_ => {278return Err(Error::IoSeekError(io::Error::new(279io::ErrorKind::InvalidInput,280"seek on non-seekable handle",281))282.into())283}284};285286for region in mem_offsets.into_iter() {287let mut overlapped_op = self.reg_source.register_overlapped_operation(offset)?;288289let slice = mem290.get_volatile_slice(region)291.map_err(Error::BackingMemoryVolatileSliceFetchFailed)?;292293// SAFETY:294// Safe because we're passing a volatile slice (valid ptr), and the size of the memory295// region it refers to.296unsafe {297write(298self.source.as_raw_descriptor(),299slice.as_ptr(),300slice.size(),301overlapped_op.get_overlapped(),302)?303};304let bytes_written = overlapped_op.await?.result.map_err(Error::IoReadError)?;305offset = offset.map(|offset| offset + bytes_written as u64);306total_bytes_written += bytes_written;307}308Ok(total_bytes_written)309}310311/// Deallocates the given range of a file.312///313/// TODO(nkgold): currently this is sync on the executor, which is bad / very hacky. With a314/// little wrapper work, we can make overlapped DeviceIoControl calls instead.315pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {316if self.seek_forbidden {317return Err(Error::IoSeekError(io::Error::new(318io::ErrorKind::InvalidInput,319"fallocate cannot be called on a non-seekable handle",320))321.into());322}323// SAFETY:324// Safe because self.source lives as long as file.325let file = ManuallyDrop::new(unsafe {326File::from_raw_descriptor(self.source.as_raw_descriptor())327});328file.punch_hole(file_offset, len)329.map_err(Error::IoPunchHoleError)?;330Ok(())331}332333/// Fills the given range with zeroes.334///335/// TODO(nkgold): currently this is sync on the executor, which is bad / very hacky. With a336/// little wrapper work, we can make overlapped DeviceIoControl calls instead.337pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {338if self.seek_forbidden {339return Err(Error::IoSeekError(io::Error::new(340io::ErrorKind::InvalidInput,341"write_zeroes_at cannot be called on a non-seekable handle",342))343.into());344}345// SAFETY:346// Safe because self.source lives as long as file.347let file = ManuallyDrop::new(unsafe {348File::from_raw_descriptor(self.source.as_raw_descriptor())349});350// ZeroRange calls `punch_hole` which doesn't extend the File size if it needs to.351// Will fix if it becomes a problem.352file.write_zeroes_at(file_offset, len as usize)353.map_err(Error::IoWriteZeroesError)?;354Ok(())355}356357/// Sync all completed write operations to the backing storage.358pub async fn fsync(&self) -> AsyncResult<()> {359// SAFETY:360// Safe because self.source lives at least as long as the blocking pool thread. Note that361// if the blocking pool stalls and shutdown fails, the thread could outlive the file;362// however, this would mean things are already badly broken and we have a similar risk in363// HandleSource.364let mut file = unsafe {365ManuallyDrop::new(File::from_raw_descriptor(self.source.as_raw_descriptor()))366.try_clone()367.map_err(Error::HandleDuplicationFailed)?368};369370Ok(self371.blocking_pool372.spawn(move || file.flush().map_err(Error::IoFlushError))373.await?)374}375376/// Sync all data of completed write operations to the backing storage. Currently, the377/// implementation is equivalent to fsync.378pub async fn fdatasync(&self) -> AsyncResult<()> {379// TODO(b/282003931): Fall back to regular fsync.380self.fsync().await381}382383/// Yields the underlying IO source.384pub fn into_source(self) -> F {385self.source386}387388/// Provides a mutable ref to the underlying IO source.389pub fn as_source_mut(&mut self) -> &mut F {390&mut self.source391}392393/// Provides a ref to the underlying IO source.394///395/// In the multi-source case, the 0th source will be returned. If sources are not396/// interchangeable, behavior is undefined.397pub fn as_source(&self) -> &F {398&self.source399}400401pub async fn wait_for_handle(&self) -> AsyncResult<()> {402base::sys::windows::async_wait_for_single_object(&self.source)403.await404.map_err(super::handle_source::Error::HandleWaitFailed)?;405Ok(())406}407}408409// NOTE: Prefer adding tests to io_source.rs if not backend specific.410#[cfg(test)]411mod tests {412use std::fs::OpenOptions;413use std::io::Read;414use std::os::windows::fs::OpenOptionsExt;415use std::path::PathBuf;416417use tempfile::TempDir;418use winapi::um::winbase::FILE_FLAG_OVERLAPPED;419420use super::*;421use crate::mem::VecIoWrapper;422use crate::ExecutorTrait;423424fn tempfile_path() -> (PathBuf, TempDir) {425let dir = tempfile::TempDir::new().unwrap();426let mut file_path = PathBuf::from(dir.path());427file_path.push("test");428(file_path, dir)429}430431fn open_overlapped(path: &PathBuf) -> File {432OpenOptions::new()433.read(true)434.write(true)435.custom_flags(FILE_FLAG_OVERLAPPED)436.open(path)437.unwrap()438}439440fn create_overlapped(path: &PathBuf) -> File {441OpenOptions::new()442.create_new(true)443.read(true)444.write(true)445.custom_flags(FILE_FLAG_OVERLAPPED)446.open(path)447.unwrap()448}449450#[test]451fn test_read_vec() {452let (file_path, _tmpdir) = tempfile_path();453std::fs::write(&file_path, "data").unwrap();454455async fn read_data(src: &OverlappedSource<File>) {456let buf: Vec<u8> = vec![0; 4];457let (bytes_read, buf) = src.read_to_vec(Some(0), buf).await.unwrap();458assert_eq!(bytes_read, 4);459assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data");460}461462let ex = RawExecutor::<HandleReactor>::new().unwrap();463let src = OverlappedSource::new(open_overlapped(&file_path), &ex, false).unwrap();464ex.run_until(read_data(&src)).unwrap();465}466467#[test]468fn test_read_mem() {469let (file_path, _tmpdir) = tempfile_path();470std::fs::write(&file_path, "data").unwrap();471472async fn read_data(src: &OverlappedSource<File>) {473let mem = Arc::new(VecIoWrapper::from(vec![0; 4]));474let bytes_read = src475.read_to_mem(476Some(0),477Arc::<VecIoWrapper>::clone(&mem),478[479MemRegion { offset: 0, len: 2 },480MemRegion { offset: 2, len: 2 },481],482)483.await484.unwrap();485assert_eq!(bytes_read, 4);486let vec: Vec<u8> = match Arc::try_unwrap(mem) {487Ok(v) => v.into(),488Err(_) => panic!("Too many vec refs"),489};490assert_eq!(std::str::from_utf8(vec.as_slice()).unwrap(), "data");491}492493let ex = RawExecutor::<HandleReactor>::new().unwrap();494let src = OverlappedSource::new(open_overlapped(&file_path), &ex, false).unwrap();495ex.run_until(read_data(&src)).unwrap();496}497498#[test]499fn test_write_vec() {500let (file_path, _tmpdir) = tempfile_path();501502async fn write_data(src: &OverlappedSource<File>) {503let mut buf: Vec<u8> = Vec::new();504buf.extend_from_slice("data".as_bytes());505506let (bytes_written, _) = src.write_from_vec(Some(0), buf).await.unwrap();507assert_eq!(bytes_written, 4);508}509510let ex = RawExecutor::<HandleReactor>::new().unwrap();511let f = create_overlapped(&file_path);512let src = OverlappedSource::new(f, &ex, false).unwrap();513ex.run_until(write_data(&src)).unwrap();514drop(src);515516let buf = std::fs::read(&file_path).unwrap();517assert_eq!(buf, b"data");518}519520#[test]521fn test_write_mem() {522let (file_path, _tmpdir) = tempfile_path();523524async fn write_data(src: &OverlappedSource<File>) {525let mut buf: Vec<u8> = Vec::new();526buf.extend_from_slice("data".as_bytes());527let mem = Arc::new(VecIoWrapper::from(buf));528let bytes_written = src529.write_from_mem(530Some(0),531Arc::<VecIoWrapper>::clone(&mem),532[533MemRegion { offset: 0, len: 2 },534MemRegion { offset: 2, len: 2 },535],536)537.await538.unwrap();539assert_eq!(bytes_written, 4);540match Arc::try_unwrap(mem) {541Ok(_) => (),542Err(_) => panic!("Too many vec refs"),543};544}545546let ex = RawExecutor::<HandleReactor>::new().unwrap();547let f = create_overlapped(&file_path);548let src = OverlappedSource::new(f, &ex, false).unwrap();549ex.run_until(write_data(&src)).unwrap();550drop(src);551552let buf = std::fs::read(&file_path).unwrap();553assert_eq!(buf, b"data");554}555556#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]557#[test]558fn test_punch_holes() {559let (file_path, _tmpdir) = tempfile_path();560std::fs::write(&file_path, "abcdefghijk").unwrap();561562async fn punch_hole(src: &OverlappedSource<File>) {563let offset = 1;564let len = 3;565src.punch_hole(offset, len).await.unwrap();566}567568let ex = RawExecutor::<HandleReactor>::new().unwrap();569let f = open_overlapped(&file_path);570let src = OverlappedSource::new(f, &ex, false).unwrap();571ex.run_until(punch_hole(&src)).unwrap();572drop(src);573574let buf = std::fs::read(&file_path).unwrap();575assert_eq!(buf, b"a\0\0\0efghijk");576}577578/// Test should fail because punch hole should not be allowed to allocate more memory579#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]580#[test]581fn test_punch_holes_fail_out_of_bounds() {582let (file_path, _tmpdir) = tempfile_path();583std::fs::write(&file_path, "abcdefghijk").unwrap();584585async fn punch_hole(src: &OverlappedSource<File>) {586let offset = 9;587let len = 4;588src.punch_hole(offset, len).await.unwrap();589}590591let ex = RawExecutor::<HandleReactor>::new().unwrap();592let f = open_overlapped(&file_path);593let src = OverlappedSource::new(f, &ex, false).unwrap();594ex.run_until(punch_hole(&src)).unwrap();595drop(src);596597let mut buf = vec![0; 13];598let mut f = OpenOptions::new()599.read(true)600.write(true)601.open(&file_path)602.unwrap();603assert!(f.read_exact(&mut buf).is_err());604}605606// TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the607// bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.608// use tempfile::NamedTempFile;609// #[test]610// fn test_write_zeroes() {611// let mut temp_file = NamedTempFile::new().unwrap();612// temp_file.write("abcdefghijk".as_bytes()).unwrap();613// temp_file.flush().unwrap();614// temp_file.seek(SeekFrom::Start(0)).unwrap();615616// async fn punch_hole(src: &OverlappedSource<File>) {617// let offset = 9;618// let len = 4;619// src620// .fallocate(offset, len, AllocateMode::ZeroRange)621// .await622// .unwrap();623// }624625// let ex = RawExecutor::<HandleReactor>::new();626// let f = fs::OpenOptions::new()627// .write(true)628// .open(temp_file.path())629// .unwrap();630// let src = OverlappedSource::new(vec![f].into_boxed_slice()).unwrap();631// ex.run_until(punch_hole(&src)).unwrap();632633// let mut buf = vec![0; 13];634// temp_file.read_exact(&mut buf).unwrap();635// assert_eq!(636// std::str::from_utf8(buf.as_slice()).unwrap(),637// "abcdefghi\0\0\0\0"638// );639// }640}641642643