Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/async_executor/park_group.rs
6939 views
1
use std::sync::Arc;
2
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
3
4
use parking_lot::{Condvar, Mutex};
5
6
/// A group of workers that can park / unpark each other.
7
///
8
/// There is at most one worker at a time which is considered a 'recruiter'.
9
/// A recruiter hasn't yet found work and will either park again or recruit the
10
/// next worker when it finds work.
11
///
12
/// Calls to park/unpark participate in a global SeqCst order.
13
#[derive(Default)]
14
pub struct ParkGroup {
15
inner: Arc<ParkGroupInner>,
16
}
17
18
#[derive(Default)]
19
struct ParkGroupInner {
20
// The condvar we park with.
21
condvar: Condvar,
22
23
// Contains the number of notifications and whether or not the next unparked
24
// worker should become a recruiter.
25
notifications: Mutex<(u32, bool)>,
26
27
// Bits 0..32: number of idle workers.
28
// Bit 32: set if there is an active recruiter.
29
// Bit 33: set if a worker is preparing to park.
30
// Bits 34..64: version that is incremented to cancel a park request.
31
state: AtomicU64,
32
33
num_workers: AtomicU32,
34
}
35
36
const IDLE_UNIT: u64 = 1;
37
const ACTIVE_RECRUITER_BIT: u64 = 1 << 32;
38
const PREPARING_TO_PARK_BIT: u64 = 1 << 33;
39
const VERSION_UNIT: u64 = 1 << 34;
40
41
fn state_num_idle(state: u64) -> u32 {
42
state as u32
43
}
44
45
fn state_version(state: u64) -> u32 {
46
(state >> 34) as u32
47
}
48
49
pub struct ParkGroupWorker {
50
inner: Arc<ParkGroupInner>,
51
recruiter: bool,
52
version: u32,
53
}
54
55
impl ParkGroup {
56
pub fn new() -> Self {
57
Self::default()
58
}
59
60
/// Creates a new worker.
61
///
62
/// # Panics
63
/// Panics if you try to create more than 2^32 - 1 workers.
64
pub fn new_worker(&self) -> ParkGroupWorker {
65
self.inner
66
.num_workers
67
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |w| w.checked_add(1))
68
.expect("can't have more than 2^32 - 1 workers");
69
70
ParkGroupWorker {
71
version: 0,
72
inner: Arc::clone(&self.inner),
73
recruiter: false,
74
}
75
}
76
77
/// Unparks an idle worker if there is no recruiter.
78
///
79
/// Also cancels in-progress park attempts.
80
pub fn unpark_one(&self) {
81
self.inner.unpark_one();
82
}
83
}
84
85
impl ParkGroupWorker {
86
/// Prepares to park this worker.
87
pub fn prepare_park(&mut self) -> ParkAttempt<'_> {
88
let mut state = self.inner.state.load(Ordering::SeqCst);
89
self.version = state_version(state);
90
91
// If the version changes or someone else has set the
92
// PREPARING_TO_PARK_BIT, stop trying to update the state.
93
while state & PREPARING_TO_PARK_BIT == 0 && state_version(state) == self.version {
94
// Notify that we're preparing to park, and while we're at it might as
95
// well try to become a recruiter to avoid expensive unparks.
96
let new_state = state | PREPARING_TO_PARK_BIT | ACTIVE_RECRUITER_BIT;
97
match self.inner.state.compare_exchange_weak(
98
state,
99
new_state,
100
Ordering::Relaxed,
101
Ordering::SeqCst,
102
) {
103
Ok(s) => {
104
if s & ACTIVE_RECRUITER_BIT == 0 {
105
self.recruiter = true;
106
}
107
break;
108
},
109
110
Err(s) => state = s,
111
}
112
}
113
114
ParkAttempt { worker: self }
115
}
116
117
/// You should call this function after finding work to recruit the next
118
/// worker if this worker was a recruiter.
119
pub fn recruit_next(&mut self) {
120
if !self.recruiter {
121
return;
122
}
123
124
// Recruit the next idle worker or mark that there is no recruiter anymore.
125
let mut recruit_next = false;
126
let _ = self
127
.inner
128
.state
129
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |state| {
130
debug_assert!(state & ACTIVE_RECRUITER_BIT != 0);
131
132
recruit_next = state_num_idle(state) > 0;
133
let bit = if recruit_next {
134
IDLE_UNIT
135
} else {
136
ACTIVE_RECRUITER_BIT
137
};
138
Some(state - bit)
139
});
140
141
if recruit_next {
142
self.inner.unpark_one_slow_as_recruiter();
143
}
144
self.recruiter = false;
145
}
146
}
147
148
pub struct ParkAttempt<'a> {
149
worker: &'a mut ParkGroupWorker,
150
}
151
152
impl ParkAttempt<'_> {
153
/// Actually park this worker.
154
///
155
/// If there were calls to unpark between calling prepare_park() and park(),
156
/// this park attempt is cancelled and immediately returns.
157
pub fn park(mut self) {
158
let state = &self.worker.inner.state;
159
let update = state.fetch_update(Ordering::Relaxed, Ordering::SeqCst, |state| {
160
if state_version(state) != self.worker.version {
161
// We got notified of new work, cancel park.
162
None
163
} else if self.worker.recruiter {
164
Some(state + IDLE_UNIT - ACTIVE_RECRUITER_BIT)
165
} else {
166
Some(state + IDLE_UNIT)
167
}
168
});
169
170
if update.is_ok() {
171
self.park_slow()
172
}
173
}
174
175
#[cold]
176
fn park_slow(&mut self) {
177
let condvar = &self.worker.inner.condvar;
178
let mut notifications = self.worker.inner.notifications.lock();
179
condvar.wait_while(&mut notifications, |n| n.0 == 0);
180
181
// Possibly become a recruiter and consume the notification.
182
self.worker.recruiter = notifications.1;
183
notifications.0 -= 1;
184
notifications.1 = false;
185
}
186
}
187
188
impl ParkGroupInner {
189
fn unpark_one(&self) {
190
let mut should_unpark = false;
191
let _ = self
192
.state
193
.fetch_update(Ordering::Release, Ordering::SeqCst, |state| {
194
should_unpark = state_num_idle(state) > 0 && state & ACTIVE_RECRUITER_BIT == 0;
195
if should_unpark {
196
Some(state - IDLE_UNIT + ACTIVE_RECRUITER_BIT)
197
} else if state & PREPARING_TO_PARK_BIT == PREPARING_TO_PARK_BIT {
198
Some(state.wrapping_add(VERSION_UNIT) & !PREPARING_TO_PARK_BIT)
199
} else {
200
None
201
}
202
});
203
204
if should_unpark {
205
self.unpark_one_slow_as_recruiter();
206
}
207
}
208
209
#[cold]
210
fn unpark_one_slow_as_recruiter(&self) {
211
let mut notifications = self.notifications.lock();
212
notifications.0 += 1;
213
notifications.1 = true;
214
self.condvar.notify_one();
215
}
216
}
217
218