Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/mod.rs
8430 views
1
mod args;
2
#[cfg(feature = "asof_join")]
3
mod asof;
4
mod cross_join;
5
mod dispatch_left_right;
6
mod general;
7
mod hash_join;
8
#[cfg(feature = "iejoin")]
9
mod iejoin;
10
pub mod merge_join;
11
#[cfg(feature = "merge_sorted")]
12
mod merge_sorted;
13
14
use std::borrow::Cow;
15
use std::fmt::{Debug, Display, Formatter};
16
use std::hash::Hash;
17
18
pub use args::*;
19
use arrow::trusted_len::TrustedLen;
20
#[cfg(feature = "asof_join")]
21
pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
22
pub use cross_join::CrossJoin;
23
#[cfg(feature = "chunked_ids")]
24
use either::Either;
25
#[cfg(feature = "chunked_ids")]
26
use general::create_chunked_index_mapping;
27
pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
28
pub use hash_join::*;
29
use hashbrown::hash_map::{Entry, RawEntryMut};
30
#[cfg(feature = "iejoin")]
31
pub use iejoin::{IEJoinOptions, InequalityOperator};
32
#[cfg(feature = "merge_sorted")]
33
pub use merge_sorted::_merge_sorted_dfs;
34
use polars_core::POOL;
35
#[allow(unused_imports)]
36
use polars_core::chunked_array::ops::row_encode::{
37
encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
38
};
39
use polars_core::datatypes::DataType;
40
use polars_core::hashing::_HASHMAP_INIT_SIZE;
41
use polars_core::prelude::*;
42
pub(super) use polars_core::series::IsSorted;
43
use polars_core::utils::slice_offsets;
44
#[allow(unused_imports)]
45
use polars_core::utils::slice_slice;
46
use polars_utils::hashing::BytesHash;
47
use rayon::prelude::*;
48
49
use self::cross_join::fused_cross_filter;
50
use super::IntoDf;
51
52
pub trait DataFrameJoinOps: IntoDf {
53
/// Generic join method. Can be used to join on multiple columns.
54
///
55
/// # Example
56
///
57
/// ```no_run
58
/// # use polars_core::prelude::*;
59
/// # use polars_ops::prelude::*;
60
/// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
61
/// "Phosphorus (mg/100g)" => &[11, 22, 12])?;
62
/// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
63
/// "Potassium (mg/100g)" => &[107, 358, 115])?;
64
///
65
/// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
66
/// None)?;
67
/// assert_eq!(df3.shape(), (3, 3));
68
/// println!("{}", df3);
69
/// # Ok::<(), PolarsError>(())
70
/// ```
71
///
72
/// Output:
73
///
74
/// ```text
75
/// shape: (3, 3)
76
/// +--------+----------------------+---------------------+
77
/// | Fruit | Phosphorus (mg/100g) | Potassium (mg/100g) |
78
/// | --- | --- | --- |
79
/// | str | i32 | i32 |
80
/// +========+======================+=====================+
81
/// | Apple | 11 | 107 |
82
/// +--------+----------------------+---------------------+
83
/// | Banana | 22 | 358 |
84
/// +--------+----------------------+---------------------+
85
/// | Pear | 12 | 115 |
86
/// +--------+----------------------+---------------------+
87
/// ```
88
fn join(
89
&self,
90
other: &DataFrame,
91
left_on: impl IntoIterator<Item = impl AsRef<str>>,
92
right_on: impl IntoIterator<Item = impl AsRef<str>>,
93
args: JoinArgs,
94
options: Option<JoinTypeOptions>,
95
) -> PolarsResult<DataFrame> {
96
let df_left = self.to_df();
97
let selected_left = df_left.select_to_vec(left_on)?;
98
let selected_right = other.select_to_vec(right_on)?;
99
100
let selected_left = selected_left
101
.into_iter()
102
.map(Column::take_materialized_series)
103
.collect::<Vec<_>>();
104
let selected_right = selected_right
105
.into_iter()
106
.map(Column::take_materialized_series)
107
.collect::<Vec<_>>();
108
109
self._join_impl(
110
other,
111
selected_left,
112
selected_right,
113
args,
114
options,
115
true,
116
false,
117
)
118
}
119
120
#[doc(hidden)]
121
#[allow(clippy::too_many_arguments)]
122
#[allow(unused_mut)]
123
fn _join_impl(
124
&self,
125
other: &DataFrame,
126
mut selected_left: Vec<Series>,
127
mut selected_right: Vec<Series>,
128
mut args: JoinArgs,
129
options: Option<JoinTypeOptions>,
130
_check_rechunk: bool,
131
_verbose: bool,
132
) -> PolarsResult<DataFrame> {
133
let left_df = self.to_df();
134
135
#[cfg(feature = "cross_join")]
136
if let JoinType::Cross = args.how {
137
if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
138
assert!(args.slice.is_none());
139
return fused_cross_filter(
140
left_df,
141
other,
142
args.suffix.clone(),
143
cross_options,
144
args.maintain_order,
145
);
146
}
147
return left_df.cross_join(other, args.suffix.clone(), args.slice, args.maintain_order);
148
}
149
150
// Clear literals if a frame is empty. Otherwise we could get an oob
151
fn clear(s: &mut [Series]) {
152
for s in s.iter_mut() {
153
if s.len() == 1 {
154
*s = s.clear()
155
}
156
}
157
}
158
if left_df.height() == 0 {
159
clear(&mut selected_left);
160
}
161
if other.height() == 0 {
162
clear(&mut selected_right);
163
}
164
165
let should_coalesce = args.should_coalesce();
166
assert_eq!(selected_left.len(), selected_right.len());
167
168
#[cfg(feature = "chunked_ids")]
169
{
170
// a left join create chunked-ids
171
// the others not yet.
172
// TODO! change this to other join types once they support chunked-id joins
173
if _check_rechunk
174
&& !(matches!(args.how, JoinType::Left)
175
|| std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
176
{
177
let mut left = Cow::Borrowed(left_df);
178
let mut right = Cow::Borrowed(other);
179
if left_df.should_rechunk() {
180
if _verbose {
181
eprintln!(
182
"{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
183
args.how,
184
left_df.width()
185
);
186
}
187
188
let mut tmp_left = left_df.clone();
189
tmp_left.rechunk_mut_par();
190
left = Cow::Owned(tmp_left);
191
}
192
if other.should_rechunk() {
193
if _verbose {
194
eprintln!(
195
"{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
196
args.how,
197
other.width()
198
);
199
}
200
let mut tmp_right = other.clone();
201
tmp_right.rechunk_mut_par();
202
right = Cow::Owned(tmp_right);
203
}
204
return left._join_impl(
205
&right,
206
selected_left,
207
selected_right,
208
args,
209
options,
210
false,
211
_verbose,
212
);
213
}
214
}
215
216
if let Some((l, r)) = selected_left
217
.iter()
218
.zip(&selected_right)
219
.find(|(l, r)| l.dtype() != r.dtype())
220
{
221
polars_bail!(
222
ComputeError:
223
"datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
224
l.name(), l.dtype().pretty_format(), r.name(), r.dtype().pretty_format()
225
);
226
};
227
228
#[cfg(feature = "iejoin")]
229
if let JoinType::IEJoin = args.how {
230
let Some(JoinTypeOptions::IEJoin(options)) = options else {
231
unreachable!()
232
};
233
let func = if POOL.current_num_threads() > 1
234
&& !left_df.shape_has_zero()
235
&& !other.shape_has_zero()
236
{
237
iejoin::iejoin_par
238
} else {
239
iejoin::iejoin
240
};
241
return func(
242
left_df,
243
other,
244
selected_left,
245
selected_right,
246
&options,
247
args.suffix,
248
args.slice,
249
);
250
}
251
252
// Single keys.
253
if selected_left.len() == 1 {
254
let s_left = &selected_left[0];
255
let s_right = &selected_right[0];
256
let drop_names: Option<Vec<PlSmallStr>> =
257
if should_coalesce { None } else { Some(vec![]) };
258
return match args.how {
259
JoinType::Inner => left_df
260
._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
261
JoinType::Left => dispatch_left_right::left_join_from_series(
262
self.to_df().clone(),
263
other,
264
s_left,
265
s_right,
266
args,
267
_verbose,
268
drop_names,
269
),
270
JoinType::Right => dispatch_left_right::right_join_from_series(
271
self.to_df(),
272
other.clone(),
273
s_left,
274
s_right,
275
args,
276
_verbose,
277
drop_names,
278
),
279
JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
280
#[cfg(feature = "semi_anti_join")]
281
JoinType::Anti => left_df._semi_anti_join_from_series(
282
s_left,
283
s_right,
284
args.slice,
285
true,
286
args.nulls_equal,
287
),
288
#[cfg(feature = "semi_anti_join")]
289
JoinType::Semi => left_df._semi_anti_join_from_series(
290
s_left,
291
s_right,
292
args.slice,
293
false,
294
args.nulls_equal,
295
),
296
#[cfg(feature = "asof_join")]
297
JoinType::AsOf(options) => match (options.left_by, options.right_by) {
298
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
299
other,
300
s_left,
301
s_right,
302
left_by,
303
right_by,
304
options.strategy,
305
options.tolerance.map(|v| v.into_value()),
306
args.suffix.clone(),
307
args.slice,
308
should_coalesce,
309
options.allow_eq,
310
options.check_sortedness,
311
),
312
(None, None) => left_df._join_asof(
313
other,
314
s_left,
315
s_right,
316
options.strategy,
317
options.tolerance.map(|v| v.into_value()),
318
args.suffix,
319
args.slice,
320
should_coalesce,
321
options.allow_eq,
322
options.check_sortedness,
323
),
324
_ => {
325
panic!("expected by arguments on both sides")
326
},
327
},
328
#[cfg(feature = "iejoin")]
329
JoinType::IEJoin => {
330
unreachable!()
331
},
332
JoinType::Cross => {
333
unreachable!()
334
},
335
};
336
}
337
let (lhs_keys, rhs_keys) = if (left_df.height() == 0 || other.height() == 0)
338
&& matches!(&args.how, JoinType::Inner)
339
{
340
// Fast path for empty inner joins.
341
// Return 2 dummies so that we don't row-encode.
342
let a = Series::full_null("".into(), 0, &DataType::Null);
343
(a.clone(), a)
344
} else {
345
// Row encode the keys.
346
(
347
prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
348
prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
349
)
350
};
351
352
let drop_names = if should_coalesce {
353
if args.how == JoinType::Right {
354
selected_left
355
.iter()
356
.map(|s| s.name().clone())
357
.collect::<Vec<_>>()
358
} else {
359
selected_right
360
.iter()
361
.map(|s| s.name().clone())
362
.collect::<Vec<_>>()
363
}
364
} else {
365
vec![]
366
};
367
368
// Multiple keys.
369
match args.how {
370
#[cfg(feature = "asof_join")]
371
JoinType::AsOf(_) => polars_bail!(
372
ComputeError: "asof join not supported for join on multiple keys"
373
),
374
#[cfg(feature = "iejoin")]
375
JoinType::IEJoin => {
376
unreachable!()
377
},
378
JoinType::Cross => {
379
unreachable!()
380
},
381
JoinType::Full => {
382
let names_left = selected_left
383
.iter()
384
.map(|s| s.name().clone())
385
.collect::<Vec<_>>();
386
args.coalesce = JoinCoalesce::KeepColumns;
387
let suffix = args.suffix.clone();
388
let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
389
390
if should_coalesce {
391
Ok(_coalesce_full_join(
392
out?,
393
names_left.as_slice(),
394
drop_names.as_slice(),
395
suffix,
396
left_df,
397
))
398
} else {
399
out
400
}
401
},
402
JoinType::Inner => left_df._inner_join_from_series(
403
other,
404
&lhs_keys,
405
&rhs_keys,
406
args,
407
_verbose,
408
Some(drop_names),
409
),
410
JoinType::Left => dispatch_left_right::left_join_from_series(
411
left_df.clone(),
412
other,
413
&lhs_keys,
414
&rhs_keys,
415
args,
416
_verbose,
417
Some(drop_names),
418
),
419
JoinType::Right => dispatch_left_right::right_join_from_series(
420
left_df,
421
other.clone(),
422
&lhs_keys,
423
&rhs_keys,
424
args,
425
_verbose,
426
Some(drop_names),
427
),
428
#[cfg(feature = "semi_anti_join")]
429
JoinType::Anti | JoinType::Semi => self._join_impl(
430
other,
431
vec![lhs_keys],
432
vec![rhs_keys],
433
args,
434
options,
435
_check_rechunk,
436
_verbose,
437
),
438
}
439
}
440
441
/// Perform an inner join on two DataFrames.
442
///
443
/// # Example
444
///
445
/// ```
446
/// # use polars_core::prelude::*;
447
/// # use polars_ops::prelude::*;
448
/// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
449
/// left.inner_join(right, ["join_column_left"], ["join_column_right"])
450
/// }
451
/// ```
452
fn inner_join(
453
&self,
454
other: &DataFrame,
455
left_on: impl IntoIterator<Item = impl AsRef<str>>,
456
right_on: impl IntoIterator<Item = impl AsRef<str>>,
457
) -> PolarsResult<DataFrame> {
458
self.join(
459
other,
460
left_on,
461
right_on,
462
JoinArgs::new(JoinType::Inner),
463
None,
464
)
465
}
466
467
/// Perform a left outer join on two DataFrames
468
/// # Example
469
///
470
/// ```no_run
471
/// # use polars_core::prelude::*;
472
/// # use polars_ops::prelude::*;
473
/// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
474
/// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
475
/// "Wavelength nm" => &[480.0, 577.0, 650.0])?;
476
///
477
/// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
478
/// println!("{:?}", df3);
479
/// # Ok::<(), PolarsError>(())
480
/// ```
481
///
482
/// Output:
483
///
484
/// ```text
485
/// shape: (5, 2)
486
/// +-----------------+--------+
487
/// | Wavelength (nm) | Color |
488
/// | --- | --- |
489
/// | f64 | str |
490
/// +=================+========+
491
/// | 480 | Blue |
492
/// +-----------------+--------+
493
/// | 650 | Red |
494
/// +-----------------+--------+
495
/// | 577 | Yellow |
496
/// +-----------------+--------+
497
/// | 1201 | null |
498
/// +-----------------+--------+
499
/// | 100 | null |
500
/// +-----------------+--------+
501
/// ```
502
fn left_join(
503
&self,
504
other: &DataFrame,
505
left_on: impl IntoIterator<Item = impl AsRef<str>>,
506
right_on: impl IntoIterator<Item = impl AsRef<str>>,
507
) -> PolarsResult<DataFrame> {
508
self.join(
509
other,
510
left_on,
511
right_on,
512
JoinArgs::new(JoinType::Left),
513
None,
514
)
515
}
516
517
/// Perform a full outer join on two DataFrames
518
/// # Example
519
///
520
/// ```
521
/// # use polars_core::prelude::*;
522
/// # use polars_ops::prelude::*;
523
/// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
524
/// left.full_join(right, ["join_column_left"], ["join_column_right"])
525
/// }
526
/// ```
527
fn full_join(
528
&self,
529
other: &DataFrame,
530
left_on: impl IntoIterator<Item = impl AsRef<str>>,
531
right_on: impl IntoIterator<Item = impl AsRef<str>>,
532
) -> PolarsResult<DataFrame> {
533
self.join(
534
other,
535
left_on,
536
right_on,
537
JoinArgs::new(JoinType::Full),
538
None,
539
)
540
}
541
}
542
543
trait DataFrameJoinOpsPrivate: IntoDf {
544
fn _inner_join_from_series(
545
&self,
546
other: &DataFrame,
547
s_left: &Series,
548
s_right: &Series,
549
args: JoinArgs,
550
verbose: bool,
551
drop_names: Option<Vec<PlSmallStr>>,
552
) -> PolarsResult<DataFrame> {
553
let left_df = self.to_df();
554
let ((join_tuples_left, join_tuples_right), sorted) =
555
_sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
556
557
let mut join_tuples_left = &*join_tuples_left;
558
let mut join_tuples_right = &*join_tuples_right;
559
560
if let Some((offset, len)) = args.slice {
561
join_tuples_left = slice_slice(join_tuples_left, offset, len);
562
join_tuples_right = slice_slice(join_tuples_right, offset, len);
563
}
564
565
let other = if let Some(drop_names) = drop_names {
566
other.drop_many(drop_names)
567
} else {
568
other.drop(s_right.name()).unwrap()
569
};
570
571
let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
572
if sorted {
573
left.set_sorted_flag(IsSorted::Ascending);
574
}
575
let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
576
577
let already_left_sorted = sorted
578
&& matches!(
579
args.maintain_order,
580
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
581
);
582
try_raise_keyboard_interrupt();
583
let (df_left, df_right) =
584
if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
585
let mut df = unsafe {
586
DataFrame::new_unchecked_infer_height(vec![
587
left.into_series().into(),
588
right.into_series().into(),
589
])
590
};
591
592
let columns = match args.maintain_order {
593
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
594
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
595
_ => unreachable!(),
596
};
597
598
let options = SortMultipleOptions::new()
599
.with_order_descending(false)
600
.with_maintain_order(true);
601
602
df.sort_in_place(columns, options)?;
603
604
let [mut a, b]: [Column; 2] = df.into_columns().try_into().unwrap();
605
if matches!(
606
args.maintain_order,
607
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
608
) {
609
a.set_sorted_flag(IsSorted::Ascending);
610
}
611
612
POOL.join(
613
// SAFETY: join indices are known to be in bounds
614
|| unsafe { left_df.take_unchecked(a.idx().unwrap()) },
615
|| unsafe { other.take_unchecked(b.idx().unwrap()) },
616
)
617
} else {
618
POOL.join(
619
// SAFETY: join indices are known to be in bounds
620
|| unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
621
|| unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
622
)
623
};
624
625
_finish_join(df_left, df_right, args.suffix)
626
}
627
}
628
629
impl DataFrameJoinOps for DataFrame {}
630
impl DataFrameJoinOpsPrivate for DataFrame {}
631
632
fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
633
let keys = s
634
.iter()
635
.map(|s| {
636
let phys = s.to_physical_repr();
637
match phys.dtype() {
638
#[cfg(feature = "dtype-f16")]
639
DataType::Float16 => phys.f16().unwrap().to_canonical().into_column(),
640
DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
641
DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
642
_ => phys.into_owned().into_column(),
643
}
644
})
645
.collect::<Vec<_>>();
646
647
if nulls_equal {
648
encode_rows_vertical_par_unordered(&keys)
649
} else {
650
encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
651
}
652
}
653
pub fn private_left_join_multiple_keys(
654
a: &DataFrame,
655
b: &DataFrame,
656
nulls_equal: bool,
657
) -> PolarsResult<LeftJoinIds> {
658
// @scalar-opt
659
let a_cols = a
660
.columns()
661
.iter()
662
.map(|c| c.as_materialized_series().clone())
663
.collect::<Vec<_>>();
664
let b_cols = b
665
.columns()
666
.iter()
667
.map(|c| c.as_materialized_series().clone())
668
.collect::<Vec<_>>();
669
670
let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
671
let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
672
sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
673
}
674
675