Path: blob/main/crates/polars-mem-engine/src/executors/group_by.rs
6940 views
use rayon::prelude::*;12use super::*;34pub(super) fn evaluate_aggs(5df: &DataFrame,6aggs: &[Arc<dyn PhysicalExpr>],7groups: &GroupPositions,8state: &ExecutionState,9) -> PolarsResult<Vec<Column>> {10POOL.install(|| {11aggs.par_iter()12.map(|expr| {13let agg = expr.evaluate_on_groups(df, groups, state)?.finalize();14polars_ensure!(agg.len() == groups.len(), agg_len = agg.len(), groups.len());15Ok(agg)16})17.collect::<PolarsResult<Vec<_>>>()18})19}2021/// Take an input Executor and a multiple expressions22pub struct GroupByExec {23input: Box<dyn Executor>,24keys: Vec<Arc<dyn PhysicalExpr>>,25aggs: Vec<Arc<dyn PhysicalExpr>>,26apply: Option<PlanCallback<DataFrame, DataFrame>>,27maintain_order: bool,28input_schema: SchemaRef,29slice: Option<(i64, usize)>,30}3132impl GroupByExec {33#[allow(clippy::too_many_arguments)]34pub(crate) fn new(35input: Box<dyn Executor>,36keys: Vec<Arc<dyn PhysicalExpr>>,37aggs: Vec<Arc<dyn PhysicalExpr>>,38apply: Option<PlanCallback<DataFrame, DataFrame>>,39maintain_order: bool,40input_schema: SchemaRef,41slice: Option<(i64, usize)>,42) -> Self {43Self {44input,45keys,46aggs,47apply,48maintain_order,49input_schema,50slice,51}52}53}5455#[allow(clippy::too_many_arguments)]56pub(super) fn group_by_helper(57mut df: DataFrame,58keys: Vec<Column>,59aggs: &[Arc<dyn PhysicalExpr>],60apply: Option<PlanCallback<DataFrame, DataFrame>>,61state: &ExecutionState,62maintain_order: bool,63slice: Option<(i64, usize)>,64) -> PolarsResult<DataFrame> {65df.as_single_chunk_par();66let gb = df.group_by_with_series(keys, true, maintain_order)?;6768if let Some(f) = apply {69return gb.sliced(slice).apply(move |df| f.call(df));70}7172let mut groups = gb.get_groups();7374#[allow(unused_assignments)]75// it is unused because we only use it to keep the lifetime of sliced_group valid76let mut sliced_groups = None;7778if let Some((offset, len)) = slice {79sliced_groups = Some(groups.slice(offset, len));80groups = sliced_groups.as_ref().unwrap();81}8283let (mut columns, agg_columns) = POOL.install(|| {84let get_columns = || gb.keys_sliced(slice);8586let get_agg = || evaluate_aggs(&df, aggs, groups, state);8788rayon::join(get_columns, get_agg)89});9091columns.extend(agg_columns?);92DataFrame::new(columns)93}9495impl GroupByExec {96fn execute_impl(&mut self, state: &ExecutionState, df: DataFrame) -> PolarsResult<DataFrame> {97let keys = self98.keys99.iter()100.map(|e| e.evaluate(&df, state))101.collect::<PolarsResult<_>>()?;102group_by_helper(103df,104keys,105&self.aggs,106self.apply.take(),107state,108self.maintain_order,109self.slice,110)111}112}113114impl Executor for GroupByExec {115fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {116state.should_stop()?;117#[cfg(debug_assertions)]118{119if state.verbose() {120eprintln!("run GroupbyExec")121}122}123if state.verbose() {124eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")125}126let df = self.input.execute(state)?;127128let profile_name = if state.has_node_timer() {129let by = self130.keys131.iter()132.map(|s| Ok(s.to_field(&self.input_schema)?.name))133.collect::<PolarsResult<Vec<_>>>()?;134let name = comma_delimited("group_by".to_string(), &by);135Cow::Owned(name)136} else {137Cow::Borrowed("")138};139140if state.has_node_timer() {141let new_state = state.clone();142new_state.record(|| self.execute_impl(state, df), profile_name)143} else {144self.execute_impl(state, df)145}146}147}148149150