Path: blob/main/crates/polars-expr/src/reduce/min_max.rs
6940 views
#![allow(unsafe_op_in_unsafe_fn)]1use std::borrow::Cow;2use std::marker::PhantomData;34use arrow::array::BooleanArray;5use arrow::bitmap::Bitmap;6use num_traits::Bounded;7use polars_core::with_match_physical_integer_polars_type;8#[cfg(feature = "propagate_nans")]9use polars_ops::prelude::nan_propagating_aggregate::ca_nan_agg;10use polars_utils::float::IsFloat;11use polars_utils::min_max::MinMax;1213use super::*;1415pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box<dyn GroupedReduction> {16use DataType::*;17use VecMaskGroupedReduction as VMGR;18match &dtype {19Boolean => Box::new(BoolMinGroupedReduction::default()),20#[cfg(feature = "propagate_nans")]21Float32 if propagate_nans => {22Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float32Type>>::new()))23},24#[cfg(feature = "propagate_nans")]25Float64 if propagate_nans => {26Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float64Type>>::new()))27},28Float32 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float32Type>>::new())),29Float64 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float64Type>>::new())),30Null => Box::new(NullGroupedReduction::default()),31String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMinReducer)),32_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {33with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {34Box::new(VMGR::new(dtype, NumReducer::<Min<$T>>::new()))35})36},37#[cfg(feature = "dtype-decimal")]38Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Min<Int128Type>>::new())),39#[cfg(feature = "dtype-categorical")]40Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {41Box::new(VMGR::new(dtype.clone(), CatMinReducer::<$C>(map.clone(), PhantomData)))42}),43_ => unimplemented!(),44}45}4647pub fn new_max_reduction(dtype: DataType, propagate_nans: bool) -> Box<dyn GroupedReduction> {48use DataType::*;49use VecMaskGroupedReduction as VMGR;50match &dtype {51Boolean => Box::new(BoolMaxGroupedReduction::default()),52#[cfg(feature = "propagate_nans")]53Float32 if propagate_nans => {54Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float32Type>>::new()))55},56#[cfg(feature = "propagate_nans")]57Float64 if propagate_nans => {58Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float64Type>>::new()))59},60Float32 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float32Type>>::new())),61Float64 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float64Type>>::new())),62Null => Box::new(NullGroupedReduction::default()),63String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMaxReducer)),64_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {65with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {66Box::new(VMGR::new(dtype, NumReducer::<Max<$T>>::new()))67})68},69#[cfg(feature = "dtype-decimal")]70Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Max<Int128Type>>::new())),71#[cfg(feature = "dtype-categorical")]72Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {73Box::new(VMGR::new(dtype.clone(), CatMaxReducer::<$C>(map.clone(), PhantomData)))74}),75_ => unimplemented!(),76}77}7879// These two variants ignore nans.80struct Min<T>(PhantomData<T>);81struct Max<T>(PhantomData<T>);8283// These two variants propagate nans.84#[cfg(feature = "propagate_nans")]85struct NanMin<T>(PhantomData<T>);86#[cfg(feature = "propagate_nans")]87struct NanMax<T>(PhantomData<T>);8889impl<T> NumericReduction for Min<T>90where91T: PolarsNumericType,92ChunkedArray<T>: ChunkAgg<T::Native>,93{94type Dtype = T;9596#[inline(always)]97fn init() -> T::Native {98if T::Native::is_float() {99T::Native::nan_value()100} else {101T::Native::max_value()102}103}104105#[inline(always)]106fn combine(a: T::Native, b: T::Native) -> T::Native {107MinMax::min_ignore_nan(a, b)108}109110#[inline(always)]111fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {112ChunkAgg::min(ca)113}114}115116impl<T> NumericReduction for Max<T>117where118T: PolarsNumericType,119ChunkedArray<T>: ChunkAgg<T::Native>,120{121type Dtype = T;122123#[inline(always)]124fn init() -> T::Native {125if T::Native::is_float() {126T::Native::nan_value()127} else {128T::Native::min_value()129}130}131132#[inline(always)]133fn combine(a: T::Native, b: T::Native) -> T::Native {134MinMax::max_ignore_nan(a, b)135}136137#[inline(always)]138fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {139ChunkAgg::max(ca)140}141}142143#[cfg(feature = "propagate_nans")]144impl<T: PolarsFloatType> NumericReduction for NanMin<T> {145type Dtype = T;146147#[inline(always)]148fn init() -> T::Native {149T::Native::max_value()150}151152#[inline(always)]153fn combine(a: T::Native, b: T::Native) -> T::Native {154MinMax::min_propagate_nan(a, b)155}156157#[inline(always)]158fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {159ca_nan_agg(ca, MinMax::min_propagate_nan)160}161}162163#[cfg(feature = "propagate_nans")]164impl<T: PolarsFloatType> NumericReduction for NanMax<T> {165type Dtype = T;166167#[inline(always)]168fn init() -> T::Native {169T::Native::min_value()170}171172#[inline(always)]173fn combine(a: T::Native, b: T::Native) -> T::Native {174MinMax::max_propagate_nan(a, b)175}176177#[inline(always)]178fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {179ca_nan_agg(ca, MinMax::max_propagate_nan)180}181}182183#[derive(Clone)]184struct BinaryMinReducer;185#[derive(Clone)]186struct BinaryMaxReducer;187188impl Reducer for BinaryMinReducer {189type Dtype = BinaryType;190type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.191192fn init(&self) -> Self::Value {193None194}195196#[inline(always)]197fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {198Cow::Owned(s.cast(&DataType::Binary).unwrap())199}200201fn combine(&self, a: &mut Self::Value, b: &Self::Value) {202self.reduce_one(a, b.as_deref(), 0)203}204205fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {206match (a, b) {207(_, None) => {},208(l @ None, Some(r)) => *l = Some(r.to_owned()),209(Some(l), Some(r)) => {210if l.as_slice() > r {211l.clear();212l.extend_from_slice(r);213}214},215}216}217218fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {219self.reduce_one(v, ca.min_binary(), 0)220}221222fn finish(223&self,224v: Vec<Self::Value>,225m: Option<Bitmap>,226dtype: &DataType,227) -> PolarsResult<Series> {228assert!(m.is_none()); // This should only be used with VecGroupedReduction.229let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);230ca.into_series().cast(dtype)231}232}233234impl Reducer for BinaryMaxReducer {235type Dtype = BinaryType;236type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.237238#[inline(always)]239fn init(&self) -> Self::Value {240None241}242243#[inline(always)]244fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {245Cow::Owned(s.cast(&DataType::Binary).unwrap())246}247248#[inline(always)]249fn combine(&self, a: &mut Self::Value, b: &Self::Value) {250self.reduce_one(a, b.as_deref(), 0)251}252253#[inline(always)]254fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {255match (a, b) {256(_, None) => {},257(l @ None, Some(r)) => *l = Some(r.to_owned()),258(Some(l), Some(r)) => {259if l.as_slice() < r {260l.clear();261l.extend_from_slice(r);262}263},264}265}266267#[inline(always)]268fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {269self.reduce_one(v, ca.max_binary(), 0)270}271272#[inline(always)]273fn finish(274&self,275v: Vec<Self::Value>,276m: Option<Bitmap>,277dtype: &DataType,278) -> PolarsResult<Series> {279assert!(m.is_none()); // This should only be used with VecGroupedReduction.280let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);281ca.into_series().cast(dtype)282}283}284285#[derive(Default)]286pub struct BoolMinGroupedReduction {287values: MutableBitmap,288mask: MutableBitmap,289evicted_values: BitmapBuilder,290evicted_mask: BitmapBuilder,291}292293impl GroupedReduction for BoolMinGroupedReduction {294fn new_empty(&self) -> Box<dyn GroupedReduction> {295Box::new(Self::default())296}297298fn reserve(&mut self, additional: usize) {299self.values.reserve(additional);300self.mask.reserve(additional)301}302303fn resize(&mut self, num_groups: IdxSize) {304self.values.resize(num_groups as usize, true);305self.mask.resize(num_groups as usize, false);306}307308fn update_group(309&mut self,310values: &Column,311group_idx: IdxSize,312_seq_id: u64,313) -> PolarsResult<()> {314// TODO: we should really implement a sum-as-other-type operation instead315// of doing this materialized cast.316assert!(values.dtype() == &DataType::Boolean);317let values = values.as_materialized_series_maintain_scalar();318let ca: &BooleanChunked = values.as_ref().as_ref();319if !ca.all() {320self.values.set(group_idx as usize, false);321}322if ca.len() != ca.null_count() {323self.mask.set(group_idx as usize, true);324}325Ok(())326}327328unsafe fn update_groups_while_evicting(329&mut self,330values: &Column,331subset: &[IdxSize],332group_idxs: &[EvictIdx],333_seq_id: u64,334) -> PolarsResult<()> {335assert!(values.dtype() == &DataType::Boolean);336assert!(subset.len() == group_idxs.len());337let values = values.as_materialized_series(); // @scalar-opt338let ca: &BooleanChunked = values.as_ref().as_ref();339let arr = ca.downcast_as_array();340unsafe {341// SAFETY: indices are in-bounds guaranteed by trait.342for (i, g) in subset.iter().zip(group_idxs) {343let ov = arr.get_unchecked(*i as usize);344if g.should_evict() {345self.evicted_values.push(self.values.get_unchecked(g.idx()));346self.evicted_mask.push(self.mask.get_unchecked(g.idx()));347self.values.set_unchecked(g.idx(), ov.unwrap_or(true));348self.mask.set_unchecked(g.idx(), ov.is_some());349} else {350self.values.and_pos_unchecked(g.idx(), ov.unwrap_or(true));351self.mask.or_pos_unchecked(g.idx(), ov.is_some());352}353}354}355Ok(())356}357358unsafe fn combine_subset(359&mut self,360other: &dyn GroupedReduction,361subset: &[IdxSize],362group_idxs: &[IdxSize],363) -> PolarsResult<()> {364let other = other.as_any().downcast_ref::<Self>().unwrap();365assert!(subset.len() == group_idxs.len());366unsafe {367// SAFETY: indices are in-bounds guaranteed by trait.368for (i, g) in subset.iter().zip(group_idxs) {369self.values370.and_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));371self.mask372.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));373}374}375Ok(())376}377378fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {379Box::new(Self {380values: core::mem::take(&mut self.evicted_values).into_mut(),381mask: core::mem::take(&mut self.evicted_mask).into_mut(),382evicted_values: BitmapBuilder::new(),383evicted_mask: BitmapBuilder::new(),384})385}386387fn finalize(&mut self) -> PolarsResult<Series> {388let v = core::mem::take(&mut self.values);389let m = core::mem::take(&mut self.mask);390let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));391Ok(Series::from_array(PlSmallStr::EMPTY, arr))392}393394fn as_any(&self) -> &dyn Any {395self396}397}398399#[derive(Default)]400pub struct BoolMaxGroupedReduction {401values: MutableBitmap,402mask: MutableBitmap,403evicted_values: BitmapBuilder,404evicted_mask: BitmapBuilder,405}406407impl GroupedReduction for BoolMaxGroupedReduction {408fn new_empty(&self) -> Box<dyn GroupedReduction> {409Box::new(Self::default())410}411412fn reserve(&mut self, additional: usize) {413self.values.reserve(additional);414self.mask.reserve(additional)415}416417fn resize(&mut self, num_groups: IdxSize) {418self.values.resize(num_groups as usize, false);419self.mask.resize(num_groups as usize, false);420}421422fn update_group(423&mut self,424values: &Column,425group_idx: IdxSize,426_seq_id: u64,427) -> PolarsResult<()> {428// TODO: we should really implement a sum-as-other-type operation instead429// of doing this materialized cast.430assert!(values.dtype() == &DataType::Boolean);431let values = values.as_materialized_series_maintain_scalar();432let ca: &BooleanChunked = values.as_ref().as_ref();433if ca.any() {434self.values.set(group_idx as usize, true);435}436if ca.len() != ca.null_count() {437self.mask.set(group_idx as usize, true);438}439Ok(())440}441442unsafe fn update_groups_while_evicting(443&mut self,444values: &Column,445subset: &[IdxSize],446group_idxs: &[EvictIdx],447_seq_id: u64,448) -> PolarsResult<()> {449assert!(values.dtype() == &DataType::Boolean);450assert!(subset.len() == group_idxs.len());451let values = values.as_materialized_series(); // @scalar-opt452let ca: &BooleanChunked = values.as_ref().as_ref();453let arr = ca.downcast_as_array();454unsafe {455// SAFETY: indices are in-bounds guaranteed by trait.456for (i, g) in subset.iter().zip(group_idxs) {457let ov = arr.get_unchecked(*i as usize);458if g.should_evict() {459self.evicted_values.push(self.values.get_unchecked(g.idx()));460self.evicted_mask.push(self.mask.get_unchecked(g.idx()));461self.values.set_unchecked(g.idx(), ov.unwrap_or(false));462self.mask.set_unchecked(g.idx(), ov.is_some());463} else {464self.values.or_pos_unchecked(g.idx(), ov.unwrap_or(false));465self.mask.or_pos_unchecked(g.idx(), ov.is_some());466}467}468}469Ok(())470}471472unsafe fn combine_subset(473&mut self,474other: &dyn GroupedReduction,475subset: &[IdxSize],476group_idxs: &[IdxSize],477) -> PolarsResult<()> {478let other = other.as_any().downcast_ref::<Self>().unwrap();479assert!(subset.len() == group_idxs.len());480unsafe {481// SAFETY: indices are in-bounds guaranteed by trait.482for (i, g) in subset.iter().zip(group_idxs) {483self.values484.or_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));485self.mask486.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));487}488}489Ok(())490}491492fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {493Box::new(Self {494values: core::mem::take(&mut self.evicted_values).into_mut(),495mask: core::mem::take(&mut self.evicted_mask).into_mut(),496evicted_values: BitmapBuilder::new(),497evicted_mask: BitmapBuilder::new(),498})499}500501fn finalize(&mut self) -> PolarsResult<Series> {502let v = core::mem::take(&mut self.values);503let m = core::mem::take(&mut self.mask);504let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));505Ok(Series::from_array(PlSmallStr::EMPTY, arr))506}507508fn as_any(&self) -> &dyn Any {509self510}511}512513#[cfg(feature = "dtype-categorical")]514struct CatMinReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);515516#[cfg(feature = "dtype-categorical")]517impl<T> Clone for CatMinReducer<T> {518fn clone(&self) -> Self {519Self(self.0.clone(), PhantomData)520}521}522523#[cfg(feature = "dtype-categorical")]524impl<T: PolarsCategoricalType> Reducer for CatMinReducer<T> {525type Dtype = T::PolarsPhysical;526type Value = T::Native;527528fn init(&self) -> Self::Value {529T::Native::max_value() // Ensures it's invalid, preferring the other value.530}531532#[inline(always)]533fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {534s.to_physical_repr()535}536537fn combine(&self, a: &mut Self::Value, b: &Self::Value) {538let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {539return;540};541let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {542*a = *b;543return;544};545546if b_s < a_s {547*a = *b;548}549}550551fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {552if let Some(b) = b {553self.combine(a, &b);554}555}556557fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {558for cat in ca.iter().flatten() {559self.combine(v, &cat);560}561}562563fn finish(564&self,565v: Vec<Self::Value>,566m: Option<Bitmap>,567dtype: &DataType,568) -> PolarsResult<Series> {569let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);570let cat_ids = ChunkedArray::from(cat_ids);571unsafe {572Ok(573CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())574.into_series(),575)576}577}578}579580#[cfg(feature = "dtype-categorical")]581struct CatMaxReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);582583#[cfg(feature = "dtype-categorical")]584impl<T> Clone for CatMaxReducer<T> {585fn clone(&self) -> Self {586Self(self.0.clone(), PhantomData)587}588}589590#[cfg(feature = "dtype-categorical")]591impl<T: PolarsCategoricalType> Reducer for CatMaxReducer<T> {592type Dtype = T::PolarsPhysical;593type Value = T::Native;594595fn init(&self) -> Self::Value {596T::Native::max_value() // Ensures it's invalid, preferring the other value.597}598599#[inline(always)]600fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {601s.to_physical_repr()602}603604fn combine(&self, a: &mut Self::Value, b: &Self::Value) {605let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {606return;607};608let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {609*a = *b;610return;611};612613if b_s > a_s {614*a = *b;615}616}617618fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {619if let Some(b) = b {620self.combine(a, &b);621}622}623624fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {625for cat in ca.iter().flatten() {626self.combine(v, &cat);627}628}629630fn finish(631&self,632v: Vec<Self::Value>,633m: Option<Bitmap>,634dtype: &DataType,635) -> PolarsResult<Series> {636let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);637let cat_ids = ChunkedArray::from(cat_ids);638unsafe {639Ok(640CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())641.into_series(),642)643}644}645}646647#[derive(Default)]648pub struct NullGroupedReduction {649length: usize,650num_evictions: usize,651}652653impl GroupedReduction for NullGroupedReduction {654fn new_empty(&self) -> Box<dyn GroupedReduction> {655Box::new(Self::default())656}657658fn reserve(&mut self, _additional: usize) {}659660fn resize(&mut self, num_groups: IdxSize) {661self.length = num_groups as usize;662}663664fn update_group(665&mut self,666values: &Column,667_group_idx: IdxSize,668_seq_id: u64,669) -> PolarsResult<()> {670assert!(values.dtype() == &DataType::Null);671672// no-op673Ok(())674}675676unsafe fn update_groups_while_evicting(677&mut self,678values: &Column,679subset: &[IdxSize],680group_idxs: &[EvictIdx],681_seq_id: u64,682) -> PolarsResult<()> {683assert!(values.dtype() == &DataType::Null);684assert!(subset.len() == group_idxs.len());685686for g in group_idxs {687self.num_evictions += g.should_evict() as usize;688}689Ok(())690}691692unsafe fn combine_subset(693&mut self,694_other: &dyn GroupedReduction,695subset: &[IdxSize],696group_idxs: &[IdxSize],697) -> PolarsResult<()> {698assert!(subset.len() == group_idxs.len());699700// no-op701Ok(())702}703704fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {705let out = Box::new(Self {706length: self.num_evictions,707num_evictions: 0,708});709self.num_evictions = 0;710out711}712713fn finalize(&mut self) -> PolarsResult<Series> {714Ok(Series::full_null(715PlSmallStr::EMPTY,716self.length,717&DataType::Null,718))719}720721fn as_any(&self) -> &dyn Any {722self723}724}725726727