Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/functions/mod.rs
7896 views
1
#[cfg(feature = "pivot")]
2
mod unpivot;
3
4
#[cfg(feature = "pivot")]
5
use unpivot::process_unpivot;
6
7
use super::*;
8
9
#[allow(clippy::too_many_arguments)]
10
pub(super) fn process_functions(
11
proj_pd: &mut ProjectionPushDown,
12
input: Node,
13
function: FunctionIR,
14
mut ctx: ProjectionContext,
15
lp_arena: &mut Arena<IR>,
16
expr_arena: &mut Arena<AExpr>,
17
) -> PolarsResult<IR> {
18
use FunctionIR::*;
19
match function {
20
Explode {
21
columns, options, ..
22
} => {
23
columns
24
.iter()
25
.for_each(|name| add_str_to_accumulated(name.clone(), &mut ctx, expr_arena));
26
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
27
Ok(IRBuilder::new(input, expr_arena, lp_arena)
28
.explode(columns, options)
29
.build())
30
},
31
#[cfg(feature = "pivot")]
32
Unpivot { ref args, .. } => {
33
process_unpivot(proj_pd, args, input, ctx, lp_arena, expr_arena)
34
},
35
Hint(hint) => {
36
let hint = hint.project(&ctx.projected_names);
37
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
38
Ok(match hint {
39
None => lp_arena.get(input).clone(),
40
Some(hint) => IRBuilder::new(input, expr_arena, lp_arena)
41
.hint(hint)
42
.build(),
43
})
44
},
45
_ => {
46
if function.allow_projection_pd() && ctx.has_pushed_down() {
47
let original_acc_projection_len = ctx.acc_projections.len();
48
49
// add columns needed for the function.
50
for name in function.additional_projection_pd_columns().as_ref() {
51
let node = expr_arena.add(AExpr::Column(name.clone()));
52
add_expr_to_accumulated(
53
node,
54
&mut ctx.acc_projections,
55
&mut ctx.projected_names,
56
expr_arena,
57
)
58
}
59
let expands_schema = matches!(function, FunctionIR::Unnest { .. });
60
61
let local_projections = proj_pd.pushdown_and_assign_check_schema(
62
input,
63
ctx,
64
lp_arena,
65
expr_arena,
66
expands_schema,
67
)?;
68
69
// Remove the cached schema
70
function.clear_cached_schema();
71
let lp = IR::MapFunction {
72
input,
73
function: function.clone(),
74
};
75
76
if local_projections.is_empty() {
77
Ok(lp)
78
} else {
79
// if we would project, we would remove pushed down predicates
80
if local_projections.len() < original_acc_projection_len {
81
Ok(IRBuilder::from_lp(lp, expr_arena, lp_arena)
82
.with_columns_simple(local_projections, Default::default())
83
.build())
84
// all projections are local
85
} else {
86
Ok(IRBuilder::from_lp(lp, expr_arena, lp_arena)
87
.project_simple_nodes(local_projections)
88
.unwrap()
89
.build())
90
}
91
}
92
} else {
93
let lp = IR::MapFunction {
94
input,
95
function: function.clone(),
96
};
97
// restart projection pushdown
98
proj_pd.no_pushdown_restart_opt(lp, ctx, lp_arena, expr_arena)
99
}
100
},
101
}
102
}
103
104