Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/dispatch_left_right.rs
8424 views
1
use polars_core::utils::Container;
2
3
use super::*;
4
use crate::prelude::*;
5
6
pub(super) fn left_join_from_series(
7
left: DataFrame,
8
right: &DataFrame,
9
s_left: &Series,
10
s_right: &Series,
11
args: JoinArgs,
12
verbose: bool,
13
drop_names: Option<Vec<PlSmallStr>>,
14
) -> PolarsResult<DataFrame> {
15
let (df_left, df_right) = materialize_left_join_from_series(
16
left, right, s_left, s_right, &args, verbose, drop_names,
17
)?;
18
_finish_join(df_left, df_right, args.suffix)
19
}
20
21
pub(super) fn right_join_from_series(
22
left: &DataFrame,
23
right: DataFrame,
24
s_left: &Series,
25
s_right: &Series,
26
mut args: JoinArgs,
27
verbose: bool,
28
drop_names: Option<Vec<PlSmallStr>>,
29
) -> PolarsResult<DataFrame> {
30
// Swap the order of tables to do a right join.
31
args.maintain_order = args.maintain_order.flip();
32
let (df_right, df_left) = materialize_left_join_from_series(
33
right, left, s_right, s_left, &args, verbose, drop_names,
34
)?;
35
_finish_join(df_left, df_right, args.suffix)
36
}
37
38
pub fn materialize_left_join_from_series(
39
mut left: DataFrame,
40
right_: &DataFrame,
41
s_left: &Series,
42
s_right: &Series,
43
args: &JoinArgs,
44
verbose: bool,
45
drop_names: Option<Vec<PlSmallStr>>,
46
) -> PolarsResult<(DataFrame, DataFrame)> {
47
let mut s_left = s_left.clone();
48
// Eagerly limit left if possible.
49
if let Some((offset, len)) = args.slice {
50
if offset == 0 {
51
left = left.slice(0, len);
52
s_left = s_left.slice(0, len);
53
}
54
}
55
56
// Ensure that the chunks are aligned otherwise we go OOB.
57
let requires_ordering = matches!(
58
args.maintain_order,
59
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
60
);
61
62
let mut right = Cow::Borrowed(right_);
63
let mut s_right = s_right.clone();
64
if left.should_rechunk() || requires_ordering || left.n_chunks() != s_left.n_chunks() {
65
left.rechunk_mut_par();
66
s_left = s_left.rechunk();
67
}
68
if right.should_rechunk() || requires_ordering || right.n_chunks() != s_right.n_chunks() {
69
let mut other = right_.clone();
70
other.rechunk_mut_par();
71
right = Cow::Owned(other);
72
s_right = s_right.rechunk();
73
}
74
75
let (left_idx, right_idx) = sort_or_hash_left(
76
&s_left,
77
&s_right,
78
verbose,
79
args.validation,
80
args.nulls_equal,
81
)?;
82
83
let right = if let Some(drop_names) = drop_names {
84
right.drop_many(drop_names)
85
} else {
86
right.drop(s_right.name()).unwrap()
87
};
88
try_raise_keyboard_interrupt();
89
90
#[cfg(feature = "chunked_ids")]
91
match (left_idx, right_idx) {
92
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
93
if requires_ordering {
94
Ok(maintain_order_idx(
95
&left,
96
&right,
97
left_idx.as_slice(),
98
right_idx.as_slice(),
99
args,
100
))
101
} else {
102
Ok(POOL.join(
103
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
104
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
105
))
106
}
107
},
108
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
109
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
110
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
111
)),
112
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
113
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
114
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
115
)),
116
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
117
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
118
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
119
)),
120
}
121
122
#[cfg(not(feature = "chunked_ids"))]
123
if requires_ordering {
124
Ok(maintain_order_idx(
125
&left,
126
&right,
127
left_idx.as_slice(),
128
right_idx.as_slice(),
129
args,
130
))
131
} else {
132
Ok(POOL.join(
133
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
134
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
135
))
136
}
137
}
138
139
fn maintain_order_idx(
140
left: &DataFrame,
141
other: &DataFrame,
142
left_idx: &[IdxSize],
143
right_idx: &[NullableIdxSize],
144
args: &JoinArgs,
145
) -> (DataFrame, DataFrame) {
146
let mut df = {
147
// SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
148
let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
149
let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
150
unsafe {
151
DataFrame::new_unchecked(
152
left_idx.len(),
153
vec![left.into_series().into(), right.into_series().into()],
154
)
155
}
156
};
157
158
let options = SortMultipleOptions::new()
159
.with_order_descending(false)
160
.with_maintain_order(true);
161
162
let columns = match args.maintain_order {
163
// If the left order is preserved then there are no unsorted right rows
164
// So Left and LeftRight are equal
165
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
166
MaintainOrderJoin::Right => vec!["b"],
167
MaintainOrderJoin::RightLeft => vec!["b", "a"],
168
_ => unreachable!(),
169
};
170
171
df.sort_in_place(columns, options).unwrap();
172
df.rechunk_mut();
173
174
let join_tuples_left = df
175
.column("a")
176
.unwrap()
177
.as_materialized_series()
178
.idx()
179
.unwrap()
180
.cont_slice()
181
.unwrap();
182
183
let join_tuples_right = df
184
.column("b")
185
.unwrap()
186
.as_materialized_series()
187
.idx()
188
.unwrap()
189
.cont_slice()
190
.unwrap();
191
192
POOL.join(
193
|| materialize_left_join_idx_left(left, join_tuples_left, args),
194
|| materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
195
)
196
}
197
198
fn materialize_left_join_idx_left(
199
left: &DataFrame,
200
left_idx: &[IdxSize],
201
args: &JoinArgs,
202
) -> DataFrame {
203
let left_idx = if let Some((offset, len)) = args.slice {
204
slice_slice(left_idx, offset, len)
205
} else {
206
left_idx
207
};
208
209
unsafe {
210
left._create_left_df_from_slice(
211
left_idx,
212
true,
213
args.slice.is_some(),
214
matches!(
215
args.maintain_order,
216
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
217
) || args.how == JoinType::Left
218
&& !matches!(
219
args.maintain_order,
220
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft,
221
),
222
)
223
}
224
}
225
226
fn materialize_left_join_idx_right(
227
right: &DataFrame,
228
right_idx: &[NullableIdxSize],
229
args: &JoinArgs,
230
) -> DataFrame {
231
let right_idx = if let Some((offset, len)) = args.slice {
232
slice_slice(right_idx, offset, len)
233
} else {
234
right_idx
235
};
236
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
237
}
238
#[cfg(feature = "chunked_ids")]
239
fn materialize_left_join_chunked_left(
240
left: &DataFrame,
241
left_idx: &[ChunkId],
242
args: &JoinArgs,
243
) -> DataFrame {
244
let left_idx = if let Some((offset, len)) = args.slice {
245
slice_slice(left_idx, offset, len)
246
} else {
247
left_idx
248
};
249
unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
250
}
251
252
#[cfg(feature = "chunked_ids")]
253
fn materialize_left_join_chunked_right(
254
right: &DataFrame,
255
right_idx: &[ChunkId],
256
args: &JoinArgs,
257
) -> DataFrame {
258
let right_idx = if let Some((offset, len)) = args.slice {
259
slice_slice(right_idx, offset, len)
260
} else {
261
right_idx
262
};
263
unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
264
}
265
266