Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/windows/handle_source.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::fs::File;
6
use std::io;
7
use std::io::Read;
8
use std::io::Seek;
9
use std::io::SeekFrom;
10
use std::io::Write;
11
use std::mem::ManuallyDrop;
12
use std::ptr::null_mut;
13
use std::sync::Arc;
14
use std::time::Duration;
15
16
use base::error;
17
use base::warn;
18
use base::AsRawDescriptor;
19
use base::Descriptor;
20
use base::Error as SysUtilError;
21
use base::FileReadWriteAtVolatile;
22
use base::FileReadWriteVolatile;
23
use base::FromRawDescriptor;
24
use base::PunchHole;
25
use base::VolatileSlice;
26
use base::WriteZeroesAt;
27
use smallvec::SmallVec;
28
use sync::Mutex;
29
use thiserror::Error as ThisError;
30
use winapi::um::ioapiset::CancelIoEx;
31
32
use crate::mem::BackingMemory;
33
use crate::mem::MemRegion;
34
use crate::AsyncError;
35
use crate::AsyncResult;
36
use crate::CancellableBlockingPool;
37
38
#[derive(ThisError, Debug)]
39
pub enum Error {
40
#[error("An error occurred trying to seek: {0}.")]
41
IoSeekError(io::Error),
42
#[error("An error occurred trying to read: {0}.")]
43
IoReadError(io::Error),
44
#[error("An error occurred trying to write: {0}.")]
45
IoWriteError(io::Error),
46
#[error("An error occurred trying to flush: {0}.")]
47
IoFlushError(io::Error),
48
#[error("An error occurred trying to punch hole: {0}.")]
49
IoPunchHoleError(io::Error),
50
#[error("An error occurred trying to write zeroes: {0}.")]
51
IoWriteZeroesError(io::Error),
52
#[error("An error occurred trying to duplicate source handles: {0}.")]
53
HandleDuplicationFailed(io::Error),
54
#[error("An error occurred trying to wait on source handles: {0}.")]
55
HandleWaitFailed(io::Error),
56
#[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]
57
BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),
58
#[error("HandleSource is gone, so no handles are available to fulfill the IO request.")]
59
NoHandleSource,
60
#[error("Operation on HandleSource is cancelled.")]
61
OperationCancelled,
62
#[error("Operation on HandleSource was aborted (unexpected).")]
63
OperationAborted,
64
}
65
66
impl From<Error> for io::Error {
67
fn from(e: Error) -> Self {
68
use Error::*;
69
match e {
70
IoSeekError(e) => e,
71
IoReadError(e) => e,
72
IoWriteError(e) => e,
73
IoFlushError(e) => e,
74
IoPunchHoleError(e) => e,
75
IoWriteZeroesError(e) => e,
76
HandleDuplicationFailed(e) => e,
77
HandleWaitFailed(e) => e,
78
BackingMemoryVolatileSliceFetchFailed(e) => io::Error::other(e),
79
NoHandleSource => io::Error::other(NoHandleSource),
80
OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),
81
OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),
82
}
83
}
84
}
85
86
impl From<Error> for AsyncError {
87
fn from(e: Error) -> AsyncError {
88
AsyncError::SysVariants(e.into())
89
}
90
}
91
92
pub type Result<T> = std::result::Result<T, Error>;
93
94
/// Used to shutdown IO running on a CancellableBlockingPool.
95
pub struct HandleWrapper {
96
handle: Descriptor,
97
}
98
99
impl HandleWrapper {
100
pub fn new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>> {
101
Arc::new(Mutex::new(Self { handle }))
102
}
103
104
pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {
105
// There isn't much we can do if cancel fails.
106
// SAFETY: trivially safe
107
if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 {
108
warn!(
109
"Cancel IO for handle:{:?} failed with {}",
110
self.handle.as_raw_descriptor(),
111
SysUtilError::last()
112
);
113
}
114
ret
115
}
116
}
117
118
/// Async IO source for Windows, such as a file.
119
pub struct HandleSource<F: AsRawDescriptor> {
120
source: F,
121
source_descriptor: Descriptor,
122
blocking_pool: CancellableBlockingPool,
123
}
124
125
impl<F: AsRawDescriptor> HandleSource<F> {
126
/// Create a new `HandleSource` from the given IO source.
127
///
128
/// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these
129
/// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.
130
/// Long term, we may migrate away from this approach toward IOCP or overlapped IO.
131
///
132
/// WARNING: `source` MUST be a unique file object (e.g. separate handles
133
/// each created by CreateFile), and point at the same file on disk. This is because IO
134
/// operations on the HandleSource are randomly distributed to each source.
135
///
136
/// # Safety
137
/// The caller must guarantee that `F`'s handle is compatible with the underlying functions
138
/// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined
139
/// by this struct. Note that most winapis will fail with reasonable errors.
140
pub fn new(source: F) -> Result<Self> {
141
let source_descriptor = Descriptor(source.as_raw_descriptor());
142
143
Ok(Self {
144
source,
145
source_descriptor,
146
blocking_pool: CancellableBlockingPool::new(
147
// WARNING: this is a safety requirement! Threads are 1:1 with sources.
148
1,
149
Duration::from_secs(10),
150
),
151
})
152
}
153
154
#[inline]
155
fn get_slices(
156
mem: &Arc<dyn BackingMemory + Send + Sync>,
157
mem_offsets: Vec<MemRegion>,
158
) -> Result<SmallVec<[VolatileSlice<'_>; 16]>> {
159
mem_offsets
160
.into_iter()
161
.map(|region| {
162
mem.get_volatile_slice(region)
163
.map_err(Error::BackingMemoryVolatileSliceFetchFailed)
164
})
165
.collect::<Result<SmallVec<[VolatileSlice; 16]>>>()
166
}
167
}
168
169
fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File> {
170
// SAFETY: trivially safe
171
// Safe because all callers must exit *before* these handles will be closed (guaranteed by
172
// HandleSource's Drop impl.).
173
unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) }
174
}
175
176
impl<F: AsRawDescriptor> HandleSource<F> {
177
/// Reads from the iosource at `file_offset` and fill the given `vec`.
178
pub async fn read_to_vec(
179
&self,
180
file_offset: Option<u64>,
181
mut vec: Vec<u8>,
182
) -> AsyncResult<(usize, Vec<u8>)> {
183
let handles = HandleWrapper::new(self.source_descriptor);
184
let descriptors = self.source_descriptor;
185
186
Ok(self
187
.blocking_pool
188
.spawn(
189
move || {
190
let mut file = get_thread_file(descriptors);
191
if let Some(file_offset) = file_offset {
192
file.seek(SeekFrom::Start(file_offset))
193
.map_err(Error::IoSeekError)?;
194
}
195
Ok((
196
file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,
197
vec,
198
))
199
},
200
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
201
)
202
.await?)
203
}
204
205
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
206
pub async fn read_to_mem(
207
&self,
208
file_offset: Option<u64>,
209
mem: Arc<dyn BackingMemory + Send + Sync>,
210
mem_offsets: impl IntoIterator<Item = MemRegion>,
211
) -> AsyncResult<usize> {
212
let mem_offsets = mem_offsets.into_iter().collect();
213
let handles = HandleWrapper::new(self.source_descriptor);
214
let descriptors = self.source_descriptor;
215
216
Ok(self
217
.blocking_pool
218
.spawn(
219
move || {
220
let mut file = get_thread_file(descriptors);
221
let memory_slices = Self::get_slices(&mem, mem_offsets)?;
222
223
match file_offset {
224
Some(file_offset) => file
225
.read_vectored_at_volatile(memory_slices.as_slice(), file_offset)
226
.map_err(Error::IoReadError),
227
None => file
228
.read_vectored_volatile(memory_slices.as_slice())
229
.map_err(Error::IoReadError),
230
}
231
},
232
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
233
)
234
.await?)
235
}
236
237
/// Wait for the handle of `self` to be readable.
238
pub async fn wait_readable(&self) -> AsyncResult<()> {
239
unimplemented!()
240
}
241
242
/// Reads a single u64 from the current offset.
243
pub async fn read_u64(&self) -> AsyncResult<u64> {
244
unimplemented!()
245
}
246
247
/// Writes from the given `vec` to the file starting at `file_offset`.
248
pub async fn write_from_vec(
249
&self,
250
file_offset: Option<u64>,
251
vec: Vec<u8>,
252
) -> AsyncResult<(usize, Vec<u8>)> {
253
let handles = HandleWrapper::new(self.source_descriptor);
254
let descriptors = self.source_descriptor;
255
256
Ok(self
257
.blocking_pool
258
.spawn(
259
move || {
260
let mut file = get_thread_file(descriptors);
261
if let Some(file_offset) = file_offset {
262
file.seek(SeekFrom::Start(file_offset))
263
.map_err(Error::IoSeekError)?;
264
}
265
Ok((
266
file.write(vec.as_slice()).map_err(Error::IoWriteError)?,
267
vec,
268
))
269
},
270
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
271
)
272
.await?)
273
}
274
275
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
276
pub async fn write_from_mem(
277
&self,
278
file_offset: Option<u64>,
279
mem: Arc<dyn BackingMemory + Send + Sync>,
280
mem_offsets: impl IntoIterator<Item = MemRegion>,
281
) -> AsyncResult<usize> {
282
let mem_offsets = mem_offsets.into_iter().collect();
283
let handles = HandleWrapper::new(self.source_descriptor);
284
let descriptors = self.source_descriptor;
285
286
Ok(self
287
.blocking_pool
288
.spawn(
289
move || {
290
let mut file = get_thread_file(descriptors);
291
let memory_slices = Self::get_slices(&mem, mem_offsets)?;
292
293
match file_offset {
294
Some(file_offset) => file
295
.write_vectored_at_volatile(memory_slices.as_slice(), file_offset)
296
.map_err(Error::IoWriteError),
297
None => file
298
.write_vectored_volatile(memory_slices.as_slice())
299
.map_err(Error::IoWriteError),
300
}
301
},
302
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
303
)
304
.await?)
305
}
306
307
/// Deallocates the given range of a file.
308
pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
309
let handles = HandleWrapper::new(self.source_descriptor);
310
let descriptors = self.source_descriptor;
311
Ok(self
312
.blocking_pool
313
.spawn(
314
move || {
315
let file = get_thread_file(descriptors);
316
file.punch_hole(file_offset, len)
317
.map_err(Error::IoPunchHoleError)?;
318
Ok(())
319
},
320
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
321
)
322
.await?)
323
}
324
325
/// Fills the given range with zeroes.
326
pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
327
let handles = HandleWrapper::new(self.source_descriptor);
328
let descriptors = self.source_descriptor;
329
Ok(self
330
.blocking_pool
331
.spawn(
332
move || {
333
let file = get_thread_file(descriptors);
334
// ZeroRange calls `punch_hole` which doesn't extend the File size if it needs
335
// to. Will fix if it becomes a problem.
336
file.write_zeroes_at(file_offset, len as usize)
337
.map_err(Error::IoWriteZeroesError)?;
338
Ok(())
339
},
340
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
341
)
342
.await?)
343
}
344
345
/// Sync all completed write operations to the backing storage.
346
pub async fn fsync(&self) -> AsyncResult<()> {
347
let handles = HandleWrapper::new(self.source_descriptor);
348
let descriptors = self.source_descriptor;
349
350
Ok(self
351
.blocking_pool
352
.spawn(
353
move || {
354
let mut file = get_thread_file(descriptors);
355
file.flush().map_err(Error::IoFlushError)
356
},
357
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
358
)
359
.await?)
360
}
361
362
/// Sync all data of completed write operations to the backing storage. Currently, the
363
/// implementation is equivalent to fsync.
364
pub async fn fdatasync(&self) -> AsyncResult<()> {
365
// TODO(b/282003931): Fall back to regular fsync.
366
self.fsync().await
367
}
368
369
/// Yields the underlying IO source.
370
pub fn into_source(self) -> F {
371
self.source
372
}
373
374
/// Provides a mutable ref to the underlying IO source.
375
pub fn as_source_mut(&mut self) -> &mut F {
376
&mut self.source
377
}
378
379
/// Provides a ref to the underlying IO source.
380
///
381
/// If sources are not interchangeable, behavior is undefined.
382
pub fn as_source(&self) -> &F {
383
&self.source
384
}
385
386
/// If sources are not interchangeable, behavior is undefined.
387
pub async fn wait_for_handle(&self) -> AsyncResult<()> {
388
base::sys::windows::async_wait_for_single_object(&self.source)
389
.await
390
.map_err(Error::HandleWaitFailed)?;
391
Ok(())
392
}
393
}
394
395
// NOTE: Prefer adding tests to io_source.rs if not backend specific.
396
#[cfg(test)]
397
mod tests {
398
use std::fs;
399
400
use tempfile::NamedTempFile;
401
402
use super::super::HandleReactor;
403
use super::*;
404
use crate::common_executor::RawExecutor;
405
use crate::ExecutorTrait;
406
407
#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
408
#[test]
409
fn test_punch_holes() {
410
let mut temp_file = NamedTempFile::new().unwrap();
411
temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
412
temp_file.flush().unwrap();
413
temp_file.seek(SeekFrom::Start(0)).unwrap();
414
415
async fn punch_hole(handle_src: &HandleSource<File>) {
416
let offset = 1;
417
let len = 3;
418
handle_src.punch_hole(offset, len).await.unwrap();
419
}
420
421
let ex = RawExecutor::<HandleReactor>::new().unwrap();
422
let f = fs::OpenOptions::new()
423
.write(true)
424
.open(temp_file.path())
425
.unwrap();
426
let handle_src = HandleSource::new(f).unwrap();
427
ex.run_until(punch_hole(&handle_src)).unwrap();
428
429
let mut buf = vec![0; 11];
430
temp_file.read_exact(&mut buf).unwrap();
431
assert_eq!(
432
std::str::from_utf8(buf.as_slice()).unwrap(),
433
"a\0\0\0efghijk"
434
);
435
}
436
437
/// Test should fail because punch hole should not be allowed to allocate more memory
438
#[cfg_attr(all(target_os = "windows", target_env = "gnu"), ignore)]
439
#[test]
440
fn test_punch_holes_fail_out_of_bounds() {
441
let mut temp_file = NamedTempFile::new().unwrap();
442
temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
443
temp_file.flush().unwrap();
444
temp_file.seek(SeekFrom::Start(0)).unwrap();
445
446
async fn punch_hole(handle_src: &HandleSource<File>) {
447
let offset = 9;
448
let len = 4;
449
handle_src.punch_hole(offset, len).await.unwrap();
450
}
451
452
let ex = RawExecutor::<HandleReactor>::new().unwrap();
453
let f = fs::OpenOptions::new()
454
.write(true)
455
.open(temp_file.path())
456
.unwrap();
457
let handle_src = HandleSource::new(f).unwrap();
458
ex.run_until(punch_hole(&handle_src)).unwrap();
459
460
let mut buf = vec![0; 13];
461
assert!(temp_file.read_exact(&mut buf).is_err());
462
}
463
464
// TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the
465
// bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.
466
// #[test]
467
// fn test_write_zeroes() {
468
// let mut temp_file = NamedTempFile::new().unwrap();
469
// temp_file.write("abcdefghijk".as_bytes()).unwrap();
470
// temp_file.flush().unwrap();
471
// temp_file.seek(SeekFrom::Start(0)).unwrap();
472
473
// async fn punch_hole(handle_src: &HandleSource<File>) {
474
// let offset = 9;
475
// let len = 4;
476
// handle_src
477
// .fallocate(offset, len, AllocateMode::ZeroRange)
478
// .await
479
// .unwrap();
480
// }
481
482
// let ex = RawExecutor::<HandleReactor>::new();
483
// let f = fs::OpenOptions::new()
484
// .write(true)
485
// .open(temp_file.path())
486
// .unwrap();
487
// let handle_src = HandleSource::new(f).unwrap();
488
// ex.run_until(punch_hole(&handle_src)).unwrap();
489
490
// let mut buf = vec![0; 13];
491
// temp_file.read_exact(&mut buf).unwrap();
492
// assert_eq!(
493
// std::str::from_utf8(buf.as_slice()).unwrap(),
494
// "abcdefghi\0\0\0\0"
495
// );
496
// }
497
}
498
499