Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/group_by.rs
7889 views
1
use super::*;
2
3
#[allow(clippy::too_many_arguments)]
4
pub(super) fn process_group_by(
5
opt: &mut PredicatePushDown,
6
lp_arena: &mut Arena<IR>,
7
expr_arena: &mut Arena<AExpr>,
8
input: Node,
9
keys: Vec<ExprIR>,
10
aggs: Vec<ExprIR>,
11
schema: SchemaRef,
12
maintain_order: bool,
13
apply: Option<PlanCallback<DataFrame, DataFrame>>,
14
options: Arc<GroupbyOptions>,
15
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
16
) -> PolarsResult<IR> {
17
use IR::*;
18
19
#[cfg(feature = "dynamic_group_by")]
20
let no_push = { options.rolling.is_some() || options.dynamic.is_some() };
21
22
#[cfg(not(feature = "dynamic_group_by"))]
23
let no_push = false;
24
25
// Don't pushdown predicates on these cases.
26
if apply.is_some() || no_push || options.slice.is_some() {
27
let lp = GroupBy {
28
input,
29
keys,
30
aggs,
31
schema,
32
apply,
33
maintain_order,
34
options,
35
};
36
return opt.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
37
}
38
39
// If the predicate only resolves to the keys we can push it down, on the condition
40
// that the key values are not modified from their original values.
41
// When it filters the aggregations, the predicate should be done after aggregation.
42
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
43
let eligible_keys = keys.iter().filter(
44
|&key| matches!(expr_arena.get(key.node()), AExpr::Column(c) if c == key.output_name()),
45
);
46
let key_schema = aexprs_to_schema(
47
eligible_keys,
48
lp_arena.get(input).schema(lp_arena).as_ref(),
49
expr_arena,
50
);
51
52
let mut new_acc_predicates = PlHashMap::with_capacity(acc_predicates.len());
53
54
for (pred_name, predicate) in acc_predicates {
55
// Counts change due to groupby's
56
// TODO! handle aliases, so that the predicate that is pushed down refers to the column before alias.
57
let mut push_down = !has_aexpr(predicate.node(), expr_arena, |ae| matches!(ae, AExpr::Len));
58
59
for name in aexpr_to_leaf_names_iter(predicate.node(), expr_arena) {
60
push_down &= key_schema.contains(name.as_ref());
61
62
if !push_down {
63
break;
64
}
65
}
66
if !push_down {
67
local_predicates.push(predicate)
68
} else {
69
new_acc_predicates.insert(pred_name.clone(), predicate.clone());
70
}
71
}
72
73
opt.pushdown_and_assign(input, new_acc_predicates, lp_arena, expr_arena)?;
74
75
let lp = GroupBy {
76
input,
77
keys,
78
aggs,
79
schema,
80
apply,
81
maintain_order,
82
options,
83
};
84
Ok(opt.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
85
}
86
87