Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/group_by.rs
7889 views
use super::*;12#[allow(clippy::too_many_arguments)]3pub(super) fn process_group_by(4opt: &mut PredicatePushDown,5lp_arena: &mut Arena<IR>,6expr_arena: &mut Arena<AExpr>,7input: Node,8keys: Vec<ExprIR>,9aggs: Vec<ExprIR>,10schema: SchemaRef,11maintain_order: bool,12apply: Option<PlanCallback<DataFrame, DataFrame>>,13options: Arc<GroupbyOptions>,14acc_predicates: PlHashMap<PlSmallStr, ExprIR>,15) -> PolarsResult<IR> {16use IR::*;1718#[cfg(feature = "dynamic_group_by")]19let no_push = { options.rolling.is_some() || options.dynamic.is_some() };2021#[cfg(not(feature = "dynamic_group_by"))]22let no_push = false;2324// Don't pushdown predicates on these cases.25if apply.is_some() || no_push || options.slice.is_some() {26let lp = GroupBy {27input,28keys,29aggs,30schema,31apply,32maintain_order,33options,34};35return opt.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);36}3738// If the predicate only resolves to the keys we can push it down, on the condition39// that the key values are not modified from their original values.40// When it filters the aggregations, the predicate should be done after aggregation.41let mut local_predicates = Vec::with_capacity(acc_predicates.len());42let eligible_keys = keys.iter().filter(43|&key| matches!(expr_arena.get(key.node()), AExpr::Column(c) if c == key.output_name()),44);45let key_schema = aexprs_to_schema(46eligible_keys,47lp_arena.get(input).schema(lp_arena).as_ref(),48expr_arena,49);5051let mut new_acc_predicates = PlHashMap::with_capacity(acc_predicates.len());5253for (pred_name, predicate) in acc_predicates {54// Counts change due to groupby's55// TODO! handle aliases, so that the predicate that is pushed down refers to the column before alias.56let mut push_down = !has_aexpr(predicate.node(), expr_arena, |ae| matches!(ae, AExpr::Len));5758for name in aexpr_to_leaf_names_iter(predicate.node(), expr_arena) {59push_down &= key_schema.contains(name.as_ref());6061if !push_down {62break;63}64}65if !push_down {66local_predicates.push(predicate)67} else {68new_acc_predicates.insert(pred_name.clone(), predicate.clone());69}70}7172opt.pushdown_and_assign(input, new_acc_predicates, lp_arena, expr_arena)?;7374let lp = GroupBy {75input,76keys,77aggs,78schema,79apply,80maintain_order,81options,82};83Ok(opt.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))84}858687