Path: blob/main/cros_async/src/sys/linux/uring_executor.rs
5394 views
// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34// TODO: Move this doc to one of the public APIs, it isn't io_uring specific.56//! `UringReactor`7//!8//! ## Read/Write buffer management.9//!10//! There are two key issues managing asynchronous IO buffers in rust.11//! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must12//! not have any references to it during that time.13//! 2) The memory must remain valid as long as the kernel has a reference to it.14//!15//! ### The kernel's mutable borrow of the buffer16//!17//! Because the buffers used for read and write must be passed to the kernel for an unknown18//! duration, the functions must maintain ownership of the memory. The core of this problem is that19//! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the20//! future has a reference to. The buffer can be modified at any point from submission until the21//! operation completes. The operation can't be synchronously canceled when the future is dropped,22//! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that23//! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow24//! isn't an issue because those are already Ok with external modifications to the memory (Like a25//! `VolatileSlice`).26//!27//! ### Buffer lifetime28//!29//! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a30//! read operation was submitted, then the memory is dropped. To solve this, the executor takes an31//! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to32//! the executor. The executor holds the Arc and ensures all operations are complete before33//! dropping it, that guarantees the memory is valid for the duration.34//!35//! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is36//! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid37//! until the operation completes unless the executor holds an Arc to the memory on the heap.38//!39//! ## Using `Vec` for reads/writes.40//!41//! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type42//! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to43//! ensure it lives long enough.4445use std::convert::TryInto;46use std::ffi::CStr;47use std::fs::File;48use std::future::Future;49use std::io;50use std::mem;51use std::mem::MaybeUninit;52use std::os::unix::io::FromRawFd;53use std::os::unix::io::RawFd;54use std::pin::Pin;55use std::sync::Arc;56use std::sync::LazyLock;57use std::sync::Weak;58use std::task::Context;59use std::task::Poll;60use std::task::Waker;61use std::thread;62use std::thread::ThreadId;6364use base::trace;65use base::warn;66use base::AsRawDescriptor;67use base::EventType;68use base::IoBufMut;69use base::RawDescriptor;70use io_uring::URingAllowlist;71use io_uring::URingContext;72use io_uring::URingOperation;73use remain::sorted;74use slab::Slab;75use sync::Mutex;76use thiserror::Error as ThisError;7778use crate::common_executor::RawExecutor;79use crate::common_executor::RawTaskHandle;80use crate::common_executor::Reactor;81use crate::mem::BackingMemory;82use crate::waker::WakerToken;83use crate::waker::WeakWake;84use crate::AsyncError;85use crate::AsyncResult;86use crate::IoSource;87use crate::MemRegion;88use crate::TaskHandle;8990#[sorted]91#[derive(Debug, ThisError)]92pub enum Error {93/// Creating a context to wait on FDs failed.94#[error("Error creating the fd waiting context: {0}")]95CreatingContext(io_uring::Error),96/// Failed to discard a block97#[error("Failed to discard a block: {0}")]98Discard(base::Error),99/// Failed to copy the FD for the polling context.100#[error("Failed to copy the FD for the polling context: {0}")]101DuplicatingFd(base::Error),102/// Enabling a context faild.103#[error("Error enabling the URing context: {0}")]104EnablingContext(io_uring::Error),105/// The Executor is gone.106#[error("The executor is gone")]107ExecutorGone,108/// Invalid offset or length given for an iovec in backing memory.109#[error("Invalid offset/len for getting an iovec")]110InvalidOffset,111/// Invalid FD source specified.112#[error("Invalid source, FD not registered for use")]113InvalidSource,114/// Error doing the IO.115#[error("Error during IO: {0}")]116Io(io::Error),117/// Registering operation restrictions to a uring failed.118#[error("Error registering restrictions to the URing context: {0}")]119RegisteringURingRestriction(io_uring::Error),120/// Failed to remove the waker remove the polling context.121#[error("Error removing from the URing context: {0}")]122RemovingWaker(io_uring::Error),123/// Failed to submit the operation to the polling context.124#[error("Error adding to the URing context: {0}")]125SubmittingOp(io_uring::Error),126/// URingContext failure.127#[error("URingContext failure: {0}")]128URingContextError(io_uring::Error),129/// Failed to submit or wait for io_uring events.130#[error("URing::enter: {0}")]131URingEnter(io_uring::Error),132}133pub type Result<T> = std::result::Result<T, Error>;134135impl From<Error> for io::Error {136fn from(e: Error) -> Self {137use Error::*;138match e {139Discard(e) => e.into(),140DuplicatingFd(e) => e.into(),141ExecutorGone => io::Error::other(ExecutorGone),142InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),143InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),144Io(e) => e,145CreatingContext(e) => e.into(),146RemovingWaker(e) => e.into(),147SubmittingOp(e) => e.into(),148URingContextError(e) => e.into(),149URingEnter(e) => e.into(),150EnablingContext(e) => e.into(),151RegisteringURingRestriction(e) => e.into(),152}153}154}155156impl From<Error> for AsyncError {157fn from(e: Error) -> AsyncError {158AsyncError::SysVariants(e.into())159}160}161162static IS_URING_STABLE: LazyLock<bool> = LazyLock::new(|| {163let mut utsname = MaybeUninit::zeroed();164165// SAFETY:166// Safe because this will only modify `utsname` and we check the return value.167let res = unsafe { libc::uname(utsname.as_mut_ptr()) };168if res < 0 {169return false;170}171172// SAFETY:173// Safe because the kernel has initialized `utsname`.174let utsname = unsafe { utsname.assume_init() };175176// SAFETY:177// Safe because the pointer is valid and the kernel guarantees that this is a valid C string.178let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };179180let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {181Ok(c) => c,182Err(_) => return false,183};184185// Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.186match (components.next(), components.next()) {187(Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {188// The kernel version is new enough so check if we can actually make a uring context.189URingContext::new(8, None).is_ok()190}191_ => false,192}193});194195// Checks if the uring executor is stable.196// Caches the result so that the check is only run once.197// Useful for falling back to the FD executor on pre-uring kernels.198pub fn is_uring_stable() -> bool {199*IS_URING_STABLE200}201202// Checks the uring availability by checking if the uring creation succeeds.203// If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.204// It fails if the kernel does not support io_uring, but note that the cause is not limited to it.205pub(crate) fn check_uring_availability() -> Result<()> {206URingContext::new(8, None)207.map(drop)208.map_err(Error::URingContextError)209}210211pub struct RegisteredSource {212tag: usize,213ex: Weak<RawExecutor<UringReactor>>,214}215216impl RegisteredSource {217pub fn start_read_to_mem(218&self,219file_offset: Option<u64>,220mem: Arc<dyn BackingMemory + Send + Sync>,221addrs: impl IntoIterator<Item = MemRegion>,222) -> Result<PendingOperation> {223let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;224let token = ex225.reactor226.submit_read_to_vectored(self, mem, file_offset, addrs)?;227228Ok(PendingOperation {229waker_token: Some(token),230ex: self.ex.clone(),231submitted: false,232})233}234235pub fn start_write_from_mem(236&self,237file_offset: Option<u64>,238mem: Arc<dyn BackingMemory + Send + Sync>,239addrs: impl IntoIterator<Item = MemRegion>,240) -> Result<PendingOperation> {241let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;242let token = ex243.reactor244.submit_write_from_vectored(self, mem, file_offset, addrs)?;245246Ok(PendingOperation {247waker_token: Some(token),248ex: self.ex.clone(),249submitted: false,250})251}252253pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {254let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;255let token = ex.reactor.submit_fallocate(self, offset, len, mode)?;256257Ok(PendingOperation {258waker_token: Some(token),259ex: self.ex.clone(),260submitted: false,261})262}263264pub fn start_fsync(&self) -> Result<PendingOperation> {265let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;266let token = ex.reactor.submit_fsync(self)?;267268Ok(PendingOperation {269waker_token: Some(token),270ex: self.ex.clone(),271submitted: false,272})273}274275pub fn poll_fd_readable(&self) -> Result<PendingOperation> {276let events = EventType::Read;277278let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;279let token = ex.reactor.submit_poll(self, events)?;280281Ok(PendingOperation {282waker_token: Some(token),283ex: self.ex.clone(),284submitted: false,285})286}287}288289impl Drop for RegisteredSource {290fn drop(&mut self) {291if let Some(ex) = self.ex.upgrade() {292ex.reactor.deregister_source(self);293}294}295}296297// Number of entries in the ring.298const NUM_ENTRIES: usize = 256;299300// An operation that has been submitted to the uring and is potentially being waited on.301struct OpData {302_file: Arc<File>,303_mem: Option<Arc<dyn BackingMemory + Send + Sync>>,304waker: Option<Waker>,305canceled: bool,306}307308// The current status of an operation that's been submitted to the uring.309enum OpStatus {310Nop,311Pending(OpData),312Completed(Option<::std::io::Result<u32>>),313}314315struct Ring {316ops: Slab<OpStatus>,317registered_sources: Slab<Arc<File>>,318}319320/// `Reactor` that manages async IO work using io_uring.321pub struct UringReactor {322// The URingContext needs to be first so that it is dropped first, closing the uring fd, and323// releasing the resources borrowed by the kernel before we free them.324ctx: URingContext,325ring: Mutex<Ring>,326thread_id: Mutex<Option<ThreadId>>,327}328329impl UringReactor {330fn new() -> Result<UringReactor> {331// Allow operations only that the UringReactor really submits to enhance the security.332let mut restrictions = URingAllowlist::new();333let ops = [334URingOperation::Writev,335URingOperation::Readv,336URingOperation::Nop,337URingOperation::Fsync,338URingOperation::Fallocate,339URingOperation::PollAdd,340URingOperation::PollRemove,341URingOperation::AsyncCancel,342];343for op in ops {344restrictions.allow_submit_operation(op);345}346347let ctx =348URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;349350Ok(UringReactor {351ctx,352ring: Mutex::new(Ring {353ops: Slab::with_capacity(NUM_ENTRIES),354registered_sources: Slab::with_capacity(NUM_ENTRIES),355}),356thread_id: Mutex::new(None),357})358}359360fn runs_tasks_on_current_thread(&self) -> bool {361let executor_thread = self.thread_id.lock();362executor_thread363.map(|id| id == thread::current().id())364.unwrap_or(false)365}366367fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {368let mut ring = self.ring.lock();369370let op = ring371.ops372.get_mut(token.0)373.expect("`get_result` called on unknown operation");374match op {375OpStatus::Nop => panic!("`get_result` called on nop"),376OpStatus::Pending(data) => {377if data.canceled {378panic!("`get_result` called on canceled operation");379}380data.waker = Some(cx.waker().clone());381None382}383OpStatus::Completed(res) => {384let out = res.take();385ring.ops.remove(token.0);386Some(out.expect("Missing result in completed operation"))387}388}389}390391// Remove the waker for the given token if it hasn't fired yet.392fn cancel_operation(&self, token: WakerToken) {393let mut ring = self.ring.lock();394let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {395match op {396OpStatus::Nop => panic!("`cancel_operation` called on nop"),397OpStatus::Pending(data) => {398if data.canceled {399panic!("uring operation canceled more than once");400}401402if let Some(waker) = data.waker.take() {403waker.wake();404}405// Clear the waker as it is no longer needed.406data.waker = None;407data.canceled = true;408409// Keep the rest of the op data as the uring might still be accessing either410// the source of the backing memory so it needs to live until the kernel411// completes the operation.412true413}414OpStatus::Completed(_) => {415ring.ops.remove(token.0);416false417}418}419} else {420false421};422std::mem::drop(ring);423if submit_cancel {424let _best_effort = self.submit_cancel_async(token.0);425}426}427428pub(crate) fn register_source<F: AsRawDescriptor>(429&self,430raw: &Arc<RawExecutor<UringReactor>>,431fd: &F,432) -> Result<RegisteredSource> {433// SAFETY:434// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD435// will only be added to the poll loop.436let duped_fd = unsafe { File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?) };437438Ok(RegisteredSource {439tag: self440.ring441.lock()442.registered_sources443.insert(Arc::new(duped_fd)),444ex: Arc::downgrade(raw),445})446}447448fn deregister_source(&self, source: &RegisteredSource) {449// There isn't any need to pull pending ops out, the all have Arc's to the file and mem they450// need.let them complete. deregister with pending ops is not a common path no need to451// optimize that case yet.452self.ring.lock().registered_sources.remove(source.tag);453}454455fn submit_poll(456&self,457source: &RegisteredSource,458events: base::EventType,459) -> Result<WakerToken> {460let mut ring = self.ring.lock();461let src = ring462.registered_sources463.get(source.tag)464.ok_or(Error::InvalidSource)?465.clone();466let entry = ring.ops.vacant_entry();467let next_op_token = entry.key();468self.ctx469.add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))470.map_err(Error::SubmittingOp)?;471entry.insert(OpStatus::Pending(OpData {472_file: src,473_mem: None,474waker: None,475canceled: false,476}));477478Ok(WakerToken(next_op_token))479}480481fn submit_fallocate(482&self,483source: &RegisteredSource,484offset: u64,485len: u64,486mode: u32,487) -> Result<WakerToken> {488let mut ring = self.ring.lock();489let src = ring490.registered_sources491.get(source.tag)492.ok_or(Error::InvalidSource)?493.clone();494let entry = ring.ops.vacant_entry();495let next_op_token = entry.key();496self.ctx497.add_fallocate(498src.as_raw_descriptor(),499offset,500len,501mode,502usize_to_u64(next_op_token),503)504.map_err(Error::SubmittingOp)?;505506entry.insert(OpStatus::Pending(OpData {507_file: src,508_mem: None,509waker: None,510canceled: false,511}));512513Ok(WakerToken(next_op_token))514}515516fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {517let mut ring = self.ring.lock();518let entry = ring.ops.vacant_entry();519let next_op_token = entry.key();520self.ctx521.async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))522.map_err(Error::SubmittingOp)?;523524entry.insert(OpStatus::Nop);525526Ok(WakerToken(next_op_token))527}528529fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {530let mut ring = self.ring.lock();531let src = ring532.registered_sources533.get(source.tag)534.ok_or(Error::InvalidSource)?535.clone();536let entry = ring.ops.vacant_entry();537let next_op_token = entry.key();538self.ctx539.add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))540.map_err(Error::SubmittingOp)?;541entry.insert(OpStatus::Pending(OpData {542_file: src,543_mem: None,544waker: None,545canceled: false,546}));547548Ok(WakerToken(next_op_token))549}550551fn submit_read_to_vectored(552&self,553source: &RegisteredSource,554mem: Arc<dyn BackingMemory + Send + Sync>,555offset: Option<u64>,556addrs: impl IntoIterator<Item = MemRegion>,557) -> Result<WakerToken> {558let iovecs = addrs559.into_iter()560.map(|mem_range| {561let vslice = mem562.get_volatile_slice(mem_range)563.map_err(|_| Error::InvalidOffset)?;564// SAFETY:565// Safe because we guarantee that the memory pointed to by `iovecs` lives until the566// transaction is complete and the completion has been returned from `wait()`.567Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })568})569.collect::<Result<Vec<_>>>()?;570let iovecs = Pin::from(iovecs.into_boxed_slice());571572let mut ring = self.ring.lock();573let src = ring574.registered_sources575.get(source.tag)576.ok_or(Error::InvalidSource)?577.clone();578579let entry = ring.ops.vacant_entry();580let next_op_token = entry.key();581582// SAFETY:583// Safe because all the addresses are within the Memory that an Arc is kept for the584// duration to ensure the memory is valid while the kernel accesses it.585// Tested by `dont_drop_backing_mem_read` unit test.586unsafe {587self.ctx588.add_readv(589iovecs,590src.as_raw_descriptor(),591offset,592usize_to_u64(next_op_token),593)594.map_err(Error::SubmittingOp)?;595}596597entry.insert(OpStatus::Pending(OpData {598_file: src,599_mem: Some(mem),600waker: None,601canceled: false,602}));603604Ok(WakerToken(next_op_token))605}606607fn submit_write_from_vectored(608&self,609source: &RegisteredSource,610mem: Arc<dyn BackingMemory + Send + Sync>,611offset: Option<u64>,612addrs: impl IntoIterator<Item = MemRegion>,613) -> Result<WakerToken> {614let iovecs = addrs615.into_iter()616.map(|mem_range| {617let vslice = mem618.get_volatile_slice(mem_range)619.map_err(|_| Error::InvalidOffset)?;620// SAFETY:621// Safe because we guarantee that the memory pointed to by `iovecs` lives until the622// transaction is complete and the completion has been returned from `wait()`.623Ok(unsafe { IoBufMut::from_raw_parts(vslice.as_mut_ptr(), vslice.size()) })624})625.collect::<Result<Vec<_>>>()?;626let iovecs = Pin::from(iovecs.into_boxed_slice());627628let mut ring = self.ring.lock();629let src = ring630.registered_sources631.get(source.tag)632.ok_or(Error::InvalidSource)?633.clone();634635let entry = ring.ops.vacant_entry();636let next_op_token = entry.key();637638// SAFETY:639// Safe because all the addresses are within the Memory that an Arc is kept for the640// duration to ensure the memory is valid while the kernel accesses it.641// Tested by `dont_drop_backing_mem_write` unit test.642unsafe {643self.ctx644.add_writev(645iovecs,646src.as_raw_descriptor(),647offset,648usize_to_u64(next_op_token),649)650.map_err(Error::SubmittingOp)?;651}652653entry.insert(OpStatus::Pending(OpData {654_file: src,655_mem: Some(mem),656waker: None,657canceled: false,658}));659660Ok(WakerToken(next_op_token))661}662}663664impl Reactor for UringReactor {665fn new() -> std::io::Result<Self> {666Ok(UringReactor::new()?)667}668669fn wake(&self) {670let mut ring = self.ring.lock();671let entry = ring.ops.vacant_entry();672let next_op_token = entry.key();673if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {674warn!("Failed to add NOP for waking up executor: {}", e);675}676entry.insert(OpStatus::Nop);677mem::drop(ring);678679match self.ctx.submit() {680Ok(()) => {}681// If the kernel's submit ring is full then we know we won't block when calling682// io_uring_enter, which is all we really care about.683Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}684Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),685}686}687688fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {689// At this point, there are no strong references to the executor (see `on_executor_drop`690// docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO691// work can be submitted.692693// Submit cancellations for all operations694#[allow(clippy::needless_collect)]695let ops: Vec<_> = self696.ring697.lock()698.ops699.iter_mut()700.filter_map(|op| match op.1 {701OpStatus::Pending(data) if !data.canceled => Some(op.0),702_ => None,703})704.collect();705for token in ops {706self.cancel_operation(WakerToken(token));707}708709// Since the UringReactor is wrapped in an Arc it may end up being dropped from a different710// thread than the one that called `run` or `run_until`. Since we know there are no other711// references, just clear the thread id so that we don't panic.712*self.thread_id.lock() = None;713714// Make sure all pending uring operations are completed as kernel may try to write to715// memory that we may drop.716//717// This future doesn't use the waker, it assumes the future will always be polled after718// processing other woken futures.719// TODO: Find a more robust solution.720Box::pin(futures::future::poll_fn(|_cx| {721if self.ring.lock().ops.is_empty() {722Poll::Ready(())723} else {724Poll::Pending725}726}))727}728729fn on_thread_start(&self) {730let current_thread = thread::current().id();731let mut thread_id = self.thread_id.lock();732assert_eq!(733*thread_id.get_or_insert(current_thread),734current_thread,735"`UringReactor::wait_for_work` cannot be called from more than one thread"736);737}738739fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {740trace!(741"Waiting on events, {} pending ops",742self.ring.lock().ops.len()743);744let events = self.ctx.wait().map_err(Error::URingEnter)?;745746// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from747// writing to the eventfd.748set_processing();749750let mut ring = self.ring.lock();751for (raw_token, result) in events {752// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was753// something that we originally gave to the kernel and that was created from a754// `usize` so we should always be able to convert it back into a `usize`.755let token = raw_token756.try_into()757.expect("`u64` doesn't fit inside a `usize`");758759let op = ring760.ops761.get_mut(token)762.expect("Received completion token for unexpected operation");763match mem::replace(op, OpStatus::Completed(Some(result))) {764// No one is waiting on a Nop.765OpStatus::Nop => mem::drop(ring.ops.remove(token)),766OpStatus::Pending(data) => {767if data.canceled {768// No one is waiting for this operation and the uring is done with769// it so it's safe to remove.770ring.ops.remove(token);771}772if let Some(waker) = data.waker {773waker.wake();774}775}776OpStatus::Completed(_) => panic!("uring operation completed more than once"),777}778}779780Ok(())781}782783fn new_source<F: AsRawDescriptor>(784&self,785ex: &Arc<RawExecutor<Self>>,786f: F,787) -> AsyncResult<IoSource<F>> {788Ok(IoSource::Uring(super::UringSource::new(f, ex)?))789}790791fn wrap_task_handle<R>(task: RawTaskHandle<UringReactor, R>) -> TaskHandle<R> {792TaskHandle::Uring(task)793}794}795796impl AsRawDescriptor for UringReactor {797fn as_raw_descriptor(&self) -> RawDescriptor {798self.ctx.as_raw_descriptor()799}800}801802impl WeakWake for UringReactor {803fn wake_by_ref(weak_self: &Weak<Self>) {804if let Some(arc_self) = weak_self.upgrade() {805Reactor::wake(&*arc_self);806}807}808}809810impl Drop for UringReactor {811fn drop(&mut self) {812// The ring should have been drained when the executor's Drop ran.813assert!(self.ring.lock().ops.is_empty());814}815}816817// SAFETY:818// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while819// waiting in TLS to be added to the main polling context.820unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {821let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);822if ret < 0 {823Err(Error::DuplicatingFd(base::Error::last()))824} else {825Ok(ret)826}827}828829// Converts a `usize` into a `u64` and panics if the conversion fails.830#[inline]831fn usize_to_u64(val: usize) -> u64 {832val.try_into().expect("`usize` doesn't fit inside a `u64`")833}834835pub struct PendingOperation {836waker_token: Option<WakerToken>,837ex: Weak<RawExecutor<UringReactor>>,838submitted: bool,839}840841impl Future for PendingOperation {842type Output = Result<u32>;843844fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {845let token = self846.waker_token847.as_ref()848.expect("PendingOperation polled after returning Poll::Ready");849if let Some(ex) = self.ex.upgrade() {850if let Some(result) = ex.reactor.get_result(token, cx) {851self.waker_token = None;852Poll::Ready(result.map_err(Error::Io))853} else {854// If we haven't submitted the operation yet, and the executor runs on a different855// thread then submit it now. Otherwise the executor will submit it automatically856// the next time it calls UringContext::wait.857if !self.submitted && !ex.reactor.runs_tasks_on_current_thread() {858match ex.reactor.ctx.submit() {859Ok(()) => self.submitted = true,860// If the kernel ring is full then wait until some ops are removed from the861// completion queue. This op should get submitted the next time the executor862// calls UringContext::wait.863Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}864Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),865}866}867Poll::Pending868}869} else {870Poll::Ready(Err(Error::ExecutorGone))871}872}873}874875impl Drop for PendingOperation {876fn drop(&mut self) {877if let Some(waker_token) = self.waker_token.take() {878if let Some(ex) = self.ex.upgrade() {879ex.reactor.cancel_operation(waker_token);880}881}882}883}884885#[cfg(test)]886mod tests {887use std::future::Future;888use std::io::Read;889use std::io::Write;890use std::mem;891use std::pin::Pin;892use std::rc::Rc;893use std::task::Context;894use std::task::Poll;895896use futures::executor::block_on;897898use super::*;899use crate::mem::BackingMemory;900use crate::mem::MemRegion;901use crate::mem::VecIoWrapper;902use crate::BlockingPool;903use crate::ExecutorTrait;904905// A future that returns ready when the uring queue is empty.906struct UringQueueEmpty<'a> {907ex: &'a Arc<RawExecutor<UringReactor>>,908}909910impl Future for UringQueueEmpty<'_> {911type Output = ();912913fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {914if self.ex.reactor.ring.lock().ops.is_empty() {915Poll::Ready(())916} else {917Poll::Pending918}919}920}921922#[test]923fn dont_drop_backing_mem_read() {924if !is_uring_stable() {925return;926}927928// Create a backing memory wrapped in an Arc and check that the drop isn't called while the929// op is pending.930let bm =931Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;932933// Use pipes to create a future that will block forever.934let (rx, mut tx) = base::pipe().unwrap();935936// Set up the TLS for the uring_executor by creating one.937let ex = RawExecutor::<UringReactor>::new().unwrap();938939// Register the receive side of the pipe with the executor.940let registered_source = ex941.reactor942.register_source(&ex, &rx)943.expect("register source failed");944945// Submit the op to the kernel. Next, test that the source keeps its Arc open for the946// duration of the op.947let pending_op = registered_source948.start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])949.expect("failed to start read to mem");950951// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a952// reference while the op is active.953assert_eq!(Arc::strong_count(&bm), 2);954955// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using956// it.957drop(pending_op);958assert_eq!(Arc::strong_count(&bm), 2);959960// Finishing the operation should put the Arc count back to 1.961// write to the pipe to wake the read pipe and then wait for the uring result in the962// executor.963tx.write_all(&[0u8; 8]).expect("write failed");964ex.run_until(UringQueueEmpty { ex: &ex })965.expect("Failed to wait for read pipe ready");966assert_eq!(Arc::strong_count(&bm), 1);967}968969#[test]970fn dont_drop_backing_mem_write() {971if !is_uring_stable() {972return;973}974975// Create a backing memory wrapped in an Arc and check that the drop isn't called while the976// op is pending.977let bm =978Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;979980// Use pipes to create a future that will block forever.981let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");982983// Set up the TLS for the uring_executor by creating one.984let ex = RawExecutor::<UringReactor>::new().unwrap();985986// Register the receive side of the pipe with the executor.987let registered_source = ex988.reactor989.register_source(&ex, &tx)990.expect("register source failed");991992// Submit the op to the kernel. Next, test that the source keeps its Arc open for the993// duration of the op.994let pending_op = registered_source995.start_write_from_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])996.expect("failed to start write to mem");997998// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a999// reference while the op is active.1000assert_eq!(Arc::strong_count(&bm), 2);10011002// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using1003// it.1004drop(pending_op);1005assert_eq!(Arc::strong_count(&bm), 2);10061007// Finishing the operation should put the Arc count back to 1.1008// write to the pipe to wake the read pipe and then wait for the uring result in the1009// executor.1010let mut buf = vec![0u8; base::round_up_to_page_size(1)];1011rx.read_exact(&mut buf).expect("read to empty failed");1012ex.run_until(UringQueueEmpty { ex: &ex })1013.expect("Failed to wait for write pipe ready");1014assert_eq!(Arc::strong_count(&bm), 1);1015}10161017#[test]1018fn canceled_before_completion() {1019if !is_uring_stable() {1020return;1021}10221023async fn cancel_io(op: PendingOperation) {1024mem::drop(op);1025}10261027async fn check_result(op: PendingOperation, expected: u32) {1028let actual = op.await.expect("operation failed to complete");1029assert_eq!(expected, actual);1030}10311032let bm =1033Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;10341035let (rx, tx) = base::pipe().expect("Pipe failed");10361037let ex = RawExecutor::<UringReactor>::new().unwrap();10381039let rx_source = ex1040.reactor1041.register_source(&ex, &rx)1042.expect("register source failed");1043let tx_source = ex1044.reactor1045.register_source(&ex, &tx)1046.expect("register source failed");10471048let read_task = rx_source1049.start_read_to_mem(None, Arc::clone(&bm), [MemRegion { offset: 0, len: 8 }])1050.expect("failed to start read to mem");10511052ex.spawn_local(cancel_io(read_task)).detach();10531054// Write to the pipe so that the kernel operation will complete.1055let buf =1056Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;1057let write_task = tx_source1058.start_write_from_mem(None, Arc::clone(&buf), [MemRegion { offset: 0, len: 8 }])1059.expect("failed to start write from mem");10601061ex.run_until(check_result(write_task, 8))1062.expect("Failed to run executor");1063}10641065// We will drain all ops on drop and its not guaranteed that operation won't finish1066#[ignore]1067#[test]1068fn drop_before_completion() {1069if !is_uring_stable() {1070return;1071}10721073const VALUE: u64 = 0xef6c_a8df_b842_eb9c;10741075async fn check_op(op: PendingOperation) {1076let err = op.await.expect_err("Op completed successfully");1077match err {1078Error::ExecutorGone => {}1079e => panic!("Unexpected error from op: {e}"),1080}1081}10821083let (mut rx, mut tx) = base::pipe().expect("Pipe failed");10841085let ex = RawExecutor::<UringReactor>::new().unwrap();10861087let tx_source = ex1088.reactor1089.register_source(&ex, &tx)1090.expect("Failed to register source");1091let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));1092let op = tx_source1093.start_write_from_mem(1094None,1095bm,1096[MemRegion {1097offset: 0,1098len: mem::size_of::<u64>(),1099}],1100)1101.expect("Failed to start write from mem");11021103ex.spawn_local(check_op(op)).detach();11041105// Now drop the executor. It shouldn't run the write operation.1106mem::drop(ex);11071108// Make sure the executor did not complete the uring operation.1109let new_val = [0x2e; 8];1110tx.write_all(&new_val).unwrap();11111112let mut buf = 0u64.to_ne_bytes();1113rx.read_exact(&mut buf[..])1114.expect("Failed to read from pipe");11151116assert_eq!(buf, new_val);1117}11181119// Dropping a task that owns a BlockingPool shouldn't leak the pool.1120#[test]1121fn drop_detached_blocking_pool() {1122if !is_uring_stable() {1123return;1124}11251126struct Cleanup(BlockingPool);11271128impl Drop for Cleanup {1129fn drop(&mut self) {1130// Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).1131self.01132.shutdown(Some(1133std::time::Instant::now() + std::time::Duration::from_secs(1),1134))1135.unwrap();1136}1137}11381139let rc = Rc::new(std::cell::Cell::new(0));1140{1141let ex = RawExecutor::<UringReactor>::new().unwrap();1142let rc_clone = rc.clone();1143ex.spawn_local(async move {1144rc_clone.set(1);1145let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));1146let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);1147// Spawn a blocking task.1148let blocking_task = pool.0.spawn(move || {1149// Rendezvous.1150assert_eq!(recv.recv(), Ok(()));1151// Wait for drop.1152assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));1153});1154// Make sure it has actually started (using a "rendezvous channel" send).1155//1156// Without this step, we'll have a race where we can shutdown the blocking pool1157// before the worker thread pops off the task.1158send.send(()).unwrap();1159// Wait for it to finish1160blocking_task.await;1161rc_clone.set(2);1162})1163.detach();1164ex.run_until(async {}).unwrap();1165// `ex` is dropped here. If everything is working as expected, it should drop all of1166// its tasks, including `send` and `pool` (in that order, which is important). `pool`'s1167// `Drop` impl will try to join all the worker threads, which should work because send1168// half of the channel closed.1169}1170assert_eq!(rc.get(), 1);1171Rc::try_unwrap(rc).expect("Rc had too many refs");1172}11731174#[test]1175fn drop_on_different_thread() {1176if !is_uring_stable() {1177return;1178}11791180let ex = RawExecutor::<UringReactor>::new().unwrap();11811182let ex2 = ex.clone();1183let t = thread::spawn(move || ex2.run_until(async {}));11841185t.join().unwrap().unwrap();11861187// Leave an uncompleted operation in the queue so that the drop impl will try to drive it to1188// completion.1189let (_rx, tx) = base::pipe().expect("Pipe failed");1190let tx = ex1191.reactor1192.register_source(&ex, &tx)1193.expect("Failed to register source");1194let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));1195let op = tx1196.start_write_from_mem(1197None,1198bm,1199[MemRegion {1200offset: 0,1201len: mem::size_of::<u64>(),1202}],1203)1204.expect("Failed to start write from mem");12051206mem::drop(ex);12071208match block_on(op).expect_err("Pending operation completed after executor was dropped") {1209Error::ExecutorGone => {}1210e => panic!("Unexpected error after dropping executor: {e}"),1211}1212}1213}121412151216