Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/linux/uring_source.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::ops::Deref;
6
use std::ops::DerefMut;
7
use std::sync::Arc;
8
9
use base::sys::FallocateMode;
10
use base::AsRawDescriptor;
11
12
use super::uring_executor::RegisteredSource;
13
use super::uring_executor::Result;
14
use super::uring_executor::UringReactor;
15
use crate::common_executor::RawExecutor;
16
use crate::mem::BackingMemory;
17
use crate::mem::MemRegion;
18
use crate::mem::VecIoWrapper;
19
use crate::AsyncResult;
20
21
/// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
22
/// registering an IO source with the uring that provides an `IoSource` implementation.
23
pub struct UringSource<F: AsRawDescriptor> {
24
registered_source: RegisteredSource,
25
source: F,
26
}
27
28
impl<F: AsRawDescriptor> UringSource<F> {
29
/// Creates a new `UringSource` that wraps the given `io_source` object.
30
pub fn new(io_source: F, ex: &Arc<RawExecutor<UringReactor>>) -> Result<UringSource<F>> {
31
let r = ex.reactor.register_source(ex, &io_source)?;
32
Ok(UringSource {
33
registered_source: r,
34
source: io_source,
35
})
36
}
37
38
/// Reads from the iosource at `file_offset` and fill the given `vec`.
39
pub async fn read_to_vec(
40
&self,
41
file_offset: Option<u64>,
42
vec: Vec<u8>,
43
) -> AsyncResult<(usize, Vec<u8>)> {
44
let buf = Arc::new(VecIoWrapper::from(vec));
45
let op = self.registered_source.start_read_to_mem(
46
file_offset,
47
buf.clone(),
48
[MemRegion {
49
offset: 0,
50
len: buf.len(),
51
}],
52
)?;
53
let len = op.await?;
54
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
55
v.into()
56
} else {
57
panic!("too many refs on buf");
58
};
59
60
Ok((len as usize, bytes))
61
}
62
63
/// Wait for the FD of `self` to be readable.
64
pub async fn wait_readable(&self) -> AsyncResult<()> {
65
let op = self.registered_source.poll_fd_readable()?;
66
op.await?;
67
Ok(())
68
}
69
70
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
71
pub async fn read_to_mem(
72
&self,
73
file_offset: Option<u64>,
74
mem: Arc<dyn BackingMemory + Send + Sync>,
75
mem_offsets: impl IntoIterator<Item = MemRegion>,
76
) -> AsyncResult<usize> {
77
let op = self
78
.registered_source
79
.start_read_to_mem(file_offset, mem, mem_offsets)?;
80
let len = op.await?;
81
Ok(len as usize)
82
}
83
84
/// Writes from the given `vec` to the file starting at `file_offset`.
85
pub async fn write_from_vec(
86
&self,
87
file_offset: Option<u64>,
88
vec: Vec<u8>,
89
) -> AsyncResult<(usize, Vec<u8>)> {
90
let buf = Arc::new(VecIoWrapper::from(vec));
91
let op = self.registered_source.start_write_from_mem(
92
file_offset,
93
buf.clone(),
94
[MemRegion {
95
offset: 0,
96
len: buf.len(),
97
}],
98
)?;
99
let len = op.await?;
100
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
101
v.into()
102
} else {
103
panic!("too many refs on buf");
104
};
105
106
Ok((len as usize, bytes))
107
}
108
109
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
110
pub async fn write_from_mem(
111
&self,
112
file_offset: Option<u64>,
113
mem: Arc<dyn BackingMemory + Send + Sync>,
114
mem_offsets: impl IntoIterator<Item = MemRegion>,
115
) -> AsyncResult<usize> {
116
let op = self
117
.registered_source
118
.start_write_from_mem(file_offset, mem, mem_offsets)?;
119
let len = op.await?;
120
Ok(len as usize)
121
}
122
123
/// Deallocates the given range of a file.
124
pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
125
let op = self.registered_source.start_fallocate(
126
file_offset,
127
len,
128
FallocateMode::PunchHole.into(),
129
)?;
130
let _ = op.await?;
131
Ok(())
132
}
133
134
/// Fills the given range with zeroes.
135
pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
136
let op = self.registered_source.start_fallocate(
137
file_offset,
138
len,
139
FallocateMode::ZeroRange.into(),
140
)?;
141
let _ = op.await?;
142
Ok(())
143
}
144
145
/// Sync all completed write operations to the backing storage.
146
pub async fn fsync(&self) -> AsyncResult<()> {
147
let op = self.registered_source.start_fsync()?;
148
let _ = op.await?;
149
Ok(())
150
}
151
152
/// Sync all data of completed write operations to the backing storage. Currently, the
153
/// implementation is equivalent to fsync.
154
pub async fn fdatasync(&self) -> AsyncResult<()> {
155
// Currently io_uring does not implement fdatasync. Fall back to fsync.
156
// TODO(b/281609112): Implement real fdatasync with io_uring.
157
self.fsync().await
158
}
159
160
/// Yields the underlying IO source.
161
pub fn into_source(self) -> F {
162
self.source
163
}
164
165
/// Provides a mutable ref to the underlying IO source.
166
pub fn as_source(&self) -> &F {
167
&self.source
168
}
169
170
/// Provides a ref to the underlying IO source.
171
pub fn as_source_mut(&mut self) -> &mut F {
172
&mut self.source
173
}
174
}
175
176
impl<F: AsRawDescriptor> Deref for UringSource<F> {
177
type Target = F;
178
179
fn deref(&self) -> &Self::Target {
180
&self.source
181
}
182
}
183
184
impl<F: AsRawDescriptor> DerefMut for UringSource<F> {
185
fn deref_mut(&mut self) -> &mut Self::Target {
186
&mut self.source
187
}
188
}
189
190
// NOTE: Prefer adding tests to io_source.rs if not backend specific.
191
#[cfg(test)]
192
mod tests {
193
use std::fs::File;
194
use std::future::Future;
195
use std::pin::Pin;
196
use std::task::Context;
197
use std::task::Poll;
198
use std::task::Waker;
199
200
use sync::Mutex;
201
202
use super::super::uring_executor::is_uring_stable;
203
use super::super::UringSource;
204
use super::*;
205
use crate::sys::linux::ExecutorKindSys;
206
use crate::Executor;
207
use crate::ExecutorTrait;
208
use crate::IoSource;
209
210
async fn read_u64<T: AsRawDescriptor>(source: &UringSource<T>) -> u64 {
211
// Init a vec that translates to u64::max;
212
let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
213
let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
214
assert_eq!(ret, std::mem::size_of::<u64>());
215
let mut val = 0u64.to_ne_bytes();
216
val.copy_from_slice(&u64_mem);
217
u64::from_ne_bytes(val)
218
}
219
220
#[test]
221
fn event() {
222
if !is_uring_stable() {
223
return;
224
}
225
226
use base::Event;
227
use base::EventExt;
228
229
async fn write_event(ev: Event, wait: Event, ex: &Arc<RawExecutor<UringReactor>>) {
230
let wait = UringSource::new(wait, ex).unwrap();
231
ev.write_count(55).unwrap();
232
read_u64(&wait).await;
233
ev.write_count(66).unwrap();
234
read_u64(&wait).await;
235
ev.write_count(77).unwrap();
236
read_u64(&wait).await;
237
}
238
239
async fn read_events(ev: Event, signal: Event, ex: &Arc<RawExecutor<UringReactor>>) {
240
let source = UringSource::new(ev, ex).unwrap();
241
assert_eq!(read_u64(&source).await, 55);
242
signal.signal().unwrap();
243
assert_eq!(read_u64(&source).await, 66);
244
signal.signal().unwrap();
245
assert_eq!(read_u64(&source).await, 77);
246
signal.signal().unwrap();
247
}
248
249
let event = Event::new().unwrap();
250
let signal_wait = Event::new().unwrap();
251
let ex = RawExecutor::<UringReactor>::new().unwrap();
252
let write_task = write_event(
253
event.try_clone().unwrap(),
254
signal_wait.try_clone().unwrap(),
255
&ex,
256
);
257
let read_task = read_events(event, signal_wait, &ex);
258
ex.run_until(futures::future::join(read_task, write_task))
259
.unwrap();
260
}
261
262
#[test]
263
fn pend_on_pipe() {
264
if !is_uring_stable() {
265
return;
266
}
267
268
use std::io::Write;
269
270
use futures::future::Either;
271
272
async fn do_test(ex: &Arc<RawExecutor<UringReactor>>) {
273
let (read_source, mut w) = base::pipe().unwrap();
274
let source = UringSource::new(read_source, ex).unwrap();
275
let done = Box::pin(async { 5usize });
276
let pending = Box::pin(read_u64(&source));
277
match futures::future::select(pending, done).await {
278
Either::Right((5, pending)) => {
279
// Write to the pipe so that the kernel will release the memory associated with
280
// the uring read operation.
281
w.write_all(&[0]).expect("failed to write to pipe");
282
::std::mem::drop(pending);
283
}
284
_ => panic!("unexpected select result"),
285
};
286
}
287
288
let ex = RawExecutor::<UringReactor>::new().unwrap();
289
ex.run_until(do_test(&ex)).unwrap();
290
}
291
292
#[test]
293
fn range_error() {
294
if !is_uring_stable() {
295
return;
296
}
297
298
async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
299
let f = File::open("/dev/zero").unwrap();
300
let source = UringSource::new(f, ex).unwrap();
301
let v = vec![0x55u8; 64];
302
let vw = Arc::new(VecIoWrapper::from(v));
303
let ret = source
304
.read_to_mem(
305
None,
306
Arc::<VecIoWrapper>::clone(&vw),
307
[MemRegion {
308
offset: 32,
309
len: 33,
310
}],
311
)
312
.await;
313
assert!(ret.is_err());
314
}
315
316
let ex = RawExecutor::<UringReactor>::new().unwrap();
317
ex.run_until(go(&ex)).unwrap();
318
}
319
320
#[test]
321
fn wait_read() {
322
if !is_uring_stable() {
323
return;
324
}
325
326
async fn go(ex: &Arc<RawExecutor<UringReactor>>) {
327
let f = File::open("/dev/zero").unwrap();
328
let source = UringSource::new(f, ex).unwrap();
329
source.wait_readable().await.unwrap();
330
}
331
332
let ex = RawExecutor::<UringReactor>::new().unwrap();
333
ex.run_until(go(&ex)).unwrap();
334
}
335
336
struct State {
337
should_quit: bool,
338
waker: Option<Waker>,
339
}
340
341
impl State {
342
fn wake(&mut self) {
343
self.should_quit = true;
344
let waker = self.waker.take();
345
346
if let Some(waker) = waker {
347
waker.wake();
348
}
349
}
350
}
351
352
struct Quit {
353
state: Arc<Mutex<State>>,
354
}
355
356
impl Future for Quit {
357
type Output = ();
358
359
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
360
let mut state = self.state.lock();
361
if state.should_quit {
362
return Poll::Ready(());
363
}
364
365
state.waker = Some(cx.waker().clone());
366
Poll::Pending
367
}
368
}
369
370
#[cfg(any(target_os = "android", target_os = "linux"))]
371
#[test]
372
fn await_uring_from_poll() {
373
if !is_uring_stable() {
374
return;
375
}
376
// Start a uring operation and then await the result from an FdExecutor.
377
async fn go(source: IoSource<File>) {
378
let v = vec![0xa4u8; 16];
379
let (len, vec) = source.read_to_vec(None, v).await.unwrap();
380
assert_eq!(len, 16);
381
assert!(vec.iter().all(|&b| b == 0));
382
}
383
384
let state = Arc::new(Mutex::new(State {
385
should_quit: false,
386
waker: None,
387
}));
388
389
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
390
let f = File::open("/dev/zero").unwrap();
391
let source = uring_ex.async_from(f).unwrap();
392
393
let quit = Quit {
394
state: state.clone(),
395
};
396
let handle = std::thread::spawn(move || uring_ex.run_until(quit));
397
398
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
399
poll_ex.run_until(go(source)).unwrap();
400
401
state.lock().wake();
402
handle.join().unwrap().unwrap();
403
}
404
405
#[cfg(any(target_os = "android", target_os = "linux"))]
406
#[test]
407
fn await_poll_from_uring() {
408
if !is_uring_stable() {
409
return;
410
}
411
// Start a poll operation and then await the result
412
async fn go(source: IoSource<File>) {
413
let v = vec![0x2cu8; 16];
414
let (len, vec) = source.read_to_vec(None, v).await.unwrap();
415
assert_eq!(len, 16);
416
assert!(vec.iter().all(|&b| b == 0));
417
}
418
419
let state = Arc::new(Mutex::new(State {
420
should_quit: false,
421
waker: None,
422
}));
423
424
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
425
let f = File::open("/dev/zero").unwrap();
426
let source = poll_ex.async_from(f).unwrap();
427
428
let quit = Quit {
429
state: state.clone(),
430
};
431
let handle = std::thread::spawn(move || poll_ex.run_until(quit));
432
433
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
434
uring_ex.run_until(go(source)).unwrap();
435
436
state.lock().wake();
437
handle.join().unwrap().unwrap();
438
}
439
}
440
441