Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/hconcat.rs
7889 views
1
use super::*;
2
3
#[allow(clippy::too_many_arguments)]
4
pub(super) fn process_hconcat(
5
proj_pd: &mut ProjectionPushDown,
6
mut inputs: Vec<Node>,
7
schema: SchemaRef,
8
options: HConcatOptions,
9
ctx: ProjectionContext,
10
lp_arena: &mut Arena<IR>,
11
expr_arena: &mut Arena<AExpr>,
12
) -> PolarsResult<IR> {
13
// When applying projection pushdown to horizontal concatenation,
14
// we apply pushdown to all of the inputs using the subset of accumulated projections relevant to each input,
15
// then rebuild the concatenated schema.
16
17
let schema = if ctx.acc_projections.is_empty() {
18
schema
19
} else {
20
let mut remaining_projections: PlHashSet<_> = ctx.acc_projections.into_iter().collect();
21
22
let mut result = Ok(());
23
inputs.retain(|input| {
24
let mut input_pushdown = Vec::new();
25
let input_schema = lp_arena.get(*input).schema(lp_arena);
26
27
for proj in remaining_projections.iter() {
28
if check_input_column_node(*proj, input_schema.as_ref(), expr_arena) {
29
input_pushdown.push(*proj);
30
}
31
}
32
33
if input_pushdown.is_empty() {
34
// we can ignore this input since no columns are needed
35
if options.strict {
36
return false;
37
}
38
// we read a single column (needed to compute the correct height)
39
if let Some((name, _)) = input_schema.get_at_index(0) {
40
let node = expr_arena.add(AExpr::Column(name.clone()));
41
input_pushdown.push(ColumnNode(node));
42
}
43
}
44
45
let mut input_names = PlHashSet::new();
46
for proj in &input_pushdown {
47
remaining_projections.remove(proj);
48
for name in aexpr_to_leaf_names(proj.0, expr_arena) {
49
input_names.insert(name);
50
}
51
}
52
let ctx = ProjectionContext::new(input_pushdown, input_names, ctx.inner);
53
if let Err(e) = proj_pd.pushdown_and_assign(*input, ctx, lp_arena, expr_arena) {
54
result = Err(e);
55
}
56
true
57
});
58
result?;
59
60
let mut schemas = Vec::with_capacity(inputs.len());
61
for input in inputs.iter() {
62
let schema = lp_arena.get(*input).schema(lp_arena).into_owned();
63
schemas.push(schema);
64
}
65
let new_schema = merge_schemas(&schemas)?;
66
Arc::new(new_schema)
67
};
68
69
Ok(IR::HConcat {
70
inputs,
71
schema,
72
options,
73
})
74
}
75
76