Path: blob/main/crates/polars-mem-engine/src/executors/group_by_streaming.rs
7884 views
use std::borrow::Cow;1use std::sync::Arc;23use polars_core::frame::DataFrame;4#[cfg(feature = "dtype-categorical")]5use polars_core::prelude::DataType;6use polars_core::prelude::{Column, GroupsType};7use polars_core::schema::Schema;8use polars_core::series::IsSorted;9use polars_error::PolarsResult;10use polars_expr::prelude::PhysicalExpr;11use polars_expr::state::ExecutionState;12use polars_plan::plans::{AExpr, IR, IRPlan};13use polars_utils::arena::{Arena, Node};1415use super::{Executor, check_expand_literals, group_by_helper};16use crate::StreamingExecutorBuilder;1718pub struct GroupByStreamingExec {19input_exec: Box<dyn Executor>,20input_scan_node: Node,21plan: IRPlan,22builder: StreamingExecutorBuilder,2324phys_keys: Vec<Arc<dyn PhysicalExpr>>,25phys_aggs: Vec<Arc<dyn PhysicalExpr>>,26maintain_order: bool,27slice: Option<(i64, usize)>,28from_partitioned_ds: bool,29}3031impl GroupByStreamingExec {32#[expect(clippy::too_many_arguments)]33pub fn new(34input: Box<dyn Executor>,35builder: StreamingExecutorBuilder,36root: Node,37lp_arena: &mut Arena<IR>,38expr_arena: &Arena<AExpr>,3940phys_keys: Vec<Arc<dyn PhysicalExpr>>,41phys_aggs: Vec<Arc<dyn PhysicalExpr>>,42maintain_order: bool,43slice: Option<(i64, usize)>,44from_partitioned_ds: bool,45) -> Self {46// Create a DataFrame scan for injecting the input result47let scan = lp_arena.add(IR::DataFrameScan {48df: Arc::new(DataFrame::empty()),49schema: Arc::new(Schema::default()),50output_schema: None,51});5253let IR::GroupBy {54input: gb_input, ..55} = lp_arena.get_mut(root)56else {57unreachable!();58};5960// Set the scan as the group by input61*gb_input = scan;6263// Prune the subplan into separate arenas64let mut new_ir_arena = Arena::new();65let mut new_expr_arena = Arena::new();66let [new_root, new_scan] = polars_plan::plans::prune::prune(67&[root, scan],68lp_arena,69expr_arena,70&mut new_ir_arena,71&mut new_expr_arena,72)73.try_into()74.unwrap();7576let plan = IRPlan {77lp_top: new_root,78lp_arena: new_ir_arena,79expr_arena: new_expr_arena,80};8182Self {83input_exec: input,84input_scan_node: new_scan,85plan,86builder,87phys_keys,88phys_aggs,89maintain_order,90slice,91from_partitioned_ds,92}93}9495fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Column>> {96compute_keys(&self.phys_keys, df, state)97}98}99100fn compute_keys(101keys: &[Arc<dyn PhysicalExpr>],102df: &DataFrame,103state: &ExecutionState,104) -> PolarsResult<Vec<Column>> {105let evaluated = keys106.iter()107.map(|s| s.evaluate(df, state))108.collect::<PolarsResult<_>>()?;109let df = check_expand_literals(df, keys, evaluated, false, Default::default())?;110Ok(df.take_columns())111}112113fn estimate_unique_count(keys: &[Column], mut sample_size: usize) -> PolarsResult<usize> {114// https://stats.stackexchange.com/a/19090/147321115// estimated unique size116// u + ui / m (s - m)117// s: set_size118// m: sample_size119// u: total unique groups counted in sample120// ui: groups with single unique value counted in sample121let set_size = keys[0].len();122if set_size < sample_size {123sample_size = set_size;124}125126let finish = |groups: &GroupsType| {127let u = groups.len() as f64;128let ui = if groups.len() == sample_size {129u130} else {131groups.iter().filter(|g| g.len() == 1).count() as f64132};133134(u + (ui / sample_size as f64) * (set_size - sample_size) as f64) as usize135};136137if keys.len() == 1 {138// we sample as that will work also with sorted data.139// not that sampling without replacement is *very* expensive. don't do that.140let s = keys[0].sample_n(sample_size, true, false, None).unwrap();141// fast multi-threaded way to get unique.142let groups = s.as_materialized_series().group_tuples(true, false)?;143Ok(finish(&groups))144} else {145let offset = (keys[0].len() / 2) as i64;146let df = unsafe { DataFrame::new_no_checks_height_from_first(keys.to_vec()) };147let df = df.slice(offset, sample_size);148let names = df.get_column_names().into_iter().cloned();149let gb = df.group_by(names).unwrap();150Ok(finish(gb.get_groups()))151}152}153154// Lower this at debug builds so that we hit this in the test suite.155#[cfg(debug_assertions)]156const PARTITION_LIMIT: usize = 15;157#[cfg(not(debug_assertions))]158const PARTITION_LIMIT: usize = 1000;159160// Checks if we should run normal or default aggregation161// by sampling data.162fn can_run_partitioned(163keys: &[Column],164original_df: &DataFrame,165state: &ExecutionState,166from_partitioned_ds: bool,167) -> PolarsResult<bool> {168if !keys169.iter()170.take(1)171.all(|s| matches!(s.is_sorted_flag(), IsSorted::Not))172{173if state.verbose() {174eprintln!("FOUND SORTED KEY: running default HASH AGGREGATION")175}176Ok(false)177} else if std::env::var("POLARS_NO_PARTITION").is_ok() {178if state.verbose() {179eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")180}181Ok(false)182} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {183if state.verbose() {184eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")185}186Ok(true)187} else if original_df.height() < PARTITION_LIMIT && !cfg!(test) {188if state.verbose() {189eprintln!("DATAFRAME < {PARTITION_LIMIT} rows: running default HASH AGGREGATION")190}191Ok(false)192} else {193// below this boundary we assume the partitioned group_by will be faster194let unique_count_boundary = std::env::var("POLARS_PARTITION_UNIQUE_COUNT")195.map(|s| s.parse::<usize>().unwrap())196.unwrap_or(1000);197198let (unique_estimate, sampled_method) = match (keys.len(), keys[0].dtype()) {199#[cfg(feature = "dtype-categorical")]200(1, DataType::Categorical(_, mapping) | DataType::Enum(_, mapping)) => {201(mapping.num_cats_upper_bound(), "known")202},203_ => {204// sqrt(N) is a good sample size as it remains low on large numbers205// it is better than taking a fraction as it saturates206let sample_size = (original_df.height() as f64).powf(0.5) as usize;207208// we never sample less than 100 data points.209let sample_size = std::cmp::max(100, sample_size);210(estimate_unique_count(keys, sample_size)?, "estimated")211},212};213if state.verbose() {214eprintln!("{sampled_method} unique values: {unique_estimate}");215}216217if from_partitioned_ds {218let estimated_cardinality = unique_estimate as f32 / original_df.height() as f32;219if estimated_cardinality < 0.4 {220if state.verbose() {221eprintln!("PARTITIONED DS");222}223Ok(true)224} else {225if state.verbose() {226eprintln!(227"PARTITIONED DS: estimated cardinality: {estimated_cardinality} exceeded the boundary: 0.4, running default HASH AGGREGATION"228);229}230Ok(false)231}232} else if unique_estimate > unique_count_boundary {233if state.verbose() {234eprintln!(235"estimated unique count: {unique_estimate} exceeded the boundary: {unique_count_boundary}, running default HASH AGGREGATION"236)237}238Ok(false)239} else {240Ok(true)241}242}243}244245impl Executor for GroupByStreamingExec {246fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {247let name = "streaming_group_by";248state.should_stop()?;249#[cfg(debug_assertions)]250{251if state.verbose() {252eprintln!("run {name}")253}254}255let input_df = self.input_exec.execute(state)?;256257let profile_name = if state.has_node_timer() {258Cow::Owned(format!(".{name}()"))259} else {260Cow::Borrowed("")261};262263let keys = self.keys(&input_df, state)?;264265if !can_run_partitioned(&keys, &input_df, state, self.from_partitioned_ds)? {266return group_by_helper(267input_df,268keys,269&self.phys_aggs,270None,271state,272self.maintain_order,273self.slice,274);275}276277// Insert the input DataFrame into our DataFrame scan node278if let IR::DataFrameScan { df, schema, .. } =279self.plan.lp_arena.get_mut(self.input_scan_node)280{281*schema = input_df.schema().clone();282*df = Arc::new(input_df);283} else {284unreachable!();285}286287let mut streaming_exec = (self.builder)(288self.plan.lp_top,289&mut self.plan.lp_arena,290&mut self.plan.expr_arena,291)?;292293state294.clone()295.record(|| streaming_exec.execute(state), profile_name)296}297}298299300