Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/io_uring/src/uring.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
// This file makes several casts from u8 pointers into more-aligned pointer types.
6
// We assume that the kernel will give us suitably aligned memory.
7
#![allow(clippy::cast_ptr_alignment)]
8
9
use std::collections::BTreeMap;
10
use std::fs::File;
11
use std::io;
12
use std::os::unix::io::AsRawFd;
13
use std::os::unix::io::FromRawFd;
14
use std::os::unix::io::RawFd;
15
use std::pin::Pin;
16
use std::ptr::null;
17
use std::sync::atomic::AtomicPtr;
18
use std::sync::atomic::AtomicU32;
19
use std::sync::atomic::Ordering;
20
21
use base::AsRawDescriptor;
22
use base::EventType;
23
use base::IoBufMut;
24
use base::MappedRegion;
25
use base::MemoryMapping;
26
use base::MemoryMappingBuilder;
27
use base::Protection;
28
use base::RawDescriptor;
29
use libc::c_void;
30
use remain::sorted;
31
use sync::Mutex;
32
use thiserror::Error as ThisError;
33
34
use crate::bindings::*;
35
use crate::syscalls::*;
36
37
/// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
38
/// for callers to identify each request.
39
pub type UserData = u64;
40
41
#[sorted]
42
#[derive(Debug, ThisError)]
43
pub enum Error {
44
/// Failed to map the completion ring.
45
#[error("Failed to mmap completion ring {0}")]
46
MappingCompleteRing(base::MmapError),
47
/// Failed to map submit entries.
48
#[error("Failed to mmap submit entries {0}")]
49
MappingSubmitEntries(base::MmapError),
50
/// Failed to map the submit ring.
51
#[error("Failed to mmap submit ring {0}")]
52
MappingSubmitRing(base::MmapError),
53
/// Too many ops are already queued.
54
#[error("No space for more ring entries, try increasing the size passed to `new`")]
55
NoSpace,
56
/// The call to `io_uring_enter` failed with the given errno.
57
#[error("Failed to enter io uring: {0}")]
58
RingEnter(libc::c_int),
59
/// The call to `io_uring_register` failed with the given errno.
60
#[error("Failed to register operations for io uring: {0}")]
61
RingRegister(libc::c_int),
62
/// The call to `io_uring_setup` failed with the given errno.
63
#[error("Failed to setup io uring {0}")]
64
Setup(libc::c_int),
65
}
66
pub type Result<T> = std::result::Result<T, Error>;
67
68
impl From<Error> for io::Error {
69
fn from(e: Error) -> Self {
70
use Error::*;
71
match e {
72
RingEnter(errno) => io::Error::from_raw_os_error(errno),
73
Setup(errno) => io::Error::from_raw_os_error(errno),
74
e => io::Error::other(e),
75
}
76
}
77
}
78
79
pub struct SubmitQueue {
80
submit_ring: SubmitQueueState,
81
submit_queue_entries: SubmitQueueEntries,
82
submitting: usize, // The number of ops in the process of being submitted.
83
pub added: usize, // The number of ops added since the last call to `io_uring_enter`.
84
num_sqes: usize, // The total number of sqes allocated in shared memory.
85
}
86
87
// Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
88
impl io_uring_sqe {
89
pub fn set_addr(&mut self, val: u64) {
90
self.__bindgen_anon_2.addr = val;
91
}
92
pub fn set_off(&mut self, val: u64) {
93
self.__bindgen_anon_1.off = val;
94
}
95
96
pub fn set_buf_index(&mut self, val: u16) {
97
self.__bindgen_anon_4.buf_index = val;
98
}
99
100
pub fn set_rw_flags(&mut self, val: libc::c_int) {
101
self.__bindgen_anon_3.rw_flags = val;
102
}
103
104
pub fn set_poll_events(&mut self, val: u32) {
105
let val = if cfg!(target_endian = "big") {
106
// Swap words on big-endian platforms to match the original ABI where poll_events was 16
107
// bits wide.
108
val.rotate_left(16)
109
} else {
110
val
111
};
112
self.__bindgen_anon_3.poll32_events = val;
113
}
114
}
115
116
// Convert a file offset to the raw io_uring offset format.
117
// Some => explicit offset
118
// None => use current file position
119
fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
120
// File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
121
// file position.
122
const USE_CURRENT_FILE_POS: libc::off64_t = -1;
123
offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
124
}
125
126
impl SubmitQueue {
127
// Call `f` with the next available sqe or return an error if none are available.
128
// After `f` returns, the sqe is appended to the kernel's queue.
129
fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
130
where
131
F: FnMut(&mut io_uring_sqe),
132
{
133
if self.added == self.num_sqes {
134
return Err(Error::NoSpace);
135
}
136
137
// Find the next free submission entry in the submit ring and fill it with an iovec.
138
// The below raw pointer derefs are safe because the memory the pointers use lives as long
139
// as the mmap in self.
140
let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
141
let next_tail = tail.wrapping_add(1);
142
if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
143
return Err(Error::NoSpace);
144
}
145
// `tail` is the next sqe to use.
146
let index = (tail & self.submit_ring.ring_mask) as usize;
147
let sqe = self.submit_queue_entries.get_mut(index).unwrap();
148
149
f(sqe);
150
151
// Tells the kernel to use the new index when processing the entry at that index.
152
self.submit_ring.set_array_entry(index, index as u32);
153
// Ensure the above writes to sqe are seen before the tail is updated.
154
// set_tail uses Release ordering when storing to the ring.
155
self.submit_ring.pointers.set_tail(next_tail);
156
157
self.added += 1;
158
159
Ok(())
160
}
161
162
// Returns the number of entries that have been added to this SubmitQueue since the last time
163
// `prepare_submit` was called.
164
fn prepare_submit(&mut self) -> usize {
165
let out = self.added - self.submitting;
166
self.submitting = self.added;
167
168
out
169
}
170
171
// Indicates that we failed to submit `count` entries to the kernel and that they should be
172
// retried.
173
fn fail_submit(&mut self, count: usize) {
174
debug_assert!(count <= self.submitting);
175
self.submitting -= count;
176
}
177
178
// Indicates that `count` entries have been submitted to the kernel and so the space may be
179
// reused for new entries.
180
fn complete_submit(&mut self, count: usize) {
181
debug_assert!(count <= self.submitting);
182
self.submitting -= count;
183
self.added -= count;
184
}
185
}
186
187
/// Enum to represent all io_uring operations
188
#[repr(u32)]
189
pub enum URingOperation {
190
Nop = io_uring_op_IORING_OP_NOP,
191
Readv = io_uring_op_IORING_OP_READV,
192
Writev = io_uring_op_IORING_OP_WRITEV,
193
Fsync = io_uring_op_IORING_OP_FSYNC,
194
ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
195
WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
196
PollAdd = io_uring_op_IORING_OP_POLL_ADD,
197
PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
198
SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
199
Sendmsg = io_uring_op_IORING_OP_SENDMSG,
200
Recvmsg = io_uring_op_IORING_OP_RECVMSG,
201
Timeout = io_uring_op_IORING_OP_TIMEOUT,
202
TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
203
Accept = io_uring_op_IORING_OP_ACCEPT,
204
AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
205
LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
206
Connect = io_uring_op_IORING_OP_CONNECT,
207
Fallocate = io_uring_op_IORING_OP_FALLOCATE,
208
Openat = io_uring_op_IORING_OP_OPENAT,
209
Close = io_uring_op_IORING_OP_CLOSE,
210
FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
211
Statx = io_uring_op_IORING_OP_STATX,
212
Read = io_uring_op_IORING_OP_READ,
213
Write = io_uring_op_IORING_OP_WRITE,
214
Fadvise = io_uring_op_IORING_OP_FADVISE,
215
Madvise = io_uring_op_IORING_OP_MADVISE,
216
Send = io_uring_op_IORING_OP_SEND,
217
Recv = io_uring_op_IORING_OP_RECV,
218
Openat2 = io_uring_op_IORING_OP_OPENAT2,
219
EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
220
Splice = io_uring_op_IORING_OP_SPLICE,
221
ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
222
RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
223
Tee = io_uring_op_IORING_OP_TEE,
224
Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
225
Renameat = io_uring_op_IORING_OP_RENAMEAT,
226
Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
227
Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
228
Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
229
Linkat = io_uring_op_IORING_OP_LINKAT,
230
}
231
232
/// Represents an allowlist of the restrictions to be registered to a uring.
233
#[derive(Default)]
234
pub struct URingAllowlist(Vec<io_uring_restriction>);
235
236
impl URingAllowlist {
237
/// Create a new `UringAllowList` which allows no operation.
238
pub fn new() -> Self {
239
URingAllowlist::default()
240
}
241
242
/// Allow `operation` to be submitted to the submit queue of the io_uring.
243
pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
244
self.0.push(io_uring_restriction {
245
opcode: io_uring_register_restriction_op_IORING_RESTRICTION_SQE_OP as u16,
246
__bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
247
sqe_op: operation as u8,
248
},
249
..Default::default()
250
});
251
self
252
}
253
}
254
255
/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
256
/// to the kernel and asynchronously handling the completion of these operations.
257
/// Use the various `add_*` functions to configure operations, then call `wait` to start
258
/// the operations and get any completed results. Each op is given a u64 user_data argument that is
259
/// used to identify the result when returned in the iterator provided by `wait`.
260
///
261
/// # Example polling an FD for readable status.
262
///
263
/// ```no_run
264
/// # use std::fs::File;
265
/// # use std::os::unix::io::AsRawFd;
266
/// # use std::path::Path;
267
/// # use base::EventType;
268
/// # use io_uring::URingContext;
269
/// let f = File::open(Path::new("/dev/zero")).unwrap();
270
/// let uring = URingContext::new(16, None).unwrap();
271
/// uring
272
/// .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
273
/// .unwrap();
274
/// let (user_data, res) = uring.wait().unwrap().next().unwrap();
275
/// assert_eq!(user_data, 454 as io_uring::UserData);
276
/// assert_eq!(res.unwrap(), 1 as u32);
277
/// ```
278
pub struct URingContext {
279
ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
280
pub submit_ring: Mutex<SubmitQueue>,
281
pub complete_ring: CompleteQueueState,
282
}
283
284
impl URingContext {
285
/// Creates a `URingContext` where the underlying uring has a space for `num_entries`
286
/// simultaneous operations. If `allowlist` is given, all operations other
287
/// than those explicitly permitted by `allowlist` are prohibited.
288
pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
289
let mut ring_params = io_uring_params::default();
290
if allowlist.is_some() {
291
// To register restrictions, a uring must start in a disabled state.
292
ring_params.flags |= IORING_SETUP_R_DISABLED;
293
}
294
295
// SAFETY:
296
// The below unsafe block isolates the creation of the URingContext. Each step on it's own
297
// is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
298
// base addresses maintains safety guarantees assuming the kernel API guarantees are
299
// trusted.
300
unsafe {
301
// Safe because the kernel is trusted to only modify params and `File` is created with
302
// an FD that it takes complete ownership of.
303
let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
304
let ring_file = File::from_raw_fd(fd);
305
306
// Register the restrictions if it's given
307
if let Some(restrictions) = allowlist {
308
// safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and
309
// `restrictions` contains a valid pointer and length.
310
io_uring_register(
311
fd,
312
io_uring_register_op_IORING_REGISTER_RESTRICTIONS,
313
restrictions.0.as_ptr() as *const c_void,
314
restrictions.0.len() as u32,
315
)
316
.map_err(Error::RingRegister)?;
317
318
// enables the URingContext since it was started in a disabled state.
319
// safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
320
io_uring_register(
321
fd,
322
io_uring_register_op_IORING_REGISTER_ENABLE_RINGS,
323
null::<c_void>(),
324
0,
325
)
326
.map_err(Error::RingRegister)?;
327
}
328
329
// Mmap the submit and completion queues.
330
// Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
331
// is checked.
332
let submit_ring = SubmitQueueState::new(
333
MemoryMappingBuilder::new(
334
ring_params.sq_off.array as usize
335
+ ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
336
)
337
.from_file(&ring_file)
338
.offset(u64::from(IORING_OFF_SQ_RING))
339
.protection(Protection::read_write())
340
.populate()
341
.build()
342
.map_err(Error::MappingSubmitRing)?,
343
&ring_params,
344
);
345
346
let num_sqe = ring_params.sq_entries as usize;
347
let submit_queue_entries = SubmitQueueEntries {
348
mmap: MemoryMappingBuilder::new(
349
ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
350
)
351
.from_file(&ring_file)
352
.offset(u64::from(IORING_OFF_SQES))
353
.protection(Protection::read_write())
354
.populate()
355
.build()
356
.map_err(Error::MappingSubmitEntries)?,
357
len: num_sqe,
358
};
359
360
let complete_ring = CompleteQueueState::new(
361
MemoryMappingBuilder::new(
362
ring_params.cq_off.cqes as usize
363
+ ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
364
)
365
.from_file(&ring_file)
366
.offset(u64::from(IORING_OFF_CQ_RING))
367
.protection(Protection::read_write())
368
.populate()
369
.build()
370
.map_err(Error::MappingCompleteRing)?,
371
&ring_params,
372
);
373
374
Ok(URingContext {
375
ring_file,
376
submit_ring: Mutex::new(SubmitQueue {
377
submit_ring,
378
submit_queue_entries,
379
submitting: 0,
380
added: 0,
381
num_sqes: ring_params.sq_entries as usize,
382
}),
383
complete_ring,
384
})
385
}
386
}
387
388
/// # Safety
389
/// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
390
/// existence.
391
pub unsafe fn add_writev_iter<I>(
392
&self,
393
iovecs: I,
394
fd: RawFd,
395
offset: Option<u64>,
396
user_data: UserData,
397
) -> Result<()>
398
where
399
I: Iterator<Item = libc::iovec>,
400
{
401
self.add_writev(
402
Pin::from(
403
// Safe because the caller is required to guarantee that the memory pointed to by
404
// `iovecs` lives until the transaction is complete and the completion has been
405
// returned from `wait()`.
406
iovecs
407
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
408
.collect::<Vec<_>>()
409
.into_boxed_slice(),
410
),
411
fd,
412
offset,
413
user_data,
414
)
415
}
416
417
/// Asynchronously writes to `fd` from the addresses given in `iovecs`.
418
/// # Safety
419
/// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
420
/// guarantees there are no other references to that memory and that the memory lives until the
421
/// transaction is complete and that completion has been returned from the `wait` function. In
422
/// addition there must not be any mutable references to the data pointed to by `iovecs` until
423
/// the operation completes. Ensure that the fd remains open until the op completes as well.
424
/// The iovecs reference must be kept alive until the op returns.
425
pub unsafe fn add_writev(
426
&self,
427
iovecs: Pin<Box<[IoBufMut<'static>]>>,
428
fd: RawFd,
429
offset: Option<u64>,
430
user_data: UserData,
431
) -> Result<()> {
432
self.submit_ring.lock().prep_next_sqe(|sqe| {
433
sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
434
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
435
sqe.len = iovecs.len() as u32;
436
sqe.set_off(file_offset_to_raw_offset(offset));
437
sqe.set_buf_index(0);
438
sqe.ioprio = 0;
439
sqe.user_data = user_data;
440
sqe.flags = 0;
441
sqe.fd = fd;
442
})?;
443
self.complete_ring.add_op_data(user_data, iovecs);
444
Ok(())
445
}
446
447
/// # Safety
448
/// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
449
/// existence.
450
pub unsafe fn add_readv_iter<I>(
451
&self,
452
iovecs: I,
453
fd: RawFd,
454
offset: Option<u64>,
455
user_data: UserData,
456
) -> Result<()>
457
where
458
I: Iterator<Item = libc::iovec>,
459
{
460
self.add_readv(
461
Pin::from(
462
// Safe because the caller is required to guarantee that the memory pointed to by
463
// `iovecs` lives until the transaction is complete and the completion has been
464
// returned from `wait()`.
465
iovecs
466
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
467
.collect::<Vec<_>>()
468
.into_boxed_slice(),
469
),
470
fd,
471
offset,
472
user_data,
473
)
474
}
475
476
/// Asynchronously reads from `fd` to the addresses given in `iovecs`.
477
/// # Safety
478
/// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
479
/// guarantees there are no other references to that memory and that the memory lives until the
480
/// transaction is complete and that completion has been returned from the `wait` function. In
481
/// addition there must not be any references to the data pointed to by `iovecs` until the
482
/// operation completes. Ensure that the fd remains open until the op completes as well.
483
/// The iovecs reference must be kept alive until the op returns.
484
pub unsafe fn add_readv(
485
&self,
486
iovecs: Pin<Box<[IoBufMut<'static>]>>,
487
fd: RawFd,
488
offset: Option<u64>,
489
user_data: UserData,
490
) -> Result<()> {
491
self.submit_ring.lock().prep_next_sqe(|sqe| {
492
sqe.opcode = io_uring_op_IORING_OP_READV as u8;
493
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
494
sqe.len = iovecs.len() as u32;
495
sqe.set_off(file_offset_to_raw_offset(offset));
496
sqe.set_buf_index(0);
497
sqe.ioprio = 0;
498
sqe.user_data = user_data;
499
sqe.flags = 0;
500
sqe.fd = fd;
501
})?;
502
self.complete_ring.add_op_data(user_data, iovecs);
503
Ok(())
504
}
505
506
/// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
507
/// io_uring itself and for waking up a thread that's blocked inside a wait() call.
508
pub fn add_nop(&self, user_data: UserData) -> Result<()> {
509
self.submit_ring.lock().prep_next_sqe(|sqe| {
510
sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
511
sqe.fd = -1;
512
sqe.user_data = user_data;
513
514
sqe.set_addr(0);
515
sqe.len = 0;
516
sqe.set_off(0);
517
sqe.set_buf_index(0);
518
sqe.set_rw_flags(0);
519
sqe.ioprio = 0;
520
sqe.flags = 0;
521
})
522
}
523
524
/// Syncs all completed operations, the ordering with in-flight async ops is not
525
/// defined.
526
pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
527
self.submit_ring.lock().prep_next_sqe(|sqe| {
528
sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
529
sqe.fd = fd;
530
sqe.user_data = user_data;
531
532
sqe.set_addr(0);
533
sqe.len = 0;
534
sqe.set_off(0);
535
sqe.set_buf_index(0);
536
sqe.set_rw_flags(0);
537
sqe.ioprio = 0;
538
sqe.flags = 0;
539
})
540
}
541
542
/// See the usage of `fallocate`, this asynchronously performs the same operations.
543
pub fn add_fallocate(
544
&self,
545
fd: RawFd,
546
offset: u64,
547
len: u64,
548
mode: u32,
549
user_data: UserData,
550
) -> Result<()> {
551
// Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
552
// len field.
553
self.submit_ring.lock().prep_next_sqe(|sqe| {
554
sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
555
556
sqe.fd = fd;
557
sqe.set_addr(len);
558
sqe.len = mode;
559
sqe.set_off(offset);
560
sqe.user_data = user_data;
561
562
sqe.set_buf_index(0);
563
sqe.set_rw_flags(0);
564
sqe.ioprio = 0;
565
sqe.flags = 0;
566
})
567
}
568
569
/// Adds an FD to be polled based on the given flags.
570
/// The user must keep the FD open until the operation completion is returned from
571
/// `wait`.
572
/// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
573
/// to get future events.
574
pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
575
self.submit_ring.lock().prep_next_sqe(|sqe| {
576
sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
577
sqe.fd = fd;
578
sqe.user_data = user_data;
579
sqe.set_poll_events(events.into());
580
581
sqe.set_addr(0);
582
sqe.len = 0;
583
sqe.set_off(0);
584
sqe.set_buf_index(0);
585
sqe.ioprio = 0;
586
sqe.flags = 0;
587
})
588
}
589
590
/// Removes an FD that was previously added with `add_poll_fd`.
591
pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
592
self.submit_ring.lock().prep_next_sqe(|sqe| {
593
sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
594
sqe.fd = fd;
595
sqe.user_data = user_data;
596
sqe.set_poll_events(events.into());
597
598
sqe.set_addr(0);
599
sqe.len = 0;
600
sqe.set_off(0);
601
sqe.set_buf_index(0);
602
sqe.ioprio = 0;
603
sqe.flags = 0;
604
})
605
}
606
607
/// Attempt to cancel an already issued request. addr must contain the user_data field of the
608
/// request that should be cancelled. The cancellation request will complete with one of the
609
/// following results codes. If found, the res field of the cqe will contain 0. If not found,
610
/// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
611
/// -EALREADY. In this case, the request may or may not terminate. In general, requests that
612
/// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
613
/// cancelled if already started.
614
pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
615
self.submit_ring.lock().prep_next_sqe(|sqe| {
616
sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
617
sqe.user_data = user_data;
618
sqe.set_addr(addr);
619
620
sqe.len = 0;
621
sqe.fd = 0;
622
sqe.set_off(0);
623
sqe.set_buf_index(0);
624
sqe.ioprio = 0;
625
sqe.flags = 0;
626
})
627
}
628
629
// Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
630
// waiting for `wait_nr` operations to complete.
631
fn enter(&self, wait_nr: u64) -> Result<()> {
632
let added = self.submit_ring.lock().prepare_submit();
633
if added == 0 && wait_nr == 0 {
634
return Ok(());
635
}
636
637
let flags = if wait_nr > 0 {
638
IORING_ENTER_GETEVENTS
639
} else {
640
0
641
};
642
let res =
643
// SAFETY:
644
// Safe because the only memory modified is in the completion queue.
645
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) };
646
647
// An EINTR means we did successfully submit the events.
648
if res.is_ok() || res == Err(libc::EINTR) {
649
self.submit_ring.lock().complete_submit(added);
650
} else {
651
self.submit_ring.lock().fail_submit(added);
652
}
653
654
match res {
655
Ok(()) => Ok(()),
656
// EBUSY means that some completed events need to be processed before more can
657
// be submitted, so wait for some sqes to finish without submitting new ones.
658
// EINTR means we were interrupted while waiting, so start waiting again.
659
Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => {
660
loop {
661
let res =
662
// SAFETY:
663
// Safe because the only memory modified is in the completion queue.
664
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
665
if res != Err(libc::EINTR) {
666
return res.map_err(Error::RingEnter);
667
}
668
}
669
}
670
Err(e) => Err(Error::RingEnter(e)),
671
}
672
}
673
674
/// Sends operations added with the `add_*` functions to the kernel.
675
pub fn submit(&self) -> Result<()> {
676
self.enter(0)
677
}
678
679
/// Sends operations added with the `add_*` functions to the kernel and return an iterator to
680
/// any completed operations. `wait` blocks until at least one completion is ready. If
681
/// called without any new events added, this simply waits for any existing events to
682
/// complete and returns as soon an one or more is ready.
683
pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
684
// We only want to wait for events if there aren't already events in the completion queue.
685
let wait_nr = if self.complete_ring.num_ready() > 0 {
686
0
687
} else {
688
1
689
};
690
691
// The CompletionQueue will iterate all completed ops.
692
match self.enter(wait_nr) {
693
Ok(()) => Ok(&self.complete_ring),
694
// If we cannot submit any more entries then we need to pull stuff out of the completion
695
// ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
696
// we know there are already entries in the completion queue.
697
Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
698
Err(e) => Err(e),
699
}
700
}
701
}
702
703
impl AsRawFd for URingContext {
704
fn as_raw_fd(&self) -> RawFd {
705
self.ring_file.as_raw_fd()
706
}
707
}
708
709
impl AsRawDescriptor for URingContext {
710
fn as_raw_descriptor(&self) -> RawDescriptor {
711
self.ring_file.as_raw_descriptor()
712
}
713
}
714
715
struct SubmitQueueEntries {
716
mmap: MemoryMapping,
717
len: usize,
718
}
719
720
impl SubmitQueueEntries {
721
fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
722
if index >= self.len {
723
return None;
724
}
725
// SAFETY:
726
// Safe because the mut borrow of self resticts to one mutable reference at a time and
727
// we trust that the kernel has returned enough memory in io_uring_setup and mmap.
728
let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) };
729
// Clear any state.
730
*mut_ref = io_uring_sqe::default();
731
Some(mut_ref)
732
}
733
}
734
735
struct SubmitQueueState {
736
_mmap: MemoryMapping,
737
pointers: QueuePointers,
738
ring_mask: u32,
739
array: AtomicPtr<u32>,
740
}
741
742
impl SubmitQueueState {
743
// # Safety
744
// Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
745
// the params struct passed to io_uring_setup.
746
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
747
let ptr = mmap.as_ptr();
748
// Transmutes are safe because a u32 is atomic on all supported architectures and the
749
// pointer will live until after self is dropped because the mmap is owned.
750
let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
751
let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
752
// This offset is guaranteed to be within the mmap so unwrap the result.
753
let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
754
let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
755
SubmitQueueState {
756
_mmap: mmap,
757
pointers: QueuePointers { head, tail },
758
ring_mask,
759
array,
760
}
761
}
762
763
// Sets the kernel's array entry at the given `index` to `value`.
764
fn set_array_entry(&self, index: usize, value: u32) {
765
// SAFETY:
766
// Safe because self being constructed from the correct mmap guaratees that the memory is
767
// valid to written.
768
unsafe {
769
std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value);
770
}
771
}
772
}
773
774
#[derive(Default)]
775
struct CompleteQueueData {
776
//For ops that pass in arrays of iovecs, they need to be valid for the duration of the
777
//operation because the kernel might read them at any time.
778
pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
779
}
780
781
pub struct CompleteQueueState {
782
mmap: MemoryMapping,
783
pointers: QueuePointers,
784
ring_mask: u32,
785
cqes_offset: u32,
786
data: Mutex<CompleteQueueData>,
787
}
788
789
impl CompleteQueueState {
790
/// # Safety
791
/// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
792
/// the params struct passed to io_uring_setup.
793
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
794
let ptr = mmap.as_ptr();
795
let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
796
let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
797
let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
798
CompleteQueueState {
799
mmap,
800
pointers: QueuePointers { head, tail },
801
ring_mask,
802
cqes_offset: params.cq_off.cqes,
803
data: Default::default(),
804
}
805
}
806
807
fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
808
self.data.lock().pending_op_addrs.insert(user_data, addrs);
809
}
810
811
fn get_cqe(&self, head: u32) -> &io_uring_cqe {
812
// SAFETY:
813
// Safe because we trust that the kernel has returned enough memory in io_uring_setup
814
// and mmap and index is checked within range by the ring_mask.
815
unsafe {
816
let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
817
as *const io_uring_cqe;
818
819
let index = head & self.ring_mask;
820
821
&*cqes.add(index as usize)
822
}
823
}
824
825
pub fn num_ready(&self) -> u32 {
826
let tail = self.pointers.tail(Ordering::Acquire);
827
let head = self.pointers.head(Ordering::Relaxed);
828
829
tail.saturating_sub(head)
830
}
831
832
fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
833
// Take the lock on self.data first so that 2 threads don't try to pop the same completed op
834
// from the queue.
835
let mut data = self.data.lock();
836
837
// Safe because the pointers to the atomics are valid and the cqe must be in range
838
// because the kernel provided mask is applied to the index.
839
let head = self.pointers.head(Ordering::Relaxed);
840
841
// Synchronize the read of tail after the read of head.
842
if head == self.pointers.tail(Ordering::Acquire) {
843
return None;
844
}
845
846
let cqe = self.get_cqe(head);
847
let user_data = cqe.user_data;
848
let res = cqe.res;
849
850
// free the addrs saved for this op.
851
let _ = data.pending_op_addrs.remove(&user_data);
852
853
// Store the new head and ensure the reads above complete before the kernel sees the
854
// update to head, `set_head` uses `Release` ordering
855
let new_head = head.wrapping_add(1);
856
self.pointers.set_head(new_head);
857
858
let io_res = match res {
859
r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
860
r => Ok(r as u32),
861
};
862
Some((user_data, io_res))
863
}
864
}
865
866
// Return the completed ops with their result.
867
impl Iterator for &CompleteQueueState {
868
type Item = (UserData, std::io::Result<u32>);
869
870
fn next(&mut self) -> Option<Self::Item> {
871
self.pop_front()
872
}
873
}
874
875
struct QueuePointers {
876
head: *const AtomicU32,
877
tail: *const AtomicU32,
878
}
879
880
// SAFETY:
881
// Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
882
// safe to send the pointers between threads or access them concurrently from multiple threads.
883
unsafe impl Send for QueuePointers {}
884
// SAFETY: See safety comments for impl Send
885
unsafe impl Sync for QueuePointers {}
886
887
impl QueuePointers {
888
// Loads the tail pointer atomically with the given ordering.
889
fn tail(&self, ordering: Ordering) -> u32 {
890
// SAFETY:
891
// Safe because self being constructed from the correct mmap guaratees that the memory is
892
// valid to read.
893
unsafe { (*self.tail).load(ordering) }
894
}
895
896
// Stores the new value of the tail in the submit queue. This allows the kernel to start
897
// processing entries that have been added up until the given tail pointer.
898
// Always stores with release ordering as that is the only valid way to use the pointer.
899
fn set_tail(&self, next_tail: u32) {
900
// SAFETY:
901
// Safe because self being constructed from the correct mmap guaratees that the memory is
902
// valid to read and it's used as an atomic to cover mutability concerns.
903
unsafe { (*self.tail).store(next_tail, Ordering::Release) }
904
}
905
906
// Loads the head pointer atomically with the given ordering.
907
fn head(&self, ordering: Ordering) -> u32 {
908
// SAFETY:
909
// Safe because self being constructed from the correct mmap guaratees that the memory is
910
// valid to read.
911
unsafe { (*self.head).load(ordering) }
912
}
913
914
// Stores the new value of the head in the submit queue. This allows the kernel to start
915
// processing entries that have been added up until the given head pointer.
916
// Always stores with release ordering as that is the only valid way to use the pointer.
917
fn set_head(&self, next_head: u32) {
918
// SAFETY:
919
// Safe because self being constructed from the correct mmap guaratees that the memory is
920
// valid to read and it's used as an atomic to cover mutability concerns.
921
unsafe { (*self.head).store(next_head, Ordering::Release) }
922
}
923
}
924
925