Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/pipe.rs
8420 views
1
use std::cmp::Reverse;
2
use std::sync::Arc;
3
4
use parking_lot::Mutex;
5
use polars_error::PolarsResult;
6
use polars_utils::priority::Priority;
7
use polars_utils::relaxed_cell::RelaxedCell;
8
9
use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
10
use crate::async_primitives::connector::{ReceiverExt, SenderExt, connector_with};
11
use crate::async_primitives::distributor_channel::distributor_channel;
12
use crate::async_primitives::linearizer::Linearizer;
13
use crate::async_primitives::wait_group::WaitGroup;
14
use crate::graph::LogicalPipeKey;
15
use crate::metrics::GraphMetrics;
16
use crate::morsel::{Morsel, MorselSeq};
17
use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};
18
19
pub fn port_channel(metrics: Option<Arc<PipeMetrics>>) -> (PortSender, PortReceiver) {
20
let (send, recv) = connector_with(metrics);
21
(PortSender(send), PortReceiver(recv))
22
}
23
24
pub struct PortSender(SenderExt<Morsel, Option<Arc<PipeMetrics>>>);
25
pub struct PortReceiver(ReceiverExt<Morsel, Option<Arc<PipeMetrics>>>);
26
27
impl PortSender {
28
#[inline]
29
pub async fn send(&mut self, morsel: Morsel) -> Result<(), Morsel> {
30
let rows = morsel.df().height() as u64;
31
self.0.send(morsel).await?;
32
if let Some(metrics) = self.0.shared() {
33
metrics.morsels_sent.fetch_add(1);
34
metrics.rows_sent.fetch_add(rows);
35
metrics.largest_morsel_sent.fetch_max(rows);
36
}
37
Ok(())
38
}
39
}
40
41
impl PortReceiver {
42
#[inline]
43
pub async fn recv(&mut self) -> Result<Morsel, ()> {
44
let morsel = self.0.recv().await?;
45
let rows = morsel.df().height() as u64;
46
if let Some(metrics) = self.0.shared() {
47
metrics.morsels_received.fetch_add(1);
48
metrics.rows_received.fetch_add(rows);
49
metrics.largest_morsel_received.fetch_max(rows);
50
}
51
Ok(morsel)
52
}
53
}
54
55
#[derive(Default)]
56
#[repr(align(128))]
57
pub struct PipeMetrics {
58
pub morsels_sent: RelaxedCell<u64>,
59
pub rows_sent: RelaxedCell<u64>,
60
pub largest_morsel_sent: RelaxedCell<u64>,
61
pub morsels_received: RelaxedCell<u64>,
62
pub rows_received: RelaxedCell<u64>,
63
pub largest_morsel_received: RelaxedCell<u64>,
64
}
65
66
pub struct PhysicalPipe {
67
state: State,
68
seq_offset: Arc<RelaxedCell<u64>>,
69
metrics: Option<Arc<Mutex<GraphMetrics>>>,
70
key: LogicalPipeKey,
71
}
72
73
impl PhysicalPipe {
74
fn make_channel(&self) -> (PortSender, PortReceiver) {
75
let metrics = self.metrics.as_ref().map(|m| {
76
let pipe_metrics = Arc::<PipeMetrics>::default();
77
m.lock().add_pipe(self.key, pipe_metrics.clone());
78
pipe_metrics
79
});
80
port_channel(metrics)
81
}
82
}
83
84
enum State {
85
Invalid,
86
Uninit {
87
num_pipelines: usize,
88
},
89
SerialReceiver {
90
num_pipelines: usize,
91
send: PortSender,
92
maintain_order: bool,
93
},
94
ParallelReceiver {
95
senders: Vec<PortSender>,
96
},
97
NeedsLinearizer {
98
receivers: Vec<PortReceiver>,
99
send: PortSender,
100
maintain_order: bool,
101
},
102
NeedsDistributor {
103
recv: PortReceiver,
104
senders: Vec<PortSender>,
105
},
106
NeedsOffset {
107
senders: Vec<PortSender>,
108
receivers: Vec<PortReceiver>,
109
},
110
Initialized,
111
}
112
113
pub struct SendPort<'a>(&'a mut PhysicalPipe);
114
pub struct RecvPort<'a>(&'a mut PhysicalPipe);
115
116
impl RecvPort<'_> {
117
pub fn serial(self) -> PortReceiver {
118
self.serial_with_maintain_order(true)
119
}
120
121
pub fn serial_with_maintain_order(self, maintain_order: bool) -> PortReceiver {
122
let State::Uninit { num_pipelines } = self.0.state else {
123
unreachable!()
124
};
125
let (send, recv) = self.0.make_channel();
126
self.0.state = State::SerialReceiver {
127
num_pipelines,
128
send,
129
maintain_order,
130
};
131
recv
132
}
133
134
pub fn parallel(self) -> Vec<PortReceiver> {
135
let State::Uninit { num_pipelines } = self.0.state else {
136
unreachable!()
137
};
138
let (senders, receivers): (Vec<PortSender>, Vec<PortReceiver>) =
139
(0..num_pipelines).map(|_| self.0.make_channel()).unzip();
140
self.0.state = State::ParallelReceiver { senders };
141
receivers
142
}
143
}
144
145
impl SendPort<'_> {
146
#[allow(unused)]
147
pub fn is_receiver_serial(&self) -> bool {
148
matches!(self.0.state, State::SerialReceiver { .. })
149
}
150
151
pub fn serial(self) -> PortSender {
152
match core::mem::replace(&mut self.0.state, State::Invalid) {
153
State::SerialReceiver { send, .. } => {
154
if self.0.seq_offset.load() == 0 {
155
self.0.state = State::Initialized;
156
send
157
} else {
158
let (offset_send, offset_recv) = self.0.make_channel();
159
self.0.state = State::NeedsOffset {
160
senders: vec![send],
161
receivers: vec![offset_recv],
162
};
163
offset_send
164
}
165
},
166
State::ParallelReceiver { senders } => {
167
let (send, recv) = self.0.make_channel();
168
self.0.state = State::NeedsDistributor { recv, senders };
169
send
170
},
171
_ => unreachable!(),
172
}
173
}
174
175
pub fn parallel(self) -> Vec<PortSender> {
176
match core::mem::replace(&mut self.0.state, State::Invalid) {
177
State::SerialReceiver {
178
num_pipelines,
179
send,
180
maintain_order,
181
} => {
182
let (senders, receivers): (Vec<PortSender>, Vec<PortReceiver>) =
183
(0..num_pipelines).map(|_| self.0.make_channel()).unzip();
184
self.0.state = State::NeedsLinearizer {
185
receivers,
186
send,
187
maintain_order,
188
};
189
senders
190
},
191
State::ParallelReceiver { senders } => {
192
if self.0.seq_offset.load() == 0 {
193
self.0.state = State::Initialized;
194
senders
195
} else {
196
let (offset_senders, offset_receivers): (Vec<PortSender>, Vec<PortReceiver>) =
197
senders.iter().map(|_| self.0.make_channel()).unzip();
198
self.0.state = State::NeedsOffset {
199
senders,
200
receivers: offset_receivers,
201
};
202
offset_senders
203
}
204
},
205
_ => unreachable!(),
206
}
207
}
208
}
209
210
impl PhysicalPipe {
211
pub fn new(
212
num_pipelines: usize,
213
key: LogicalPipeKey,
214
seq_offset: Arc<RelaxedCell<u64>>,
215
metrics: Option<Arc<Mutex<GraphMetrics>>>,
216
) -> Self {
217
Self {
218
state: State::Uninit { num_pipelines },
219
key,
220
seq_offset,
221
metrics,
222
}
223
}
224
225
pub fn recv_port(&mut self) -> RecvPort<'_> {
226
assert!(
227
matches!(self.state, State::Uninit { .. }),
228
"PhysicalPipe::recv_port can only be called on an uninitialized pipe"
229
);
230
RecvPort(self)
231
}
232
233
pub fn send_port(&mut self) -> SendPort<'_> {
234
assert!(
235
matches!(
236
self.state,
237
State::SerialReceiver { .. } | State::ParallelReceiver { .. }
238
),
239
"PhysicalPipe::send_port must be called on a pipe which only has its receive port initialized"
240
);
241
SendPort(self)
242
}
243
244
pub fn spawn<'env, 's>(
245
&'env mut self,
246
scope: &'s TaskScope<'s, 'env>,
247
handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
248
) {
249
match core::mem::replace(&mut self.state, State::Initialized) {
250
State::Invalid
251
| State::Uninit { .. }
252
| State::SerialReceiver { .. }
253
| State::ParallelReceiver { .. } => {
254
panic!("PhysicalPipe::spawn called on (partially) initialized pipe");
255
},
256
257
State::Initialized => {},
258
259
State::NeedsLinearizer {
260
receivers,
261
mut send,
262
maintain_order,
263
} => {
264
let num_pipelines = receivers.len();
265
let (mut linearizer, inserters) =
266
Linearizer::<Priority<Reverse<MorselSeq>, Morsel>>::new_with_maintain_order(
267
num_pipelines,
268
*DEFAULT_LINEARIZER_BUFFER_SIZE,
269
maintain_order,
270
);
271
272
let seq_offset = self.seq_offset.load();
273
handles.push(scope.spawn_task(TaskPriority::High, async move {
274
while let Some(Priority(_, mut morsel)) = linearizer.get().await {
275
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
276
if send.0.send(morsel).await.is_err() {
277
break;
278
}
279
}
280
281
Ok(())
282
}));
283
284
for (mut recv, mut inserter) in receivers.into_iter().zip(inserters) {
285
handles.push(scope.spawn_task(TaskPriority::High, async move {
286
while let Ok(mut morsel) = recv.0.recv().await {
287
// Drop the consume token, but only after the send has succeeded. This
288
// ensures we have backpressure, but only once the channel fills up.
289
let consume_token = morsel.take_consume_token();
290
if inserter
291
.insert(Priority(Reverse(morsel.seq()), morsel))
292
.await
293
.is_err()
294
{
295
break;
296
}
297
drop(consume_token);
298
}
299
300
Ok(())
301
}));
302
}
303
},
304
305
State::NeedsDistributor { mut recv, senders } => {
306
let num_pipelines = senders.len();
307
let (mut distributor, distr_receivers) =
308
distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
309
310
let arc_seq_offset = self.seq_offset.clone();
311
handles.push(scope.spawn_task(TaskPriority::High, async move {
312
let mut seq_offset = arc_seq_offset.load();
313
let mut prev_orig_seq = None;
314
315
while let Ok(mut morsel) = recv.0.recv().await {
316
// We have to relabel sequence ids to be unique before distributing.
317
// Normally within a single pipeline consecutive ids may repeat but
318
// when distributing this would destroy the order.
319
if Some(morsel.seq()) == prev_orig_seq {
320
seq_offset += 1;
321
}
322
prev_orig_seq = Some(morsel.seq());
323
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
324
325
// Important: we have to drop the consume token before
326
// going into the buffered distributor.
327
drop(morsel.take_consume_token());
328
if distributor.send(morsel).await.is_err() {
329
break;
330
}
331
}
332
333
arc_seq_offset.store(seq_offset);
334
335
Ok(())
336
}));
337
338
for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) {
339
handles.push(scope.spawn_task(TaskPriority::High, async move {
340
let wait_group = WaitGroup::default();
341
while let Ok(mut morsel) = recv.recv().await {
342
morsel.set_consume_token(wait_group.token());
343
if send.0.send(morsel).await.is_err() {
344
break;
345
}
346
wait_group.wait().await;
347
}
348
349
Ok(())
350
}));
351
}
352
},
353
354
State::NeedsOffset { senders, receivers } => {
355
let seq_offset = self.seq_offset.load();
356
for (mut send, mut recv) in senders.into_iter().zip(receivers) {
357
handles.push(scope.spawn_task(TaskPriority::High, async move {
358
while let Ok(mut morsel) = recv.0.recv().await {
359
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
360
if send.0.send(morsel).await.is_err() {
361
break;
362
}
363
}
364
Ok(())
365
}));
366
}
367
},
368
}
369
}
370
}
371
372