Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/group_by.rs
7889 views
use super::*;12#[allow(clippy::too_many_arguments)]3pub(super) fn process_group_by(4proj_pd: &mut ProjectionPushDown,5input: Node,6keys: Vec<ExprIR>,7aggs: Vec<ExprIR>,8apply: Option<PlanCallback<DataFrame, DataFrame>>,9schema: SchemaRef,10maintain_order: bool,11options: Arc<GroupbyOptions>,12ctx: ProjectionContext,13lp_arena: &mut Arena<IR>,14expr_arena: &mut Arena<AExpr>,15) -> PolarsResult<IR> {16use IR::*;1718// the custom function may need all columns so we do the projections here.19if let Some(f) = apply {20let lp = GroupBy {21input,22keys,23aggs,24schema,25apply: Some(f),26maintain_order,27options,28};29let input = lp_arena.add(lp);3031let builder = IRBuilder::new(input, expr_arena, lp_arena);32Ok(proj_pd.finish_node_simple_projection(&ctx.acc_projections, builder))33} else {34let has_pushed_down = ctx.has_pushed_down();3536// TODO! remove unnecessary vec alloc.37let (mut acc_projections, _local_projections, mut names) = split_acc_projections(38ctx.acc_projections,39lp_arena.get(input).schema(lp_arena).as_ref(),40expr_arena,41false,42);4344// add the columns used in the aggregations to the projection only if they are used upstream45let projected_aggs = aggs46.into_iter()47.filter(|agg| {48if has_pushed_down && ctx.inner.projections_seen > 0 {49ctx.projected_names.contains(agg.output_name())50} else {51true52}53})54.collect::<Vec<_>>();5556for agg in &projected_aggs {57add_expr_to_accumulated(agg.node(), &mut acc_projections, &mut names, expr_arena);58}5960// make sure the keys are projected61for key in &*keys {62add_expr_to_accumulated(key.node(), &mut acc_projections, &mut names, expr_arena);63}6465// make sure that the dynamic key is projected66#[cfg(feature = "dynamic_group_by")]67if let Some(options) = &options.dynamic {68let node = expr_arena.add(AExpr::Column(options.index_column.clone()));69add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);70}71// make sure that the rolling key is projected72#[cfg(feature = "dynamic_group_by")]73if let Some(options) = &options.rolling {74let node = expr_arena.add(AExpr::Column(options.index_column.clone()));75add_expr_to_accumulated(node, &mut acc_projections, &mut names, expr_arena);76}77let ctx = ProjectionContext::new(acc_projections, names, ctx.inner);7879proj_pd.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;8081let builder = IRBuilder::new(input, expr_arena, lp_arena).group_by(82keys,83projected_aggs,84apply,85maintain_order,86options,87);88Ok(builder.build())89}90}919293