Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/hash_join/mod.rs
6940 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
pub(super) mod single_keys;
3
mod single_keys_dispatch;
4
mod single_keys_inner;
5
mod single_keys_left;
6
mod single_keys_outer;
7
#[cfg(feature = "semi_anti_join")]
8
mod single_keys_semi_anti;
9
pub(super) mod sort_merge;
10
use arrow::array::ArrayRef;
11
use polars_core::POOL;
12
use polars_core::utils::_set_partition_size;
13
use polars_utils::index::ChunkId;
14
pub(super) use single_keys::*;
15
pub use single_keys_dispatch::SeriesJoin;
16
#[cfg(feature = "asof_join")]
17
pub(super) use single_keys_dispatch::prepare_binary;
18
use single_keys_inner::*;
19
use single_keys_left::*;
20
use single_keys_outer::*;
21
#[cfg(feature = "semi_anti_join")]
22
use single_keys_semi_anti::*;
23
pub(crate) use sort_merge::*;
24
25
pub use super::*;
26
#[cfg(feature = "chunked_ids")]
27
use crate::chunked_array::gather::chunked::TakeChunkedHorPar;
28
29
pub fn default_join_ids() -> ChunkJoinOptIds {
30
#[cfg(feature = "chunked_ids")]
31
{
32
Either::Left(vec![])
33
}
34
#[cfg(not(feature = "chunked_ids"))]
35
{
36
vec![]
37
}
38
}
39
40
macro_rules! det_hash_prone_order {
41
($self:expr, $other:expr) => {{
42
// The shortest relation will be used to create a hash table.
43
if $self.len() > $other.len() {
44
($self, $other, false)
45
} else {
46
($other, $self, true)
47
}
48
}};
49
}
50
51
#[cfg(feature = "performant")]
52
use arrow::legacy::conversion::primitive_to_vec;
53
pub(super) use det_hash_prone_order;
54
55
pub trait JoinDispatch: IntoDf {
56
/// # Safety
57
/// Join tuples must be in bounds
58
#[cfg(feature = "chunked_ids")]
59
unsafe fn create_left_df_chunked(
60
&self,
61
chunk_ids: &[ChunkId],
62
left_join: bool,
63
was_sliced: bool,
64
) -> DataFrame {
65
let df_self = self.to_df();
66
67
let left_join_no_duplicate_matches =
68
left_join && !was_sliced && chunk_ids.len() == df_self.height();
69
70
if left_join_no_duplicate_matches {
71
df_self.clone()
72
} else {
73
// left join keys are in ascending order
74
let sorted = if left_join {
75
IsSorted::Ascending
76
} else {
77
IsSorted::Not
78
};
79
df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)
80
}
81
}
82
83
/// # Safety
84
/// Join tuples must be in bounds
85
unsafe fn _create_left_df_from_slice(
86
&self,
87
join_tuples: &[IdxSize],
88
left_join: bool,
89
was_sliced: bool,
90
sorted_tuple_idx: bool,
91
) -> DataFrame {
92
let df_self = self.to_df();
93
94
let left_join_no_duplicate_matches =
95
sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();
96
97
if left_join_no_duplicate_matches {
98
df_self.clone()
99
} else {
100
let sorted = if sorted_tuple_idx {
101
IsSorted::Ascending
102
} else {
103
IsSorted::Not
104
};
105
106
df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)
107
}
108
}
109
110
#[cfg(feature = "semi_anti_join")]
111
/// # Safety
112
/// `idx` must be in bounds
113
unsafe fn _finish_anti_semi_join(
114
&self,
115
mut idx: &[IdxSize],
116
slice: Option<(i64, usize)>,
117
) -> DataFrame {
118
let ca_self = self.to_df();
119
if let Some((offset, len)) = slice {
120
idx = slice_slice(idx, offset, len);
121
}
122
// idx from anti-semi join should always be sorted
123
ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)
124
}
125
126
#[cfg(feature = "semi_anti_join")]
127
fn _semi_anti_join_from_series(
128
&self,
129
s_left: &Series,
130
s_right: &Series,
131
slice: Option<(i64, usize)>,
132
anti: bool,
133
nulls_equal: bool,
134
) -> PolarsResult<DataFrame> {
135
let ca_self = self.to_df();
136
137
let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;
138
// SAFETY:
139
// indices are in bounds
140
Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })
141
}
142
fn _full_join_from_series(
143
&self,
144
other: &DataFrame,
145
s_left: &Series,
146
s_right: &Series,
147
args: JoinArgs,
148
) -> PolarsResult<DataFrame> {
149
let df_self = self.to_df();
150
151
// Get the indexes of the joined relations
152
let (mut join_idx_l, mut join_idx_r) =
153
s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;
154
155
try_raise_keyboard_interrupt();
156
if let Some((offset, len)) = args.slice {
157
let (offset, len) = slice_offsets(offset, len, join_idx_l.len());
158
join_idx_l.slice(offset, len);
159
join_idx_r.slice(offset, len);
160
}
161
let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);
162
let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);
163
164
let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {
165
let mut df = DataFrame::new(vec![
166
idx_ca_l.into_series().into(),
167
idx_ca_r.into_series().into(),
168
])?;
169
170
let options = SortMultipleOptions::new()
171
.with_order_descending(false)
172
.with_maintain_order(true)
173
.with_nulls_last(true);
174
175
let columns = match args.maintain_order {
176
MaintainOrderJoin::Left => vec!["a"],
177
MaintainOrderJoin::LeftRight => vec!["a", "b"],
178
MaintainOrderJoin::Right => vec!["b"],
179
MaintainOrderJoin::RightLeft => vec!["b", "a"],
180
_ => unreachable!(),
181
};
182
183
df.sort_in_place(columns, options)?;
184
185
let join_tuples_left = df.column("a").unwrap().idx().unwrap();
186
let join_tuples_right = df.column("b").unwrap().idx().unwrap();
187
POOL.join(
188
|| unsafe { df_self.take_unchecked(join_tuples_left) },
189
|| unsafe { other.take_unchecked(join_tuples_right) },
190
)
191
} else {
192
POOL.join(
193
|| unsafe { df_self.take_unchecked(&idx_ca_l) },
194
|| unsafe { other.take_unchecked(&idx_ca_r) },
195
)
196
};
197
198
let coalesce = args.coalesce.coalesce(&JoinType::Full);
199
let out = _finish_join(df_left, df_right, args.suffix.clone());
200
if coalesce {
201
Ok(_coalesce_full_join(
202
out?,
203
&[s_left.name().clone()],
204
&[s_right.name().clone()],
205
args.suffix,
206
df_self,
207
))
208
} else {
209
out
210
}
211
}
212
}
213
214
impl JoinDispatch for DataFrame {}
215
216