Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/merge_sorted.rs
6939 views
1
use std::collections::VecDeque;
2
3
use polars_core::prelude::ChunkCompareIneq;
4
use polars_ops::frame::_merge_sorted_dfs;
5
6
use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;
7
use crate::async_primitives::connector::Receiver;
8
use crate::async_primitives::distributor_channel::distributor_channel;
9
use crate::morsel::{SourceToken, get_ideal_morsel_size};
10
use crate::nodes::compute_node_prelude::*;
11
12
/// Performs `merge_sorted` with the last column being regarded as the key column. This key column
13
/// is also popped in the send pipe.
14
pub struct MergeSortedNode {
15
seq: MorselSeq,
16
17
starting_nulls: bool,
18
19
// Not yet merged buffers.
20
left_unmerged: VecDeque<DataFrame>,
21
right_unmerged: VecDeque<DataFrame>,
22
}
23
24
impl MergeSortedNode {
25
pub fn new() -> Self {
26
Self {
27
seq: MorselSeq::default(),
28
29
starting_nulls: false,
30
31
left_unmerged: VecDeque::new(),
32
right_unmerged: VecDeque::new(),
33
}
34
}
35
}
36
37
/// Find a part amongst both unmerged buffers which is mergeable.
38
///
39
/// This returns `None` if there is nothing mergeable at this point.
40
fn find_mergeable(
41
left_unmerged: &mut VecDeque<DataFrame>,
42
right_unmerged: &mut VecDeque<DataFrame>,
43
44
is_first: bool,
45
starting_nulls: &mut bool,
46
) -> PolarsResult<Option<(DataFrame, DataFrame)>> {
47
fn first_non_empty(vd: &mut VecDeque<DataFrame>) -> Option<DataFrame> {
48
let mut df = vd.pop_front()?;
49
while df.height() == 0 {
50
df = vd.pop_front()?;
51
}
52
Some(df)
53
}
54
55
loop {
56
let (mut left, mut right) = match (
57
first_non_empty(left_unmerged),
58
first_non_empty(right_unmerged),
59
) {
60
(Some(l), Some(r)) => (l, r),
61
(Some(l), None) => {
62
left_unmerged.push_front(l);
63
return Ok(None);
64
},
65
(None, Some(r)) => {
66
right_unmerged.push_front(r);
67
return Ok(None);
68
},
69
(None, None) => return Ok(None),
70
};
71
72
let left_key = left.get_columns().last().unwrap();
73
let right_key = right.get_columns().last().unwrap();
74
75
let left_null_count = left_key.null_count();
76
let right_null_count = right_key.null_count();
77
78
let has_nulls = left_null_count > 0 || right_null_count > 0;
79
80
// If we are on the first morsel we need to decide whether we have
81
// nulls first or not.
82
if is_first
83
&& has_nulls
84
&& (left_key.head(Some(1)).has_nulls() || right_key.head(Some(1)).has_nulls())
85
{
86
*starting_nulls = true;
87
}
88
89
// For both left and right, find row index of the minimum of the maxima
90
// of the left and right key columns. We can safely merge until this
91
// point.
92
let mut left_cutoff = left.height();
93
let mut right_cutoff = right.height();
94
95
let left_key_last = left_key.tail(Some(1));
96
let right_key_last = right_key.tail(Some(1));
97
98
// We already made sure we had data to work with.
99
assert!(!left_key_last.is_empty());
100
assert!(!right_key_last.is_empty());
101
102
if has_nulls {
103
if *starting_nulls {
104
// If there are starting nulls do those first, then repeat
105
// without the nulls.
106
left_cutoff = left_null_count;
107
right_cutoff = right_null_count;
108
} else {
109
// If there are ending nulls then first do things without the
110
// nulls and then repeat with only the nulls the nulls.
111
let left_is_all_nulls = left_null_count == left.height();
112
let right_is_all_nulls = right_null_count == right.height();
113
114
match (left_is_all_nulls, right_is_all_nulls) {
115
(false, false) => {
116
let left_nulls;
117
let right_nulls;
118
(left, left_nulls) =
119
left.split_at((left.height() - left_null_count) as i64);
120
(right, right_nulls) =
121
right.split_at((right.height() - right_null_count) as i64);
122
123
left_unmerged.push_front(left_nulls);
124
left_unmerged.push_front(left);
125
right_unmerged.push_front(right_nulls);
126
right_unmerged.push_front(right);
127
continue;
128
},
129
(true, false) => left_cutoff = 0,
130
(false, true) => right_cutoff = 0,
131
(true, true) => {},
132
}
133
}
134
} else if left_key_last.lt(&right_key_last)?.all() {
135
// @TODO: This is essentially search sorted, but that does not
136
// support categoricals at moment.
137
let gt_mask = right_key.gt(&left_key_last)?;
138
right_cutoff = gt_mask.downcast_as_array().values().leading_zeros();
139
} else if left_key_last.gt(&right_key_last)?.all() {
140
// @TODO: This is essentially search sorted, but that does not
141
// support categoricals at moment.
142
let gt_mask = left_key.gt(&right_key_last)?;
143
left_cutoff = gt_mask.downcast_as_array().values().leading_zeros();
144
}
145
146
let left_mergeable: DataFrame;
147
let right_mergeable: DataFrame;
148
(left_mergeable, left) = left.split_at(left_cutoff as i64);
149
(right_mergeable, right) = right.split_at(right_cutoff as i64);
150
151
if !left.is_empty() {
152
left_unmerged.push_front(left);
153
}
154
if !right.is_empty() {
155
right_unmerged.push_front(right);
156
}
157
158
return Ok(Some((left_mergeable, right_mergeable)));
159
}
160
}
161
162
fn remove_key_column(df: &mut DataFrame) {
163
// SAFETY:
164
// - We only pop so height stays same.
165
// - We only pop so no new name collisions.
166
// - We clear schema afterwards.
167
unsafe { df.get_columns_mut().pop().unwrap() };
168
df.clear_schema();
169
}
170
171
impl ComputeNode for MergeSortedNode {
172
fn name(&self) -> &str {
173
"merge-sorted"
174
}
175
176
fn update_state(
177
&mut self,
178
recv: &mut [PortState],
179
send: &mut [PortState],
180
_state: &StreamingExecutionState,
181
) -> PolarsResult<()> {
182
assert_eq!(send.len(), 1);
183
assert_eq!(recv.len(), 2);
184
185
// Abstraction: we merge buffer state with port state so we can map
186
// to one three possible 'effective' states:
187
// no data now (_blocked); data available (); or no data anymore (_done)
188
let left_done = recv[0] == PortState::Done && self.left_unmerged.is_empty();
189
let right_done = recv[1] == PortState::Done && self.right_unmerged.is_empty();
190
191
// We're done as soon as one side is done.
192
if send[0] == PortState::Done || (left_done && right_done) {
193
recv[0] = PortState::Done;
194
recv[1] = PortState::Done;
195
send[0] = PortState::Done;
196
return Ok(());
197
}
198
199
// Each port is ready to proceed unless one of the other ports is effectively
200
// blocked. For example:
201
// - [Blocked with empty buffer, Ready] [Ready] returns [Ready, Blocked] [Blocked]
202
// - [Blocked with non-empty buffer, Ready] [Ready] returns [Ready, Ready, Ready]
203
let send_blocked = send[0] == PortState::Blocked;
204
let left_blocked = recv[0] == PortState::Blocked && self.left_unmerged.is_empty();
205
let right_blocked = recv[1] == PortState::Blocked && self.right_unmerged.is_empty();
206
send[0] = if left_blocked || right_blocked {
207
PortState::Blocked
208
} else {
209
PortState::Ready
210
};
211
recv[0] = if send_blocked || right_blocked {
212
PortState::Blocked
213
} else {
214
PortState::Ready
215
};
216
recv[1] = if send_blocked || left_blocked {
217
PortState::Blocked
218
} else {
219
PortState::Ready
220
};
221
222
Ok(())
223
}
224
225
fn spawn<'env, 's>(
226
&'env mut self,
227
scope: &'s TaskScope<'s, 'env>,
228
recv_ports: &mut [Option<RecvPort<'_>>],
229
send_ports: &mut [Option<SendPort<'_>>],
230
_state: &'s StreamingExecutionState,
231
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
232
) {
233
assert_eq!(recv_ports.len(), 2);
234
assert_eq!(send_ports.len(), 1);
235
236
let send = send_ports[0].take().unwrap().parallel();
237
238
let seq = &mut self.seq;
239
let starting_nulls = &mut self.starting_nulls;
240
let left_unmerged = &mut self.left_unmerged;
241
let right_unmerged = &mut self.right_unmerged;
242
243
match (recv_ports[0].take(), recv_ports[1].take()) {
244
// If we do not need to merge or flush anymore, just start passing the port in
245
// parallel.
246
(Some(port), None) | (None, Some(port))
247
if left_unmerged.is_empty() && right_unmerged.is_empty() =>
248
{
249
let recv = port.parallel();
250
let inner_handles = recv
251
.into_iter()
252
.zip(send)
253
.map(|(mut recv, mut send)| {
254
let morsel_offset = *seq;
255
scope.spawn_task(TaskPriority::High, async move {
256
let mut max_seq = morsel_offset;
257
while let Ok(mut morsel) = recv.recv().await {
258
// Ensure the morsel sequence id stream is monotone non-decreasing.
259
let seq = morsel.seq().offset_by(morsel_offset);
260
max_seq = max_seq.max(seq);
261
262
remove_key_column(morsel.df_mut());
263
264
morsel.set_seq(seq);
265
if send.send(morsel).await.is_err() {
266
break;
267
}
268
}
269
max_seq
270
})
271
})
272
.collect::<Vec<_>>();
273
274
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
275
// Update our global maximum.
276
for handle in inner_handles {
277
*seq = (*seq).max(handle.await);
278
}
279
Ok(())
280
}));
281
},
282
283
// This is the base case. Either:
284
// - Both streams are still open and we still need to merge.
285
// - One or both streams are closed stream is closed and we still have some buffered
286
// data.
287
(left, right) => {
288
async fn buffer_unmerged(
289
port: &mut Receiver<Morsel>,
290
unmerged: &mut VecDeque<DataFrame>,
291
) {
292
// If a stop was requested, we need to buffer the remaining
293
// morsels and trigger a phase transition.
294
let Ok(morsel) = port.recv().await else {
295
return;
296
};
297
298
// Request the port stop producing morsels.
299
morsel.source_token().stop();
300
301
// Buffer all the morsels that were already produced.
302
unmerged.push_back(morsel.into_df());
303
while let Ok(morsel) = port.recv().await {
304
unmerged.push_back(morsel.into_df());
305
}
306
}
307
308
let (mut distributor, dist_recv) =
309
distributor_channel(send.len(), *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
310
311
let mut left = left.map(|p| p.serial());
312
let mut right = right.map(|p| p.serial());
313
314
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
315
let source_token = SourceToken::new();
316
317
// While we can still load data for the empty side.
318
while (left.is_some() || right.is_some())
319
&& !(left.is_none() && left_unmerged.is_empty())
320
&& !(right.is_none() && right_unmerged.is_empty())
321
{
322
// If we have morsels from both input ports, find until where we can merge
323
// them and send that on to be merged.
324
while let Some((left_mergeable, right_mergeable)) = find_mergeable(
325
left_unmerged,
326
right_unmerged,
327
seq.to_u64() == 0,
328
starting_nulls,
329
)? {
330
let left_mergeable =
331
Morsel::new(left_mergeable, *seq, source_token.clone());
332
*seq = seq.successor();
333
334
if distributor
335
.send((left_mergeable, right_mergeable))
336
.await
337
.is_err()
338
{
339
return Ok(());
340
};
341
}
342
343
if source_token.stop_requested() {
344
// Request that a port stops producing morsels and buffers all the
345
// remaining morsels.
346
if let Some(p) = &mut left {
347
buffer_unmerged(p, left_unmerged).await;
348
}
349
if let Some(p) = &mut right {
350
buffer_unmerged(p, right_unmerged).await;
351
}
352
break;
353
}
354
355
assert!(left_unmerged.is_empty() || right_unmerged.is_empty());
356
let (empty_port, empty_unmerged) = match (
357
left_unmerged.is_empty(),
358
right_unmerged.is_empty(),
359
left.as_mut(),
360
right.as_mut(),
361
) {
362
(true, _, Some(left), _) => (left, &mut *left_unmerged),
363
(_, true, _, Some(right)) => (right, &mut *right_unmerged),
364
365
// If the port that is empty is closed, we don't need to merge anymore.
366
_ => break,
367
};
368
369
// Try to get a new morsel from the empty side.
370
let Ok(m) = empty_port.recv().await else {
371
if let Some(p) = &mut left {
372
buffer_unmerged(p, left_unmerged).await;
373
}
374
if let Some(p) = &mut right {
375
buffer_unmerged(p, right_unmerged).await;
376
}
377
break;
378
};
379
empty_unmerged.push_back(m.into_df());
380
}
381
382
// Clear out buffers until we cannot anymore. This helps allows us to go to the
383
// parallel case faster.
384
while let Some((left_mergeable, right_mergeable)) = find_mergeable(
385
left_unmerged,
386
right_unmerged,
387
seq.to_u64() == 0,
388
starting_nulls,
389
)? {
390
let left_mergeable =
391
Morsel::new(left_mergeable, *seq, source_token.clone());
392
*seq = seq.successor();
393
394
if distributor
395
.send((left_mergeable, right_mergeable))
396
.await
397
.is_err()
398
{
399
return Ok(());
400
};
401
}
402
403
// If one of the ports is done and does not have buffered data anymore, we
404
// flush the data on the other side. After this point, this node just pipes
405
// data through.
406
let pass = if left.is_none() && left_unmerged.is_empty() {
407
Some((right.as_mut(), &mut *right_unmerged))
408
} else if right.is_none() && right_unmerged.is_empty() {
409
Some((left.as_mut(), &mut *left_unmerged))
410
} else {
411
None
412
};
413
if let Some((pass_port, pass_unmerged)) = pass {
414
for df in std::mem::take(pass_unmerged) {
415
let m = Morsel::new(df, *seq, source_token.clone());
416
*seq = seq.successor();
417
if distributor.send((m, DataFrame::empty())).await.is_err() {
418
return Ok(());
419
}
420
}
421
422
// Start passing on the port that is port that is still open.
423
if let Some(pass_port) = pass_port {
424
let Ok(mut m) = pass_port.recv().await else {
425
return Ok(());
426
};
427
if source_token.stop_requested() {
428
m.source_token().stop();
429
}
430
m.set_seq(*seq);
431
*seq = seq.successor();
432
if distributor.send((m, DataFrame::empty())).await.is_err() {
433
return Ok(());
434
}
435
436
while let Ok(mut m) = pass_port.recv().await {
437
m.set_seq(*seq);
438
*seq = seq.successor();
439
if distributor.send((m, DataFrame::empty())).await.is_err() {
440
return Ok(());
441
}
442
}
443
}
444
}
445
446
Ok(())
447
}));
448
449
// Task that actually merges the two dataframes. Since this merge might be very
450
// expensive, this is split over several tasks.
451
join_handles.extend(dist_recv.into_iter().zip(send).map(|(mut recv, mut send)| {
452
let ideal_morsel_size = get_ideal_morsel_size();
453
scope.spawn_task(TaskPriority::High, async move {
454
while let Ok((mut left, mut right)) = recv.recv().await {
455
// When we are flushing the buffer, we will just send one morsel from
456
// the input. We don't want to mess with the source token or wait group
457
// and just pass it on.
458
if right.is_empty() {
459
remove_key_column(left.df_mut());
460
461
if send.send(left).await.is_err() {
462
return Ok(());
463
}
464
continue;
465
}
466
467
let (mut left, seq, source_token, wg) = left.into_inner();
468
assert!(wg.is_none());
469
470
let left_s = left
471
.get_columns()
472
.last()
473
.unwrap()
474
.as_materialized_series()
475
.clone();
476
let right_s = right
477
.get_columns()
478
.last()
479
.unwrap()
480
.as_materialized_series()
481
.clone();
482
483
remove_key_column(&mut left);
484
remove_key_column(&mut right);
485
486
let merged =
487
_merge_sorted_dfs(&left, &right, &left_s, &right_s, false)?;
488
489
if ideal_morsel_size > 1 && merged.height() > ideal_morsel_size {
490
// The merged dataframe will have at most doubled in size from the
491
// input so we can divide by half.
492
let (m1, m2) = merged.split_at((merged.height() / 2) as i64);
493
494
// MorselSeq have to be monotonely non-decreasing so we can
495
// pass the same sequence token twice.
496
let morsel = Morsel::new(m1, seq, source_token.clone());
497
if send.send(morsel).await.is_err() {
498
break;
499
}
500
let morsel = Morsel::new(m2, seq, source_token.clone());
501
if send.send(morsel).await.is_err() {
502
break;
503
}
504
} else {
505
let morsel = Morsel::new(merged, seq, source_token.clone());
506
if send.send(morsel).await.is_err() {
507
break;
508
}
509
}
510
}
511
512
Ok(())
513
})
514
}));
515
},
516
}
517
}
518
}
519
520