Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs
7889 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::InitHashMaps;
4
use polars_error::PolarsResult;
5
use polars_ops::frame::{JoinCoalesce, JoinType};
6
use polars_utils::arena::Arena;
7
use polars_utils::format_pl_smallstr;
8
use polars_utils::pl_str::PlSmallStr;
9
10
use crate::plans::{
11
AExpr, ColumnNode, ExprIR, ExprOrigin, IR, IRBuilder, OutputName, PlHashSet, det_join_schema,
12
};
13
use crate::prelude::optimizer::projection_pushdown::ProjectionContext;
14
use crate::prelude::{ProjectionOptions, ProjectionPushDown};
15
use crate::utils::{aexpr_to_leaf_names_iter, column_node_to_name};
16
17
/// # Panics
18
/// Panics if `join_ir` is not `IR::Join`.
19
pub(super) fn process_join(
20
mut join_ir: IR,
21
proj_cx: ProjectionContext,
22
proj_pd: &mut ProjectionPushDown,
23
ir_arena: &mut Arena<IR>,
24
expr_arena: &mut Arena<AExpr>,
25
) -> PolarsResult<IR> {
26
let IR::Join {
27
input_left,
28
input_right,
29
schema: join_output_schema,
30
left_on,
31
right_on,
32
options,
33
} = &mut join_ir
34
else {
35
panic!()
36
};
37
38
let is_projected =
39
|name: &str| proj_cx.projected_names.contains(name) || !proj_cx.has_pushed_down();
40
41
let input_schema_left = ir_arena.get(*input_left).schema(ir_arena).into_owned();
42
let input_schema_right = ir_arena.get(*input_right).schema(ir_arena).into_owned();
43
44
let mut project_left = PlHashSet::with_capacity(input_schema_left.len());
45
let mut project_right = PlHashSet::with_capacity(input_schema_right.len());
46
47
let mut coalesced_to_right: PlHashSet<PlSmallStr> = Default::default();
48
if options.args.should_coalesce()
49
&& let JoinType::Right = &options.args.how
50
{
51
coalesced_to_right = left_on
52
.iter()
53
.map(|expr| {
54
let node = match expr_arena.get(expr.node()) {
55
AExpr::Cast {
56
expr,
57
dtype: _,
58
options: _,
59
} => *expr,
60
61
_ => expr.node(),
62
};
63
64
let AExpr::Column(name) = expr_arena.get(node) else {
65
// All keys should be columns when coalesce=True
66
unreachable!()
67
};
68
69
name.clone()
70
})
71
.collect()
72
}
73
74
// Add accumulated projections
75
for output_name in join_output_schema.iter_names() {
76
if !is_projected(output_name) {
77
continue;
78
}
79
80
match ExprOrigin::get_column_origin(
81
output_name,
82
&input_schema_left,
83
&input_schema_right,
84
options.args.suffix(),
85
Some(&|name| coalesced_to_right.contains(name)),
86
)? {
87
ExprOrigin::None => {},
88
ExprOrigin::Left => {
89
project_left.insert(output_name.clone());
90
},
91
ExprOrigin::Right => {
92
let name = if !input_schema_right.contains(output_name.as_str()) {
93
PlSmallStr::from_str(
94
output_name
95
.strip_suffix(options.args.suffix().as_str())
96
.unwrap(),
97
)
98
} else {
99
output_name.clone()
100
};
101
102
debug_assert!(input_schema_right.contains(name.as_str()));
103
104
project_right.insert(name);
105
},
106
ExprOrigin::Both => unreachable!(),
107
}
108
}
109
110
// Add projections required by the join itself
111
for expr_ir in left_on.as_slice() {
112
for name in aexpr_to_leaf_names_iter(expr_ir.node(), expr_arena).cloned() {
113
project_left.insert(name);
114
}
115
}
116
117
for expr_ir in right_on.as_slice() {
118
for name in aexpr_to_leaf_names_iter(expr_ir.node(), expr_arena).cloned() {
119
project_right.insert(name);
120
}
121
}
122
123
#[cfg(feature = "asof_join")]
124
if let JoinType::AsOf(asof_options) = &options.args.how {
125
if let Some(left_by) = asof_options.left_by.as_deref() {
126
for name in left_by {
127
project_left.insert(name.clone());
128
}
129
}
130
131
if let Some(right_by) = asof_options.right_by.as_deref() {
132
for name in right_by {
133
project_right.insert(name.clone());
134
}
135
}
136
}
137
138
// Turn on coalesce if non-coalesced keys are not included in projection. Reduces materialization.
139
if !options.args.should_coalesce()
140
&& matches!(options.args.how, JoinType::Inner | JoinType::Left)
141
&& left_on
142
.iter()
143
.all(|e| matches!(expr_arena.get(e.node()), AExpr::Column(_)))
144
&& right_on.iter().all(|e| {
145
let AExpr::Column(name) = expr_arena.get(e.node()) else {
146
return false;
147
};
148
149
let projected = if input_schema_left.contains(name.as_str()) {
150
let name = format_pl_smallstr!("{}{}", name, options.args.suffix());
151
is_projected(&name)
152
} else {
153
is_projected(name)
154
};
155
156
!projected
157
})
158
{
159
Arc::make_mut(options).args.coalesce = JoinCoalesce::CoalesceColumns;
160
}
161
162
// Pushdown left/right projections.
163
{
164
let input = *input_left;
165
let acc_projections = input_schema_left
166
.iter_names()
167
.filter(|x| project_left.contains(*x))
168
.map(|name| ColumnNode(expr_arena.add(AExpr::Column(name.clone()))))
169
.collect();
170
let projected_names = project_left;
171
172
proj_pd.pushdown_and_assign(
173
input,
174
ProjectionContext::new(acc_projections, projected_names, proj_cx.inner),
175
ir_arena,
176
expr_arena,
177
)?;
178
}
179
180
{
181
let input = *input_right;
182
let acc_projections = input_schema_right
183
.iter_names()
184
.filter(|x| project_right.contains(*x))
185
.map(|name| ColumnNode(expr_arena.add(AExpr::Column(name.clone()))))
186
.collect();
187
let projected_names = project_right;
188
189
proj_pd.pushdown_and_assign(
190
input,
191
ProjectionContext::new(acc_projections, projected_names, proj_cx.inner),
192
ir_arena,
193
expr_arena,
194
)?;
195
}
196
197
// Resolve new schemas after pushdown to left/right.
198
let input_schema_left = ir_arena.get(*input_left).schema(ir_arena).into_owned();
199
let input_schema_right = ir_arena.get(*input_right).schema(ir_arena).into_owned();
200
let new_join_output_schema = det_join_schema(
201
&input_schema_left,
202
&input_schema_right,
203
left_on,
204
right_on,
205
options,
206
expr_arena,
207
)
208
.unwrap();
209
210
let post_project: Option<Vec<ExprIR>> = if proj_cx.has_pushed_down() {
211
let mut needs_post_project = proj_cx.acc_projections.len() != new_join_output_schema.len();
212
213
// Build post-projection to re-order the columns and add suffixes if necessary.
214
let post_project: Vec<ExprIR> = proj_cx
215
.acc_projections
216
.iter()
217
.enumerate()
218
.map(|(i, col_node)| {
219
let original_projected_name = column_node_to_name(*col_node, expr_arena);
220
221
if new_join_output_schema.index_of(original_projected_name.as_str()) != Some(i) {
222
needs_post_project = true;
223
}
224
225
if !new_join_output_schema.contains(original_projected_name.as_str()) {
226
// This name is no longer suffixed in the new output schema, we restore it with an
227
// alias here.
228
let new_output_name = PlSmallStr::from_str(
229
original_projected_name
230
.strip_suffix(options.args.suffix().as_str())
231
.unwrap(),
232
);
233
234
debug_assert!(new_join_output_schema.contains(new_output_name.as_str()));
235
let original_projected_name = original_projected_name.clone();
236
237
ExprIR::new(
238
expr_arena.add(AExpr::Column(new_output_name)),
239
OutputName::Alias(original_projected_name),
240
)
241
} else {
242
ExprIR::from_node(col_node.0, expr_arena)
243
}
244
})
245
.collect();
246
247
needs_post_project.then_some(post_project)
248
} else {
249
None
250
};
251
252
*join_output_schema = new_join_output_schema;
253
254
let out: IR = if let Some(post_project) = post_project {
255
IRBuilder::from_lp(join_ir, expr_arena, ir_arena)
256
.project(
257
post_project,
258
ProjectionOptions {
259
run_parallel: false,
260
duplicate_check: false,
261
should_broadcast: false,
262
},
263
)
264
.build()
265
} else {
266
join_ir
267
};
268
269
Ok(out)
270
}
271
272