Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs
7889 views
1
use super::*;
2
3
#[allow(clippy::too_many_arguments)]
4
pub(super) fn process_group_by(
5
proj_pd: &mut ProjectionPushDown,
6
input: Node,
7
keys: Vec<ExprIR>,
8
aggs: Vec<ExprIR>,
9
apply: Option<PlanCallback<DataFrame, DataFrame>>,
10
schema: SchemaRef,
11
maintain_order: bool,
12
options: Arc<GroupbyOptions>,
13
ctx: ProjectionContext,
14
lp_arena: &mut Arena<IR>,
15
expr_arena: &mut Arena<AExpr>,
16
) -> PolarsResult<IR> {
17
use IR::*;
18
19
// the custom function may need all columns so we do the projections here.
20
if let Some(f) = apply {
21
let lp = GroupBy {
22
input,
23
keys,
24
aggs,
25
schema,
26
apply: Some(f),
27
maintain_order,
28
options,
29
};
30
let input = lp_arena.add(lp);
31
32
let builder = IRBuilder::new(input, expr_arena, lp_arena);
33
Ok(proj_pd.finish_node_simple_projection(&ctx.acc_projections, builder))
34
} else {
35
let has_pushed_down = ctx.has_pushed_down();
36
37
// TODO! remove unnecessary vec alloc.
38
let (mut acc_projections, _local_projections, mut names) = split_acc_projections(
39
ctx.acc_projections,
40
lp_arena.get(input).schema(lp_arena).as_ref(),
41
expr_arena,
42
false,
43
);
44
45
// add the columns used in the aggregations to the projection only if they are used upstream
46
let projected_aggs = aggs
47
.into_iter()
48
.filter(|agg| {
49
if has_pushed_down && ctx.inner.projections_seen > 0 {
50
ctx.projected_names.contains(agg.output_name())
51
} else {
52
true
53
}
54
})
55
.collect::<Vec<_>>();
56
57
for agg in &projected_aggs {
58
add_expr_to_accumulated(agg.node(), &mut acc_projections, &mut names, expr_arena);
59
}
60
61
// make sure the keys are projected
62
for key in &*keys {
63
add_expr_to_accumulated(key.node(), &mut acc_projections, &mut names, expr_arena);
64
}
65
66
// make sure that the dynamic key is projected
67
#[cfg(feature = "dynamic_group_by")]
68
if let Some(options) = &options.dynamic {
69
let node = expr_arena.add(AExpr::Column(options.index_column.clone()));
70
add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);
71
}
72
// make sure that the rolling key is projected
73
#[cfg(feature = "dynamic_group_by")]
74
if let Some(options) = &options.rolling {
75
let node = expr_arena.add(AExpr::Column(options.index_column.clone()));
76
add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);
77
}
78
let ctx = ProjectionContext::new(acc_projections, names, ctx.inner);
79
80
proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
81
82
let builder = IRBuilder::new(input, expr_arena, lp_arena).group_by(
83
keys,
84
projected_aggs,
85
apply,
86
maintain_order,
87
options,
88
);
89
Ok(builder.build())
90
}
91
}
92
93