Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/mod.rs
6940 views
1
mod args;
2
#[cfg(feature = "asof_join")]
3
mod asof;
4
mod cross_join;
5
mod dispatch_left_right;
6
mod general;
7
mod hash_join;
8
#[cfg(feature = "iejoin")]
9
mod iejoin;
10
#[cfg(feature = "merge_sorted")]
11
mod merge_sorted;
12
13
use std::borrow::Cow;
14
use std::fmt::{Debug, Display, Formatter};
15
use std::hash::Hash;
16
17
pub use args::*;
18
use arrow::trusted_len::TrustedLen;
19
#[cfg(feature = "asof_join")]
20
pub use asof::{AsOfOptions, AsofJoin, AsofJoinBy, AsofStrategy};
21
pub use cross_join::CrossJoin;
22
#[cfg(feature = "chunked_ids")]
23
use either::Either;
24
#[cfg(feature = "chunked_ids")]
25
use general::create_chunked_index_mapping;
26
pub use general::{_coalesce_full_join, _finish_join, _join_suffix_name};
27
pub use hash_join::*;
28
use hashbrown::hash_map::{Entry, RawEntryMut};
29
#[cfg(feature = "iejoin")]
30
pub use iejoin::{IEJoinOptions, InequalityOperator};
31
#[cfg(feature = "merge_sorted")]
32
pub use merge_sorted::_merge_sorted_dfs;
33
use polars_core::POOL;
34
#[allow(unused_imports)]
35
use polars_core::chunked_array::ops::row_encode::{
36
encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
37
};
38
use polars_core::hashing::_HASHMAP_INIT_SIZE;
39
use polars_core::prelude::*;
40
pub(super) use polars_core::series::IsSorted;
41
use polars_core::utils::slice_offsets;
42
#[allow(unused_imports)]
43
use polars_core::utils::slice_slice;
44
use polars_utils::hashing::BytesHash;
45
use rayon::prelude::*;
46
47
use self::cross_join::fused_cross_filter;
48
use super::IntoDf;
49
50
pub trait DataFrameJoinOps: IntoDf {
51
/// Generic join method. Can be used to join on multiple columns.
52
///
53
/// # Example
54
///
55
/// ```no_run
56
/// # use polars_core::prelude::*;
57
/// # use polars_ops::prelude::*;
58
/// let df1: DataFrame = df!("Fruit" => &["Apple", "Banana", "Pear"],
59
/// "Phosphorus (mg/100g)" => &[11, 22, 12])?;
60
/// let df2: DataFrame = df!("Name" => &["Apple", "Banana", "Pear"],
61
/// "Potassium (mg/100g)" => &[107, 358, 115])?;
62
///
63
/// let df3: DataFrame = df1.join(&df2, ["Fruit"], ["Name"], JoinArgs::new(JoinType::Inner),
64
/// None)?;
65
/// assert_eq!(df3.shape(), (3, 3));
66
/// println!("{}", df3);
67
/// # Ok::<(), PolarsError>(())
68
/// ```
69
///
70
/// Output:
71
///
72
/// ```text
73
/// shape: (3, 3)
74
/// +--------+----------------------+---------------------+
75
/// | Fruit | Phosphorus (mg/100g) | Potassium (mg/100g) |
76
/// | --- | --- | --- |
77
/// | str | i32 | i32 |
78
/// +========+======================+=====================+
79
/// | Apple | 11 | 107 |
80
/// +--------+----------------------+---------------------+
81
/// | Banana | 22 | 358 |
82
/// +--------+----------------------+---------------------+
83
/// | Pear | 12 | 115 |
84
/// +--------+----------------------+---------------------+
85
/// ```
86
fn join(
87
&self,
88
other: &DataFrame,
89
left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
90
right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
91
args: JoinArgs,
92
options: Option<JoinTypeOptions>,
93
) -> PolarsResult<DataFrame> {
94
let df_left = self.to_df();
95
let selected_left = df_left.select_columns(left_on)?;
96
let selected_right = other.select_columns(right_on)?;
97
98
let selected_left = selected_left
99
.into_iter()
100
.map(Column::take_materialized_series)
101
.collect::<Vec<_>>();
102
let selected_right = selected_right
103
.into_iter()
104
.map(Column::take_materialized_series)
105
.collect::<Vec<_>>();
106
107
self._join_impl(
108
other,
109
selected_left,
110
selected_right,
111
args,
112
options,
113
true,
114
false,
115
)
116
}
117
118
#[doc(hidden)]
119
#[allow(clippy::too_many_arguments)]
120
#[allow(unused_mut)]
121
fn _join_impl(
122
&self,
123
other: &DataFrame,
124
mut selected_left: Vec<Series>,
125
mut selected_right: Vec<Series>,
126
mut args: JoinArgs,
127
options: Option<JoinTypeOptions>,
128
_check_rechunk: bool,
129
_verbose: bool,
130
) -> PolarsResult<DataFrame> {
131
let left_df = self.to_df();
132
133
#[cfg(feature = "cross_join")]
134
if let JoinType::Cross = args.how {
135
if let Some(JoinTypeOptions::Cross(cross_options)) = &options {
136
assert!(args.slice.is_none());
137
return fused_cross_filter(left_df, other, args.suffix.clone(), cross_options);
138
}
139
return left_df.cross_join(other, args.suffix.clone(), args.slice);
140
}
141
142
// Clear literals if a frame is empty. Otherwise we could get an oob
143
fn clear(s: &mut [Series]) {
144
for s in s.iter_mut() {
145
if s.len() == 1 {
146
*s = s.clear()
147
}
148
}
149
}
150
if left_df.is_empty() {
151
clear(&mut selected_left);
152
}
153
if other.is_empty() {
154
clear(&mut selected_right);
155
}
156
157
let should_coalesce = args.should_coalesce();
158
assert_eq!(selected_left.len(), selected_right.len());
159
160
#[cfg(feature = "chunked_ids")]
161
{
162
// a left join create chunked-ids
163
// the others not yet.
164
// TODO! change this to other join types once they support chunked-id joins
165
if _check_rechunk
166
&& !(matches!(args.how, JoinType::Left)
167
|| std::env::var("POLARS_NO_CHUNKED_JOIN").is_ok())
168
{
169
let mut left = Cow::Borrowed(left_df);
170
let mut right = Cow::Borrowed(other);
171
if left_df.should_rechunk() {
172
if _verbose {
173
eprintln!(
174
"{:?} join triggered a rechunk of the left DataFrame: {} columns are affected",
175
args.how,
176
left_df.width()
177
);
178
}
179
180
let mut tmp_left = left_df.clone();
181
tmp_left.as_single_chunk_par();
182
left = Cow::Owned(tmp_left);
183
}
184
if other.should_rechunk() {
185
if _verbose {
186
eprintln!(
187
"{:?} join triggered a rechunk of the right DataFrame: {} columns are affected",
188
args.how,
189
other.width()
190
);
191
}
192
let mut tmp_right = other.clone();
193
tmp_right.as_single_chunk_par();
194
right = Cow::Owned(tmp_right);
195
}
196
return left._join_impl(
197
&right,
198
selected_left,
199
selected_right,
200
args,
201
options,
202
false,
203
_verbose,
204
);
205
}
206
}
207
208
if let Some((l, r)) = selected_left
209
.iter()
210
.zip(&selected_right)
211
.find(|(l, r)| l.dtype() != r.dtype())
212
{
213
polars_bail!(
214
ComputeError:
215
format!(
216
"datatypes of join keys don't match - `{}`: {} on left does not match `{}`: {} on right",
217
l.name(), l.dtype(), r.name(), r.dtype()
218
)
219
);
220
};
221
222
#[cfg(feature = "iejoin")]
223
if let JoinType::IEJoin = args.how {
224
let Some(JoinTypeOptions::IEJoin(options)) = options else {
225
unreachable!()
226
};
227
let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
228
{
229
iejoin::iejoin_par
230
} else {
231
iejoin::iejoin
232
};
233
return func(
234
left_df,
235
other,
236
selected_left,
237
selected_right,
238
&options,
239
args.suffix,
240
args.slice,
241
);
242
}
243
244
// Single keys.
245
if selected_left.len() == 1 {
246
let s_left = &selected_left[0];
247
let s_right = &selected_right[0];
248
let drop_names: Option<Vec<PlSmallStr>> =
249
if should_coalesce { None } else { Some(vec![]) };
250
return match args.how {
251
JoinType::Inner => left_df
252
._inner_join_from_series(other, s_left, s_right, args, _verbose, drop_names),
253
JoinType::Left => dispatch_left_right::left_join_from_series(
254
self.to_df().clone(),
255
other,
256
s_left,
257
s_right,
258
args,
259
_verbose,
260
drop_names,
261
),
262
JoinType::Right => dispatch_left_right::right_join_from_series(
263
self.to_df(),
264
other.clone(),
265
s_left,
266
s_right,
267
args,
268
_verbose,
269
drop_names,
270
),
271
JoinType::Full => left_df._full_join_from_series(other, s_left, s_right, args),
272
#[cfg(feature = "semi_anti_join")]
273
JoinType::Anti => left_df._semi_anti_join_from_series(
274
s_left,
275
s_right,
276
args.slice,
277
true,
278
args.nulls_equal,
279
),
280
#[cfg(feature = "semi_anti_join")]
281
JoinType::Semi => left_df._semi_anti_join_from_series(
282
s_left,
283
s_right,
284
args.slice,
285
false,
286
args.nulls_equal,
287
),
288
#[cfg(feature = "asof_join")]
289
JoinType::AsOf(options) => match (options.left_by, options.right_by) {
290
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
291
other,
292
s_left,
293
s_right,
294
left_by,
295
right_by,
296
options.strategy,
297
options.tolerance.map(|v| v.into_value()),
298
args.suffix.clone(),
299
args.slice,
300
should_coalesce,
301
options.allow_eq,
302
options.check_sortedness,
303
),
304
(None, None) => left_df._join_asof(
305
other,
306
s_left,
307
s_right,
308
options.strategy,
309
options.tolerance.map(|v| v.into_value()),
310
args.suffix,
311
args.slice,
312
should_coalesce,
313
options.allow_eq,
314
options.check_sortedness,
315
),
316
_ => {
317
panic!("expected by arguments on both sides")
318
},
319
},
320
#[cfg(feature = "iejoin")]
321
JoinType::IEJoin => {
322
unreachable!()
323
},
324
JoinType::Cross => {
325
unreachable!()
326
},
327
};
328
}
329
let (lhs_keys, rhs_keys) =
330
if (left_df.is_empty() || other.is_empty()) && matches!(&args.how, JoinType::Inner) {
331
// Fast path for empty inner joins.
332
// Return 2 dummies so that we don't row-encode.
333
let a = Series::full_null("".into(), 0, &DataType::Null);
334
(a.clone(), a)
335
} else {
336
// Row encode the keys.
337
(
338
prepare_keys_multiple(&selected_left, args.nulls_equal)?.into_series(),
339
prepare_keys_multiple(&selected_right, args.nulls_equal)?.into_series(),
340
)
341
};
342
343
let drop_names = if should_coalesce {
344
if args.how == JoinType::Right {
345
selected_left
346
.iter()
347
.map(|s| s.name().clone())
348
.collect::<Vec<_>>()
349
} else {
350
selected_right
351
.iter()
352
.map(|s| s.name().clone())
353
.collect::<Vec<_>>()
354
}
355
} else {
356
vec![]
357
};
358
359
// Multiple keys.
360
match args.how {
361
#[cfg(feature = "asof_join")]
362
JoinType::AsOf(_) => polars_bail!(
363
ComputeError: "asof join not supported for join on multiple keys"
364
),
365
#[cfg(feature = "iejoin")]
366
JoinType::IEJoin => {
367
unreachable!()
368
},
369
JoinType::Cross => {
370
unreachable!()
371
},
372
JoinType::Full => {
373
let names_left = selected_left
374
.iter()
375
.map(|s| s.name().clone())
376
.collect::<Vec<_>>();
377
args.coalesce = JoinCoalesce::KeepColumns;
378
let suffix = args.suffix.clone();
379
let out = left_df._full_join_from_series(other, &lhs_keys, &rhs_keys, args);
380
381
if should_coalesce {
382
Ok(_coalesce_full_join(
383
out?,
384
names_left.as_slice(),
385
drop_names.as_slice(),
386
suffix,
387
left_df,
388
))
389
} else {
390
out
391
}
392
},
393
JoinType::Inner => left_df._inner_join_from_series(
394
other,
395
&lhs_keys,
396
&rhs_keys,
397
args,
398
_verbose,
399
Some(drop_names),
400
),
401
JoinType::Left => dispatch_left_right::left_join_from_series(
402
left_df.clone(),
403
other,
404
&lhs_keys,
405
&rhs_keys,
406
args,
407
_verbose,
408
Some(drop_names),
409
),
410
JoinType::Right => dispatch_left_right::right_join_from_series(
411
left_df,
412
other.clone(),
413
&lhs_keys,
414
&rhs_keys,
415
args,
416
_verbose,
417
Some(drop_names),
418
),
419
#[cfg(feature = "semi_anti_join")]
420
JoinType::Anti | JoinType::Semi => self._join_impl(
421
other,
422
vec![lhs_keys],
423
vec![rhs_keys],
424
args,
425
options,
426
_check_rechunk,
427
_verbose,
428
),
429
}
430
}
431
432
/// Perform an inner join on two DataFrames.
433
///
434
/// # Example
435
///
436
/// ```
437
/// # use polars_core::prelude::*;
438
/// # use polars_ops::prelude::*;
439
/// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
440
/// left.inner_join(right, ["join_column_left"], ["join_column_right"])
441
/// }
442
/// ```
443
fn inner_join(
444
&self,
445
other: &DataFrame,
446
left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
447
right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
448
) -> PolarsResult<DataFrame> {
449
self.join(
450
other,
451
left_on,
452
right_on,
453
JoinArgs::new(JoinType::Inner),
454
None,
455
)
456
}
457
458
/// Perform a left outer join on two DataFrames
459
/// # Example
460
///
461
/// ```no_run
462
/// # use polars_core::prelude::*;
463
/// # use polars_ops::prelude::*;
464
/// let df1: DataFrame = df!("Wavelength (nm)" => &[480.0, 650.0, 577.0, 1201.0, 100.0])?;
465
/// let df2: DataFrame = df!("Color" => &["Blue", "Yellow", "Red"],
466
/// "Wavelength nm" => &[480.0, 577.0, 650.0])?;
467
///
468
/// let df3: DataFrame = df1.left_join(&df2, ["Wavelength (nm)"], ["Wavelength nm"])?;
469
/// println!("{:?}", df3);
470
/// # Ok::<(), PolarsError>(())
471
/// ```
472
///
473
/// Output:
474
///
475
/// ```text
476
/// shape: (5, 2)
477
/// +-----------------+--------+
478
/// | Wavelength (nm) | Color |
479
/// | --- | --- |
480
/// | f64 | str |
481
/// +=================+========+
482
/// | 480 | Blue |
483
/// +-----------------+--------+
484
/// | 650 | Red |
485
/// +-----------------+--------+
486
/// | 577 | Yellow |
487
/// +-----------------+--------+
488
/// | 1201 | null |
489
/// +-----------------+--------+
490
/// | 100 | null |
491
/// +-----------------+--------+
492
/// ```
493
fn left_join(
494
&self,
495
other: &DataFrame,
496
left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
497
right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
498
) -> PolarsResult<DataFrame> {
499
self.join(
500
other,
501
left_on,
502
right_on,
503
JoinArgs::new(JoinType::Left),
504
None,
505
)
506
}
507
508
/// Perform a full outer join on two DataFrames
509
/// # Example
510
///
511
/// ```
512
/// # use polars_core::prelude::*;
513
/// # use polars_ops::prelude::*;
514
/// fn join_dfs(left: &DataFrame, right: &DataFrame) -> PolarsResult<DataFrame> {
515
/// left.full_join(right, ["join_column_left"], ["join_column_right"])
516
/// }
517
/// ```
518
fn full_join(
519
&self,
520
other: &DataFrame,
521
left_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
522
right_on: impl IntoIterator<Item = impl Into<PlSmallStr>>,
523
) -> PolarsResult<DataFrame> {
524
self.join(
525
other,
526
left_on,
527
right_on,
528
JoinArgs::new(JoinType::Full),
529
None,
530
)
531
}
532
}
533
534
trait DataFrameJoinOpsPrivate: IntoDf {
535
fn _inner_join_from_series(
536
&self,
537
other: &DataFrame,
538
s_left: &Series,
539
s_right: &Series,
540
args: JoinArgs,
541
verbose: bool,
542
drop_names: Option<Vec<PlSmallStr>>,
543
) -> PolarsResult<DataFrame> {
544
let left_df = self.to_df();
545
let ((join_tuples_left, join_tuples_right), sorted) =
546
_sort_or_hash_inner(s_left, s_right, verbose, args.validation, args.nulls_equal)?;
547
548
let mut join_tuples_left = &*join_tuples_left;
549
let mut join_tuples_right = &*join_tuples_right;
550
551
if let Some((offset, len)) = args.slice {
552
join_tuples_left = slice_slice(join_tuples_left, offset, len);
553
join_tuples_right = slice_slice(join_tuples_right, offset, len);
554
}
555
556
let other = if let Some(drop_names) = drop_names {
557
other.drop_many(drop_names)
558
} else {
559
other.drop(s_right.name()).unwrap()
560
};
561
562
let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) };
563
if sorted {
564
left.set_sorted_flag(IsSorted::Ascending);
565
}
566
let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) };
567
568
let already_left_sorted = sorted
569
&& matches!(
570
args.maintain_order,
571
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
572
);
573
try_raise_keyboard_interrupt();
574
let (df_left, df_right) =
575
if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted {
576
let mut df =
577
DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?;
578
579
let columns = match args.maintain_order {
580
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
581
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"],
582
_ => unreachable!(),
583
};
584
585
let options = SortMultipleOptions::new()
586
.with_order_descending(false)
587
.with_maintain_order(true);
588
589
df.sort_in_place(columns, options)?;
590
591
let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap();
592
if matches!(
593
args.maintain_order,
594
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
595
) {
596
a.set_sorted_flag(IsSorted::Ascending);
597
}
598
599
POOL.join(
600
// SAFETY: join indices are known to be in bounds
601
|| unsafe { left_df.take_unchecked(a.idx().unwrap()) },
602
|| unsafe { other.take_unchecked(b.idx().unwrap()) },
603
)
604
} else {
605
POOL.join(
606
// SAFETY: join indices are known to be in bounds
607
|| unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) },
608
|| unsafe { other.take_unchecked(right.into_series().idx().unwrap()) },
609
)
610
};
611
612
_finish_join(df_left, df_right, args.suffix)
613
}
614
}
615
616
impl DataFrameJoinOps for DataFrame {}
617
impl DataFrameJoinOpsPrivate for DataFrame {}
618
619
fn prepare_keys_multiple(s: &[Series], nulls_equal: bool) -> PolarsResult<BinaryOffsetChunked> {
620
let keys = s
621
.iter()
622
.map(|s| {
623
let phys = s.to_physical_repr();
624
match phys.dtype() {
625
DataType::Float32 => phys.f32().unwrap().to_canonical().into_column(),
626
DataType::Float64 => phys.f64().unwrap().to_canonical().into_column(),
627
_ => phys.into_owned().into_column(),
628
}
629
})
630
.collect::<Vec<_>>();
631
632
if nulls_equal {
633
encode_rows_vertical_par_unordered(&keys)
634
} else {
635
encode_rows_vertical_par_unordered_broadcast_nulls(&keys)
636
}
637
}
638
pub fn private_left_join_multiple_keys(
639
a: &DataFrame,
640
b: &DataFrame,
641
nulls_equal: bool,
642
) -> PolarsResult<LeftJoinIds> {
643
// @scalar-opt
644
let a_cols = a
645
.get_columns()
646
.iter()
647
.map(|c| c.as_materialized_series().clone())
648
.collect::<Vec<_>>();
649
let b_cols = b
650
.get_columns()
651
.iter()
652
.map(|c| c.as_materialized_series().clone())
653
.collect::<Vec<_>>();
654
655
let a = prepare_keys_multiple(&a_cols, nulls_equal)?.into_series();
656
let b = prepare_keys_multiple(&b_cols, nulls_equal)?.into_series();
657
sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, nulls_equal)
658
}
659
660