Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/pipe.rs
6939 views
1
use std::cmp::Reverse;
2
use std::sync::Arc;
3
4
use polars_error::PolarsResult;
5
use polars_utils::priority::Priority;
6
use polars_utils::relaxed_cell::RelaxedCell;
7
8
use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
9
use crate::async_primitives::connector::{Receiver, Sender, connector};
10
use crate::async_primitives::distributor_channel::distributor_channel;
11
use crate::async_primitives::linearizer::Linearizer;
12
use crate::async_primitives::wait_group::WaitGroup;
13
use crate::morsel::{Morsel, MorselSeq};
14
use crate::{DEFAULT_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_LINEARIZER_BUFFER_SIZE};
15
16
pub struct PhysicalPipe {
17
state: State,
18
seq_offset: Arc<RelaxedCell<u64>>,
19
}
20
21
enum State {
22
Invalid,
23
Uninit {
24
num_pipelines: usize,
25
},
26
SerialReceiver {
27
num_pipelines: usize,
28
send: Sender<Morsel>,
29
maintain_order: bool,
30
},
31
ParallelReceiver {
32
senders: Vec<Sender<Morsel>>,
33
},
34
NeedsLinearizer {
35
receivers: Vec<Receiver<Morsel>>,
36
send: Sender<Morsel>,
37
maintain_order: bool,
38
},
39
NeedsDistributor {
40
recv: Receiver<Morsel>,
41
senders: Vec<Sender<Morsel>>,
42
},
43
NeedsOffset {
44
senders: Vec<Sender<Morsel>>,
45
receivers: Vec<Receiver<Morsel>>,
46
},
47
Initialized,
48
}
49
50
pub struct SendPort<'a>(&'a mut PhysicalPipe);
51
pub struct RecvPort<'a>(&'a mut PhysicalPipe);
52
53
impl RecvPort<'_> {
54
pub fn serial(self) -> Receiver<Morsel> {
55
self.serial_with_maintain_order(true)
56
}
57
58
pub fn serial_with_maintain_order(self, maintain_order: bool) -> Receiver<Morsel> {
59
let State::Uninit { num_pipelines } = self.0.state else {
60
unreachable!()
61
};
62
let (send, recv) = connector();
63
self.0.state = State::SerialReceiver {
64
num_pipelines,
65
send,
66
maintain_order,
67
};
68
recv
69
}
70
71
pub fn parallel(self) -> Vec<Receiver<Morsel>> {
72
let State::Uninit { num_pipelines } = self.0.state else {
73
unreachable!()
74
};
75
let (senders, receivers): (Vec<Sender<Morsel>>, Vec<Receiver<Morsel>>) =
76
(0..num_pipelines).map(|_| connector()).unzip();
77
self.0.state = State::ParallelReceiver { senders };
78
receivers
79
}
80
}
81
82
impl SendPort<'_> {
83
#[allow(unused)]
84
pub fn is_receiver_serial(&self) -> bool {
85
matches!(self.0.state, State::SerialReceiver { .. })
86
}
87
88
pub fn serial(self) -> Sender<Morsel> {
89
match core::mem::replace(&mut self.0.state, State::Invalid) {
90
State::SerialReceiver { send, .. } => {
91
if self.0.seq_offset.load() == 0 {
92
self.0.state = State::Initialized;
93
send
94
} else {
95
let (offset_send, offset_recv) = connector();
96
self.0.state = State::NeedsOffset {
97
senders: vec![send],
98
receivers: vec![offset_recv],
99
};
100
offset_send
101
}
102
},
103
State::ParallelReceiver { senders } => {
104
let (send, recv) = connector();
105
self.0.state = State::NeedsDistributor { recv, senders };
106
send
107
},
108
_ => unreachable!(),
109
}
110
}
111
112
pub fn parallel(self) -> Vec<Sender<Morsel>> {
113
match core::mem::replace(&mut self.0.state, State::Invalid) {
114
State::SerialReceiver {
115
num_pipelines,
116
send,
117
maintain_order,
118
} => {
119
let (senders, receivers): (Vec<Sender<Morsel>>, Vec<Receiver<Morsel>>) =
120
(0..num_pipelines).map(|_| connector()).unzip();
121
self.0.state = State::NeedsLinearizer {
122
receivers,
123
send,
124
maintain_order,
125
};
126
senders
127
},
128
State::ParallelReceiver { senders } => {
129
if self.0.seq_offset.load() == 0 {
130
self.0.state = State::Initialized;
131
senders
132
} else {
133
let (offset_senders, offset_receivers): (
134
Vec<Sender<Morsel>>,
135
Vec<Receiver<Morsel>>,
136
) = senders.iter().map(|_| connector()).unzip();
137
self.0.state = State::NeedsOffset {
138
senders,
139
receivers: offset_receivers,
140
};
141
offset_senders
142
}
143
},
144
_ => unreachable!(),
145
}
146
}
147
}
148
149
impl PhysicalPipe {
150
pub fn new(num_pipelines: usize, seq_offset: Arc<RelaxedCell<u64>>) -> Self {
151
Self {
152
state: State::Uninit { num_pipelines },
153
seq_offset,
154
}
155
}
156
157
pub fn recv_port(&mut self) -> RecvPort<'_> {
158
assert!(
159
matches!(self.state, State::Uninit { .. }),
160
"PhysicalPipe::recv_port can only be called on an uninitialized pipe"
161
);
162
RecvPort(self)
163
}
164
165
pub fn send_port(&mut self) -> SendPort<'_> {
166
assert!(
167
matches!(
168
self.state,
169
State::SerialReceiver { .. } | State::ParallelReceiver { .. }
170
),
171
"PhysicalPipe::send_port must be called on a pipe which only has its receive port initialized"
172
);
173
SendPort(self)
174
}
175
176
pub fn spawn<'env, 's>(
177
&'env mut self,
178
scope: &'s TaskScope<'s, 'env>,
179
handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
180
) {
181
match core::mem::replace(&mut self.state, State::Initialized) {
182
State::Invalid
183
| State::Uninit { .. }
184
| State::SerialReceiver { .. }
185
| State::ParallelReceiver { .. } => {
186
panic!("PhysicalPipe::spawn called on (partially) initialized pipe");
187
},
188
189
State::Initialized => {},
190
191
State::NeedsLinearizer {
192
receivers,
193
mut send,
194
maintain_order,
195
} => {
196
let num_pipelines = receivers.len();
197
let (mut linearizer, inserters) =
198
Linearizer::<Priority<Reverse<MorselSeq>, Morsel>>::new_with_maintain_order(
199
num_pipelines,
200
*DEFAULT_LINEARIZER_BUFFER_SIZE,
201
maintain_order,
202
);
203
204
let seq_offset = self.seq_offset.load();
205
handles.push(scope.spawn_task(TaskPriority::High, async move {
206
while let Some(Priority(_, mut morsel)) = linearizer.get().await {
207
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
208
if send.send(morsel).await.is_err() {
209
break;
210
}
211
}
212
213
Ok(())
214
}));
215
216
for (mut recv, mut inserter) in receivers.into_iter().zip(inserters) {
217
handles.push(scope.spawn_task(TaskPriority::High, async move {
218
while let Ok(mut morsel) = recv.recv().await {
219
// Drop the consume token, but only after the send has succeeded. This
220
// ensures we have backpressure, but only once the channel fills up.
221
let consume_token = morsel.take_consume_token();
222
if inserter
223
.insert(Priority(Reverse(morsel.seq()), morsel))
224
.await
225
.is_err()
226
{
227
break;
228
}
229
drop(consume_token);
230
}
231
232
Ok(())
233
}));
234
}
235
},
236
237
State::NeedsDistributor { mut recv, senders } => {
238
let num_pipelines = senders.len();
239
let (mut distributor, distr_receivers) =
240
distributor_channel(num_pipelines, *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
241
242
let arc_seq_offset = self.seq_offset.clone();
243
handles.push(scope.spawn_task(TaskPriority::High, async move {
244
let mut seq_offset = arc_seq_offset.load();
245
let mut prev_orig_seq = None;
246
247
while let Ok(mut morsel) = recv.recv().await {
248
// We have to relabel sequence ids to be unique before distributing.
249
// Normally within a single pipeline consecutive ids may repeat but
250
// when distributing this would destroy the order.
251
if Some(morsel.seq()) == prev_orig_seq {
252
seq_offset += 1;
253
}
254
prev_orig_seq = Some(morsel.seq());
255
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
256
257
// Important: we have to drop the consume token before
258
// going into the buffered distributor.
259
drop(morsel.take_consume_token());
260
if distributor.send(morsel).await.is_err() {
261
break;
262
}
263
}
264
265
arc_seq_offset.store(seq_offset);
266
267
Ok(())
268
}));
269
270
for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) {
271
handles.push(scope.spawn_task(TaskPriority::High, async move {
272
let wait_group = WaitGroup::default();
273
while let Ok(mut morsel) = recv.recv().await {
274
morsel.set_consume_token(wait_group.token());
275
if send.send(morsel).await.is_err() {
276
break;
277
}
278
wait_group.wait().await;
279
}
280
281
Ok(())
282
}));
283
}
284
},
285
286
State::NeedsOffset { senders, receivers } => {
287
let seq_offset = self.seq_offset.load();
288
for (mut send, mut recv) in senders.into_iter().zip(receivers) {
289
handles.push(scope.spawn_task(TaskPriority::High, async move {
290
while let Ok(mut morsel) = recv.recv().await {
291
morsel.set_seq(morsel.seq().offset_by_u64(seq_offset));
292
if send.send(morsel).await.is_err() {
293
break;
294
}
295
}
296
Ok(())
297
}));
298
}
299
},
300
}
301
}
302
}
303
304