#![allow(clippy::cast_ptr_alignment)]
use std::collections::BTreeMap;
use std::fs::File;
use std::io;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::FromRawFd;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::ptr::null;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use base::AsRawDescriptor;
use base::EventType;
use base::IoBufMut;
use base::MappedRegion;
use base::MemoryMapping;
use base::MemoryMappingBuilder;
use base::Protection;
use base::RawDescriptor;
use libc::c_void;
use remain::sorted;
use sync::Mutex;
use thiserror::Error as ThisError;
use crate::bindings::*;
use crate::syscalls::*;
pub type UserData = u64;
#[sorted]
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Failed to mmap completion ring {0}")]
MappingCompleteRing(base::MmapError),
#[error("Failed to mmap submit entries {0}")]
MappingSubmitEntries(base::MmapError),
#[error("Failed to mmap submit ring {0}")]
MappingSubmitRing(base::MmapError),
#[error("No space for more ring entries, try increasing the size passed to `new`")]
NoSpace,
#[error("Failed to enter io uring: {0}")]
RingEnter(libc::c_int),
#[error("Failed to register operations for io uring: {0}")]
RingRegister(libc::c_int),
#[error("Failed to setup io uring {0}")]
Setup(libc::c_int),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
RingEnter(errno) => io::Error::from_raw_os_error(errno),
Setup(errno) => io::Error::from_raw_os_error(errno),
e => io::Error::other(e),
}
}
}
pub struct SubmitQueue {
submit_ring: SubmitQueueState,
submit_queue_entries: SubmitQueueEntries,
submitting: usize,
pub added: usize,
num_sqes: usize,
}
impl io_uring_sqe {
pub fn set_addr(&mut self, val: u64) {
self.__bindgen_anon_2.addr = val;
}
pub fn set_off(&mut self, val: u64) {
self.__bindgen_anon_1.off = val;
}
pub fn set_buf_index(&mut self, val: u16) {
self.__bindgen_anon_4.buf_index = val;
}
pub fn set_rw_flags(&mut self, val: libc::c_int) {
self.__bindgen_anon_3.rw_flags = val;
}
pub fn set_poll_events(&mut self, val: u32) {
let val = if cfg!(target_endian = "big") {
val.rotate_left(16)
} else {
val
};
self.__bindgen_anon_3.poll32_events = val;
}
}
fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
const USE_CURRENT_FILE_POS: libc::off64_t = -1;
offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
}
impl SubmitQueue {
fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
where
F: FnMut(&mut io_uring_sqe),
{
if self.added == self.num_sqes {
return Err(Error::NoSpace);
}
let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
let next_tail = tail.wrapping_add(1);
if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
return Err(Error::NoSpace);
}
let index = (tail & self.submit_ring.ring_mask) as usize;
let sqe = self.submit_queue_entries.get_mut(index).unwrap();
f(sqe);
self.submit_ring.set_array_entry(index, index as u32);
self.submit_ring.pointers.set_tail(next_tail);
self.added += 1;
Ok(())
}
fn prepare_submit(&mut self) -> usize {
let out = self.added - self.submitting;
self.submitting = self.added;
out
}
fn fail_submit(&mut self, count: usize) {
debug_assert!(count <= self.submitting);
self.submitting -= count;
}
fn complete_submit(&mut self, count: usize) {
debug_assert!(count <= self.submitting);
self.submitting -= count;
self.added -= count;
}
}
#[repr(u32)]
pub enum URingOperation {
Nop = io_uring_op_IORING_OP_NOP,
Readv = io_uring_op_IORING_OP_READV,
Writev = io_uring_op_IORING_OP_WRITEV,
Fsync = io_uring_op_IORING_OP_FSYNC,
ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
PollAdd = io_uring_op_IORING_OP_POLL_ADD,
PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
Sendmsg = io_uring_op_IORING_OP_SENDMSG,
Recvmsg = io_uring_op_IORING_OP_RECVMSG,
Timeout = io_uring_op_IORING_OP_TIMEOUT,
TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
Accept = io_uring_op_IORING_OP_ACCEPT,
AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
Connect = io_uring_op_IORING_OP_CONNECT,
Fallocate = io_uring_op_IORING_OP_FALLOCATE,
Openat = io_uring_op_IORING_OP_OPENAT,
Close = io_uring_op_IORING_OP_CLOSE,
FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
Statx = io_uring_op_IORING_OP_STATX,
Read = io_uring_op_IORING_OP_READ,
Write = io_uring_op_IORING_OP_WRITE,
Fadvise = io_uring_op_IORING_OP_FADVISE,
Madvise = io_uring_op_IORING_OP_MADVISE,
Send = io_uring_op_IORING_OP_SEND,
Recv = io_uring_op_IORING_OP_RECV,
Openat2 = io_uring_op_IORING_OP_OPENAT2,
EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
Splice = io_uring_op_IORING_OP_SPLICE,
ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
Tee = io_uring_op_IORING_OP_TEE,
Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
Renameat = io_uring_op_IORING_OP_RENAMEAT,
Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
Linkat = io_uring_op_IORING_OP_LINKAT,
}
#[derive(Default)]
pub struct URingAllowlist(Vec<io_uring_restriction>);
impl URingAllowlist {
pub fn new() -> Self {
URingAllowlist::default()
}
pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
self.0.push(io_uring_restriction {
opcode: io_uring_register_restriction_op_IORING_RESTRICTION_SQE_OP as u16,
__bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
sqe_op: operation as u8,
},
..Default::default()
});
self
}
}
pub struct URingContext {
ring_file: File,
pub submit_ring: Mutex<SubmitQueue>,
pub complete_ring: CompleteQueueState,
}
impl URingContext {
pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
let mut ring_params = io_uring_params::default();
if allowlist.is_some() {
ring_params.flags |= IORING_SETUP_R_DISABLED;
}
unsafe {
let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
let ring_file = File::from_raw_fd(fd);
if let Some(restrictions) = allowlist {
io_uring_register(
fd,
io_uring_register_op_IORING_REGISTER_RESTRICTIONS,
restrictions.0.as_ptr() as *const c_void,
restrictions.0.len() as u32,
)
.map_err(Error::RingRegister)?;
io_uring_register(
fd,
io_uring_register_op_IORING_REGISTER_ENABLE_RINGS,
null::<c_void>(),
0,
)
.map_err(Error::RingRegister)?;
}
let submit_ring = SubmitQueueState::new(
MemoryMappingBuilder::new(
ring_params.sq_off.array as usize
+ ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_SQ_RING))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingSubmitRing)?,
&ring_params,
);
let num_sqe = ring_params.sq_entries as usize;
let submit_queue_entries = SubmitQueueEntries {
mmap: MemoryMappingBuilder::new(
ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_SQES))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingSubmitEntries)?,
len: num_sqe,
};
let complete_ring = CompleteQueueState::new(
MemoryMappingBuilder::new(
ring_params.cq_off.cqes as usize
+ ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
)
.from_file(&ring_file)
.offset(u64::from(IORING_OFF_CQ_RING))
.protection(Protection::read_write())
.populate()
.build()
.map_err(Error::MappingCompleteRing)?,
&ring_params,
);
Ok(URingContext {
ring_file,
submit_ring: Mutex::new(SubmitQueue {
submit_ring,
submit_queue_entries,
submitting: 0,
added: 0,
num_sqes: ring_params.sq_entries as usize,
}),
complete_ring,
})
}
}
pub unsafe fn add_writev_iter<I>(
&self,
iovecs: I,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()>
where
I: Iterator<Item = libc::iovec>,
{
self.add_writev(
Pin::from(
iovecs
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
.collect::<Vec<_>>()
.into_boxed_slice(),
),
fd,
offset,
user_data,
)
}
pub unsafe fn add_writev(
&self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
sqe.len = iovecs.len() as u32;
sqe.set_off(file_offset_to_raw_offset(offset));
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})?;
self.complete_ring.add_op_data(user_data, iovecs);
Ok(())
}
pub unsafe fn add_readv_iter<I>(
&self,
iovecs: I,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()>
where
I: Iterator<Item = libc::iovec>,
{
self.add_readv(
Pin::from(
iovecs
.map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
.collect::<Vec<_>>()
.into_boxed_slice(),
),
fd,
offset,
user_data,
)
}
pub unsafe fn add_readv(
&self,
iovecs: Pin<Box<[IoBufMut<'static>]>>,
fd: RawFd,
offset: Option<u64>,
user_data: UserData,
) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_READV as u8;
sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
sqe.len = iovecs.len() as u32;
sqe.set_off(file_offset_to_raw_offset(offset));
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.user_data = user_data;
sqe.flags = 0;
sqe.fd = fd;
})?;
self.complete_ring.add_op_data(user_data, iovecs);
Ok(())
}
pub fn add_nop(&self, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
sqe.fd = -1;
sqe.user_data = user_data;
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
pub fn add_fallocate(
&self,
fd: RawFd,
offset: u64,
len: u64,
mode: u32,
user_data: UserData,
) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
sqe.fd = fd;
sqe.set_addr(len);
sqe.len = mode;
sqe.set_off(offset);
sqe.user_data = user_data;
sqe.set_buf_index(0);
sqe.set_rw_flags(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_poll_events(events.into());
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
sqe.fd = fd;
sqe.user_data = user_data;
sqe.set_poll_events(events.into());
sqe.set_addr(0);
sqe.len = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
self.submit_ring.lock().prep_next_sqe(|sqe| {
sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
sqe.user_data = user_data;
sqe.set_addr(addr);
sqe.len = 0;
sqe.fd = 0;
sqe.set_off(0);
sqe.set_buf_index(0);
sqe.ioprio = 0;
sqe.flags = 0;
})
}
fn enter(&self, wait_nr: u64) -> Result<()> {
let added = self.submit_ring.lock().prepare_submit();
if added == 0 && wait_nr == 0 {
return Ok(());
}
let flags = if wait_nr > 0 {
IORING_ENTER_GETEVENTS
} else {
0
};
let res =
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) };
if res.is_ok() || res == Err(libc::EINTR) {
self.submit_ring.lock().complete_submit(added);
} else {
self.submit_ring.lock().fail_submit(added);
}
match res {
Ok(()) => Ok(()),
Err(libc::EBUSY) | Err(libc::EINTR) if wait_nr != 0 => {
loop {
let res =
unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
if res != Err(libc::EINTR) {
return res.map_err(Error::RingEnter);
}
}
}
Err(e) => Err(Error::RingEnter(e)),
}
}
pub fn submit(&self) -> Result<()> {
self.enter(0)
}
pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
let wait_nr = if self.complete_ring.num_ready() > 0 {
0
} else {
1
};
match self.enter(wait_nr) {
Ok(()) => Ok(&self.complete_ring),
Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
Err(e) => Err(e),
}
}
}
impl AsRawFd for URingContext {
fn as_raw_fd(&self) -> RawFd {
self.ring_file.as_raw_fd()
}
}
impl AsRawDescriptor for URingContext {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.ring_file.as_raw_descriptor()
}
}
struct SubmitQueueEntries {
mmap: MemoryMapping,
len: usize,
}
impl SubmitQueueEntries {
fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
if index >= self.len {
return None;
}
let mut_ref = unsafe { &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) };
*mut_ref = io_uring_sqe::default();
Some(mut_ref)
}
}
struct SubmitQueueState {
_mmap: MemoryMapping,
pointers: QueuePointers,
ring_mask: u32,
array: AtomicPtr<u32>,
}
impl SubmitQueueState {
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
let ptr = mmap.as_ptr();
let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
SubmitQueueState {
_mmap: mmap,
pointers: QueuePointers { head, tail },
ring_mask,
array,
}
}
fn set_array_entry(&self, index: usize, value: u32) {
unsafe {
std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value);
}
}
}
#[derive(Default)]
struct CompleteQueueData {
pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
}
pub struct CompleteQueueState {
mmap: MemoryMapping,
pointers: QueuePointers,
ring_mask: u32,
cqes_offset: u32,
data: Mutex<CompleteQueueData>,
}
impl CompleteQueueState {
unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
let ptr = mmap.as_ptr();
let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
CompleteQueueState {
mmap,
pointers: QueuePointers { head, tail },
ring_mask,
cqes_offset: params.cq_off.cqes,
data: Default::default(),
}
}
fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
self.data.lock().pending_op_addrs.insert(user_data, addrs);
}
fn get_cqe(&self, head: u32) -> &io_uring_cqe {
unsafe {
let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
as *const io_uring_cqe;
let index = head & self.ring_mask;
&*cqes.add(index as usize)
}
}
pub fn num_ready(&self) -> u32 {
let tail = self.pointers.tail(Ordering::Acquire);
let head = self.pointers.head(Ordering::Relaxed);
tail.saturating_sub(head)
}
fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
let mut data = self.data.lock();
let head = self.pointers.head(Ordering::Relaxed);
if head == self.pointers.tail(Ordering::Acquire) {
return None;
}
let cqe = self.get_cqe(head);
let user_data = cqe.user_data;
let res = cqe.res;
let _ = data.pending_op_addrs.remove(&user_data);
let new_head = head.wrapping_add(1);
self.pointers.set_head(new_head);
let io_res = match res {
r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
r => Ok(r as u32),
};
Some((user_data, io_res))
}
}
impl Iterator for &CompleteQueueState {
type Item = (UserData, std::io::Result<u32>);
fn next(&mut self) -> Option<Self::Item> {
self.pop_front()
}
}
struct QueuePointers {
head: *const AtomicU32,
tail: *const AtomicU32,
}
unsafe impl Send for QueuePointers {}
unsafe impl Sync for QueuePointers {}
impl QueuePointers {
fn tail(&self, ordering: Ordering) -> u32 {
unsafe { (*self.tail).load(ordering) }
}
fn set_tail(&self, next_tail: u32) {
unsafe { (*self.tail).store(next_tail, Ordering::Release) }
}
fn head(&self, ordering: Ordering) -> u32 {
unsafe { (*self.head).load(ordering) }
}
fn set_head(&self, next_head: u32) {
unsafe { (*self.head).store(next_head, Ordering::Release) }
}
}