Path: blob/main/crates/polars-expr/src/expressions/aggregation.rs
8415 views
use std::borrow::Cow;12use arrow::legacy::utils::CustomIterTools;3use polars_compute::rolling::QuantileMethod;4use polars_core::POOL;5use polars_core::prelude::*;6use polars_core::series::IsSorted;7use polars_core::utils::{_split_offsets, NoNull};8use polars_ops::prelude::ArgAgg;9#[cfg(feature = "propagate_nans")]10use polars_ops::prelude::nan_propagating_aggregate;11use polars_utils::itertools::Itertools;12use rayon::prelude::*;1314use super::*;15use crate::expressions::AggState::AggregatedScalar;16use crate::expressions::{AggState, AggregationContext, PhysicalExpr, UpdateGroups};17use crate::reduce::GroupedReduction;1819#[derive(Debug, Clone, Copy)]20pub struct AggregationType {21pub(crate) groupby: GroupByMethod,22pub(crate) allow_threading: bool,23}2425pub(crate) struct AggregationExpr {26pub(crate) input: Arc<dyn PhysicalExpr>,27pub(crate) agg_type: AggregationType,28pub(crate) output_field: Field,29}3031impl AggregationExpr {32pub fn new(33expr: Arc<dyn PhysicalExpr>,34agg_type: AggregationType,35output_field: Field,36) -> Self {37Self {38input: expr,39agg_type,40output_field,41}42}43}4445impl PhysicalExpr for AggregationExpr {46fn as_expression(&self) -> Option<&Expr> {47None48}4950fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {51let s = self.input.evaluate(df, state)?;5253let AggregationType {54groupby,55allow_threading,56} = self.agg_type;5758let is_float = s.dtype().is_float();59let group_by = match groupby {60GroupByMethod::NanMin if !is_float => GroupByMethod::Min,61GroupByMethod::NanMax if !is_float => GroupByMethod::Max,62gb => gb,63};6465match group_by {66GroupByMethod::Min => match s.is_sorted_flag() {67IsSorted::Ascending | IsSorted::Descending => {68s.min_reduce().map(|sc| sc.into_column(s.name().clone()))69},70IsSorted::Not => parallel_op_columns(71|s| s.min_reduce().map(|sc| sc.into_column(s.name().clone())),72s,73allow_threading,74),75},76#[cfg(feature = "propagate_nans")]77GroupByMethod::NanMin => parallel_op_columns(78|s| {79Ok(polars_ops::prelude::nan_propagating_aggregate::nan_min_s(80s.as_materialized_series(),81s.name().clone(),82)83.into_column())84},85s,86allow_threading,87),88#[cfg(not(feature = "propagate_nans"))]89GroupByMethod::NanMin => {90panic!("activate 'propagate_nans' feature")91},92GroupByMethod::Max => match s.is_sorted_flag() {93IsSorted::Ascending | IsSorted::Descending => {94s.max_reduce().map(|sc| sc.into_column(s.name().clone()))95},96IsSorted::Not => parallel_op_columns(97|s| s.max_reduce().map(|sc| sc.into_column(s.name().clone())),98s,99allow_threading,100),101},102#[cfg(feature = "propagate_nans")]103GroupByMethod::NanMax => parallel_op_columns(104|s| {105Ok(polars_ops::prelude::nan_propagating_aggregate::nan_max_s(106s.as_materialized_series(),107s.name().clone(),108)109.into_column())110},111s,112allow_threading,113),114#[cfg(not(feature = "propagate_nans"))]115GroupByMethod::NanMax => {116panic!("activate 'propagate_nans' feature")117},118GroupByMethod::Median => s.median_reduce().map(|sc| sc.into_column(s.name().clone())),119GroupByMethod::Mean => s.mean_reduce().map(|sc| sc.into_column(s.name().clone())),120GroupByMethod::First => Ok(if s.is_empty() {121Column::full_null(s.name().clone(), 1, s.dtype())122} else {123s.head(Some(1))124}),125GroupByMethod::FirstNonNull => Ok(s126.as_materialized_series_maintain_scalar()127.first_non_null()128.into_column(s.name().clone())),129GroupByMethod::Last => Ok(if s.is_empty() {130Column::full_null(s.name().clone(), 1, s.dtype())131} else {132s.tail(Some(1))133}),134GroupByMethod::LastNonNull => Ok(s135.as_materialized_series_maintain_scalar()136.last_non_null()137.into_column(s.name().clone())),138GroupByMethod::Item { allow_empty } => Ok(match s.len() {1390 if allow_empty => Column::full_null(s.name().clone(), 1, s.dtype()),1401 => s,141n => polars_bail!(item_agg_count_not_one = n, allow_empty = allow_empty),142}),143GroupByMethod::Sum => parallel_op_columns(144|s| s.sum_reduce().map(|sc| sc.into_column(s.name().clone())),145s,146allow_threading,147),148GroupByMethod::Groups => unreachable!(),149GroupByMethod::NUnique => s.n_unique().map(|count| {150IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column()151}),152GroupByMethod::Count { include_nulls } => {153let count = s.len() - s.null_count() * !include_nulls as usize;154155Ok(IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column())156},157GroupByMethod::Implode => s.implode().map(|ca| ca.into_column()),158GroupByMethod::Std(ddof) => s159.std_reduce(ddof)160.map(|sc| sc.into_column(s.name().clone())),161GroupByMethod::Var(ddof) => s162.var_reduce(ddof)163.map(|sc| sc.into_column(s.name().clone())),164GroupByMethod::Quantile(_, _) => unimplemented!(),165GroupByMethod::ArgMin => {166let opt = s.as_materialized_series().arg_min();167Ok(opt.map_or_else(168|| Column::full_null(s.name().clone(), 1, &IDX_DTYPE),169|idx| {170Column::new_scalar(171s.name().clone(),172Scalar::new_idxsize(idx.try_into().unwrap()),1731,174)175},176))177},178GroupByMethod::ArgMax => {179let opt = s.as_materialized_series().arg_max();180Ok(opt.map_or_else(181|| Column::full_null(s.name().clone(), 1, &IDX_DTYPE),182|idx| {183Column::new_scalar(184s.name().clone(),185Scalar::new_idxsize(idx.try_into().unwrap()),1861,187)188},189))190},191}192}193194#[allow(clippy::ptr_arg)]195fn evaluate_on_groups<'a>(196&self,197df: &DataFrame,198groups: &'a GroupPositions,199state: &ExecutionState,200) -> PolarsResult<AggregationContext<'a>> {201let mut ac = self.input.evaluate_on_groups(df, groups, state)?;202203// don't change names by aggregations as is done in polars-core204let keep_name = ac.get_values().name().clone();205206if let AggState::LiteralScalar(c) = &mut ac.state {207*c = self.evaluate(df, state)?;208return Ok(ac);209}210211// AggregatedScalar has no defined group structure. We fix it up here, so that we can212// reliably call `agg_*` functions with the groups.213ac.set_groups_for_undefined_agg_states();214215// SAFETY:216// groups must always be in bounds.217let out = unsafe {218match self.agg_type.groupby {219GroupByMethod::Min => {220let (c, groups) = ac.get_final_aggregation();221let agg_c = c.agg_min(&groups);222AggregatedScalar(agg_c.with_name(keep_name))223},224GroupByMethod::Max => {225let (c, groups) = ac.get_final_aggregation();226let agg_c = c.agg_max(&groups);227AggregatedScalar(agg_c.with_name(keep_name))228},229GroupByMethod::ArgMin => {230let (c, groups) = ac.get_final_aggregation();231let agg_c = c.agg_arg_min(&groups);232AggregatedScalar(agg_c.with_name(keep_name))233},234GroupByMethod::ArgMax => {235let (c, groups) = ac.get_final_aggregation();236let agg_c = c.agg_arg_max(&groups);237AggregatedScalar(agg_c.with_name(keep_name))238},239GroupByMethod::Median => {240let (c, groups) = ac.get_final_aggregation();241let agg_c = c.agg_median(&groups);242AggregatedScalar(agg_c.with_name(keep_name))243},244GroupByMethod::Mean => {245let (c, groups) = ac.get_final_aggregation();246let agg_c = c.agg_mean(&groups);247AggregatedScalar(agg_c.with_name(keep_name))248},249GroupByMethod::Sum => {250let (c, groups) = ac.get_final_aggregation();251let agg_c = c.agg_sum(&groups);252AggregatedScalar(agg_c.with_name(keep_name))253},254GroupByMethod::Count { include_nulls } => {255if include_nulls || ac.get_values().null_count() == 0 {256// a few fast paths that prevent materializing new groups257match ac.update_groups {258UpdateGroups::WithSeriesLen => {259let list = ac260.get_values()261.list()262.expect("impl error, should be a list at this point");263264let mut s = match list.chunks().len() {2651 => {266let arr = list.downcast_iter().next().unwrap();267let offsets = arr.offsets().as_slice();268269let mut previous = 0i64;270let counts: NoNull<IdxCa> = offsets[1..]271.iter()272.map(|&o| {273let len = (o - previous) as IdxSize;274previous = o;275len276})277.collect_trusted();278counts.into_inner()279},280_ => {281let counts: NoNull<IdxCa> = list282.amortized_iter()283.map(|s| {284if let Some(s) = s {285s.as_ref().len() as IdxSize286} else {2871288}289})290.collect_trusted();291counts.into_inner()292},293};294s.rename(keep_name);295AggregatedScalar(s.into_column())296},297UpdateGroups::WithGroupsLen => {298// no need to update the groups299// we can just get the attribute, because we only need the length,300// not the correct order301let mut ca = ac.groups.group_count();302ca.rename(keep_name);303AggregatedScalar(ca.into_column())304},305// materialize groups306_ => {307let mut ca = ac.groups().group_count();308ca.rename(keep_name);309AggregatedScalar(ca.into_column())310},311}312} else {313// TODO: optimize this/and write somewhere else.314match ac.agg_state() {315AggState::LiteralScalar(_) => unreachable!(),316AggState::AggregatedScalar(c) => AggregatedScalar(317c.is_not_null().cast(&IDX_DTYPE).unwrap().into_column(),318),319AggState::AggregatedList(s) => {320let ca = s.list()?;321let out: IdxCa = ca322.into_iter()323.map(|opt_s| {324opt_s325.map(|s| s.len() as IdxSize - s.null_count() as IdxSize)326})327.collect();328AggregatedScalar(out.into_column().with_name(keep_name))329},330AggState::NotAggregated(s) => {331let s = s.clone();332let groups = ac.groups();333let out: IdxCa = if matches!(s.dtype(), &DataType::Null) {334IdxCa::full(s.name().clone(), 0, groups.len())335} else {336match groups.as_ref().as_ref() {337GroupsType::Idx(idx) => {338let s = s.rechunk();339// @scalar-opt340// @partition-opt341let array = &s.as_materialized_series().chunks()[0];342let validity = array.validity().unwrap();343idx.iter()344.map(|(_, g)| {345let mut count = 0 as IdxSize;346// Count valid values347g.iter().for_each(|i| {348count += validity349.get_bit_unchecked(*i as usize)350as IdxSize;351});352count353})354.collect_ca_trusted_with_dtype(keep_name, IDX_DTYPE)355},356GroupsType::Slice { groups, .. } => {357// Slice and use computed null count358groups359.iter()360.map(|g| {361let start = g[0];362let len = g[1];363len - s364.slice(start as i64, len as usize)365.null_count()366as IdxSize367})368.collect_ca_trusted_with_dtype(keep_name, IDX_DTYPE)369},370}371};372AggregatedScalar(out.into_column())373},374}375}376},377GroupByMethod::First => {378let (s, groups) = ac.get_final_aggregation();379let agg_s = s.agg_first(&groups);380AggregatedScalar(agg_s.with_name(keep_name))381},382GroupByMethod::FirstNonNull => {383let (s, groups) = ac.get_final_aggregation();384let agg_s = s.agg_first_non_null(&groups);385AggregatedScalar(agg_s.with_name(keep_name))386},387GroupByMethod::Last => {388let (s, groups) = ac.get_final_aggregation();389let agg_s = s.agg_last(&groups);390AggregatedScalar(agg_s.with_name(keep_name))391},392GroupByMethod::LastNonNull => {393let (s, groups) = ac.get_final_aggregation();394let agg_s = s.agg_last_non_null(&groups);395AggregatedScalar(agg_s.with_name(keep_name))396},397GroupByMethod::Item { allow_empty } => {398let (s, groups) = ac.get_final_aggregation();399for gc in groups.group_count().iter() {400match gc {401Some(0) if allow_empty => continue,402None | Some(1) => continue,403Some(n) => {404polars_bail!(item_agg_count_not_one = n, allow_empty = allow_empty);405},406}407}408let agg_s = s.agg_first(&groups);409AggregatedScalar(agg_s.with_name(keep_name))410},411GroupByMethod::NUnique => {412let (s, groups) = ac.get_final_aggregation();413let agg_s = s.agg_n_unique(&groups);414AggregatedScalar(agg_s.with_name(keep_name))415},416GroupByMethod::Implode => AggregatedScalar(match ac.agg_state() {417AggState::LiteralScalar(_) => unreachable!(), // handled above418AggState::AggregatedScalar(c) => c.as_list().into_column(),419AggState::NotAggregated(_) | AggState::AggregatedList(_) => ac.aggregated(),420}),421GroupByMethod::Groups => {422let mut column: ListChunked = ac.groups().as_list_chunked();423column.rename(keep_name);424AggregatedScalar(column.into_column())425},426GroupByMethod::Std(ddof) => {427let (c, groups) = ac.get_final_aggregation();428let agg_c = c.agg_std(&groups, ddof);429AggregatedScalar(agg_c.with_name(keep_name))430},431GroupByMethod::Var(ddof) => {432let (c, groups) = ac.get_final_aggregation();433let agg_c = c.agg_var(&groups, ddof);434AggregatedScalar(agg_c.with_name(keep_name))435},436GroupByMethod::Quantile(_, _) => {437// implemented explicitly in AggQuantile struct438unimplemented!()439},440GroupByMethod::NanMin => {441#[cfg(feature = "propagate_nans")]442{443let (c, groups) = ac.get_final_aggregation();444let agg_c = if c.dtype().is_float() {445nan_propagating_aggregate::group_agg_nan_min_s(446c.as_materialized_series(),447&groups,448)449.into_column()450} else {451c.agg_min(&groups)452};453AggregatedScalar(agg_c.with_name(keep_name))454}455#[cfg(not(feature = "propagate_nans"))]456{457panic!("activate 'propagate_nans' feature")458}459},460GroupByMethod::NanMax => {461#[cfg(feature = "propagate_nans")]462{463let (c, groups) = ac.get_final_aggregation();464let agg_c = if c.dtype().is_float() {465nan_propagating_aggregate::group_agg_nan_max_s(466c.as_materialized_series(),467&groups,468)469.into_column()470} else {471c.agg_max(&groups)472};473AggregatedScalar(agg_c.with_name(keep_name))474}475#[cfg(not(feature = "propagate_nans"))]476{477panic!("activate 'propagate_nans' feature")478}479},480}481};482483Ok(AggregationContext::from_agg_state(484out,485Cow::Borrowed(groups),486))487}488489fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {490Ok(self.output_field.clone())491}492493fn is_scalar(&self) -> bool {494true495}496}497498pub struct AggQuantileExpr {499input: Arc<dyn PhysicalExpr>,500quantile: Arc<dyn PhysicalExpr>,501method: QuantileMethod,502}503504impl AggQuantileExpr {505pub fn new(506input: Arc<dyn PhysicalExpr>,507quantile: Arc<dyn PhysicalExpr>,508method: QuantileMethod,509) -> Self {510Self {511input,512quantile,513method,514}515}516}517518impl PhysicalExpr for AggQuantileExpr {519fn as_expression(&self) -> Option<&Expr> {520None521}522523fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {524let input = self.input.evaluate(df, state)?;525526let quantile = self.quantile.evaluate(df, state)?;527528polars_ensure!(quantile.len() <= 1, ComputeError:529"polars does not support varying quantiles yet, \530make sure the 'quantile' expression input produces a single quantile or a list of quantiles"531);532533let s = quantile.as_materialized_series();534535match s.dtype() {536DataType::List(_) => {537let list = s.list()?;538let inner_s = list.get_as_series(0).unwrap();539if inner_s.has_nulls() {540polars_bail!(ComputeError: "quantile expression contains null values");541}542543let v: Vec<f64> = inner_s544.cast(&DataType::Float64)?545.f64()?546.into_no_null_iter()547.collect();548549input550.quantiles_reduce(&v, self.method)551.map(|sc| sc.into_column(input.name().clone()))552},553_ => {554let q: f64 = quantile.get(0).unwrap().try_extract()?;555input556.quantile_reduce(q, self.method)557.map(|sc| sc.into_column(input.name().clone()))558},559}560}561562#[allow(clippy::ptr_arg)]563fn evaluate_on_groups<'a>(564&self,565df: &DataFrame,566groups: &'a GroupPositions,567state: &ExecutionState,568) -> PolarsResult<AggregationContext<'a>> {569let mut ac = self.input.evaluate_on_groups(df, groups, state)?;570571// AggregatedScalar has no defined group structure. We fix it up here, so that we can572// reliably call `agg_quantile` functions with the groups.573ac.set_groups_for_undefined_agg_states();574575// don't change names by aggregations as is done in polars-core576let keep_name = ac.get_values().name().clone();577578let quantile_column = self.quantile.evaluate(df, state)?;579polars_ensure!(quantile_column.len() <= 1, ComputeError:580"polars only supports computing a single quantile in a groupby aggregation context"581);582let quantile: f64 = quantile_column.get(0).unwrap().try_extract()?;583584if let AggState::LiteralScalar(c) = &mut ac.state {585*c = c586.quantile_reduce(quantile, self.method)?587.into_column(keep_name);588return Ok(ac);589}590591// SAFETY:592// groups are in bounds593let mut agg = unsafe {594ac.flat_naive()595.into_owned()596.agg_quantile(ac.groups(), quantile, self.method)597};598agg.rename(keep_name);599Ok(AggregationContext::from_agg_state(600AggregatedScalar(agg),601Cow::Borrowed(groups),602))603}604605fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {606// If the quantile expression is a literal that yields a list of floats,607// the aggregation returns a list of quantiles (one list per row/group).608// In that case, report `List(Float64)` as the output field.609let input_field = self.input.to_field(input_schema)?;610match self.quantile.to_field(input_schema) {611Ok(qf) => match qf.dtype() {612DataType::List(inner) => {613if inner.is_float() {614Ok(Field::new(615input_field.name().clone(),616DataType::List(Box::new(DataType::Float64)),617))618} else {619// fallback to input field620Ok(input_field)621}622},623_ => Ok(input_field),624},625Err(_) => Ok(input_field),626}627}628629fn is_scalar(&self) -> bool {630true631}632}633634pub struct AggMinMaxByExpr {635input: Arc<dyn PhysicalExpr>,636by: Arc<dyn PhysicalExpr>,637is_max_by: bool,638}639640impl AggMinMaxByExpr {641pub fn new_min_by(input: Arc<dyn PhysicalExpr>, by: Arc<dyn PhysicalExpr>) -> Self {642Self {643input,644by,645is_max_by: false,646}647}648649pub fn new_max_by(input: Arc<dyn PhysicalExpr>, by: Arc<dyn PhysicalExpr>) -> Self {650Self {651input,652by,653is_max_by: true,654}655}656}657658impl PhysicalExpr for AggMinMaxByExpr {659fn as_expression(&self) -> Option<&Expr> {660None661}662663fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {664let input = self.input.evaluate(df, state)?;665let by = self.by.evaluate(df, state)?;666let name = if self.is_max_by { "max_by" } else { "min_by" };667polars_ensure!(668input.len() == by.len(),669ShapeMismatch: "'by' column in {} expression has incorrect length: expected {}, got {}",670name, input.len(), by.len()671);672let arg_extremum = if self.is_max_by {673by.as_materialized_series_maintain_scalar().arg_max()674} else {675by.as_materialized_series_maintain_scalar().arg_min()676};677let out = if let Some(idx) = arg_extremum {678input.slice(idx as i64, 1)679} else {680let dtype = input.dtype().clone();681Column::new_scalar(input.name().clone(), Scalar::null(dtype), 1)682};683Ok(out)684}685686#[allow(clippy::ptr_arg)]687fn evaluate_on_groups<'a>(688&self,689df: &DataFrame,690groups: &'a GroupPositions,691state: &ExecutionState,692) -> PolarsResult<AggregationContext<'a>> {693let ac = self.input.evaluate_on_groups(df, groups, state)?;694let ac_by = self.by.evaluate_on_groups(df, groups, state)?;695assert!(ac.groups.len() == ac_by.groups.len());696697// Don't change names by aggregations as is done in polars-core698let keep_name = ac.get_values().name().clone();699700let (input_col, input_groups) = ac.get_final_aggregation();701let (by_col, by_groups) = ac_by.get_final_aggregation();702GroupsType::check_lengths(&input_groups, &by_groups)?;703704// Dispatch to arg_min/arg_max and then gather705// SAFETY: Groups are correct.706let idxs_in_groups = if self.is_max_by {707unsafe { by_col.agg_arg_max(&by_groups) }708} else {709unsafe { by_col.agg_arg_min(&by_groups) }710};711let idxs_in_groups: &IdxCa = idxs_in_groups.as_materialized_series().as_ref().as_ref();712let flat_gather_idxs = match input_groups.as_ref().as_ref() {713GroupsType::Idx(g) => idxs_in_groups714.into_no_null_iter()715.enumerate()716.map(|(group_idx, idx_in_group)| g.all()[group_idx][idx_in_group as usize])717.collect_vec(),718GroupsType::Slice { groups, .. } => idxs_in_groups719.into_no_null_iter()720.enumerate()721.map(|(group_idx, idx_in_group)| groups[group_idx][0] + idx_in_group)722.collect_vec(),723};724725// SAFETY: All indices are within input_col's groups.726let gathered = unsafe { input_col.take_slice_unchecked(&flat_gather_idxs) };727let agg_state = AggregatedScalar(gathered.with_name(keep_name));728Ok(AggregationContext::from_agg_state(729agg_state,730Cow::Borrowed(groups),731))732}733734fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {735self.input.to_field(input_schema)736}737738fn is_scalar(&self) -> bool {739true740}741}742743pub(crate) struct AnonymousAggregationExpr {744pub(crate) inputs: Vec<Arc<dyn PhysicalExpr>>,745pub(crate) grouped_reduction: Box<dyn GroupedReduction>,746pub(crate) output_field: Field,747}748749impl AnonymousAggregationExpr {750pub fn new(751inputs: Vec<Arc<dyn PhysicalExpr>>,752grouped_reduction: Box<dyn GroupedReduction>,753output_field: Field,754) -> Self {755Self {756inputs,757grouped_reduction,758output_field,759}760}761}762763impl PhysicalExpr for AnonymousAggregationExpr {764fn as_expression(&self) -> Option<&Expr> {765None766}767768fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {769polars_ensure!(770self.inputs.len() == 1,771ComputeError: "AnonymousAggregationExpr with more than one input is not supported"772);773774let col = self.inputs[0].evaluate(df, state)?;775let mut gr = self.grouped_reduction.new_empty();776gr.resize(1);777gr.update_group(&[&col], 0, 0)?;778let out_series = gr.finalize()?;779Ok(Column::new(col.name().clone(), out_series))780}781782#[allow(clippy::ptr_arg)]783fn evaluate_on_groups<'a>(784&self,785df: &DataFrame,786groups: &'a GroupPositions,787state: &ExecutionState,788) -> PolarsResult<AggregationContext<'a>> {789polars_ensure!(790self.inputs.len() == 1,791ComputeError: "AnonymousAggregationExpr with more than one input is not supported"792);793794let input = &self.inputs[0];795let mut ac = input.evaluate_on_groups(df, groups, state)?;796797// don't change names by aggregations as is done in polars-core798let input_column_name = ac.get_values().name().clone();799800if let AggState::LiteralScalar(input_column) = &mut ac.state {801*input_column = self.evaluate(df, state)?;802return Ok(ac);803}804805let (input_column, resolved_groups) = ac.get_final_aggregation();806807let mut gr = self.grouped_reduction.new_empty();808gr.resize(groups.len() as IdxSize);809810assert!(811!resolved_groups.is_overlapping(),812"Aggregating with overlapping groups is a logic error"813);814815let subset = (0..input_column.len() as IdxSize).collect::<Vec<IdxSize>>();816817let mut group_idxs = Vec::with_capacity(input_column.len());818match &**resolved_groups {819GroupsType::Idx(group_indices) => {820group_idxs.resize(input_column.len(), 0);821for (group_idx, indices_in_group) in group_indices.all().iter().enumerate() {822for pos in indices_in_group.iter() {823group_idxs[*pos as usize] = group_idx as IdxSize;824}825}826},827GroupsType::Slice { groups, .. } => {828for (group_idx, [_start, len]) in groups.iter().enumerate() {829group_idxs.extend(std::iter::repeat_n(group_idx as IdxSize, *len as usize));830}831},832};833assert_eq!(group_idxs.len(), input_column.len());834835// `update_groups_subset` needs a single chunk.836let input_column_rechunked = input_column.rechunk();837838// Single call so no need to resolve ordering.839let seq_id = 0;840841// SAFETY:842// - `subset` is in-bounds because it is 0..N843// - `group_idxs` is in-bounds because we checked that it matches `input_column.len()` *and*844// is filled with values <= `input_column.len()` since they are derived from it via845// `enumerate`.846unsafe {847gr.update_groups_subset(&[&input_column_rechunked], &subset, &group_idxs, seq_id)?;848}849850let out_series = gr.finalize()?;851let out = AggregatedScalar(Column::new(input_column_name, out_series));852853Ok(AggregationContext::from_agg_state(out, resolved_groups))854}855856fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {857Ok(self.output_field.clone())858}859860fn is_scalar(&self) -> bool {861true862}863}864865/// Simple wrapper to parallelize functions that can be divided over threads aggregated and866/// finally aggregated in the main thread. This can be done for sum, min, max, etc.867fn parallel_op_columns<F>(f: F, s: Column, allow_threading: bool) -> PolarsResult<Column>868where869F: Fn(Column) -> PolarsResult<Column> + Send + Sync,870{871// set during debug low so872// we mimic production size data behavior873#[cfg(debug_assertions)]874let thread_boundary = 0;875876#[cfg(not(debug_assertions))]877let thread_boundary = 100_000;878879// threading overhead/ splitting work stealing is costly..880881if !allow_threading882|| s.len() < thread_boundary883|| POOL.current_thread_has_pending_tasks().unwrap_or(false)884{885return f(s);886}887let n_threads = POOL.current_num_threads();888let splits = _split_offsets(s.len(), n_threads);889890let chunks = POOL.install(|| {891splits892.into_par_iter()893.map(|(offset, len)| {894let s = s.slice(offset as i64, len);895f(s)896})897.collect::<PolarsResult<Vec<_>>>()898})?;899900let mut iter = chunks.into_iter();901let first = iter.next().unwrap();902let dtype = first.dtype();903let out = iter.fold(first.to_physical_repr(), |mut acc, s| {904acc.append(&s.to_physical_repr()).unwrap();905acc906});907908unsafe { f(out.from_physical_unchecked(dtype).unwrap()) }909}910911912