Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/linux/poll_source.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::io;
6
use std::os::fd::AsRawFd;
7
use std::sync::Arc;
8
9
use base::handle_eintr_errno;
10
use base::sys::fallocate;
11
use base::sys::FallocateMode;
12
use base::AsRawDescriptor;
13
use base::VolatileSlice;
14
use remain::sorted;
15
use thiserror::Error as ThisError;
16
17
use super::fd_executor;
18
use super::fd_executor::EpollReactor;
19
use super::fd_executor::RegisteredSource;
20
use crate::common_executor::RawExecutor;
21
use crate::mem::BackingMemory;
22
use crate::AsyncError;
23
use crate::AsyncResult;
24
use crate::MemRegion;
25
26
#[sorted]
27
#[derive(ThisError, Debug)]
28
pub enum Error {
29
/// An error occurred attempting to register a waker with the executor.
30
#[error("An error occurred attempting to register a waker with the executor: {0}.")]
31
AddingWaker(fd_executor::Error),
32
/// Failed to discard a block
33
#[error("Failed to discard a block: {0}")]
34
Discard(base::Error),
35
/// An executor error occurred.
36
#[error("An executor error occurred: {0}")]
37
Executor(fd_executor::Error),
38
/// An error occurred when executing fallocate synchronously.
39
#[error("An error occurred when executing fallocate synchronously: {0}")]
40
Fallocate(base::Error),
41
/// An error occurred when executing fdatasync synchronously.
42
#[error("An error occurred when executing fdatasync synchronously: {0}")]
43
Fdatasync(base::Error),
44
/// An error occurred when executing fsync synchronously.
45
#[error("An error occurred when executing fsync synchronously: {0}")]
46
Fsync(base::Error),
47
/// An error occurred when reading the FD.
48
#[error("An error occurred when reading the FD: {0}.")]
49
Read(base::Error),
50
/// Can't seek file.
51
#[error("An error occurred when seeking the FD: {0}.")]
52
Seeking(base::Error),
53
/// An error occurred when writing the FD.
54
#[error("An error occurred when writing the FD: {0}.")]
55
Write(base::Error),
56
}
57
pub type Result<T> = std::result::Result<T, Error>;
58
59
impl From<Error> for io::Error {
60
fn from(e: Error) -> Self {
61
use Error::*;
62
match e {
63
AddingWaker(e) => e.into(),
64
Executor(e) => e.into(),
65
Discard(e) => e.into(),
66
Fallocate(e) => e.into(),
67
Fdatasync(e) => e.into(),
68
Fsync(e) => e.into(),
69
Read(e) => e.into(),
70
Seeking(e) => e.into(),
71
Write(e) => e.into(),
72
}
73
}
74
}
75
76
impl From<Error> for AsyncError {
77
fn from(e: Error) -> AsyncError {
78
AsyncError::SysVariants(e.into())
79
}
80
}
81
82
/// Async wrapper for an IO source that uses the FD executor to drive async operations.
83
pub struct PollSource<F> {
84
registered_source: RegisteredSource<F>,
85
}
86
87
impl<F: AsRawDescriptor> PollSource<F> {
88
/// Create a new `PollSource` from the given IO source.
89
pub fn new(f: F, ex: &Arc<RawExecutor<EpollReactor>>) -> Result<Self> {
90
RegisteredSource::new(ex, f)
91
.map({
92
|f| PollSource {
93
registered_source: f,
94
}
95
})
96
.map_err(Error::Executor)
97
}
98
}
99
100
impl<F: AsRawDescriptor> PollSource<F> {
101
/// Reads from the iosource at `file_offset` and fill the given `vec`.
102
pub async fn read_to_vec(
103
&self,
104
file_offset: Option<u64>,
105
mut vec: Vec<u8>,
106
) -> AsyncResult<(usize, Vec<u8>)> {
107
loop {
108
let res = if let Some(offset) = file_offset {
109
// SAFETY:
110
// Safe because this will only modify `vec` and we check the return value.
111
handle_eintr_errno!(unsafe {
112
libc::pread64(
113
self.registered_source.duped_fd.as_raw_fd(),
114
vec.as_mut_ptr() as *mut libc::c_void,
115
vec.len(),
116
offset as libc::off64_t,
117
)
118
})
119
} else {
120
// SAFETY:
121
// Safe because this will only modify `vec` and we check the return value.
122
handle_eintr_errno!(unsafe {
123
libc::read(
124
self.registered_source.duped_fd.as_raw_fd(),
125
vec.as_mut_ptr() as *mut libc::c_void,
126
vec.len(),
127
)
128
})
129
};
130
131
if res >= 0 {
132
return Ok((res as usize, vec));
133
}
134
135
match base::Error::last() {
136
e if e.errno() == libc::EWOULDBLOCK => {
137
let op = self
138
.registered_source
139
.wait_readable()
140
.map_err(Error::AddingWaker)?;
141
op.await.map_err(Error::Executor)?;
142
}
143
e => return Err(Error::Read(e).into()),
144
}
145
}
146
}
147
148
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
149
pub async fn read_to_mem(
150
&self,
151
file_offset: Option<u64>,
152
mem: Arc<dyn BackingMemory + Send + Sync>,
153
mem_offsets: impl IntoIterator<Item = MemRegion>,
154
) -> AsyncResult<usize> {
155
let mut iovecs = mem_offsets
156
.into_iter()
157
.filter_map(|mem_range| mem.get_volatile_slice(mem_range).ok())
158
.collect::<Vec<VolatileSlice>>();
159
160
loop {
161
let res = if let Some(offset) = file_offset {
162
// SAFETY:
163
// Safe because we trust the kernel not to write path the length given and the
164
// length is guaranteed to be valid from the pointer by
165
// io_slice_mut.
166
handle_eintr_errno!(unsafe {
167
libc::preadv64(
168
self.registered_source.duped_fd.as_raw_fd(),
169
iovecs.as_mut_ptr() as *mut _,
170
iovecs.len() as i32,
171
offset as libc::off64_t,
172
)
173
})
174
} else {
175
// SAFETY:
176
// Safe because we trust the kernel not to write path the length given and the
177
// length is guaranteed to be valid from the pointer by
178
// io_slice_mut.
179
handle_eintr_errno!(unsafe {
180
libc::readv(
181
self.registered_source.duped_fd.as_raw_fd(),
182
iovecs.as_mut_ptr() as *mut _,
183
iovecs.len() as i32,
184
)
185
})
186
};
187
188
if res >= 0 {
189
return Ok(res as usize);
190
}
191
192
match base::Error::last() {
193
e if e.errno() == libc::EWOULDBLOCK => {
194
let op = self
195
.registered_source
196
.wait_readable()
197
.map_err(Error::AddingWaker)?;
198
op.await.map_err(Error::Executor)?;
199
}
200
e => return Err(Error::Read(e).into()),
201
}
202
}
203
}
204
205
/// Wait for the FD of `self` to be readable.
206
pub async fn wait_readable(&self) -> AsyncResult<()> {
207
let op = self
208
.registered_source
209
.wait_readable()
210
.map_err(Error::AddingWaker)?;
211
op.await.map_err(Error::Executor)?;
212
Ok(())
213
}
214
215
/// Writes from the given `vec` to the file starting at `file_offset`.
216
pub async fn write_from_vec(
217
&self,
218
file_offset: Option<u64>,
219
vec: Vec<u8>,
220
) -> AsyncResult<(usize, Vec<u8>)> {
221
loop {
222
let res = if let Some(offset) = file_offset {
223
// SAFETY:
224
// Safe because this will not modify any memory and we check the return value.
225
handle_eintr_errno!(unsafe {
226
libc::pwrite64(
227
self.registered_source.duped_fd.as_raw_fd(),
228
vec.as_ptr() as *const libc::c_void,
229
vec.len(),
230
offset as libc::off64_t,
231
)
232
})
233
} else {
234
// SAFETY:
235
// Safe because this will not modify any memory and we check the return value.
236
handle_eintr_errno!(unsafe {
237
libc::write(
238
self.registered_source.duped_fd.as_raw_fd(),
239
vec.as_ptr() as *const libc::c_void,
240
vec.len(),
241
)
242
})
243
};
244
245
if res >= 0 {
246
return Ok((res as usize, vec));
247
}
248
249
match base::Error::last() {
250
e if e.errno() == libc::EWOULDBLOCK => {
251
let op = self
252
.registered_source
253
.wait_writable()
254
.map_err(Error::AddingWaker)?;
255
op.await.map_err(Error::Executor)?;
256
}
257
e => return Err(Error::Write(e).into()),
258
}
259
}
260
}
261
262
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
263
pub async fn write_from_mem(
264
&self,
265
file_offset: Option<u64>,
266
mem: Arc<dyn BackingMemory + Send + Sync>,
267
mem_offsets: impl IntoIterator<Item = MemRegion>,
268
) -> AsyncResult<usize> {
269
let iovecs = mem_offsets
270
.into_iter()
271
.map(|mem_range| mem.get_volatile_slice(mem_range))
272
.filter_map(|r| r.ok())
273
.collect::<Vec<VolatileSlice>>();
274
275
loop {
276
let res = if let Some(offset) = file_offset {
277
// SAFETY:
278
// Safe because we trust the kernel not to write path the length given and the
279
// length is guaranteed to be valid from the pointer by
280
// io_slice_mut.
281
handle_eintr_errno!(unsafe {
282
libc::pwritev64(
283
self.registered_source.duped_fd.as_raw_fd(),
284
iovecs.as_ptr() as *mut _,
285
iovecs.len() as i32,
286
offset as libc::off64_t,
287
)
288
})
289
} else {
290
// SAFETY:
291
// Safe because we trust the kernel not to write path the length given and the
292
// length is guaranteed to be valid from the pointer by
293
// io_slice_mut.
294
handle_eintr_errno!(unsafe {
295
libc::writev(
296
self.registered_source.duped_fd.as_raw_fd(),
297
iovecs.as_ptr() as *mut _,
298
iovecs.len() as i32,
299
)
300
})
301
};
302
303
if res >= 0 {
304
return Ok(res as usize);
305
}
306
307
match base::Error::last() {
308
e if e.errno() == libc::EWOULDBLOCK => {
309
let op = self
310
.registered_source
311
.wait_writable()
312
.map_err(Error::AddingWaker)?;
313
op.await.map_err(Error::Executor)?;
314
}
315
e => return Err(Error::Write(e).into()),
316
}
317
}
318
}
319
320
/// # Safety
321
///
322
/// Sync all completed write operations to the backing storage.
323
pub async fn fsync(&self) -> AsyncResult<()> {
324
// SAFETY: the duped_fd is valid and return value is checked.
325
let ret = handle_eintr_errno!(unsafe {
326
libc::fsync(self.registered_source.duped_fd.as_raw_fd())
327
});
328
if ret == 0 {
329
Ok(())
330
} else {
331
Err(Error::Fsync(base::Error::last()).into())
332
}
333
}
334
335
/// punch_hole
336
pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
337
Ok(fallocate(
338
&self.registered_source.duped_fd,
339
FallocateMode::PunchHole,
340
file_offset,
341
len,
342
)
343
.map_err(Error::Fallocate)?)
344
}
345
346
/// write_zeroes_at
347
pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
348
Ok(fallocate(
349
&self.registered_source.duped_fd,
350
FallocateMode::ZeroRange,
351
file_offset,
352
len,
353
)
354
.map_err(Error::Fallocate)?)
355
}
356
357
/// Sync all data of completed write operations to the backing storage, avoiding updating extra
358
/// metadata.
359
pub async fn fdatasync(&self) -> AsyncResult<()> {
360
// SAFETY: the duped_fd is valid and return value is checked.
361
let ret = handle_eintr_errno!(unsafe {
362
libc::fdatasync(self.registered_source.duped_fd.as_raw_fd())
363
});
364
if ret == 0 {
365
Ok(())
366
} else {
367
Err(Error::Fdatasync(base::Error::last()).into())
368
}
369
}
370
371
/// Yields the underlying IO source.
372
pub fn into_source(self) -> F {
373
self.registered_source.source
374
}
375
376
/// Provides a mutable ref to the underlying IO source.
377
pub fn as_source_mut(&mut self) -> &mut F {
378
&mut self.registered_source.source
379
}
380
381
/// Provides a ref to the underlying IO source.
382
pub fn as_source(&self) -> &F {
383
&self.registered_source.source
384
}
385
}
386
387
// NOTE: Prefer adding tests to io_source.rs if not backend specific.
388
#[cfg(test)]
389
mod tests {
390
use std::fs::File;
391
392
use super::*;
393
use crate::ExecutorTrait;
394
395
#[test]
396
fn memory_leak() {
397
// This test needs to run under ASAN to detect memory leaks.
398
399
async fn owns_poll_source(source: PollSource<File>) {
400
let _ = source.wait_readable().await;
401
}
402
403
let (rx, _tx) = base::pipe().unwrap();
404
let ex = RawExecutor::<EpollReactor>::new().unwrap();
405
let source = PollSource::new(rx, &ex).unwrap();
406
ex.spawn_local(owns_poll_source(source)).detach();
407
408
// Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
409
// reference to the executor because it owns a reference to the future that owns PollSource
410
// (via its Runnable). The strong reference prevents the drop impl from running, which would
411
// otherwise poll the future and have it return with an error.
412
}
413
}
414
415