Path: blob/main/crates/polars-expr/src/reduce/first_last.rs
6940 views
#![allow(unsafe_op_in_unsafe_fn)]1use std::marker::PhantomData;23use polars_core::frame::row::AnyValueBufferTrusted;4use polars_core::with_match_physical_numeric_polars_type;56use super::*;78pub fn new_first_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {9new_reduction_with_policy::<First>(dtype)10}1112pub fn new_last_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {13new_reduction_with_policy::<Last>(dtype)14}1516fn new_reduction_with_policy<P: Policy + 'static>(dtype: DataType) -> Box<dyn GroupedReduction> {17use DataType::*;18use VecGroupedReduction as VGR;19match dtype {20Boolean => Box::new(VecGroupedReduction::new(21dtype,22BoolFirstLastReducer::<P>(PhantomData),23)),24_ if dtype.is_primitive_numeric() || dtype.is_temporal() => {25with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {26Box::new(VGR::new(dtype, NumFirstLastReducer::<P, $T>(PhantomData)))27})28},29String | Binary => Box::new(VecGroupedReduction::new(30dtype,31BinaryFirstLastReducer::<P>(PhantomData),32)),33_ => Box::new(GenericFirstLastGroupedReduction::<P>::new(dtype)),34}35}3637trait Policy: Send + Sync + 'static {38fn index(len: usize) -> usize;39fn should_replace(new: u64, old: u64) -> bool;40}4142struct First;43impl Policy for First {44fn index(_len: usize) -> usize {45046}4748fn should_replace(new: u64, old: u64) -> bool {49// Subtracting 1 with wrapping leaves all order unchanged, except it50// makes 0 (no value) the largest possible.51new.wrapping_sub(1) < old.wrapping_sub(1)52}53}5455struct Last;56impl Policy for Last {57fn index(len: usize) -> usize {58len - 159}6061fn should_replace(new: u64, old: u64) -> bool {62new >= old63}64}6566#[allow(dead_code)]67struct Arbitrary;68impl Policy for Arbitrary {69fn index(_len: usize) -> usize {70071}7273fn should_replace(_new: u64, old: u64) -> bool {74old == 075}76}7778struct NumFirstLastReducer<P, T>(PhantomData<(P, T)>);7980impl<P, T> Clone for NumFirstLastReducer<P, T> {81fn clone(&self) -> Self {82Self(PhantomData)83}84}8586impl<P, T> Reducer for NumFirstLastReducer<P, T>87where88P: Policy,89T: PolarsNumericType,90{91type Dtype = T;92type Value = (Option<T::Native>, u64);9394fn init(&self) -> Self::Value {95(None, 0)96}9798fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {99s.to_physical_repr()100}101102fn combine(&self, a: &mut Self::Value, b: &Self::Value) {103if P::should_replace(b.1, a.1) {104*a = *b;105}106}107108fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, seq_id: u64) {109if P::should_replace(seq_id, a.1) {110*a = (b, seq_id);111}112}113114fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {115if !ca.is_empty() && P::should_replace(seq_id, v.1) {116let val = ca.get(P::index(ca.len()));117*v = (val, seq_id);118}119}120121fn finish(122&self,123v: Vec<Self::Value>,124m: Option<Bitmap>,125dtype: &DataType,126) -> PolarsResult<Series> {127assert!(m.is_none()); // This should only be used with VecGroupedReduction.128let ca: ChunkedArray<T> = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);129ca.into_series().cast(dtype)130}131}132133struct BinaryFirstLastReducer<P>(PhantomData<P>);134135impl<P> Clone for BinaryFirstLastReducer<P> {136fn clone(&self) -> Self {137Self(PhantomData)138}139}140141fn replace_opt_bytes(l: &mut Option<Vec<u8>>, r: Option<&[u8]>) {142match (l, r) {143(Some(l), Some(r)) => {144l.clear();145l.extend_from_slice(r);146},147(l, r) => *l = r.map(|s| s.to_owned()),148}149}150151impl<P> Reducer for BinaryFirstLastReducer<P>152where153P: Policy,154{155type Dtype = BinaryType;156type Value = (Option<Vec<u8>>, u64);157158fn init(&self) -> Self::Value {159(None, 0)160}161162fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {163Cow::Owned(s.cast(&DataType::Binary).unwrap())164}165166fn combine(&self, a: &mut Self::Value, b: &Self::Value) {167if P::should_replace(b.1, a.1) {168a.0.clone_from(&b.0);169a.1 = b.1;170}171}172173fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, seq_id: u64) {174if P::should_replace(seq_id, a.1) {175replace_opt_bytes(&mut a.0, b);176a.1 = seq_id;177}178}179180fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {181if !ca.is_empty() && P::should_replace(seq_id, v.1) {182replace_opt_bytes(&mut v.0, ca.get(P::index(ca.len())));183v.1 = seq_id;184}185}186187fn finish(188&self,189v: Vec<Self::Value>,190m: Option<Bitmap>,191dtype: &DataType,192) -> PolarsResult<Series> {193assert!(m.is_none()); // This should only be used with VecGroupedReduction.194let ca: BinaryChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);195ca.into_series().cast(dtype)196}197}198199struct BoolFirstLastReducer<P>(PhantomData<P>);200201impl<P> Clone for BoolFirstLastReducer<P> {202fn clone(&self) -> Self {203Self(PhantomData)204}205}206207impl<P> Reducer for BoolFirstLastReducer<P>208where209P: Policy,210{211type Dtype = BooleanType;212type Value = (Option<bool>, u64);213214fn init(&self) -> Self::Value {215(None, 0)216}217218fn combine(&self, a: &mut Self::Value, b: &Self::Value) {219if P::should_replace(b.1, a.1) {220*a = *b;221}222}223224fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, seq_id: u64) {225if P::should_replace(seq_id, a.1) {226a.0 = b;227a.1 = seq_id;228}229}230231fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {232if !ca.is_empty() && P::should_replace(seq_id, v.1) {233v.0 = ca.get(P::index(ca.len()));234v.1 = seq_id;235}236}237238fn finish(239&self,240v: Vec<Self::Value>,241m: Option<Bitmap>,242_dtype: &DataType,243) -> PolarsResult<Series> {244assert!(m.is_none()); // This should only be used with VecGroupedReduction.245let ca: BooleanChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);246Ok(ca.into_series())247}248}249250pub struct GenericFirstLastGroupedReduction<P> {251in_dtype: DataType,252values: Vec<AnyValue<'static>>,253seqs: Vec<u64>,254evicted_values: Vec<AnyValue<'static>>,255evicted_seqs: Vec<u64>,256policy: PhantomData<fn() -> P>,257}258259impl<P> GenericFirstLastGroupedReduction<P> {260fn new(in_dtype: DataType) -> Self {261Self {262in_dtype,263values: Vec::new(),264seqs: Vec::new(),265evicted_values: Vec::new(),266evicted_seqs: Vec::new(),267policy: PhantomData,268}269}270}271272impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<P> {273fn new_empty(&self) -> Box<dyn GroupedReduction> {274Box::new(Self::new(self.in_dtype.clone()))275}276277fn reserve(&mut self, additional: usize) {278self.values.reserve(additional);279self.seqs.reserve(additional);280}281282fn resize(&mut self, num_groups: IdxSize) {283self.values.resize(num_groups as usize, AnyValue::Null);284self.seqs.resize(num_groups as usize, 0);285}286287fn update_group(288&mut self,289values: &Column,290group_idx: IdxSize,291seq_id: u64,292) -> PolarsResult<()> {293if !values.is_empty() {294let seq_id = seq_id + 1; // We use 0 for 'no value'.295if P::should_replace(seq_id, self.seqs[group_idx as usize]) {296self.values[group_idx as usize] = values.get(P::index(values.len()))?.into_static();297self.seqs[group_idx as usize] = seq_id;298}299}300Ok(())301}302303unsafe fn update_groups_while_evicting(304&mut self,305values: &Column,306subset: &[IdxSize],307group_idxs: &[EvictIdx],308seq_id: u64,309) -> PolarsResult<()> {310let seq_id = seq_id + 1; // We use 0 for 'no value'.311for (i, g) in subset.iter().zip(group_idxs) {312let grp_val = self.values.get_unchecked_mut(g.idx());313let grp_seq = self.seqs.get_unchecked_mut(g.idx());314if g.should_evict() {315self.evicted_values316.push(core::mem::replace(grp_val, AnyValue::Null));317self.evicted_seqs.push(core::mem::replace(grp_seq, 0));318}319if P::should_replace(seq_id, *grp_seq) {320*grp_val = values.get_unchecked(*i as usize).into_static();321*grp_seq = seq_id;322}323}324Ok(())325}326327unsafe fn combine_subset(328&mut self,329other: &dyn GroupedReduction,330subset: &[IdxSize],331group_idxs: &[IdxSize],332) -> PolarsResult<()> {333let other = other.as_any().downcast_ref::<Self>().unwrap();334for (i, g) in group_idxs.iter().enumerate() {335let si = *subset.get_unchecked(i) as usize;336if P::should_replace(337*other.seqs.get_unchecked(si),338*self.seqs.get_unchecked(*g as usize),339) {340*self.values.get_unchecked_mut(*g as usize) =341other.values.get_unchecked(si).clone();342*self.seqs.get_unchecked_mut(*g as usize) = *other.seqs.get_unchecked(si);343}344}345Ok(())346}347348fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {349Box::new(Self {350in_dtype: self.in_dtype.clone(),351values: core::mem::take(&mut self.evicted_values),352seqs: core::mem::take(&mut self.evicted_seqs),353evicted_values: Vec::new(),354evicted_seqs: Vec::new(),355policy: PhantomData,356})357}358359fn finalize(&mut self) -> PolarsResult<Series> {360self.seqs.clear();361unsafe {362let mut buf = AnyValueBufferTrusted::new(&self.in_dtype, self.values.len());363for v in core::mem::take(&mut self.values) {364buf.add_unchecked_owned_physical(&v);365}366Ok(buf.into_series())367}368}369370fn as_any(&self) -> &dyn Any {371self372}373}374375376