Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/iejoin/mod.rs
6940 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
mod filtered_bit_array;
3
mod l1_l2;
4
5
use std::cmp::min;
6
7
use filtered_bit_array::FilteredBitArray;
8
use l1_l2::*;
9
use polars_core::chunked_array::ChunkedArray;
10
use polars_core::datatypes::{IdxCa, NumericNative, PolarsNumericType};
11
use polars_core::frame::DataFrame;
12
use polars_core::prelude::*;
13
use polars_core::series::IsSorted;
14
use polars_core::utils::{_set_partition_size, split};
15
use polars_core::{POOL, with_match_physical_numeric_polars_type};
16
use polars_error::{PolarsResult, polars_err};
17
use polars_utils::IdxSize;
18
use polars_utils::binary_search::ExponentialSearch;
19
use polars_utils::itertools::Itertools;
20
use polars_utils::total_ord::{TotalEq, TotalOrd};
21
use rayon::prelude::*;
22
#[cfg(feature = "serde")]
23
use serde::{Deserialize, Serialize};
24
25
use crate::frame::_finish_join;
26
27
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
28
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
29
pub enum InequalityOperator {
30
#[default]
31
Lt,
32
LtEq,
33
Gt,
34
GtEq,
35
}
36
37
impl InequalityOperator {
38
fn is_strict(&self) -> bool {
39
matches!(self, InequalityOperator::Gt | InequalityOperator::Lt)
40
}
41
}
42
#[derive(Clone, Debug, PartialEq, Eq, Default, Hash)]
43
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
44
pub struct IEJoinOptions {
45
pub operator1: InequalityOperator,
46
pub operator2: Option<InequalityOperator>,
47
}
48
49
#[allow(clippy::too_many_arguments)]
50
fn ie_join_impl_t<T: PolarsNumericType>(
51
slice: Option<(i64, usize)>,
52
l1_order: IdxCa,
53
l2_order: &[IdxSize],
54
op1: InequalityOperator,
55
op2: InequalityOperator,
56
x: Series,
57
y_ordered_by_x: Series,
58
left_height: usize,
59
) -> PolarsResult<(Vec<IdxSize>, Vec<IdxSize>)> {
60
// Create a bit array with order corresponding to L1,
61
// denoting which entries have been visited while traversing L2.
62
let mut bit_array = FilteredBitArray::from_len_zeroed(l1_order.len());
63
64
let mut left_row_idx: Vec<IdxSize> = vec![];
65
let mut right_row_idx: Vec<IdxSize> = vec![];
66
67
let slice_end = slice_end_index(slice);
68
let mut match_count = 0;
69
70
let ca: &ChunkedArray<T> = x.as_ref().as_ref();
71
let l1_array = build_l1_array(ca, &l1_order, left_height as IdxSize)?;
72
73
if op2.is_strict() {
74
// For strict inequalities, we rely on using a stable sort of l2 so that
75
// p values only increase as we traverse a run of equal y values.
76
// To handle inclusive comparisons in x and duplicate x values we also need the
77
// sort of l1 to be stable, so that the left hand side entries come before the right
78
// hand side entries (as we mark visited entries from the right hand side).
79
for &p in l2_order {
80
match_count += unsafe {
81
l1_array.process_entry(
82
p as usize,
83
&mut bit_array,
84
op1,
85
&mut left_row_idx,
86
&mut right_row_idx,
87
)
88
};
89
90
if slice_end.is_some_and(|end| match_count >= end) {
91
break;
92
}
93
}
94
} else {
95
let l2_array = build_l2_array(&y_ordered_by_x, l2_order)?;
96
97
// For non-strict inequalities in l2, we need to track runs of equal y values and only
98
// check for matches after we reach the end of the run and have marked all rhs entries
99
// in the run as visited.
100
let mut run_start = 0;
101
102
for i in 0..l2_array.len() {
103
// Elide bound checks
104
unsafe {
105
let item = l2_array.get_unchecked(i);
106
let p = item.l1_index;
107
l1_array.mark_visited(p as usize, &mut bit_array);
108
109
if item.run_end {
110
for l2_item in l2_array.get_unchecked(run_start..i + 1) {
111
let p = l2_item.l1_index;
112
match_count += l1_array.process_lhs_entry(
113
p as usize,
114
&bit_array,
115
op1,
116
&mut left_row_idx,
117
&mut right_row_idx,
118
);
119
}
120
121
run_start = i + 1;
122
123
if slice_end.is_some_and(|end| match_count >= end) {
124
break;
125
}
126
}
127
}
128
}
129
}
130
Ok((left_row_idx, right_row_idx))
131
}
132
133
fn piecewise_merge_join_impl_t<T, P>(
134
slice: Option<(i64, usize)>,
135
left_order: Option<&[IdxSize]>,
136
right_order: Option<&[IdxSize]>,
137
left_ordered: Series,
138
right_ordered: Series,
139
mut pred: P,
140
) -> PolarsResult<(Vec<IdxSize>, Vec<IdxSize>)>
141
where
142
T: PolarsNumericType,
143
P: FnMut(&T::Native, &T::Native) -> bool,
144
{
145
let slice_end = slice_end_index(slice);
146
147
let mut left_row_idx: Vec<IdxSize> = vec![];
148
let mut right_row_idx: Vec<IdxSize> = vec![];
149
150
let left_ca: &ChunkedArray<T> = left_ordered.as_ref().as_ref();
151
let right_ca: &ChunkedArray<T> = right_ordered.as_ref().as_ref();
152
153
debug_assert!(left_order.is_none_or(|order| order.len() == left_ca.len()));
154
debug_assert!(right_order.is_none_or(|order| order.len() == right_ca.len()));
155
156
let mut left_idx = 0;
157
let mut right_idx = 0;
158
let mut match_count = 0;
159
160
while left_idx < left_ca.len() {
161
debug_assert!(left_ca.get(left_idx).is_some());
162
let left_val = unsafe { left_ca.value_unchecked(left_idx) };
163
while right_idx < right_ca.len() {
164
debug_assert!(right_ca.get(right_idx).is_some());
165
let right_val = unsafe { right_ca.value_unchecked(right_idx) };
166
if pred(&left_val, &right_val) {
167
// If the predicate is true, then it will also be true for all
168
// remaining rows from the right side.
169
let left_row = match left_order {
170
None => left_idx as IdxSize,
171
Some(order) => order[left_idx],
172
};
173
let right_end_idx = match slice_end {
174
None => right_ca.len(),
175
Some(end) => min(right_ca.len(), (end as usize) - match_count + right_idx),
176
};
177
for included_right_row_idx in right_idx..right_end_idx {
178
let right_row = match right_order {
179
None => included_right_row_idx as IdxSize,
180
Some(order) => order[included_right_row_idx],
181
};
182
left_row_idx.push(left_row);
183
right_row_idx.push(right_row);
184
}
185
match_count += right_end_idx - right_idx;
186
break;
187
} else {
188
right_idx += 1;
189
}
190
}
191
if right_idx == right_ca.len() {
192
// We've reached the end of the right side
193
// so there can be no more matches for LHS rows
194
break;
195
}
196
if slice_end.is_some_and(|end| match_count >= end as usize) {
197
break;
198
}
199
left_idx += 1;
200
}
201
202
Ok((left_row_idx, right_row_idx))
203
}
204
205
pub(super) fn iejoin_par(
206
left: &DataFrame,
207
right: &DataFrame,
208
selected_left: Vec<Series>,
209
selected_right: Vec<Series>,
210
options: &IEJoinOptions,
211
suffix: Option<PlSmallStr>,
212
slice: Option<(i64, usize)>,
213
) -> PolarsResult<DataFrame> {
214
let l1_descending = matches!(
215
options.operator1,
216
InequalityOperator::Gt | InequalityOperator::GtEq
217
);
218
219
let l1_sort_options = SortOptions::default()
220
.with_maintain_order(true)
221
.with_nulls_last(false)
222
.with_order_descending(l1_descending);
223
224
let sl = &selected_left[0];
225
let l1_s_l = sl
226
.arg_sort(l1_sort_options)
227
.slice(sl.null_count() as i64, sl.len() - sl.null_count());
228
229
let sr = &selected_right[0];
230
let l1_s_r = sr
231
.arg_sort(l1_sort_options)
232
.slice(sr.null_count() as i64, sr.len() - sr.null_count());
233
234
// Because we do a cartesian product, the number of partitions is squared.
235
// We take the sqrt, but we don't expect every partition to produce results and work can be
236
// imbalanced, so we multiply the number of partitions by 2, which leads to 2^2= 4
237
let n_partitions = (_set_partition_size() as f32).sqrt() as usize * 2;
238
let splitted_a = split(&l1_s_l, n_partitions);
239
let splitted_b = split(&l1_s_r, n_partitions);
240
241
let cartesian_prod = splitted_a
242
.iter()
243
.flat_map(|l| splitted_b.iter().map(move |r| (l, r)))
244
.collect::<Vec<_>>();
245
246
let iter = cartesian_prod.par_iter().map(|(l_l1_idx, r_l1_idx)| {
247
if l_l1_idx.is_empty() || r_l1_idx.is_empty() {
248
return Ok(None);
249
}
250
fn get_extrema<'a>(
251
l1_idx: &'a IdxCa,
252
s: &'a Series,
253
) -> Option<(AnyValue<'a>, AnyValue<'a>)> {
254
let first = l1_idx.first()?;
255
let last = l1_idx.last()?;
256
257
let start = s.get(first as usize).unwrap();
258
let end = s.get(last as usize).unwrap();
259
260
Some(if start < end {
261
(start, end)
262
} else {
263
(end, start)
264
})
265
}
266
let Some((min_l, max_l)) = get_extrema(l_l1_idx, sl) else {
267
return Ok(None);
268
};
269
let Some((min_r, max_r)) = get_extrema(r_l1_idx, sr) else {
270
return Ok(None);
271
};
272
273
let include_block = match options.operator1 {
274
InequalityOperator::Lt => min_l < max_r,
275
InequalityOperator::LtEq => min_l <= max_r,
276
InequalityOperator::Gt => max_l > min_r,
277
InequalityOperator::GtEq => max_l >= min_r,
278
};
279
280
if include_block {
281
let (mut l, mut r) = unsafe {
282
(
283
selected_left
284
.iter()
285
.map(|s| s.take_unchecked(l_l1_idx))
286
.collect_vec(),
287
selected_right
288
.iter()
289
.map(|s| s.take_unchecked(r_l1_idx))
290
.collect_vec(),
291
)
292
};
293
let sorted_flag = if l1_descending {
294
IsSorted::Descending
295
} else {
296
IsSorted::Ascending
297
};
298
// We sorted using the first series
299
l[0].set_sorted_flag(sorted_flag);
300
r[0].set_sorted_flag(sorted_flag);
301
302
// Compute the row indexes
303
let (idx_l, idx_r) = if options.operator2.is_some() {
304
iejoin_tuples(l, r, options, None)
305
} else {
306
piecewise_merge_join_tuples(l, r, options, None)
307
}?;
308
309
if idx_l.is_empty() {
310
return Ok(None);
311
}
312
313
// These are row indexes in the slices we have given, so we use those to gather in the
314
// original l1 offset arrays. This gives us indexes in the original tables.
315
unsafe {
316
Ok(Some((
317
l_l1_idx.take_unchecked(&idx_l),
318
r_l1_idx.take_unchecked(&idx_r),
319
)))
320
}
321
} else {
322
Ok(None)
323
}
324
});
325
326
let row_indices = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
327
328
let mut left_idx = IdxCa::default();
329
let mut right_idx = IdxCa::default();
330
for (l, r) in row_indices.into_iter().flatten() {
331
left_idx.append(&l)?;
332
right_idx.append(&r)?;
333
}
334
if let Some((offset, end)) = slice {
335
left_idx = left_idx.slice(offset, end);
336
right_idx = right_idx.slice(offset, end);
337
}
338
339
unsafe { materialize_join(left, right, &left_idx, &right_idx, suffix) }
340
}
341
342
pub(super) fn iejoin(
343
left: &DataFrame,
344
right: &DataFrame,
345
selected_left: Vec<Series>,
346
selected_right: Vec<Series>,
347
options: &IEJoinOptions,
348
suffix: Option<PlSmallStr>,
349
slice: Option<(i64, usize)>,
350
) -> PolarsResult<DataFrame> {
351
let (left_row_idx, right_row_idx) = if options.operator2.is_some() {
352
iejoin_tuples(selected_left, selected_right, options, slice)
353
} else {
354
piecewise_merge_join_tuples(selected_left, selected_right, options, slice)
355
}?;
356
unsafe { materialize_join(left, right, &left_row_idx, &right_row_idx, suffix) }
357
}
358
359
unsafe fn materialize_join(
360
left: &DataFrame,
361
right: &DataFrame,
362
left_row_idx: &IdxCa,
363
right_row_idx: &IdxCa,
364
suffix: Option<PlSmallStr>,
365
) -> PolarsResult<DataFrame> {
366
try_raise_keyboard_interrupt();
367
let (join_left, join_right) = {
368
POOL.join(
369
|| left.take_unchecked(left_row_idx),
370
|| right.take_unchecked(right_row_idx),
371
)
372
};
373
374
_finish_join(join_left, join_right, suffix)
375
}
376
377
/// Inequality join. Matches rows between two DataFrames using two inequality operators
378
/// (one of [<, <=, >, >=]).
379
/// Based on Khayyat et al. 2015, "Lightning Fast and Space Efficient Inequality Joins"
380
/// and extended to work with duplicate values.
381
fn iejoin_tuples(
382
selected_left: Vec<Series>,
383
selected_right: Vec<Series>,
384
options: &IEJoinOptions,
385
slice: Option<(i64, usize)>,
386
) -> PolarsResult<(IdxCa, IdxCa)> {
387
if selected_left.len() != 2 {
388
return Err(
389
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the left DataFrame"),
390
);
391
};
392
if selected_right.len() != 2 {
393
return Err(
394
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the right DataFrame"),
395
);
396
};
397
398
let op1 = options.operator1;
399
let op2 = match options.operator2 {
400
None => {
401
return Err(polars_err!(ComputeError: "IEJoin requires two inequality operators"));
402
},
403
Some(op2) => op2,
404
};
405
406
// Determine the sort order based on the comparison operators used.
407
// We want to sort L1 so that "x[i] op1 x[j]" is true for j > i,
408
// and L2 so that "y[i] op2 y[j]" is true for j < i
409
// (except in the case of duplicates and strict inequalities).
410
// Note that the algorithms published in Khayyat et al. have incorrect logic for
411
// determining whether to sort descending.
412
let l1_descending = matches!(op1, InequalityOperator::Gt | InequalityOperator::GtEq);
413
let l2_descending = matches!(op2, InequalityOperator::Lt | InequalityOperator::LtEq);
414
415
let mut x = selected_left[0].to_physical_repr().into_owned();
416
let left_height = x.len();
417
418
x.extend(&selected_right[0].to_physical_repr())?;
419
// Rechunk because we will gather.
420
let x = x.rechunk();
421
422
let mut y = selected_left[1].to_physical_repr().into_owned();
423
y.extend(&selected_right[1].to_physical_repr())?;
424
// Rechunk because we will gather.
425
let y = y.rechunk();
426
427
let l1_sort_options = SortOptions::default()
428
.with_maintain_order(true)
429
.with_nulls_last(false)
430
.with_order_descending(l1_descending);
431
// Get ordering of x, skipping any null entries as these cannot be matches
432
let l1_order = x
433
.arg_sort(l1_sort_options)
434
.slice(x.null_count() as i64, x.len() - x.null_count());
435
436
let y_ordered_by_x = unsafe { y.take_unchecked(&l1_order) };
437
let l2_sort_options = SortOptions::default()
438
.with_maintain_order(true)
439
.with_nulls_last(false)
440
.with_order_descending(l2_descending);
441
// Get the indexes into l1, ordered by y values.
442
// l2_order is the same as "p" from Khayyat et al.
443
let l2_order = y_ordered_by_x.arg_sort(l2_sort_options).slice(
444
y_ordered_by_x.null_count() as i64,
445
y_ordered_by_x.len() - y_ordered_by_x.null_count(),
446
);
447
let l2_order = l2_order.rechunk();
448
let l2_order = l2_order.downcast_as_array().values().as_slice();
449
450
let (left_row_idx, right_row_idx) = with_match_physical_numeric_polars_type!(x.dtype(), |$T| {
451
ie_join_impl_t::<$T>(
452
slice,
453
l1_order,
454
l2_order,
455
op1,
456
op2,
457
x,
458
y_ordered_by_x,
459
left_height
460
)
461
})?;
462
463
debug_assert_eq!(left_row_idx.len(), right_row_idx.len());
464
let left_row_idx = IdxCa::from_vec("".into(), left_row_idx);
465
let right_row_idx = IdxCa::from_vec("".into(), right_row_idx);
466
let (left_row_idx, right_row_idx) = match slice {
467
None => (left_row_idx, right_row_idx),
468
Some((offset, len)) => (
469
left_row_idx.slice(offset, len),
470
right_row_idx.slice(offset, len),
471
),
472
};
473
Ok((left_row_idx, right_row_idx))
474
}
475
476
/// Piecewise merge join, for joins with only a single inequality.
477
fn piecewise_merge_join_tuples(
478
selected_left: Vec<Series>,
479
selected_right: Vec<Series>,
480
options: &IEJoinOptions,
481
slice: Option<(i64, usize)>,
482
) -> PolarsResult<(IdxCa, IdxCa)> {
483
if selected_left.len() != 1 {
484
return Err(
485
polars_err!(ComputeError: "Piecewise merge join requires exactly one expression from the left DataFrame"),
486
);
487
};
488
if selected_right.len() != 1 {
489
return Err(
490
polars_err!(ComputeError: "Piecewise merge join requires exactly one expression from the right DataFrame"),
491
);
492
};
493
if options.operator2.is_some() {
494
return Err(
495
polars_err!(ComputeError: "Piecewise merge join expects only one inequality operator"),
496
);
497
}
498
499
let op = options.operator1;
500
// The left side is sorted such that if the condition is false, it will also
501
// be false for the same RHS row and all following LHS rows.
502
// The right side is sorted such that if the condition is true then it is also
503
// true for the same LHS row and all following RHS rows.
504
// The desired sort order should match the l1 order used in iejoin_par
505
// so we don't need to re-sort slices when doing a parallel join.
506
let descending = matches!(op, InequalityOperator::Gt | InequalityOperator::GtEq);
507
508
let left = selected_left[0].to_physical_repr().into_owned();
509
let mut right = selected_right[0].to_physical_repr().into_owned();
510
let must_cast = right.dtype().matches_schema_type(left.dtype())?;
511
if must_cast {
512
right = right.cast(left.dtype())?;
513
}
514
515
fn get_sorted(series: Series, descending: bool) -> (Series, Option<IdxCa>) {
516
let expected_flag = if descending {
517
IsSorted::Descending
518
} else {
519
IsSorted::Ascending
520
};
521
if (series.is_sorted_flag() == expected_flag || series.len() <= 1) && !series.has_nulls() {
522
// Fast path, no need to re-sort
523
(series, None)
524
} else {
525
let sort_options = SortOptions::default()
526
.with_nulls_last(false)
527
.with_order_descending(descending);
528
529
// Get order and slice to ignore any null values, which cannot be match results
530
let mut order = series.arg_sort(sort_options).slice(
531
series.null_count() as i64,
532
series.len() - series.null_count(),
533
);
534
order.rechunk_mut();
535
let ordered = unsafe { series.take_unchecked(&order) };
536
(ordered, Some(order))
537
}
538
}
539
540
let (left_ordered, left_order) = get_sorted(left, descending);
541
debug_assert!(
542
left_order
543
.as_ref()
544
.is_none_or(|order| order.chunks().len() == 1)
545
);
546
let left_order = left_order
547
.as_ref()
548
.map(|order| order.downcast_get(0).unwrap().values().as_slice());
549
550
let (right_ordered, right_order) = get_sorted(right, descending);
551
debug_assert!(
552
right_order
553
.as_ref()
554
.is_none_or(|order| order.chunks().len() == 1)
555
);
556
let right_order = right_order
557
.as_ref()
558
.map(|order| order.downcast_get(0).unwrap().values().as_slice());
559
560
let (left_row_idx, right_row_idx) = with_match_physical_numeric_polars_type!(left_ordered.dtype(), |$T| {
561
match op {
562
InequalityOperator::Lt => piecewise_merge_join_impl_t::<$T, _>(
563
slice,
564
left_order,
565
right_order,
566
left_ordered,
567
right_ordered,
568
|l, r| l.tot_lt(r),
569
),
570
InequalityOperator::LtEq => piecewise_merge_join_impl_t::<$T, _>(
571
slice,
572
left_order,
573
right_order,
574
left_ordered,
575
right_ordered,
576
|l, r| l.tot_le(r),
577
),
578
InequalityOperator::Gt => piecewise_merge_join_impl_t::<$T, _>(
579
slice,
580
left_order,
581
right_order,
582
left_ordered,
583
right_ordered,
584
|l, r| l.tot_gt(r),
585
),
586
InequalityOperator::GtEq => piecewise_merge_join_impl_t::<$T, _>(
587
slice,
588
left_order,
589
right_order,
590
left_ordered,
591
right_ordered,
592
|l, r| l.tot_ge(r),
593
),
594
}
595
})?;
596
597
debug_assert_eq!(left_row_idx.len(), right_row_idx.len());
598
let left_row_idx = IdxCa::from_vec("".into(), left_row_idx);
599
let right_row_idx = IdxCa::from_vec("".into(), right_row_idx);
600
let (left_row_idx, right_row_idx) = match slice {
601
None => (left_row_idx, right_row_idx),
602
Some((offset, len)) => (
603
left_row_idx.slice(offset, len),
604
right_row_idx.slice(offset, len),
605
),
606
};
607
Ok((left_row_idx, right_row_idx))
608
}
609
610
fn slice_end_index(slice: Option<(i64, usize)>) -> Option<i64> {
611
match slice {
612
Some((offset, len)) if offset >= 0 => Some(offset.saturating_add_unsigned(len as u64)),
613
_ => None,
614
}
615
}
616
617