Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/dispatch_left_right.rs
6940 views
1
use super::*;
2
use crate::prelude::*;
3
4
pub(super) fn left_join_from_series(
5
left: DataFrame,
6
right: &DataFrame,
7
s_left: &Series,
8
s_right: &Series,
9
args: JoinArgs,
10
verbose: bool,
11
drop_names: Option<Vec<PlSmallStr>>,
12
) -> PolarsResult<DataFrame> {
13
let (df_left, df_right) = materialize_left_join_from_series(
14
left, right, s_left, s_right, &args, verbose, drop_names,
15
)?;
16
_finish_join(df_left, df_right, args.suffix)
17
}
18
19
pub(super) fn right_join_from_series(
20
left: &DataFrame,
21
right: DataFrame,
22
s_left: &Series,
23
s_right: &Series,
24
mut args: JoinArgs,
25
verbose: bool,
26
drop_names: Option<Vec<PlSmallStr>>,
27
) -> PolarsResult<DataFrame> {
28
// Swap the order of tables to do a right join.
29
args.maintain_order = args.maintain_order.flip();
30
let (df_right, df_left) = materialize_left_join_from_series(
31
right, left, s_right, s_left, &args, verbose, drop_names,
32
)?;
33
_finish_join(df_left, df_right, args.suffix)
34
}
35
36
pub fn materialize_left_join_from_series(
37
mut left: DataFrame,
38
right_: &DataFrame,
39
s_left: &Series,
40
s_right: &Series,
41
args: &JoinArgs,
42
verbose: bool,
43
drop_names: Option<Vec<PlSmallStr>>,
44
) -> PolarsResult<(DataFrame, DataFrame)> {
45
let mut s_left = s_left.clone();
46
// Eagerly limit left if possible.
47
if let Some((offset, len)) = args.slice {
48
if offset == 0 {
49
left = left.slice(0, len);
50
s_left = s_left.slice(0, len);
51
}
52
}
53
54
// Ensure that the chunks are aligned otherwise we go OOB.
55
let mut right = Cow::Borrowed(right_);
56
let mut s_right = s_right.clone();
57
if left.should_rechunk() {
58
left.as_single_chunk_par();
59
s_left = s_left.rechunk();
60
}
61
if right.should_rechunk() {
62
let mut other = right_.clone();
63
other.as_single_chunk_par();
64
right = Cow::Owned(other);
65
s_right = s_right.rechunk();
66
}
67
68
// The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now.
69
let requires_ordering = matches!(
70
args.maintain_order,
71
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
72
);
73
if requires_ordering {
74
// When ordering we rechunk the series so we don't get ChunkIds as output
75
s_left = s_left.rechunk();
76
s_right = s_right.rechunk();
77
}
78
79
let (left_idx, right_idx) = sort_or_hash_left(
80
&s_left,
81
&s_right,
82
verbose,
83
args.validation,
84
args.nulls_equal,
85
)?;
86
87
let right = if let Some(drop_names) = drop_names {
88
right.drop_many(drop_names)
89
} else {
90
right.drop(s_right.name()).unwrap()
91
};
92
try_raise_keyboard_interrupt();
93
94
#[cfg(feature = "chunked_ids")]
95
match (left_idx, right_idx) {
96
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
97
if requires_ordering {
98
Ok(maintain_order_idx(
99
&left,
100
&right,
101
left_idx.as_slice(),
102
right_idx.as_slice(),
103
args,
104
))
105
} else {
106
Ok(POOL.join(
107
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
108
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
109
))
110
}
111
},
112
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
113
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
114
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
115
)),
116
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
117
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
118
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
119
)),
120
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
121
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
122
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
123
)),
124
}
125
126
#[cfg(not(feature = "chunked_ids"))]
127
if requires_ordering {
128
Ok(maintain_order_idx(
129
&left,
130
&right,
131
left_idx.as_slice(),
132
right_idx.as_slice(),
133
args,
134
))
135
} else {
136
Ok(POOL.join(
137
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
138
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
139
))
140
}
141
}
142
143
fn maintain_order_idx(
144
left: &DataFrame,
145
other: &DataFrame,
146
left_idx: &[IdxSize],
147
right_idx: &[NullableIdxSize],
148
args: &JoinArgs,
149
) -> (DataFrame, DataFrame) {
150
let mut df = {
151
// SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
152
let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
153
let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
154
DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap()
155
};
156
157
let options = SortMultipleOptions::new()
158
.with_order_descending(false)
159
.with_maintain_order(true);
160
161
let columns = match args.maintain_order {
162
// If the left order is preserved then there are no unsorted right rows
163
// So Left and LeftRight are equal
164
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
165
MaintainOrderJoin::Right => vec!["b"],
166
MaintainOrderJoin::RightLeft => vec!["b", "a"],
167
_ => unreachable!(),
168
};
169
170
df.sort_in_place(columns, options).unwrap();
171
df.rechunk_mut();
172
173
let join_tuples_left = df
174
.column("a")
175
.unwrap()
176
.as_materialized_series()
177
.idx()
178
.unwrap()
179
.cont_slice()
180
.unwrap();
181
182
let join_tuples_right = df
183
.column("b")
184
.unwrap()
185
.as_materialized_series()
186
.idx()
187
.unwrap()
188
.cont_slice()
189
.unwrap();
190
191
POOL.join(
192
|| materialize_left_join_idx_left(left, join_tuples_left, args),
193
|| materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
194
)
195
}
196
197
fn materialize_left_join_idx_left(
198
left: &DataFrame,
199
left_idx: &[IdxSize],
200
args: &JoinArgs,
201
) -> DataFrame {
202
let left_idx = if let Some((offset, len)) = args.slice {
203
slice_slice(left_idx, offset, len)
204
} else {
205
left_idx
206
};
207
208
unsafe {
209
left._create_left_df_from_slice(
210
left_idx,
211
true,
212
args.slice.is_some(),
213
matches!(
214
args.maintain_order,
215
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
216
) || args.how == JoinType::Left
217
&& !matches!(
218
args.maintain_order,
219
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft,
220
),
221
)
222
}
223
}
224
225
fn materialize_left_join_idx_right(
226
right: &DataFrame,
227
right_idx: &[NullableIdxSize],
228
args: &JoinArgs,
229
) -> DataFrame {
230
let right_idx = if let Some((offset, len)) = args.slice {
231
slice_slice(right_idx, offset, len)
232
} else {
233
right_idx
234
};
235
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
236
}
237
#[cfg(feature = "chunked_ids")]
238
fn materialize_left_join_chunked_left(
239
left: &DataFrame,
240
left_idx: &[ChunkId],
241
args: &JoinArgs,
242
) -> DataFrame {
243
let left_idx = if let Some((offset, len)) = args.slice {
244
slice_slice(left_idx, offset, len)
245
} else {
246
left_idx
247
};
248
unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
249
}
250
251
#[cfg(feature = "chunked_ids")]
252
fn materialize_left_join_chunked_right(
253
right: &DataFrame,
254
right_idx: &[ChunkId],
255
args: &JoinArgs,
256
) -> DataFrame {
257
let right_idx = if let Some((offset, len)) = args.slice {
258
slice_slice(right_idx, offset, len)
259
} else {
260
right_idx
261
};
262
unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
263
}
264
265