Path: blob/main/cros_async/src/blocking/cancellable_pool.rs
5394 views
// Copyright 2022 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34//! Provides an async blocking pool whose tasks can be cancelled.56use std::collections::HashMap;7use std::future::Future;8use std::sync::Arc;9use std::sync::LazyLock;10use std::time::Duration;11use std::time::Instant;1213use sync::Condvar;14use sync::Mutex;15use thiserror::Error as ThisError;1617use crate::BlockingPool;1819/// Global executor.20///21/// This is convenient, though not preferred. Pros/cons:22/// + It avoids passing executor all the way to each call sites.23/// + The call site can assume that executor will never shutdown.24/// + Provides similar functionality as async_task with a few improvements around ability to cancel.25/// - Globals are harder to reason about.26static EXECUTOR: LazyLock<CancellableBlockingPool> =27LazyLock::new(|| CancellableBlockingPool::new(256, Duration::from_secs(10)));2829const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);3031#[derive(PartialEq, Eq, PartialOrd, Default)]32enum WindDownStates {33#[default]34Armed,35Disarmed,36ShuttingDown,37ShutDown,38}3940#[derive(Default)]41struct State {42wind_down: WindDownStates,4344/// Helps to generate unique id to associate `cancel` with task.45current_cancellable_id: u64,4647/// A map of all the `cancel` routines of queued/in-flight tasks.48cancellables: HashMap<u64, Box<dyn Fn() + Send + 'static>>,49}5051#[derive(Debug, Clone, Copy)]52pub enum TimeoutAction {53/// Do nothing on timeout.54None,55/// Panic the thread on timeout.56Panic,57}5859#[derive(ThisError, Debug, PartialEq, Eq)]60pub enum Error {61#[error("Timeout occurred while trying to join threads")]62Timedout,63#[error("Shutdown is in progress")]64ShutdownInProgress,65#[error("Already shut down")]66AlreadyShutdown,67}6869struct Inner {70blocking_pool: BlockingPool,71state: Mutex<State>,7273/// This condvar gets notified when `cancellables` is empty after removing an74/// entry.75cancellables_cv: Condvar,76}7778impl Inner {79pub fn spawn<F, R>(self: &Arc<Self>, f: F) -> impl Future<Output = R>80where81F: FnOnce() -> R + Send + 'static,82R: Send + 'static,83{84self.blocking_pool.spawn(f)85}8687/// Adds cancel to a cancellables and returns an `id` with which `cancel` can be88/// accessed/removed.89fn add_cancellable(&self, cancel: Box<dyn Fn() + Send + 'static>) -> u64 {90let mut state = self.state.lock();91let id = state.current_cancellable_id;92state.current_cancellable_id += 1;93state.cancellables.insert(id, cancel);94id95}96}9798/// A thread pool for running work that may block.99///100/// This is a wrapper around `BlockingPool` with an ability to cancel queued tasks.101/// See [BlockingPool] for more info.102///103/// # Examples104///105/// Spawn a task to run in the `CancellableBlockingPool` and await on its result.106///107/// ```edition2018108/// use cros_async::CancellableBlockingPool;109///110/// # async fn do_it() {111/// let pool = CancellableBlockingPool::default();112/// let CANCELLED = 0;113///114/// let res = pool.spawn(move || {115/// // Do some CPU-intensive or blocking work here.116///117/// 42118/// }, move || CANCELLED).await;119///120/// assert_eq!(res, 42);121/// # }122/// # futures::executor::block_on(do_it());123/// ```124#[derive(Clone)]125pub struct CancellableBlockingPool {126inner: Arc<Inner>,127}128129impl CancellableBlockingPool {130const RETRY_COUNT: usize = 10;131const SLEEP_DURATION: Duration = Duration::from_millis(100);132133/// Create a new `CancellableBlockingPool`.134///135/// When we try to shutdown or drop `CancellableBlockingPool`, it may happen that a hung thread136/// might prevent `CancellableBlockingPool` pool from getting dropped. On failure to shutdown in137/// `watchdog_opts.timeout` duration, `CancellableBlockingPool` can take an action specified by138/// `watchdog_opts.action`.139///140/// See also: [BlockingPool::new()](BlockingPool::new)141pub fn new(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool {142CancellableBlockingPool {143inner: Arc::new(Inner {144blocking_pool: BlockingPool::new(max_threads, keepalive),145state: Default::default(),146cancellables_cv: Condvar::new(),147}),148}149}150151/// Like [Self::new] but with pre-allocating capacity for up to `max_threads`.152pub fn with_capacity(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool {153CancellableBlockingPool {154inner: Arc::new(Inner {155blocking_pool: BlockingPool::with_capacity(max_threads, keepalive),156state: Mutex::new(State::default()),157cancellables_cv: Condvar::new(),158}),159}160}161162/// Spawn a task to run in the `CancellableBlockingPool`.163///164/// Callers may `await` the returned `Task` to be notified when the work is completed.165/// Dropping the future will not cancel the task.166///167/// `cancel` helps to cancel a queued or in-flight operation `f`.168/// `cancel` may be called more than once if `f` doesn't respond to `cancel`.169/// `cancel` is not called if `f` completes successfully. For example,170/// # Examples171///172/// ```edition2018173/// use {cros_async::CancellableBlockingPool, std::sync::{Arc, Mutex, Condvar}};174///175/// # async fn cancel_it() {176/// let pool = CancellableBlockingPool::default();177/// let cancelled: i32 = 1;178/// let success: i32 = 2;179///180/// let shared = Arc::new((Mutex::new(0), Condvar::new()));181/// let shared2 = shared.clone();182/// let shared3 = shared.clone();183///184/// let res = pool185/// .spawn(186/// move || {187/// let guard = shared.0.lock().unwrap();188/// let mut guard = shared.1.wait_while(guard, |state| *state == 0).unwrap();189/// if *guard != cancelled {190/// *guard = success;191/// }192/// },193/// move || {194/// *shared2.0.lock().unwrap() = cancelled;195/// shared2.1.notify_all();196/// },197/// )198/// .await;199/// pool.shutdown();200///201/// assert_eq!(*shared3.0.lock().unwrap(), cancelled);202/// # }203/// ```204pub fn spawn<F, R, G>(&self, f: F, cancel: G) -> impl Future<Output = R>205where206F: FnOnce() -> R + Send + 'static,207R: Send + 'static,208G: Fn() -> R + Send + 'static,209{210let inner = self.inner.clone();211let cancelled = Arc::new(Mutex::new(None));212let cancelled_spawn = cancelled.clone();213let id = inner.add_cancellable(Box::new(move || {214let mut c = cancelled.lock();215*c = Some(cancel());216}));217218self.inner.spawn(move || {219if let Some(res) = cancelled_spawn.lock().take() {220return res;221}222let ret = f();223let mut state = inner.state.lock();224state.cancellables.remove(&id);225if state.cancellables.is_empty() {226inner.cancellables_cv.notify_one();227}228ret229})230}231232/// Iterates over all the queued tasks and marks them as cancelled.233fn drain_cancellables(&self) {234let mut state = self.inner.state.lock();235// Iterate a few times to try cancelling all the tasks.236for _ in 0..Self::RETRY_COUNT {237// Nothing left to do.238if state.cancellables.is_empty() {239return;240}241242// We only cancel the task and do not remove it from the cancellables. It is runner's243// job to remove from state.cancellables.244for cancel in state.cancellables.values() {245cancel();246}247// Hold the state lock in a block before sleeping so that woken up threads can get to248// hold the lock.249// Wait for a while so that the threads get a chance complete task in flight.250let (state1, _cv_timeout) = self251.inner252.cancellables_cv253.wait_timeout(state, Self::SLEEP_DURATION);254state = state1;255}256}257258/// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing259/// will be cancelled.260/// Does not wait for all the tasks to get cancelled.261pub fn disarm(&self) {262{263let mut state = self.inner.state.lock();264265if state.wind_down >= WindDownStates::Disarmed {266return;267}268269// At this point any new incoming request will be cancelled when run.270state.wind_down = WindDownStates::Disarmed;271}272self.drain_cancellables();273}274275/// Shut down the `CancellableBlockingPool`.276///277/// This will block until all work that has been started by the worker threads is finished. Any278/// work that was added to the `CancellableBlockingPool` but not yet picked up by a worker279/// thread will not complete and `await`ing on the `Task` for that work will panic.280pub fn shutdown(&self) -> Result<(), Error> {281self.shutdown_with_timeout(DEFAULT_SHUTDOWN_TIMEOUT)282}283284fn shutdown_with_timeout(&self, timeout: Duration) -> Result<(), Error> {285self.disarm();286{287let mut state = self.inner.state.lock();288if state.wind_down == WindDownStates::ShuttingDown {289return Err(Error::ShutdownInProgress);290}291if state.wind_down == WindDownStates::ShutDown {292return Err(Error::AlreadyShutdown);293}294state.wind_down = WindDownStates::ShuttingDown;295}296297let res = self298.inner299.blocking_pool300.shutdown(/* deadline: */ Some(Instant::now() + timeout));301302self.inner.state.lock().wind_down = WindDownStates::ShutDown;303match res {304Ok(_) => Ok(()),305Err(_) => Err(Error::Timedout),306}307}308}309310impl Default for CancellableBlockingPool {311fn default() -> CancellableBlockingPool {312CancellableBlockingPool::new(256, Duration::from_secs(10))313}314}315316impl Drop for CancellableBlockingPool {317fn drop(&mut self) {318if let Err(e) = self.shutdown() {319base::error!("CancellableBlockingPool::shutdown failed: {}", e);320}321}322}323324/// Spawn a task to run in the `CancellableBlockingPool` static executor.325///326/// `cancel` in-flight operation. cancel is called on operation during `disarm` or during327/// `shutdown`. Cancel may be called multiple times if running task doesn't get cancelled on first328/// attempt.329///330/// Callers may `await` the returned `Task` to be notified when the work is completed.331///332/// See also: `spawn`.333pub fn unblock<F, R, G>(f: F, cancel: G) -> impl Future<Output = R>334where335F: FnOnce() -> R + Send + 'static,336R: Send + 'static,337G: Fn() -> R + Send + 'static,338{339EXECUTOR.spawn(f, cancel)340}341342/// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing343/// will be cancelled.344/// Doesn't not wait for all the tasks to get cancelled.345pub fn unblock_disarm() {346EXECUTOR.disarm()347}348349#[cfg(test)]350mod test {351use std::sync::Arc;352use std::sync::Barrier;353use std::thread;354use std::time::Duration;355356use futures::executor::block_on;357use sync::Condvar;358use sync::Mutex;359360use crate::blocking::Error;361use crate::CancellableBlockingPool;362363#[test]364fn disarm_with_pending_work() {365// Create a pool with only one thread.366let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));367368let mu = Arc::new(Mutex::new(false));369let cv = Arc::new(Condvar::new());370let blocker_is_running = Arc::new(Barrier::new(2));371372// First spawn a thread that blocks the pool.373let task_mu = mu.clone();374let task_cv = cv.clone();375let task_blocker_is_running = blocker_is_running.clone();376let _blocking_task = pool.spawn(377move || {378task_blocker_is_running.wait();379let mut ready = task_mu.lock();380while !*ready {381ready = task_cv.wait(ready);382}383},384move || {},385);386387// Wait for the worker to start running the blocking thread.388blocker_is_running.wait();389390// This task will never finish because we will disarm the pool first.391let unfinished = pool.spawn(|| 5, || 0);392393// Disarming should cancel the task.394pool.disarm();395396// Shutdown the blocking thread. This will allow a worker to pick up the task that has397// to be cancelled.398*mu.lock() = true;399cv.notify_all();400401// We expect the cancelled value to be returned.402assert_eq!(block_on(unfinished), 0);403404// Now the pool is empty and can be shutdown without blocking.405pool.shutdown().unwrap();406}407408#[test]409fn shutdown_with_blocked_work_should_timeout() {410let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));411412let running = Arc::new((Mutex::new(false), Condvar::new()));413let running1 = running.clone();414let _blocking_task = pool.spawn(415move || {416*running1.0.lock() = true;417running1.1.notify_one();418thread::sleep(Duration::from_secs(10000));419},420move || {},421);422423let mut is_running = running.0.lock();424while !*is_running {425is_running = running.1.wait(is_running);426}427428// This shutdown will wait for the full timeout period, so use a short timeout.429assert_eq!(430pool.shutdown_with_timeout(Duration::from_millis(1)),431Err(Error::Timedout)432);433}434435#[test]436fn multiple_shutdown_returns_error() {437let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));438let _ = pool.shutdown();439assert_eq!(pool.shutdown(), Err(Error::AlreadyShutdown));440}441442#[test]443fn shutdown_in_progress() {444let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));445446let running = Arc::new((Mutex::new(false), Condvar::new()));447let running1 = running.clone();448let _blocking_task = pool.spawn(449move || {450*running1.0.lock() = true;451running1.1.notify_one();452thread::sleep(Duration::from_secs(10000));453},454move || {},455);456457let mut is_running = running.0.lock();458while !*is_running {459is_running = running.1.wait(is_running);460}461462let pool_clone = pool.clone();463thread::spawn(move || {464while !pool_clone.inner.blocking_pool.shutting_down() {}465assert_eq!(pool_clone.shutdown(), Err(Error::ShutdownInProgress));466});467468// This shutdown will wait for the full timeout period, so use a short timeout.469// However, it also needs to wait long enough for the thread spawned above to observe the470// shutting_down state, so don't make it too short.471assert_eq!(472pool.shutdown_with_timeout(Duration::from_millis(200)),473Err(Error::Timedout)474);475}476}477478479