Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/linux/uring_executor.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
// TODO: Move this doc to one of the public APIs, it isn't io_uring specific.
6
7
//! `UringReactor`
8
//!
9
//! ## Read/Write buffer management.
10
//!
11
//! There are two key issues managing asynchronous IO buffers in rust.
12
//! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
13
//! not have any references to it during that time.
14
//! 2) The memory must remain valid as long as the kernel has a reference to it.
15
//!
16
//! ### The kernel's mutable borrow of the buffer
17
//!
18
//! Because the buffers used for read and write must be passed to the kernel for an unknown
19
//! duration, the functions must maintain ownership of the memory. The core of this problem is that
20
//! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
21
//! future has a reference to. The buffer can be modified at any point from submission until the
22
//! operation completes. The operation can't be synchronously canceled when the future is dropped,
23
//! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
24
//! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
25
//! isn't an issue because those are already Ok with external modifications to the memory (Like a
26
//! `VolatileSlice`).
27
//!
28
//! ### Buffer lifetime
29
//!
30
//! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
31
//! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
32
//! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
33
//! the executor. The executor holds the Arc and ensures all operations are complete before
34
//! dropping it, that guarantees the memory is valid for the duration.
35
//!
36
//! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
37
//! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
38
//! until the operation completes unless the executor holds an Arc to the memory on the heap.
39
//!
40
//! ## Using `Vec` for reads/writes.
41
//!
42
//! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
43
//! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
44
//! ensure it lives long enough.
45
46
use std::convert::TryInto;
47
use std::ffi::CStr;
48
use std::fs::File;
49
use std::future::Future;
50
use std::io;
51
use std::mem;
52
use std::mem::MaybeUninit;
53
use std::os::unix::io::FromRawFd;
54
use std::os::unix::io::RawFd;
55
use std::pin::Pin;
56
use std::sync::Arc;
57
use std::sync::LazyLock;
58
use std::sync::Weak;
59
use std::task::Context;
60
use std::task::Poll;
61
use std::task::Waker;
62
use std::thread;
63
use std::thread::ThreadId;
64
65
use base::trace;
66
use base::warn;
67
use base::AsRawDescriptor;
68
use base::EventType;
69
use base::IoBufMut;
70
use base::RawDescriptor;
71
use io_uring::URingAllowlist;
72
use io_uring::URingContext;
73
use io_uring::URingOperation;
74
use remain::sorted;
75
use slab::Slab;
76
use sync::Mutex;
77
use thiserror::Error as ThisError;
78
79
use crate::common_executor::RawExecutor;
80
use crate::common_executor::RawTaskHandle;
81
use crate::common_executor::Reactor;
82
use crate::mem::BackingMemory;
83
use crate::waker::WakerToken;
84
use crate::waker::WeakWake;
85
use crate::AsyncError;
86
use crate::AsyncResult;
87
use crate::IoSource;
88
use crate::MemRegion;
89
use crate::TaskHandle;
90
91
#[sorted]
92
#[derive(Debug, ThisError)]
93
pub enum Error {
94
/// Creating a context to wait on FDs failed.
95
#[error("Error creating the fd waiting context: {0}")]
96
CreatingContext(io_uring::Error),
97
/// Failed to discard a block
98
#[error("Failed to discard a block: {0}")]
99
Discard(base::Error),
100
/// Failed to copy the FD for the polling context.
101
#[error("Failed to copy the FD for the polling context: {0}")]
102
DuplicatingFd(base::Error),
103
/// Enabling a context faild.
104
#[error("Error enabling the URing context: {0}")]
105
EnablingContext(io_uring::Error),
106
/// The Executor is gone.
107
#[error("The executor is gone")]
108
ExecutorGone,
109
/// Invalid offset or length given for an iovec in backing memory.
110
#[error("Invalid offset/len for getting an iovec")]
111
InvalidOffset,
112
/// Invalid FD source specified.
113
#[error("Invalid source, FD not registered for use")]
114
InvalidSource,
115
/// Error doing the IO.
116
#[error("Error during IO: {0}")]
117
Io(io::Error),
118
/// Registering operation restrictions to a uring failed.
119
#[error("Error registering restrictions to the URing context: {0}")]
120
RegisteringURingRestriction(io_uring::Error),
121
/// Failed to remove the waker remove the polling context.
122
#[error("Error removing from the URing context: {0}")]
123
RemovingWaker(io_uring::Error),
124
/// Failed to submit the operation to the polling context.
125
#[error("Error adding to the URing context: {0}")]
126
SubmittingOp(io_uring::Error),
127
/// URingContext failure.
128
#[error("URingContext failure: {0}")]
129
URingContextError(io_uring::Error),
130
/// Failed to submit or wait for io_uring events.
131
#[error("URing::enter: {0}")]
132
URingEnter(io_uring::Error),
133
}
134
pub type Result<T> = std::result::Result<T, Error>;
135
136
impl From<Error> for io::Error {
137
fn from(e: Error) -> Self {
138
use Error::*;
139
match e {
140
Discard(e) => e.into(),
141
DuplicatingFd(e) => e.into(),
142
ExecutorGone => io::Error::other(ExecutorGone),
143
InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
144
InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
145
Io(e) => e,
146
CreatingContext(e) => e.into(),
147
RemovingWaker(e) => e.into(),
148
SubmittingOp(e) => e.into(),
149
URingContextError(e) => e.into(),
150
URingEnter(e) => e.into(),
151
EnablingContext(e) => e.into(),
152
RegisteringURingRestriction(e) => e.into(),
153
}
154
}
155
}
156
157
impl From<Error> for AsyncError {
158
fn from(e: Error) -> AsyncError {
159
AsyncError::SysVariants(e.into())
160
}
161
}
162
163
static IS_URING_STABLE: LazyLock<bool> = LazyLock::new(|| {
164
let mut utsname = MaybeUninit::zeroed();
165
166
// SAFETY:
167
// Safe because this will only modify `utsname` and we check the return value.
168
let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
169
if res < 0 {
170
return false;
171
}
172
173
// SAFETY:
174
// Safe because the kernel has initialized `utsname`.
175
let utsname = unsafe { utsname.assume_init() };
176
177
// SAFETY:
178
// Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
179
let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
180
181
let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
182
Ok(c) => c,
183
Err(_) => return false,
184
};
185
186
// Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
187
match (components.next(), components.next()) {
188
(Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
189
// The kernel version is new enough so check if we can actually make a uring context.
190
URingContext::new(8, None).is_ok()
191
}
192
_ => false,
193
}
194
});
195
196
// Checks if the uring executor is stable.
197
// Caches the result so that the check is only run once.
198
// Useful for falling back to the FD executor on pre-uring kernels.
199
pub fn is_uring_stable() -> bool {
200
*IS_URING_STABLE
201
}
202
203
// Checks the uring availability by checking if the uring creation succeeds.
204
// If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
205
// It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
206
pub(crate) fn check_uring_availability() -> Result<()> {
207
URingContext::new(8, None)
208
.map(drop)
209
.map_err(Error::URingContextError)
210
}
211
212
pub struct RegisteredSource {
213
tag: usize,
214
ex: Weak<RawExecutor<UringReactor>>,
215
}
216
217
impl RegisteredSource {
218
pub fn start_read_to_mem(
219
&self,
220
file_offset: Option<u64>,
221
mem: Arc<dyn BackingMemory + Send + Sync>,
222
addrs: impl IntoIterator<Item = MemRegion>,
223
) -> Result<PendingOperation> {
224
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
225
let token = ex
226
.reactor
227
.submit_read_to_vectored(self, mem, file_offset, addrs)?;
228
229
Ok(PendingOperation {
230
waker_token: Some(token),
231
ex: self.ex.clone(),
232
submitted: false,
233
})
234
}
235
236
pub fn start_write_from_mem(
237
&self,
238
file_offset: Option<u64>,
239
mem: Arc<dyn BackingMemory + Send + Sync>,
240
addrs: impl IntoIterator<Item = MemRegion>,
241
) -> Result<PendingOperation> {
242
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
243
let token = ex
244
.reactor
245
.submit_write_from_vectored(self, mem, file_offset, addrs)?;
246
247
Ok(PendingOperation {
248
waker_token: Some(token),
249
ex: self.ex.clone(),
250
submitted: false,
251
})
252
}
253
254
pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
255
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
256
let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;
257
258
Ok(PendingOperation {
259
waker_token: Some(token),
260
ex: self.ex.clone(),
261
submitted: false,
262
})
263
}
264
265
pub fn start_fsync(&self) -> Result<PendingOperation> {
266
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
267
let token = ex.reactor.submit_fsync(self)?;
268
269
Ok(PendingOperation {
270
waker_token: Some(token),
271
ex: self.ex.clone(),
272
submitted: false,
273
})
274
}
275
276
pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
277
let events = EventType::Read;
278
279
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
280
let token = ex.reactor.submit_poll(self, events)?;
281
282
Ok(PendingOperation {
283
waker_token: Some(token),
284
ex: self.ex.clone(),
285
submitted: false,
286
})
287
}
288
}
289
290
impl Drop for RegisteredSource {
291
fn drop(&mut self) {
292
if let Some(ex) = self.ex.upgrade() {
293
ex.reactor.deregister_source(self);
294
}
295
}
296
}
297
298
// Number of entries in the ring.
299
const NUM_ENTRIES: usize = 256;
300
301
// An operation that has been submitted to the uring and is potentially being waited on.
302
struct OpData {
303
_file: Arc<File>,
304
_mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
305
waker: Option<Waker>,
306
canceled: bool,
307
}
308
309
// The current status of an operation that's been submitted to the uring.
310
enum OpStatus {
311
Nop,
312
Pending(OpData),
313
Completed(Option<::std::io::Result<u32>>),
314
}
315
316
struct Ring {
317
ops: Slab<OpStatus>,
318
registered_sources: Slab<Arc<File>>,
319
}
320
321
/// `Reactor` that manages async IO work using io_uring.
322
pub struct UringReactor {
323
// The URingContext needs to be first so that it is dropped first, closing the uring fd, and
324
// releasing the resources borrowed by the kernel before we free them.
325
ctx: URingContext,
326
ring: Mutex<Ring>,
327
thread_id: Mutex<Option<ThreadId>>,
328
}
329
330
impl UringReactor {
331
fn new() -> Result<UringReactor> {
332
// Allow operations only that the UringReactor really submits to enhance the security.
333
let mut restrictions = URingAllowlist::new();
334
let ops = [
335
URingOperation::Writev,
336
URingOperation::Readv,
337
URingOperation::Nop,
338
URingOperation::Fsync,
339
URingOperation::Fallocate,
340
URingOperation::PollAdd,
341
URingOperation::PollRemove,
342
URingOperation::AsyncCancel,
343
];
344
for op in ops {
345
restrictions.allow_submit_operation(op);
346
}
347
348
let ctx =
349
URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
350
351
Ok(UringReactor {
352
ctx,
353
ring: Mutex::new(Ring {
354
ops: Slab::with_capacity(NUM_ENTRIES),
355
registered_sources: Slab::with_capacity(NUM_ENTRIES),
356
}),
357
thread_id: Mutex::new(None),
358
})
359
}
360
361
fn runs_tasks_on_current_thread(&self) -> bool {
362
let executor_thread = self.thread_id.lock();
363
executor_thread
364
.map(|id| id == thread::current().id())
365
.unwrap_or(false)
366
}
367
368
fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
369
let mut ring = self.ring.lock();
370
371
let op = ring
372
.ops
373
.get_mut(token.0)
374
.expect("`get_result` called on unknown operation");
375
match op {
376
OpStatus::Nop => panic!("`get_result` called on nop"),
377
OpStatus::Pending(data) => {
378
if data.canceled {
379
panic!("`get_result` called on canceled operation");
380
}
381
data.waker = Some(cx.waker().clone());
382
None
383
}
384
OpStatus::Completed(res) => {
385
let out = res.take();
386
ring.ops.remove(token.0);
387
Some(out.expect("Missing result in completed operation"))
388
}
389
}
390
}
391
392
// Remove the waker for the given token if it hasn't fired yet.
393
fn cancel_operation(&self, token: WakerToken) {
394
let mut ring = self.ring.lock();
395
let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
396
match op {
397
OpStatus::Nop => panic!("`cancel_operation` called on nop"),
398
OpStatus::Pending(data) => {
399
if data.canceled {
400
panic!("uring operation canceled more than once");
401
}
402
403
if let Some(waker) = data.waker.take() {
404
waker.wake();
405
}
406
// Clear the waker as it is no longer needed.
407
data.waker = None;
408
data.canceled = true;
409
410
// Keep the rest of the op data as the uring might still be accessing either
411
// the source of the backing memory so it needs to live until the kernel
412
// completes the operation.
413
true
414
}
415
OpStatus::Completed(_) => {
416
ring.ops.remove(token.0);
417
false
418
}
419
}
420
} else {
421
false
422
};
423
std::mem::drop(ring);
424
if submit_cancel {
425
let _best_effort = self.submit_cancel_async(token.0);
426
}
427
}
428
429
pub(crate) fn register_source<F: AsRawDescriptor>(
430
&self,
431
raw: &Arc<RawExecutor<UringReactor>>,
432
fd: &F,
433
) -> Result<RegisteredSource> {
434
// SAFETY:
435
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
436
// will only be added to the poll loop.
437
let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };
438
439
Ok(RegisteredSource {
440
tag: self
441
.ring
442
.lock()
443
.registered_sources
444
.insert(Arc::new(duped_fd)),
445
ex: Arc::downgrade(raw),
446
})
447
}
448
449
fn deregister_source(&self, source: &RegisteredSource) {
450
// There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
451
// need.let them complete. deregister with pending ops is not a common path no need to
452
// optimize that case yet.
453
self.ring.lock().registered_sources.remove(source.tag);
454
}
455
456
fn submit_poll(
457
&self,
458
source: &RegisteredSource,
459
events: base::EventType,
460
) -> Result<WakerToken> {
461
let mut ring = self.ring.lock();
462
let src = ring
463
.registered_sources
464
.get(source.tag)
465
.ok_or(Error::InvalidSource)?
466
.clone();
467
let entry = ring.ops.vacant_entry();
468
let next_op_token = entry.key();
469
self.ctx
470
.add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
471
.map_err(Error::SubmittingOp)?;
472
entry.insert(OpStatus::Pending(OpData {
473
_file: src,
474
_mem: None,
475
waker: None,
476
canceled: false,
477
}));
478
479
Ok(WakerToken(next_op_token))
480
}
481
482
fn submit_fallocate(
483
&self,
484
source: &RegisteredSource,
485
offset: u64,
486
len: u64,
487
mode: u32,
488
) -> Result<WakerToken> {
489
let mut ring = self.ring.lock();
490
let src = ring
491
.registered_sources
492
.get(source.tag)
493
.ok_or(Error::InvalidSource)?
494
.clone();
495
let entry = ring.ops.vacant_entry();
496
let next_op_token = entry.key();
497
self.ctx
498
.add_fallocate(
499
src.as_raw_descriptor(),
500
offset,
501
len,
502
mode,
503
usize_to_u64(next_op_token),
504
)
505
.map_err(Error::SubmittingOp)?;
506
507
entry.insert(OpStatus::Pending(OpData {
508
_file: src,
509
_mem: None,
510
waker: None,
511
canceled: false,
512
}));
513
514
Ok(WakerToken(next_op_token))
515
}
516
517
fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
518
let mut ring = self.ring.lock();
519
let entry = ring.ops.vacant_entry();
520
let next_op_token = entry.key();
521
self.ctx
522
.async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
523
.map_err(Error::SubmittingOp)?;
524
525
entry.insert(OpStatus::Nop);
526
527
Ok(WakerToken(next_op_token))
528
}
529
530
fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
531
let mut ring = self.ring.lock();
532
let src = ring
533
.registered_sources
534
.get(source.tag)
535
.ok_or(Error::InvalidSource)?
536
.clone();
537
let entry = ring.ops.vacant_entry();
538
let next_op_token = entry.key();
539
self.ctx
540
.add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
541
.map_err(Error::SubmittingOp)?;
542
entry.insert(OpStatus::Pending(OpData {
543
_file: src,
544
_mem: None,
545
waker: None,
546
canceled: false,
547
}));
548
549
Ok(WakerToken(next_op_token))
550
}
551
552
fn submit_read_to_vectored(
553
&self,
554
source: &RegisteredSource,
555
mem: Arc<dyn BackingMemory + Send + Sync>,
556
offset: Option<u64>,
557
addrs: impl IntoIterator<Item = MemRegion>,
558
) -> Result<WakerToken> {
559
let iovecs = addrs
560
.into_iter()
561
.map(|mem_range| {
562
let vslice = mem
563
.get_volatile_slice(mem_range)
564
.map_err(|_| Error::InvalidOffset)?;
565
// SAFETY:
566
// Safe because we guarantee that the memory pointed to by `iovecs` lives until the
567
// transaction is complete and the completion has been returned from `wait()`.
568
Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
569
})
570
.collect::<Result<Vec<_>>>()?;
571
let iovecs = Pin::from(iovecs.into_boxed_slice());
572
573
let mut ring = self.ring.lock();
574
let src = ring
575
.registered_sources
576
.get(source.tag)
577
.ok_or(Error::InvalidSource)?
578
.clone();
579
580
let entry = ring.ops.vacant_entry();
581
let next_op_token = entry.key();
582
583
// SAFETY:
584
// Safe because all the addresses are within the Memory that an Arc is kept for the
585
// duration to ensure the memory is valid while the kernel accesses it.
586
// Tested by `dont_drop_backing_mem_read` unit test.
587
unsafe {
588
self.ctx
589
.add_readv(
590
iovecs,
591
src.as_raw_descriptor(),
592
offset,
593
usize_to_u64(next_op_token),
594
)
595
.map_err(Error::SubmittingOp)?;
596
}
597
598
entry.insert(OpStatus::Pending(OpData {
599
_file: src,
600
_mem: Some(mem),
601
waker: None,
602
canceled: false,
603
}));
604
605
Ok(WakerToken(next_op_token))
606
}
607
608
fn submit_write_from_vectored(
609
&self,
610
source: &RegisteredSource,
611
mem: Arc<dyn BackingMemory + Send + Sync>,
612
offset: Option<u64>,
613
addrs: impl IntoIterator<Item = MemRegion>,
614
) -> Result<WakerToken> {
615
let iovecs = addrs
616
.into_iter()
617
.map(|mem_range| {
618
let vslice = mem
619
.get_volatile_slice(mem_range)
620
.map_err(|_| Error::InvalidOffset)?;
621
// SAFETY:
622
// Safe because we guarantee that the memory pointed to by `iovecs` lives until the
623
// transaction is complete and the completion has been returned from `wait()`.
624
Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })
625
})
626
.collect::<Result<Vec<_>>>()?;
627
let iovecs = Pin::from(iovecs.into_boxed_slice());
628
629
let mut ring = self.ring.lock();
630
let src = ring
631
.registered_sources
632
.get(source.tag)
633
.ok_or(Error::InvalidSource)?
634
.clone();
635
636
let entry = ring.ops.vacant_entry();
637
let next_op_token = entry.key();
638
639
// SAFETY:
640
// Safe because all the addresses are within the Memory that an Arc is kept for the
641
// duration to ensure the memory is valid while the kernel accesses it.
642
// Tested by `dont_drop_backing_mem_write` unit test.
643
unsafe {
644
self.ctx
645
.add_writev(
646
iovecs,
647
src.as_raw_descriptor(),
648
offset,
649
usize_to_u64(next_op_token),
650
)
651
.map_err(Error::SubmittingOp)?;
652
}
653
654
entry.insert(OpStatus::Pending(OpData {
655
_file: src,
656
_mem: Some(mem),
657
waker: None,
658
canceled: false,
659
}));
660
661
Ok(WakerToken(next_op_token))
662
}
663
}
664
665
impl Reactor for UringReactor {
666
fn new() -> std::io::Result<Self> {
667
Ok(UringReactor::new()?)
668
}
669
670
fn wake(&self) {
671
let mut ring = self.ring.lock();
672
let entry = ring.ops.vacant_entry();
673
let next_op_token = entry.key();
674
if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
675
warn!("Failed to add NOP for waking up executor: {}", e);
676
}
677
entry.insert(OpStatus::Nop);
678
mem::drop(ring);
679
680
match self.ctx.submit() {
681
Ok(()) => {}
682
// If the kernel's submit ring is full then we know we won't block when calling
683
// io_uring_enter, which is all we really care about.
684
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
685
Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
686
}
687
}
688
689
fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
690
// At this point, there are no strong references to the executor (see `on_executor_drop`
691
// docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
692
// work can be submitted.
693
694
// Submit cancellations for all operations
695
#[allow(clippy::needless_collect)]
696
let ops: Vec<_> = self
697
.ring
698
.lock()
699
.ops
700
.iter_mut()
701
.filter_map(|op| match op.1 {
702
OpStatus::Pending(data) if !data.canceled => Some(op.0),
703
_ => None,
704
})
705
.collect();
706
for token in ops {
707
self.cancel_operation(WakerToken(token));
708
}
709
710
// Since the UringReactor is wrapped in an Arc it may end up being dropped from a different
711
// thread than the one that called `run` or `run_until`. Since we know there are no other
712
// references, just clear the thread id so that we don't panic.
713
*self.thread_id.lock() = None;
714
715
// Make sure all pending uring operations are completed as kernel may try to write to
716
// memory that we may drop.
717
//
718
// This future doesn't use the waker, it assumes the future will always be polled after
719
// processing other woken futures.
720
// TODO: Find a more robust solution.
721
Box::pin(futures::future::poll_fn(|_cx| {
722
if self.ring.lock().ops.is_empty() {
723
Poll::Ready(())
724
} else {
725
Poll::Pending
726
}
727
}))
728
}
729
730
fn on_thread_start(&self) {
731
let current_thread = thread::current().id();
732
let mut thread_id = self.thread_id.lock();
733
assert_eq!(
734
*thread_id.get_or_insert(current_thread),
735
current_thread,
736
"`UringReactor::wait_for_work` cannot be called from more than one thread"
737
);
738
}
739
740
fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
741
trace!(
742
"Waiting on events, {} pending ops",
743
self.ring.lock().ops.len()
744
);
745
let events = self.ctx.wait().map_err(Error::URingEnter)?;
746
747
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
748
// writing to the eventfd.
749
set_processing();
750
751
let mut ring = self.ring.lock();
752
for (raw_token, result) in events {
753
// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
754
// something that we originally gave to the kernel and that was created from a
755
// `usize` so we should always be able to convert it back into a `usize`.
756
let token = raw_token
757
.try_into()
758
.expect("`u64` doesn't fit inside a `usize`");
759
760
let op = ring
761
.ops
762
.get_mut(token)
763
.expect("Received completion token for unexpected operation");
764
match mem::replace(op, OpStatus::Completed(Some(result))) {
765
// No one is waiting on a Nop.
766
OpStatus::Nop => mem::drop(ring.ops.remove(token)),
767
OpStatus::Pending(data) => {
768
if data.canceled {
769
// No one is waiting for this operation and the uring is done with
770
// it so it's safe to remove.
771
ring.ops.remove(token);
772
}
773
if let Some(waker) = data.waker {
774
waker.wake();
775
}
776
}
777
OpStatus::Completed(_) => panic!("uring operation completed more than once"),
778
}
779
}
780
781
Ok(())
782
}
783
784
fn new_source<F: AsRawDescriptor>(
785
&self,
786
ex: &Arc<RawExecutor<Self>>,
787
f: F,
788
) -> AsyncResult<IoSource<F>> {
789
Ok(IoSource::Uring(super::UringSource::new(f, ex)?))
790
}
791
792
fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {
793
TaskHandle::Uring(task)
794
}
795
}
796
797
impl AsRawDescriptor for UringReactor {
798
fn as_raw_descriptor(&self) -> RawDescriptor {
799
self.ctx.as_raw_descriptor()
800
}
801
}
802
803
impl WeakWake for UringReactor {
804
fn wake_by_ref(weak_self: &Weak<Self>) {
805
if let Some(arc_self) = weak_self.upgrade() {
806
Reactor::wake(&*arc_self);
807
}
808
}
809
}
810
811
impl Drop for UringReactor {
812
fn drop(&mut self) {
813
// The ring should have been drained when the executor's Drop ran.
814
assert!(self.ring.lock().ops.is_empty());
815
}
816
}
817
818
// SAFETY:
819
// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
820
// waiting in TLS to be added to the main polling context.
821
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
822
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
823
if ret < 0 {
824
Err(Error::DuplicatingFd(base::Error::last()))
825
} else {
826
Ok(ret)
827
}
828
}
829
830
// Converts a `usize` into a `u64` and panics if the conversion fails.
831
#[inline]
832
fn usize_to_u64(val: usize) -> u64 {
833
val.try_into().expect("`usize` doesn't fit inside a `u64`")
834
}
835
836
pub struct PendingOperation {
837
waker_token: Option<WakerToken>,
838
ex: Weak<RawExecutor<UringReactor>>,
839
submitted: bool,
840
}
841
842
impl Future for PendingOperation {
843
type Output = Result<u32>;
844
845
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
846
let token = self
847
.waker_token
848
.as_ref()
849
.expect("PendingOperation polled after returning Poll::Ready");
850
if let Some(ex) = self.ex.upgrade() {
851
if let Some(result) = ex.reactor.get_result(token, cx) {
852
self.waker_token = None;
853
Poll::Ready(result.map_err(Error::Io))
854
} else {
855
// If we haven't submitted the operation yet, and the executor runs on a different
856
// thread then submit it now. Otherwise the executor will submit it automatically
857
// the next time it calls UringContext::wait.
858
if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {
859
match ex.reactor.ctx.submit() {
860
Ok(()) => self.submitted = true,
861
// If the kernel ring is full then wait until some ops are removed from the
862
// completion queue. This op should get submitted the next time the executor
863
// calls UringContext::wait.
864
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
865
Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
866
}
867
}
868
Poll::Pending
869
}
870
} else {
871
Poll::Ready(Err(Error::ExecutorGone))
872
}
873
}
874
}
875
876
impl Drop for PendingOperation {
877
fn drop(&mut self) {
878
if let Some(waker_token) = self.waker_token.take() {
879
if let Some(ex) = self.ex.upgrade() {
880
ex.reactor.cancel_operation(waker_token);
881
}
882
}
883
}
884
}
885
886
#[cfg(test)]
887
mod tests {
888
use std::future::Future;
889
use std::io::Read;
890
use std::io::Write;
891
use std::mem;
892
use std::pin::Pin;
893
use std::rc::Rc;
894
use std::task::Context;
895
use std::task::Poll;
896
897
use futures::executor::block_on;
898
899
use super::*;
900
use crate::mem::BackingMemory;
901
use crate::mem::MemRegion;
902
use crate::mem::VecIoWrapper;
903
use crate::BlockingPool;
904
use crate::ExecutorTrait;
905
906
// A future that returns ready when the uring queue is empty.
907
struct UringQueueEmpty<'a> {
908
ex: &'a Arc<RawExecutor<UringReactor>>,
909
}
910
911
impl Future for UringQueueEmpty<'_> {
912
type Output = ();
913
914
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
915
if self.ex.reactor.ring.lock().ops.is_empty() {
916
Poll::Ready(())
917
} else {
918
Poll::Pending
919
}
920
}
921
}
922
923
#[test]
924
fn dont_drop_backing_mem_read() {
925
if !is_uring_stable() {
926
return;
927
}
928
929
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
930
// op is pending.
931
let bm =
932
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
933
934
// Use pipes to create a future that will block forever.
935
let (rx, mut tx) = base::pipe().unwrap();
936
937
// Set up the TLS for the uring_executor by creating one.
938
let ex = RawExecutor::<UringReactor>::new().unwrap();
939
940
// Register the receive side of the pipe with the executor.
941
let registered_source = ex
942
.reactor
943
.register_source(&ex, &rx)
944
.expect("register source failed");
945
946
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the
947
// duration of the op.
948
let pending_op = registered_source
949
.start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
950
.expect("failed to start read to mem");
951
952
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
953
// reference while the op is active.
954
assert_eq!(Arc::strong_count(&bm), 2);
955
956
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
957
// it.
958
drop(pending_op);
959
assert_eq!(Arc::strong_count(&bm), 2);
960
961
// Finishing the operation should put the Arc count back to 1.
962
// write to the pipe to wake the read pipe and then wait for the uring result in the
963
// executor.
964
tx.write_all(&[0u8; 8]).expect("write failed");
965
ex.run_until(UringQueueEmpty { ex: &ex })
966
.expect("Failed to wait for read pipe ready");
967
assert_eq!(Arc::strong_count(&bm), 1);
968
}
969
970
#[test]
971
fn dont_drop_backing_mem_write() {
972
if !is_uring_stable() {
973
return;
974
}
975
976
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
977
// op is pending.
978
let bm =
979
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
980
981
// Use pipes to create a future that will block forever.
982
let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
983
984
// Set up the TLS for the uring_executor by creating one.
985
let ex = RawExecutor::<UringReactor>::new().unwrap();
986
987
// Register the receive side of the pipe with the executor.
988
let registered_source = ex
989
.reactor
990
.register_source(&ex, &tx)
991
.expect("register source failed");
992
993
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the
994
// duration of the op.
995
let pending_op = registered_source
996
.start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
997
.expect("failed to start write to mem");
998
999
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1000
// reference while the op is active.
1001
assert_eq!(Arc::strong_count(&bm), 2);
1002
1003
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1004
// it.
1005
drop(pending_op);
1006
assert_eq!(Arc::strong_count(&bm), 2);
1007
1008
// Finishing the operation should put the Arc count back to 1.
1009
// write to the pipe to wake the read pipe and then wait for the uring result in the
1010
// executor.
1011
let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1012
rx.read_exact(&mut buf).expect("read to empty failed");
1013
ex.run_until(UringQueueEmpty { ex: &ex })
1014
.expect("Failed to wait for write pipe ready");
1015
assert_eq!(Arc::strong_count(&bm), 1);
1016
}
1017
1018
#[test]
1019
fn canceled_before_completion() {
1020
if !is_uring_stable() {
1021
return;
1022
}
1023
1024
async fn cancel_io(op: PendingOperation) {
1025
mem::drop(op);
1026
}
1027
1028
async fn check_result(op: PendingOperation, expected: u32) {
1029
let actual = op.await.expect("operation failed to complete");
1030
assert_eq!(expected, actual);
1031
}
1032
1033
let bm =
1034
Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1035
1036
let (rx, tx) = base::pipe().expect("Pipe failed");
1037
1038
let ex = RawExecutor::<UringReactor>::new().unwrap();
1039
1040
let rx_source = ex
1041
.reactor
1042
.register_source(&ex, &rx)
1043
.expect("register source failed");
1044
let tx_source = ex
1045
.reactor
1046
.register_source(&ex, &tx)
1047
.expect("register source failed");
1048
1049
let read_task = rx_source
1050
.start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])
1051
.expect("failed to start read to mem");
1052
1053
ex.spawn_local(cancel_io(read_task)).detach();
1054
1055
// Write to the pipe so that the kernel operation will complete.
1056
let buf =
1057
Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1058
let write_task = tx_source
1059
.start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])
1060
.expect("failed to start write from mem");
1061
1062
ex.run_until(check_result(write_task, 8))
1063
.expect("Failed to run executor");
1064
}
1065
1066
// We will drain all ops on drop and its not guaranteed that operation won't finish
1067
#[ignore]
1068
#[test]
1069
fn drop_before_completion() {
1070
if !is_uring_stable() {
1071
return;
1072
}
1073
1074
const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1075
1076
async fn check_op(op: PendingOperation) {
1077
let err = op.await.expect_err("Op completed successfully");
1078
match err {
1079
Error::ExecutorGone => {}
1080
e => panic!("Unexpected error from op: {e}"),
1081
}
1082
}
1083
1084
let (mut rx, mut tx) = base::pipe().expect("Pipe failed");
1085
1086
let ex = RawExecutor::<UringReactor>::new().unwrap();
1087
1088
let tx_source = ex
1089
.reactor
1090
.register_source(&ex, &tx)
1091
.expect("Failed to register source");
1092
let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1093
let op = tx_source
1094
.start_write_from_mem(
1095
None,
1096
bm,
1097
[MemRegion {
1098
offset: 0,
1099
len: mem::size_of::<u64>(),
1100
}],
1101
)
1102
.expect("Failed to start write from mem");
1103
1104
ex.spawn_local(check_op(op)).detach();
1105
1106
// Now drop the executor. It shouldn't run the write operation.
1107
mem::drop(ex);
1108
1109
// Make sure the executor did not complete the uring operation.
1110
let new_val = [0x2e; 8];
1111
tx.write_all(&new_val).unwrap();
1112
1113
let mut buf = 0u64.to_ne_bytes();
1114
rx.read_exact(&mut buf[..])
1115
.expect("Failed to read from pipe");
1116
1117
assert_eq!(buf, new_val);
1118
}
1119
1120
// Dropping a task that owns a BlockingPool shouldn't leak the pool.
1121
#[test]
1122
fn drop_detached_blocking_pool() {
1123
if !is_uring_stable() {
1124
return;
1125
}
1126
1127
struct Cleanup(BlockingPool);
1128
1129
impl Drop for Cleanup {
1130
fn drop(&mut self) {
1131
// Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1132
self.0
1133
.shutdown(Some(
1134
std::time::Instant::now() + std::time::Duration::from_secs(1),
1135
))
1136
.unwrap();
1137
}
1138
}
1139
1140
let rc = Rc::new(std::cell::Cell::new(0));
1141
{
1142
let ex = RawExecutor::<UringReactor>::new().unwrap();
1143
let rc_clone = rc.clone();
1144
ex.spawn_local(async move {
1145
rc_clone.set(1);
1146
let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1147
let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1148
// Spawn a blocking task.
1149
let blocking_task = pool.0.spawn(move || {
1150
// Rendezvous.
1151
assert_eq!(recv.recv(), Ok(()));
1152
// Wait for drop.
1153
assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1154
});
1155
// Make sure it has actually started (using a "rendezvous channel" send).
1156
//
1157
// Without this step, we'll have a race where we can shutdown the blocking pool
1158
// before the worker thread pops off the task.
1159
send.send(()).unwrap();
1160
// Wait for it to finish
1161
blocking_task.await;
1162
rc_clone.set(2);
1163
})
1164
.detach();
1165
ex.run_until(async {}).unwrap();
1166
// `ex` is dropped here. If everything is working as expected, it should drop all of
1167
// its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1168
// `Drop` impl will try to join all the worker threads, which should work because send
1169
// half of the channel closed.
1170
}
1171
assert_eq!(rc.get(), 1);
1172
Rc::try_unwrap(rc).expect("Rc had too many refs");
1173
}
1174
1175
#[test]
1176
fn drop_on_different_thread() {
1177
if !is_uring_stable() {
1178
return;
1179
}
1180
1181
let ex = RawExecutor::<UringReactor>::new().unwrap();
1182
1183
let ex2 = ex.clone();
1184
let t = thread::spawn(move || ex2.run_until(async {}));
1185
1186
t.join().unwrap().unwrap();
1187
1188
// Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1189
// completion.
1190
let (_rx, tx) = base::pipe().expect("Pipe failed");
1191
let tx = ex
1192
.reactor
1193
.register_source(&ex, &tx)
1194
.expect("Failed to register source");
1195
let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1196
let op = tx
1197
.start_write_from_mem(
1198
None,
1199
bm,
1200
[MemRegion {
1201
offset: 0,
1202
len: mem::size_of::<u64>(),
1203
}],
1204
)
1205
.expect("Failed to start write from mem");
1206
1207
mem::drop(ex);
1208
1209
match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1210
Error::ExecutorGone => {}
1211
e => panic!("Unexpected error after dropping executor: {e}"),
1212
}
1213
}
1214
}
1215
1216