Path: blob/main/crates/polars-expr/src/reduce/min_max.rs
8424 views
#![allow(unsafe_op_in_unsafe_fn)]1use std::borrow::Cow;2use std::marker::PhantomData;34use arrow::array::BooleanArray;5use arrow::bitmap::Bitmap;6use num_traits::Bounded;7use polars_core::with_match_physical_integer_polars_type;8#[cfg(feature = "propagate_nans")]9use polars_ops::prelude::nan_propagating_aggregate::ca_nan_agg;10use polars_utils::float::IsFloat;11use polars_utils::min_max::MinMax;1213use super::*;1415pub fn new_min_reduction(16dtype: DataType,17propagate_nans: bool,18) -> PolarsResult<Box<dyn GroupedReduction>> {19// TODO: Move the error checks up and make this function infallible20use DataType::*;21use VecMaskGroupedReduction as VMGR;22Ok(match &dtype {23Boolean => Box::new(BoolMinGroupedReduction::default()),24#[cfg(all(feature = "dtype-f16", feature = "propagate_nans"))]25Float16 if propagate_nans => {26Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float16Type>>::new()))27},28#[cfg(feature = "propagate_nans")]29Float32 if propagate_nans => {30Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float32Type>>::new()))31},32#[cfg(feature = "propagate_nans")]33Float64 if propagate_nans => {34Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float64Type>>::new()))35},36#[cfg(feature = "dtype-f16")]37Float16 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float16Type>>::new())),38Float32 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float32Type>>::new())),39Float64 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float64Type>>::new())),40Null => Box::new(NullGroupedReduction::default()),41String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMinReducer)),42_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {43with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {44Box::new(VMGR::new(dtype, NumReducer::<Min<$T>>::new()))45})46},47#[cfg(feature = "dtype-decimal")]48Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Min<Int128Type>>::new())),49#[cfg(feature = "dtype-categorical")]50Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {51Box::new(VMGR::new(dtype.clone(), CatMinReducer::<$C>(map.clone(), PhantomData)))52}),53_ => polars_bail!(InvalidOperation: "`min` operation not supported for dtype `{dtype}`"),54})55}5657pub fn new_max_reduction(58dtype: DataType,59propagate_nans: bool,60) -> PolarsResult<Box<dyn GroupedReduction>> {61// TODO: Move the error checks up and make this function infallible62use DataType::*;63use VecMaskGroupedReduction as VMGR;64Ok(match &dtype {65Boolean => Box::new(BoolMaxGroupedReduction::default()),66#[cfg(all(feature = "dtype-f16", feature = "propagate_nans"))]67Float16 if propagate_nans => {68Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float16Type>>::new()))69},70#[cfg(feature = "propagate_nans")]71Float32 if propagate_nans => {72Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float32Type>>::new()))73},74#[cfg(feature = "propagate_nans")]75Float64 if propagate_nans => {76Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float64Type>>::new()))77},78#[cfg(feature = "dtype-f16")]79Float16 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float16Type>>::new())),80Float32 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float32Type>>::new())),81Float64 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float64Type>>::new())),82Null => Box::new(NullGroupedReduction::default()),83String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMaxReducer)),84_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {85with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {86Box::new(VMGR::new(dtype, NumReducer::<Max<$T>>::new()))87})88},89#[cfg(feature = "dtype-decimal")]90Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Max<Int128Type>>::new())),91#[cfg(feature = "dtype-categorical")]92Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {93Box::new(VMGR::new(dtype.clone(), CatMaxReducer::<$C>(map.clone(), PhantomData)))94}),95_ => polars_bail!(InvalidOperation: "`max` operation not supported for dtype `{dtype}`"),96})97}9899// These two variants ignore nans.100struct Min<T>(PhantomData<T>);101struct Max<T>(PhantomData<T>);102103// These two variants propagate nans.104#[cfg(feature = "propagate_nans")]105struct NanMin<T>(PhantomData<T>);106#[cfg(feature = "propagate_nans")]107struct NanMax<T>(PhantomData<T>);108109impl<T> NumericReduction for Min<T>110where111T: PolarsNumericType,112ChunkedArray<T>: ChunkAgg<T::Native>,113{114type Dtype = T;115116#[inline(always)]117fn init() -> T::Native {118if T::Native::is_float() {119T::Native::nan_value()120} else {121T::Native::max_value()122}123}124125#[inline(always)]126fn combine(a: T::Native, b: T::Native) -> T::Native {127MinMax::min_ignore_nan(a, b)128}129130#[inline(always)]131fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {132ChunkAgg::min(ca)133}134}135136impl<T> NumericReduction for Max<T>137where138T: PolarsNumericType,139ChunkedArray<T>: ChunkAgg<T::Native>,140{141type Dtype = T;142143#[inline(always)]144fn init() -> T::Native {145if T::Native::is_float() {146T::Native::nan_value()147} else {148T::Native::min_value()149}150}151152#[inline(always)]153fn combine(a: T::Native, b: T::Native) -> T::Native {154MinMax::max_ignore_nan(a, b)155}156157#[inline(always)]158fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {159ChunkAgg::max(ca)160}161}162163#[cfg(feature = "propagate_nans")]164impl<T: PolarsFloatType> NumericReduction for NanMin<T> {165type Dtype = T;166167#[inline(always)]168fn init() -> T::Native {169T::Native::max_value()170}171172#[inline(always)]173fn combine(a: T::Native, b: T::Native) -> T::Native {174MinMax::min_propagate_nan(a, b)175}176177#[inline(always)]178fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {179ca_nan_agg(ca, MinMax::min_propagate_nan)180}181}182183#[cfg(feature = "propagate_nans")]184impl<T: PolarsFloatType> NumericReduction for NanMax<T> {185type Dtype = T;186187#[inline(always)]188fn init() -> T::Native {189T::Native::min_value()190}191192#[inline(always)]193fn combine(a: T::Native, b: T::Native) -> T::Native {194MinMax::max_propagate_nan(a, b)195}196197#[inline(always)]198fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {199ca_nan_agg(ca, MinMax::max_propagate_nan)200}201}202203#[derive(Clone)]204struct BinaryMinReducer;205#[derive(Clone)]206struct BinaryMaxReducer;207208impl Reducer for BinaryMinReducer {209type Dtype = BinaryType;210type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.211212fn init(&self) -> Self::Value {213None214}215216#[inline(always)]217fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {218Cow::Owned(s.cast(&DataType::Binary).unwrap())219}220221fn combine(&self, a: &mut Self::Value, b: &Self::Value) {222self.reduce_one(a, b.as_deref(), 0)223}224225fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {226match (a, b) {227(_, None) => {},228(l @ None, Some(r)) => *l = Some(r.to_owned()),229(Some(l), Some(r)) => {230if l.as_slice() > r {231l.clear();232l.extend_from_slice(r);233}234},235}236}237238fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {239self.reduce_one(v, ca.min_binary(), 0)240}241242fn finish(243&self,244v: Vec<Self::Value>,245m: Option<Bitmap>,246dtype: &DataType,247) -> PolarsResult<Series> {248assert!(m.is_none()); // This should only be used with VecGroupedReduction.249let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);250ca.into_series().cast(dtype)251}252}253254impl Reducer for BinaryMaxReducer {255type Dtype = BinaryType;256type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.257258#[inline(always)]259fn init(&self) -> Self::Value {260None261}262263#[inline(always)]264fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {265Cow::Owned(s.cast(&DataType::Binary).unwrap())266}267268#[inline(always)]269fn combine(&self, a: &mut Self::Value, b: &Self::Value) {270self.reduce_one(a, b.as_deref(), 0)271}272273#[inline(always)]274fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {275match (a, b) {276(_, None) => {},277(l @ None, Some(r)) => *l = Some(r.to_owned()),278(Some(l), Some(r)) => {279if l.as_slice() < r {280l.clear();281l.extend_from_slice(r);282}283},284}285}286287#[inline(always)]288fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {289self.reduce_one(v, ca.max_binary(), 0)290}291292#[inline(always)]293fn finish(294&self,295v: Vec<Self::Value>,296m: Option<Bitmap>,297dtype: &DataType,298) -> PolarsResult<Series> {299assert!(m.is_none()); // This should only be used with VecGroupedReduction.300let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);301ca.into_series().cast(dtype)302}303}304305#[derive(Default)]306pub struct BoolMinGroupedReduction {307values: MutableBitmap,308mask: MutableBitmap,309evicted_values: BitmapBuilder,310evicted_mask: BitmapBuilder,311}312313impl GroupedReduction for BoolMinGroupedReduction {314fn new_empty(&self) -> Box<dyn GroupedReduction> {315Box::new(Self::default())316}317318fn reserve(&mut self, additional: usize) {319self.values.reserve(additional);320self.mask.reserve(additional)321}322323fn resize(&mut self, num_groups: IdxSize) {324self.values.resize(num_groups as usize, true);325self.mask.resize(num_groups as usize, false);326}327328fn update_group(329&mut self,330values: &[&Column],331group_idx: IdxSize,332_seq_id: u64,333) -> PolarsResult<()> {334let &[values] = values else { unreachable!() };335// TODO: we should really implement a sum-as-other-type operation instead336// of doing this materialized cast.337assert!(values.dtype() == &DataType::Boolean);338let values = values.as_materialized_series_maintain_scalar();339let ca: &BooleanChunked = values.as_ref().as_ref();340if !ca.all() {341self.values.set(group_idx as usize, false);342}343if ca.len() != ca.null_count() {344self.mask.set(group_idx as usize, true);345}346Ok(())347}348349unsafe fn update_groups_while_evicting(350&mut self,351values: &[&Column],352subset: &[IdxSize],353group_idxs: &[EvictIdx],354_seq_id: u64,355) -> PolarsResult<()> {356let &[values] = values else { unreachable!() };357assert!(values.dtype() == &DataType::Boolean);358assert!(subset.len() == group_idxs.len());359let values = values.as_materialized_series(); // @scalar-opt360let ca: &BooleanChunked = values.as_ref().as_ref();361let arr = ca.downcast_as_array();362unsafe {363// SAFETY: indices are in-bounds guaranteed by trait.364for (i, g) in subset.iter().zip(group_idxs) {365let ov = arr.get_unchecked(*i as usize);366if g.should_evict() {367self.evicted_values.push(self.values.get_unchecked(g.idx()));368self.evicted_mask.push(self.mask.get_unchecked(g.idx()));369self.values.set_unchecked(g.idx(), ov.unwrap_or(true));370self.mask.set_unchecked(g.idx(), ov.is_some());371} else {372self.values.and_pos_unchecked(g.idx(), ov.unwrap_or(true));373self.mask.or_pos_unchecked(g.idx(), ov.is_some());374}375}376}377Ok(())378}379380unsafe fn combine_subset(381&mut self,382other: &dyn GroupedReduction,383subset: &[IdxSize],384group_idxs: &[IdxSize],385) -> PolarsResult<()> {386let other = other.as_any().downcast_ref::<Self>().unwrap();387assert!(subset.len() == group_idxs.len());388unsafe {389// SAFETY: indices are in-bounds guaranteed by trait.390for (i, g) in subset.iter().zip(group_idxs) {391self.values392.and_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));393self.mask394.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));395}396}397Ok(())398}399400fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {401Box::new(Self {402values: core::mem::take(&mut self.evicted_values).into_mut(),403mask: core::mem::take(&mut self.evicted_mask).into_mut(),404evicted_values: BitmapBuilder::new(),405evicted_mask: BitmapBuilder::new(),406})407}408409fn finalize(&mut self) -> PolarsResult<Series> {410let v = core::mem::take(&mut self.values);411let m = core::mem::take(&mut self.mask);412let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));413Ok(Series::from_array(PlSmallStr::EMPTY, arr))414}415416fn as_any(&self) -> &dyn Any {417self418}419}420421#[derive(Default)]422pub struct BoolMaxGroupedReduction {423values: MutableBitmap,424mask: MutableBitmap,425evicted_values: BitmapBuilder,426evicted_mask: BitmapBuilder,427}428429impl GroupedReduction for BoolMaxGroupedReduction {430fn new_empty(&self) -> Box<dyn GroupedReduction> {431Box::new(Self::default())432}433434fn reserve(&mut self, additional: usize) {435self.values.reserve(additional);436self.mask.reserve(additional)437}438439fn resize(&mut self, num_groups: IdxSize) {440self.values.resize(num_groups as usize, false);441self.mask.resize(num_groups as usize, false);442}443444fn update_group(445&mut self,446values: &[&Column],447group_idx: IdxSize,448_seq_id: u64,449) -> PolarsResult<()> {450let &[values] = values else { unreachable!() };451// TODO: we should really implement a sum-as-other-type operation instead452// of doing this materialized cast.453assert!(values.dtype() == &DataType::Boolean);454let values = values.as_materialized_series_maintain_scalar();455let ca: &BooleanChunked = values.as_ref().as_ref();456if ca.any() {457self.values.set(group_idx as usize, true);458}459if ca.len() != ca.null_count() {460self.mask.set(group_idx as usize, true);461}462Ok(())463}464465unsafe fn update_groups_while_evicting(466&mut self,467values: &[&Column],468subset: &[IdxSize],469group_idxs: &[EvictIdx],470_seq_id: u64,471) -> PolarsResult<()> {472let &[values] = values else { unreachable!() };473assert!(values.dtype() == &DataType::Boolean);474assert!(subset.len() == group_idxs.len());475let values = values.as_materialized_series(); // @scalar-opt476let ca: &BooleanChunked = values.as_ref().as_ref();477let arr = ca.downcast_as_array();478unsafe {479// SAFETY: indices are in-bounds guaranteed by trait.480for (i, g) in subset.iter().zip(group_idxs) {481let ov = arr.get_unchecked(*i as usize);482if g.should_evict() {483self.evicted_values.push(self.values.get_unchecked(g.idx()));484self.evicted_mask.push(self.mask.get_unchecked(g.idx()));485self.values.set_unchecked(g.idx(), ov.unwrap_or(false));486self.mask.set_unchecked(g.idx(), ov.is_some());487} else {488self.values.or_pos_unchecked(g.idx(), ov.unwrap_or(false));489self.mask.or_pos_unchecked(g.idx(), ov.is_some());490}491}492}493Ok(())494}495496unsafe fn combine_subset(497&mut self,498other: &dyn GroupedReduction,499subset: &[IdxSize],500group_idxs: &[IdxSize],501) -> PolarsResult<()> {502let other = other.as_any().downcast_ref::<Self>().unwrap();503assert!(subset.len() == group_idxs.len());504unsafe {505// SAFETY: indices are in-bounds guaranteed by trait.506for (i, g) in subset.iter().zip(group_idxs) {507self.values508.or_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));509self.mask510.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));511}512}513Ok(())514}515516fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {517Box::new(Self {518values: core::mem::take(&mut self.evicted_values).into_mut(),519mask: core::mem::take(&mut self.evicted_mask).into_mut(),520evicted_values: BitmapBuilder::new(),521evicted_mask: BitmapBuilder::new(),522})523}524525fn finalize(&mut self) -> PolarsResult<Series> {526let v = core::mem::take(&mut self.values);527let m = core::mem::take(&mut self.mask);528let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));529Ok(Series::from_array(PlSmallStr::EMPTY, arr))530}531532fn as_any(&self) -> &dyn Any {533self534}535}536537#[cfg(feature = "dtype-categorical")]538struct CatMinReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);539540#[cfg(feature = "dtype-categorical")]541impl<T> Clone for CatMinReducer<T> {542fn clone(&self) -> Self {543Self(self.0.clone(), PhantomData)544}545}546547#[cfg(feature = "dtype-categorical")]548impl<T: PolarsCategoricalType> Reducer for CatMinReducer<T> {549type Dtype = T::PolarsPhysical;550type Value = T::Native;551552fn init(&self) -> Self::Value {553T::Native::max_value() // Ensures it's invalid, preferring the other value.554}555556#[inline(always)]557fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {558s.to_physical_repr()559}560561fn combine(&self, a: &mut Self::Value, b: &Self::Value) {562let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {563return;564};565let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {566*a = *b;567return;568};569570if b_s < a_s {571*a = *b;572}573}574575fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {576if let Some(b) = b {577self.combine(a, &b);578}579}580581fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {582for cat in ca.iter().flatten() {583self.combine(v, &cat);584}585}586587fn finish(588&self,589v: Vec<Self::Value>,590m: Option<Bitmap>,591dtype: &DataType,592) -> PolarsResult<Series> {593let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);594let cat_ids = ChunkedArray::from(cat_ids);595unsafe {596Ok(597CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())598.into_series(),599)600}601}602}603604#[cfg(feature = "dtype-categorical")]605struct CatMaxReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);606607#[cfg(feature = "dtype-categorical")]608impl<T> Clone for CatMaxReducer<T> {609fn clone(&self) -> Self {610Self(self.0.clone(), PhantomData)611}612}613614#[cfg(feature = "dtype-categorical")]615impl<T: PolarsCategoricalType> Reducer for CatMaxReducer<T> {616type Dtype = T::PolarsPhysical;617type Value = T::Native;618619fn init(&self) -> Self::Value {620T::Native::max_value() // Ensures it's invalid, preferring the other value.621}622623#[inline(always)]624fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {625s.to_physical_repr()626}627628fn combine(&self, a: &mut Self::Value, b: &Self::Value) {629let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {630return;631};632let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {633*a = *b;634return;635};636637if b_s > a_s {638*a = *b;639}640}641642fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {643if let Some(b) = b {644self.combine(a, &b);645}646}647648fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {649for cat in ca.iter().flatten() {650self.combine(v, &cat);651}652}653654fn finish(655&self,656v: Vec<Self::Value>,657m: Option<Bitmap>,658dtype: &DataType,659) -> PolarsResult<Series> {660let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);661let cat_ids = ChunkedArray::from(cat_ids);662unsafe {663Ok(664CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())665.into_series(),666)667}668}669}670671#[derive(Default)]672pub struct NullGroupedReduction {673length: usize,674num_evictions: usize,675}676677impl GroupedReduction for NullGroupedReduction {678fn new_empty(&self) -> Box<dyn GroupedReduction> {679Box::new(Self::default())680}681682fn reserve(&mut self, _additional: usize) {}683684fn resize(&mut self, num_groups: IdxSize) {685self.length = num_groups as usize;686}687688fn update_group(689&mut self,690values: &[&Column],691_group_idx: IdxSize,692_seq_id: u64,693) -> PolarsResult<()> {694let &[values] = values else { unreachable!() };695assert!(values.dtype() == &DataType::Null);696697// no-op698Ok(())699}700701unsafe fn update_groups_while_evicting(702&mut self,703values: &[&Column],704subset: &[IdxSize],705group_idxs: &[EvictIdx],706_seq_id: u64,707) -> PolarsResult<()> {708let &[values] = values else { unreachable!() };709assert!(values.dtype() == &DataType::Null);710assert!(subset.len() == group_idxs.len());711712for g in group_idxs {713self.num_evictions += g.should_evict() as usize;714}715Ok(())716}717718unsafe fn combine_subset(719&mut self,720_other: &dyn GroupedReduction,721subset: &[IdxSize],722group_idxs: &[IdxSize],723) -> PolarsResult<()> {724assert!(subset.len() == group_idxs.len());725726// no-op727Ok(())728}729730fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {731let out = Box::new(Self {732length: self.num_evictions,733num_evictions: 0,734});735self.num_evictions = 0;736out737}738739fn finalize(&mut self) -> PolarsResult<Series> {740Ok(Series::full_null(741PlSmallStr::EMPTY,742self.length,743&DataType::Null,744))745}746747fn as_any(&self) -> &dyn Any {748self749}750}751752753