Path: blob/main/crates/polars-mem-engine/src/executors/group_by.rs
8421 views
use rayon::prelude::*;12use super::*;34pub(super) fn evaluate_aggs(5df: &DataFrame,6aggs: &[Arc<dyn PhysicalExpr>],7groups: &GroupPositions,8state: &ExecutionState,9) -> PolarsResult<Vec<Column>> {10POOL.install(|| {11aggs.par_iter()12.map(|expr| {13let agg = expr.evaluate_on_groups(df, groups, state)?.finalize();14polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());15Ok(agg)16})17.collect::<PolarsResult<Vec<_>>>()18})19}2021/// Take an input Executor and a multiple expressions22pub struct GroupByExec {23input: Box<dyn Executor>,24keys: Vec<Arc<dyn PhysicalExpr>>,25aggs: Vec<Arc<dyn PhysicalExpr>>,26apply: Option<PlanCallback<DataFrame, DataFrame>>,27maintain_order: bool,28input_schema: SchemaRef,29output_schema: SchemaRef,30slice: Option<(i64, usize)>,31}3233impl GroupByExec {34#[allow(clippy::too_many_arguments)]35pub(crate) fn new(36input: Box<dyn Executor>,37keys: Vec<Arc<dyn PhysicalExpr>>,38aggs: Vec<Arc<dyn PhysicalExpr>>,39apply: Option<PlanCallback<DataFrame, DataFrame>>,40maintain_order: bool,41input_schema: SchemaRef,42output_schema: SchemaRef,43slice: Option<(i64, usize)>,44) -> Self {45Self {46input,47keys,48aggs,49apply,50maintain_order,51input_schema,52output_schema,53slice,54}55}56}5758#[allow(clippy::too_many_arguments)]59pub(super) fn group_by_helper(60mut df: DataFrame,61keys: Vec<Column>,62aggs: &[Arc<dyn PhysicalExpr>],63apply: Option<PlanCallback<DataFrame, DataFrame>>,64state: &ExecutionState,65maintain_order: bool,66output_schema: &SchemaRef,67slice: Option<(i64, usize)>,68) -> PolarsResult<DataFrame> {69df.rechunk_mut_par();70let gb = df.group_by_with_series(keys, true, maintain_order)?;7172if let Some(f) = apply {73return gb.apply_sliced(slice, move |df| f.call(df), Some(output_schema));74}7576let mut groups = gb.get_groups();7778#[allow(unused_assignments)]79// it is unused because we only use it to keep the lifetime of sliced_group valid80let mut sliced_groups = None;8182if let Some((offset, len)) = slice {83sliced_groups = Some(groups.slice(offset, len));84groups = sliced_groups.as_ref().unwrap();85}8687let (mut columns, agg_columns) = POOL.install(|| {88let get_columns = || gb.keys_sliced(slice);8990let get_agg = || evaluate_aggs(&df, aggs, groups, state);9192rayon::join(get_columns, get_agg)93});9495columns.extend(agg_columns?);96DataFrame::new_infer_height(columns)97}9899impl GroupByExec {100fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {101let keys = self102.keys103.iter()104.map(|e| e.evaluate(&df, state))105.collect::<PolarsResult<_>>()?;106group_by_helper(107df,108keys,109&self.aggs,110self.apply.take(),111state,112self.maintain_order,113&self.output_schema,114self.slice,115)116}117}118119impl Executor for GroupByExec {120fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {121state.should_stop()?;122#[cfg(debug_assertions)]123{124if state.verbose() {125eprintln!("run GroupbyExec")126}127}128if state.verbose() {129eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")130}131let df = self.input.execute(state)?;132133let profile_name = if state.has_node_timer() {134let by = self135.keys136.iter()137.map(|s| Ok(s.to_field(&self.input_schema)?.name))138.collect::<PolarsResult<Vec<_>>>()?;139let name = comma_delimited("group_by".to_string(), &by);140Cow::Owned(name)141} else {142Cow::Borrowed("")143};144145if state.has_node_timer() {146let new_state = state.clone();147new_state.record(|| self.execute_impl(state, df), profile_name)148} else {149self.execute_impl(state, df)150}151}152}153154155