Path: blob/main/crates/polars-mem-engine/src/executors/group_by_partitioned.rs
6940 views
use polars_core::series::IsSorted;1use polars_core::utils::{accumulate_dataframes_vertical, split_df};2use rayon::prelude::*;34use super::*;56/// Take an input Executor and a multiple expressions7pub struct PartitionGroupByExec {8input: Box<dyn Executor>,9phys_keys: Vec<Arc<dyn PhysicalExpr>>,10phys_aggs: Vec<Arc<dyn PhysicalExpr>>,11maintain_order: bool,12slice: Option<(i64, usize)>,13input_schema: SchemaRef,14output_schema: SchemaRef,15from_partitioned_ds: bool,16#[allow(dead_code)]17keys: Vec<Expr>,18#[allow(dead_code)]19aggs: Vec<Expr>,20}2122impl PartitionGroupByExec {23#[allow(clippy::too_many_arguments)]24pub(crate) fn new(25input: Box<dyn Executor>,26phys_keys: Vec<Arc<dyn PhysicalExpr>>,27phys_aggs: Vec<Arc<dyn PhysicalExpr>>,28maintain_order: bool,29slice: Option<(i64, usize)>,30input_schema: SchemaRef,31output_schema: SchemaRef,32from_partitioned_ds: bool,33keys: Vec<Expr>,34aggs: Vec<Expr>,35) -> Self {36Self {37input,38phys_keys,39phys_aggs,40maintain_order,41slice,42input_schema,43output_schema,44from_partitioned_ds,45keys,46aggs,47}48}4950fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Column>> {51compute_keys(&self.phys_keys, df, state)52}53}5455fn compute_keys(56keys: &[Arc<dyn PhysicalExpr>],57df: &DataFrame,58state: &ExecutionState,59) -> PolarsResult<Vec<Column>> {60let evaluated = keys61.iter()62.map(|s| s.evaluate(df, state))63.collect::<PolarsResult<_>>()?;64let df = check_expand_literals(df, keys, evaluated, false, Default::default())?;65Ok(df.take_columns())66}6768fn run_partitions(69df: &mut DataFrame,70exec: &PartitionGroupByExec,71state: &ExecutionState,72n_threads: usize,73maintain_order: bool,74) -> PolarsResult<(Vec<DataFrame>, Vec<Vec<Column>>)> {75// We do a partitioned group_by.76// Meaning that we first do the group_by operation arbitrarily77// split on several threads. Than the final result we apply the same group_by again.78let dfs = split_df(df, n_threads, true);7980let phys_aggs = &exec.phys_aggs;81let keys = &exec.phys_keys;8283let mut keys = DataFrame::from_iter(compute_keys(keys, df, state)?);84let splitted_keys = split_df(&mut keys, n_threads, true);8586POOL.install(|| {87dfs.into_par_iter()88.zip(splitted_keys)89.map(|(df, keys)| {90let gb = df.group_by_with_series(keys.into(), false, maintain_order)?;91let groups = gb.get_groups();9293let mut columns = gb.keys();94// don't naively call par_iter here, it will segfault in rayon95// if you do, throw it on the POOL threadpool.96let agg_columns = phys_aggs97.iter()98.map(|expr| {99let agg_expr = expr.as_partitioned_aggregator().unwrap();100let agg = agg_expr.evaluate_partitioned(&df, groups, state)?;101Ok(if agg.len() != groups.len() {102polars_ensure!(agg.len() == 1, agg_len = agg.len(), groups.len());103match groups.len() {1040 => agg.clear(),105len => agg.new_from_index(0, len),106}107} else {108agg109}110.into_column())111})112.collect::<PolarsResult<Vec<_>>>()?;113114columns.extend_from_slice(&agg_columns);115116let df = DataFrame::new(columns)?;117Ok((df, gb.keys()))118})119.collect()120})121}122123fn estimate_unique_count(keys: &[Column], mut sample_size: usize) -> PolarsResult<usize> {124// https://stats.stackexchange.com/a/19090/147321125// estimated unique size126// u + ui / m (s - m)127// s: set_size128// m: sample_size129// u: total unique groups counted in sample130// ui: groups with single unique value counted in sample131let set_size = keys[0].len();132if set_size < sample_size {133sample_size = set_size;134}135136let finish = |groups: &GroupsType| {137let u = groups.len() as f64;138let ui = if groups.len() == sample_size {139u140} else {141groups.iter().filter(|g| g.len() == 1).count() as f64142};143144(u + (ui / sample_size as f64) * (set_size - sample_size) as f64) as usize145};146147if keys.len() == 1 {148// we sample as that will work also with sorted data.149// not that sampling without replacement is *very* expensive. don't do that.150let s = keys[0].sample_n(sample_size, true, false, None).unwrap();151// fast multi-threaded way to get unique.152let groups = s.as_materialized_series().group_tuples(true, false)?;153Ok(finish(&groups))154} else {155let offset = (keys[0].len() / 2) as i64;156let df = unsafe { DataFrame::new_no_checks_height_from_first(keys.to_vec()) };157let df = df.slice(offset, sample_size);158let names = df.get_column_names().into_iter().cloned();159let gb = df.group_by(names).unwrap();160Ok(finish(gb.get_groups()))161}162}163164// Lower this at debug builds so that we hit this in the test suite.165#[cfg(debug_assertions)]166const PARTITION_LIMIT: usize = 15;167#[cfg(not(debug_assertions))]168const PARTITION_LIMIT: usize = 1000;169170// Checks if we should run normal or default aggregation171// by sampling data.172fn can_run_partitioned(173keys: &[Column],174original_df: &DataFrame,175state: &ExecutionState,176from_partitioned_ds: bool,177) -> PolarsResult<bool> {178if !keys179.iter()180.take(1)181.all(|s| matches!(s.is_sorted_flag(), IsSorted::Not))182{183if state.verbose() {184eprintln!("FOUND SORTED KEY: running default HASH AGGREGATION")185}186Ok(false)187} else if std::env::var("POLARS_NO_PARTITION").is_ok() {188if state.verbose() {189eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")190}191Ok(false)192} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {193if state.verbose() {194eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")195}196Ok(true)197} else if original_df.height() < PARTITION_LIMIT && !cfg!(test) {198if state.verbose() {199eprintln!("DATAFRAME < {PARTITION_LIMIT} rows: running default HASH AGGREGATION")200}201Ok(false)202} else {203// below this boundary we assume the partitioned group_by will be faster204let unique_count_boundary = std::env::var("POLARS_PARTITION_UNIQUE_COUNT")205.map(|s| s.parse::<usize>().unwrap())206.unwrap_or(1000);207208let (unique_estimate, sampled_method) = match (keys.len(), keys[0].dtype()) {209#[cfg(feature = "dtype-categorical")]210(1, DataType::Categorical(_, mapping) | DataType::Enum(_, mapping)) => {211(mapping.num_cats_upper_bound(), "known")212},213_ => {214// sqrt(N) is a good sample size as it remains low on large numbers215// it is better than taking a fraction as it saturates216let sample_size = (original_df.height() as f64).powf(0.5) as usize;217218// we never sample less than 100 data points.219let sample_size = std::cmp::max(100, sample_size);220(estimate_unique_count(keys, sample_size)?, "estimated")221},222};223if state.verbose() {224eprintln!("{sampled_method} unique values: {unique_estimate}");225}226227if from_partitioned_ds {228let estimated_cardinality = unique_estimate as f32 / original_df.height() as f32;229if estimated_cardinality < 0.4 {230if state.verbose() {231eprintln!("PARTITIONED DS");232}233Ok(true)234} else {235if state.verbose() {236eprintln!(237"PARTITIONED DS: estimated cardinality: {estimated_cardinality} exceeded the boundary: 0.4, running default HASH AGGREGATION"238);239}240Ok(false)241}242} else if unique_estimate > unique_count_boundary {243if state.verbose() {244eprintln!(245"estimated unique count: {unique_estimate} exceeded the boundary: {unique_count_boundary}, running default HASH AGGREGATION"246)247}248Ok(false)249} else {250Ok(true)251}252}253}254255impl PartitionGroupByExec {256fn execute_impl(257&mut self,258state: &mut ExecutionState,259mut original_df: DataFrame,260) -> PolarsResult<DataFrame> {261let (splitted_dfs, splitted_keys) = {262// already get the keys. This is the very last minute decision which group_by method we choose.263// If the column is a categorical, we know the number of groups we have and can decide to continue264// partitioned or go for the standard group_by. The partitioned is likely to be faster on a small number265// of groups.266let keys = self.keys(&original_df, state)?;267268if !can_run_partitioned(&keys, &original_df, state, self.from_partitioned_ds)? {269return group_by_helper(270original_df,271keys,272&self.phys_aggs,273None,274state,275self.maintain_order,276self.slice,277);278}279280if state.verbose() {281eprintln!("run PARTITIONED HASH AGGREGATION")282}283284// Run the partitioned aggregations285let n_threads = POOL.current_num_threads();286287run_partitions(288&mut original_df,289self,290state,291n_threads,292self.maintain_order,293)?294};295296// MERGE phase297298let df = accumulate_dataframes_vertical(splitted_dfs)?;299let keys = splitted_keys300.into_iter()301.reduce(|mut acc, e| {302acc.iter_mut().zip(e).for_each(|(acc, e)| {303let _ = acc.append(&e);304});305acc306})307.unwrap();308309// the partitioned group_by has added columns so we must update the schema.310state.set_schema(self.output_schema.clone());311312// merge and hash aggregate again313314// first get mutable access and optionally sort315let gb = df.group_by_with_series(keys, true, self.maintain_order)?;316let mut groups = gb.get_groups();317318#[allow(unused_assignments)]319// it is unused because we only use it to keep the lifetime of sliced_group valid320let mut sliced_groups = None;321322if let Some((offset, len)) = self.slice {323sliced_groups = Some(groups.slice(offset, len));324groups = sliced_groups.as_ref().unwrap();325}326327let get_columns = || gb.keys_sliced(self.slice);328let get_agg = || {329let out: PolarsResult<Vec<_>> = self330.phys_aggs331.par_iter()332// we slice the keys off and finalize every aggregation333.zip(&df.get_columns()[self.phys_keys.len()..])334.map(|(expr, partitioned_s)| {335let agg_expr = expr.as_partitioned_aggregator().unwrap();336agg_expr.finalize(partitioned_s.clone(), groups, state)337})338.collect();339340out341};342let (mut columns, agg_columns): (Vec<_>, _) = POOL.join(get_columns, get_agg);343344columns.extend(agg_columns?);345state.clear_schema_cache();346347Ok(DataFrame::new(columns).unwrap())348}349}350351impl Executor for PartitionGroupByExec {352fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {353state.should_stop()?;354#[cfg(debug_assertions)]355{356if state.verbose() {357eprintln!("run PartitionGroupbyExec")358}359}360let original_df = self.input.execute(state)?;361362let profile_name = if state.has_node_timer() {363let by = self364.phys_keys365.iter()366.map(|s| Ok(s.to_field(&self.input_schema)?.name))367.collect::<PolarsResult<Vec<_>>>()?;368let name = comma_delimited("group_by_partitioned".to_string(), &by);369Cow::Owned(name)370} else {371Cow::Borrowed("")372};373if state.has_node_timer() {374let new_state = state.clone();375new_state.record(|| self.execute_impl(state, original_df), profile_name)376} else {377self.execute_impl(state, original_df)378}379}380}381382383