Path: blob/main/cros_async/src/sys/windows/handle_executor.rs
5394 views
// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::collections::HashMap;5use std::future::Future;6use std::io;7use std::mem;8use std::pin::Pin;9use std::sync::mpsc;10use std::sync::Arc;11use std::sync::Weak;12use std::task::Waker;1314use base::named_pipes::BoxedOverlapped;15use base::warn;16use base::AsRawDescriptor;17use base::Error as SysError;18use base::RawDescriptor;19use futures::task::Context;20use futures::task::Poll;21use sync::Mutex;22use thiserror::Error as ThisError;23use winapi::um::handleapi::INVALID_HANDLE_VALUE;24use winapi::um::minwinbase::OVERLAPPED;2526use crate::common_executor;27use crate::common_executor::RawExecutor;28use crate::common_executor::RawTaskHandle;29use crate::sys::windows::io_completion_port::CompletionPacket;30use crate::sys::windows::io_completion_port::IoCompletionPort;31use crate::waker::WakerToken;32use crate::waker::WeakWake;33use crate::AsyncError;34use crate::AsyncResult;35use crate::IoSource;36use crate::TaskHandle;3738const DEFAULT_IO_CONCURRENCY: u32 = 1;3940#[derive(Debug, ThisError)]41pub enum Error {42#[error("IO completion port operation failed: {0}")]43IocpOperationFailed(SysError),44#[error("Failed to get future from executor run.")]45FailedToReadFutureFromWakerChannel(mpsc::RecvError),46#[error("executor gone before future was dropped.")]47ExecutorGone,48#[error("tried to remove overlapped operation but it didn't exist.")]49RemoveNonExistentOperation,50}5152impl From<Error> for io::Error {53fn from(e: Error) -> Self {54use Error::*;55match e {56FailedToReadFutureFromWakerChannel(e) => io::Error::other(e),57IocpOperationFailed(e) => io::Error::other(e),58ExecutorGone => io::Error::other(e),59RemoveNonExistentOperation => io::Error::other(e),60}61}62}6364impl From<Error> for AsyncError {65fn from(e: Error) -> Self {66AsyncError::SysVariants(e.into())67}68}6970pub type Result<T> = std::result::Result<T, Error>;7172/// Represents an overlapped operation that running (or has completed but not yet woken).73struct OpData {74waker: Option<Waker>,75}7677/// The current status of a future that is running or has completed on HandleReactor.78enum OpStatus {79Pending(OpData),80Completed(CompletionPacket),81}8283pub struct HandleReactor {84iocp: IoCompletionPort,85overlapped_ops: Mutex<HashMap<WakerToken, OpStatus>>,86}8788impl HandleReactor {89pub fn new_with(concurrency: u32) -> Result<Self> {90let iocp = IoCompletionPort::new(concurrency)?;91Ok(Self {92iocp,93overlapped_ops: Mutex::new(HashMap::with_capacity(64)),94})95}9697pub fn new() -> Result<Self> {98Self::new_with(DEFAULT_IO_CONCURRENCY)99}100101/// All descriptors must be first registered with IOCP before any completion packets can be102/// received for them.103pub(crate) fn register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()> {104self.iocp.register_descriptor(rd)105}106107/// When an overlapped operation is created, it is registered with the executor here. This way,108/// when the executor's run thread picks up the completion events, it can associate them back109/// to the correct overlapped operation. Notice that here, no waker is registered. This is110/// because the await hasn't happened yet, so there is no waker. Once the await is triggered,111/// we'll invoke get_overlapped_op_if_ready which will register the waker.112pub(crate) fn register_overlapped_op(&self, token: &WakerToken) {113let mut ops = self.overlapped_ops.lock();114ops.insert(*token, OpStatus::Pending(OpData { waker: None }));115}116117/// Called to register an overlapped IO source with the executor. From here, the source can118/// register overlapped operations that will be managed by the executor.119#[allow(dead_code)]120pub(crate) fn register_overlapped_source(121&self,122raw: &Arc<RawExecutor<HandleReactor>>,123rd: &dyn AsRawDescriptor,124) -> Result<RegisteredOverlappedSource> {125RegisteredOverlappedSource::new(rd, raw)126}127128/// Every time an `OverlappedOperation` is polled, this method will be called. It's a trick to129/// register the waker so that completion events can trigger it from the executor's main thread.130fn get_overlapped_op_if_ready(131&self,132token: &WakerToken,133cx: &mut Context,134) -> Option<CompletionPacket> {135let mut ops = self.overlapped_ops.lock();136137if let OpStatus::Pending(data) = ops138.get_mut(token)139.expect("`get_overlapped_op_if_ready` called on unknown operation")140{141data.waker = Some(cx.waker().clone());142return None;143}144if let OpStatus::Completed(pkt) = ops.remove(token).unwrap() {145return Some(pkt);146}147unreachable!("OpStatus didn't match any known variant.");148}149150/// When an `OverlappedOperation` is dropped, this is called to so we don't leak registered151/// operations. It's possible the operation was already removed (e.g. via polling), in which152/// case this has no effect.153fn remove_overlapped_op(&self, token: &WakerToken) {154let mut ops = self.overlapped_ops.lock();155if ops.remove(token).is_none() {156warn!("Tried to remove non-existent overlapped operation from HandleReactor.");157}158}159}160161impl common_executor::Reactor for HandleReactor {162fn new() -> std::io::Result<Self> {163Ok(HandleReactor::new()?)164}165166fn wake(&self) {167self.iocp.wake().expect("wakeup failed on HandleReactor.");168}169170fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {171// TODO: Cancel overlapped ops and/or wait for everything to complete like the linux172// reactors?173Box::pin(async {})174}175176fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {177let completion_packets = self.iocp.poll()?;178179set_processing();180181for pkt in completion_packets {182if pkt.completion_key as RawDescriptor == INVALID_HANDLE_VALUE {183// These completion packets aren't from overlapped operations. They're from184// something calling HandleReactor::wake, so they've already enqueued whatever185// they think is runnable into the queue. All the packet does is wake up the186// executor loop.187continue;188}189190let mut overlapped_ops = self.overlapped_ops.lock();191if let Some(op) = overlapped_ops.get_mut(&WakerToken(pkt.overlapped_ptr)) {192let waker = match mem::replace(op, OpStatus::Completed(pkt)) {193OpStatus::Pending(OpData { waker }) => waker,194OpStatus::Completed(_) => panic!("operation completed more than once"),195};196drop(overlapped_ops);197if let Some(waker) = waker {198waker.wake();199} else {200// We shouldn't ever get a completion packet for an IO operation that hasn't201// registered its waker.202warn!(203"got a completion packet for an IO operation that had no waker.\204future may be stalled."205)206}207}208}209Ok(())210}211212fn new_source<F: AsRawDescriptor>(213&self,214_ex: &Arc<RawExecutor<Self>>,215f: F,216) -> AsyncResult<IoSource<F>> {217Ok(IoSource::Handle(super::HandleSource::new(f)?))218}219220fn wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R> {221TaskHandle::Handle(task)222}223}224225/// Represents a handle that has been registered for overlapped operations with a specific executor.226/// From here, the OverlappedSource can register overlapped operations with the executor.227pub(crate) struct RegisteredOverlappedSource {228ex: Weak<RawExecutor<HandleReactor>>,229}230231impl RegisteredOverlappedSource {232fn new(233rd: &dyn AsRawDescriptor,234ex: &Arc<RawExecutor<HandleReactor>>,235) -> Result<RegisteredOverlappedSource> {236ex.reactor.register_descriptor(rd)?;237Ok(Self {238ex: Arc::downgrade(ex),239})240}241242/// Registers an overlapped IO operation with this executor. Call this function with the243/// overlapped struct that represents the operation **before** making the overlapped IO call.244///245/// NOTE: you MUST pass OverlappedOperation::get_overlapped_ptr() as the overlapped IO pointer246/// in the IO call.247pub fn register_overlapped_operation(248&self,249offset: Option<u64>,250) -> Result<OverlappedOperation> {251OverlappedOperation::new(offset, self.ex.clone())252}253}254255impl WeakWake for HandleReactor {256fn wake_by_ref(weak_self: &Weak<Self>) {257if let Some(arc_self) = weak_self.upgrade() {258common_executor::Reactor::wake(&*arc_self);259}260}261}262263/// Represents a pending overlapped IO operation. This must be used in the following manner or264/// undefined behavior will result:265/// 1. The reactor in use is a HandleReactor.266/// 2. Immediately after the IO syscall, this future MUST be awaited. We rely on the fact that267/// the executor cannot poll the IOCP before this future is polled for the first time to268/// ensure the waker has been registered. (If the executor polls the IOCP before the waker is269/// registered, the future will stall.)270pub(crate) struct OverlappedOperation {271overlapped: BoxedOverlapped,272ex: Weak<RawExecutor<HandleReactor>>,273completed: bool,274}275276impl OverlappedOperation {277fn new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> {278let ret = Self {279overlapped: BoxedOverlapped(Box::new(base::create_overlapped(offset, None))),280ex,281completed: false,282};283ret.register_op()?;284Ok(ret)285}286287fn register_op(&self) -> Result<()> {288self.ex289.upgrade()290.ok_or(Error::ExecutorGone)?291.reactor292.register_overlapped_op(&self.get_token());293Ok(())294}295296/// Returns a pointer to the overlapped struct representing the operation. This MUST be used297/// when making the overlapped IO call or the executor will not be able to wake the right298/// future.299pub fn get_overlapped(&mut self) -> &mut OVERLAPPED {300&mut self.overlapped.0301}302303#[inline]304pub fn get_token(&self) -> WakerToken {305WakerToken((&*self.overlapped.0) as *const _ as usize)306}307}308309impl Future for OverlappedOperation {310type Output = Result<CompletionPacket>;311312fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {313if self.completed {314panic!("OverlappedOperation polled after returning Poll::Ready");315}316if let Some(ex) = self.ex.upgrade() {317if let Some(completion_pkt) =318ex.reactor.get_overlapped_op_if_ready(&self.get_token(), cx)319{320self.completed = true;321Poll::Ready(Ok(completion_pkt))322} else {323Poll::Pending324}325} else {326Poll::Ready(Err(Error::ExecutorGone))327}328}329}330331impl Drop for OverlappedOperation {332fn drop(&mut self) {333if !self.completed {334if let Some(ex) = self.ex.upgrade() {335ex.reactor.remove_overlapped_op(&self.get_token());336}337}338}339}340341#[cfg(test)]342mod test {343use super::*;344const FUT_MSG: i32 = 5;345use std::rc::Rc;346347use futures::channel::mpsc as fut_mpsc;348use futures::SinkExt;349use futures::StreamExt;350351use crate::BlockingPool;352use crate::ExecutorTrait;353354#[test]355fn run_future() {356let (send, recv) = mpsc::channel();357async fn this_test(send: mpsc::Sender<i32>) {358send.send(FUT_MSG).unwrap();359}360361let ex = RawExecutor::<HandleReactor>::new().unwrap();362ex.run_until(this_test(send)).unwrap();363assert_eq!(recv.recv().unwrap(), FUT_MSG);364}365366#[test]367fn spawn_future() {368let (send, recv) = fut_mpsc::channel(1);369let (send_done_signal, receive_done_signal) = mpsc::channel();370371async fn message_sender(mut send: fut_mpsc::Sender<i32>) {372send.send(FUT_MSG).await.unwrap();373}374375async fn message_receiver(mut recv: fut_mpsc::Receiver<i32>, done: mpsc::Sender<bool>) {376assert_eq!(recv.next().await.unwrap(), FUT_MSG);377done.send(true).unwrap();378}379380let ex = RawExecutor::<HandleReactor>::new().unwrap();381ex.spawn(message_sender(send)).detach();382ex.run_until(message_receiver(recv, send_done_signal))383.unwrap();384assert_eq!(receive_done_signal.recv().unwrap(), true);385}386387// Dropping a task that owns a BlockingPool shouldn't leak the pool.388#[test]389fn drop_detached_blocking_pool() {390struct Cleanup(BlockingPool);391392impl Drop for Cleanup {393fn drop(&mut self) {394// Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).395self.0396.shutdown(Some(397std::time::Instant::now() + std::time::Duration::from_secs(1),398))399.unwrap();400}401}402403let rc = Rc::new(std::cell::Cell::new(0));404{405let ex = RawExecutor::<HandleReactor>::new().unwrap();406let rc_clone = rc.clone();407ex.spawn_local(async move {408rc_clone.set(1);409let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));410let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);411// Spawn a blocking task.412let blocking_task = pool.0.spawn(move || {413// Rendezvous.414assert_eq!(recv.recv(), Ok(()));415// Wait for drop.416assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));417});418// Make sure it has actually started (using a "rendezvous channel" send).419//420// Without this step, we'll have a race where we can shutdown the blocking pool421// before the worker thread pops off the task.422send.send(()).unwrap();423// Wait for it to finish424blocking_task.await;425rc_clone.set(2);426})427.detach();428ex.run_until(async {}).unwrap();429// `ex` is dropped here. If everything is working as expected, it should drop all of430// its tasks, including `send` and `pool` (in that order, which is important). `pool`'s431// `Drop` impl will try to join all the worker threads, which should work because send432// half of the channel closed.433}434assert_eq!(rc.get(), 1);435Rc::try_unwrap(rc).expect("Rc had too many refs");436}437}438439440