Path: blob/main/crates/polars-stream/src/async_executor/park_group.rs
6939 views
use std::sync::Arc;1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};23use parking_lot::{Condvar, Mutex};45/// A group of workers that can park / unpark each other.6///7/// There is at most one worker at a time which is considered a 'recruiter'.8/// A recruiter hasn't yet found work and will either park again or recruit the9/// next worker when it finds work.10///11/// Calls to park/unpark participate in a global SeqCst order.12#[derive(Default)]13pub struct ParkGroup {14inner: Arc<ParkGroupInner>,15}1617#[derive(Default)]18struct ParkGroupInner {19// The condvar we park with.20condvar: Condvar,2122// Contains the number of notifications and whether or not the next unparked23// worker should become a recruiter.24notifications: Mutex<(u32, bool)>,2526// Bits 0..32: number of idle workers.27// Bit 32: set if there is an active recruiter.28// Bit 33: set if a worker is preparing to park.29// Bits 34..64: version that is incremented to cancel a park request.30state: AtomicU64,3132num_workers: AtomicU32,33}3435const IDLE_UNIT: u64 = 1;36const ACTIVE_RECRUITER_BIT: u64 = 1 << 32;37const PREPARING_TO_PARK_BIT: u64 = 1 << 33;38const VERSION_UNIT: u64 = 1 << 34;3940fn state_num_idle(state: u64) -> u32 {41state as u3242}4344fn state_version(state: u64) -> u32 {45(state >> 34) as u3246}4748pub struct ParkGroupWorker {49inner: Arc<ParkGroupInner>,50recruiter: bool,51version: u32,52}5354impl ParkGroup {55pub fn new() -> Self {56Self::default()57}5859/// Creates a new worker.60///61/// # Panics62/// Panics if you try to create more than 2^32 - 1 workers.63pub fn new_worker(&self) -> ParkGroupWorker {64self.inner65.num_workers66.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |w| w.checked_add(1))67.expect("can't have more than 2^32 - 1 workers");6869ParkGroupWorker {70version: 0,71inner: Arc::clone(&self.inner),72recruiter: false,73}74}7576/// Unparks an idle worker if there is no recruiter.77///78/// Also cancels in-progress park attempts.79pub fn unpark_one(&self) {80self.inner.unpark_one();81}82}8384impl ParkGroupWorker {85/// Prepares to park this worker.86pub fn prepare_park(&mut self) -> ParkAttempt<'_> {87let mut state = self.inner.state.load(Ordering::SeqCst);88self.version = state_version(state);8990// If the version changes or someone else has set the91// PREPARING_TO_PARK_BIT, stop trying to update the state.92while state & PREPARING_TO_PARK_BIT == 0 && state_version(state) == self.version {93// Notify that we're preparing to park, and while we're at it might as94// well try to become a recruiter to avoid expensive unparks.95let new_state = state | PREPARING_TO_PARK_BIT | ACTIVE_RECRUITER_BIT;96match self.inner.state.compare_exchange_weak(97state,98new_state,99Ordering::Relaxed,100Ordering::SeqCst,101) {102Ok(s) => {103if s & ACTIVE_RECRUITER_BIT == 0 {104self.recruiter = true;105}106break;107},108109Err(s) => state = s,110}111}112113ParkAttempt { worker: self }114}115116/// You should call this function after finding work to recruit the next117/// worker if this worker was a recruiter.118pub fn recruit_next(&mut self) {119if !self.recruiter {120return;121}122123// Recruit the next idle worker or mark that there is no recruiter anymore.124let mut recruit_next = false;125let _ = self126.inner127.state128.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |state| {129debug_assert!(state & ACTIVE_RECRUITER_BIT != 0);130131recruit_next = state_num_idle(state) > 0;132let bit = if recruit_next {133IDLE_UNIT134} else {135ACTIVE_RECRUITER_BIT136};137Some(state - bit)138});139140if recruit_next {141self.inner.unpark_one_slow_as_recruiter();142}143self.recruiter = false;144}145}146147pub struct ParkAttempt<'a> {148worker: &'a mut ParkGroupWorker,149}150151impl ParkAttempt<'_> {152/// Actually park this worker.153///154/// If there were calls to unpark between calling prepare_park() and park(),155/// this park attempt is cancelled and immediately returns.156pub fn park(mut self) {157let state = &self.worker.inner.state;158let update = state.fetch_update(Ordering::Relaxed, Ordering::SeqCst, |state| {159if state_version(state) != self.worker.version {160// We got notified of new work, cancel park.161None162} else if self.worker.recruiter {163Some(state + IDLE_UNIT - ACTIVE_RECRUITER_BIT)164} else {165Some(state + IDLE_UNIT)166}167});168169if update.is_ok() {170self.park_slow()171}172}173174#[cold]175fn park_slow(&mut self) {176let condvar = &self.worker.inner.condvar;177let mut notifications = self.worker.inner.notifications.lock();178condvar.wait_while(&mut notifications, |n| n.0 == 0);179180// Possibly become a recruiter and consume the notification.181self.worker.recruiter = notifications.1;182notifications.0 -= 1;183notifications.1 = false;184}185}186187impl ParkGroupInner {188fn unpark_one(&self) {189let mut should_unpark = false;190let _ = self191.state192.fetch_update(Ordering::Release, Ordering::SeqCst, |state| {193should_unpark = state_num_idle(state) > 0 && state & ACTIVE_RECRUITER_BIT == 0;194if should_unpark {195Some(state - IDLE_UNIT + ACTIVE_RECRUITER_BIT)196} else if state & PREPARING_TO_PARK_BIT == PREPARING_TO_PARK_BIT {197Some(state.wrapping_add(VERSION_UNIT) & !PREPARING_TO_PARK_BIT)198} else {199None200}201});202203if should_unpark {204self.unpark_one_slow_as_recruiter();205}206}207208#[cold]209fn unpark_one_slow_as_recruiter(&self) {210let mut notifications = self.notifications.lock();211notifications.0 += 1;212notifications.1 = true;213self.condvar.notify_one();214}215}216217218