Path: blob/main/crates/polars-expr/src/reduce/first_last_nonnull.rs
7884 views
use polars_core::frame::row::AnyValueBufferTrusted;1use polars_core::with_match_physical_numeric_polars_type;23use super::first_last::{First, Last, replace_opt_bytes};4use super::*;56pub fn new_first_nonnull_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {7new_nonnull_reduction_with_policy(dtype, First)8}910pub fn new_last_nonnull_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {11new_nonnull_reduction_with_policy(dtype, Last)12}1314fn new_nonnull_reduction_with_policy<P: NonNullPolicy + 'static>(15dtype: DataType,16policy: P,17) -> Box<dyn GroupedReduction> {18use DataType::*;19use VecGroupedReduction as VGR;20match dtype {21Boolean => Box::new(VecGroupedReduction::new(22dtype,23BoolFirstLastNonNullReducer(policy),24)),25_ if dtype.is_primitive_numeric()26|| dtype.is_temporal()27|| dtype.is_decimal()28|| dtype.is_categorical() =>29{30with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {31Box::new(VGR::new(dtype, NumFirstLastNonNullReducer::<_, $T>(policy, PhantomData)))32})33},34String | Binary => Box::new(VecGroupedReduction::new(35dtype,36BinaryFirstLastNonNullReducer(policy),37)),38_ => Box::new(GenericFirstLastNonNullGroupedReduction::new(dtype, policy)),39}40}4142enum FirstOrLast {43First,44Last,45}46trait NonNullPolicy: Copy + Send + Sync + 'static {47fn is_first_or_last(self) -> FirstOrLast;48fn index(self, len: usize) -> usize;49fn might_replace(self, new: u64, old: u64, seen: bool) -> bool;50}5152impl NonNullPolicy for First {53fn is_first_or_last(self) -> FirstOrLast {54FirstOrLast::First55}5657fn index(self, _len: usize) -> usize {58059}6061fn might_replace(self, new: u64, old: u64, seen: bool) -> bool {62// Subtracting 1 with wrapping leaves all order unchanged, except it63// makes 0 (no value) the largest possible.64// If an item has not yet been found, we still might replace, even if we are higher idx.65!seen || (new.wrapping_sub(1) < old.wrapping_sub(1))66}67}6869impl NonNullPolicy for Last {70fn is_first_or_last(self) -> FirstOrLast {71FirstOrLast::Last72}7374fn index(self, len: usize) -> usize {75len - 176}7778fn might_replace(self, new: u64, old: u64, seen: bool) -> bool {79// If an item has not yet been found, we still might replace, even if we are lower idx.80!seen || (new >= old)81}82}8384struct NumFirstLastNonNullReducer<P: NonNullPolicy, T>(P, PhantomData<T>);8586#[derive(Clone, Debug, Default)]87struct ValueForNonNull<T: Clone> {88value: Option<T>,89seq: u64,90seen: bool,91}9293impl<P: NonNullPolicy, T> Clone for NumFirstLastNonNullReducer<P, T> {94fn clone(&self) -> Self {95Self(self.0, PhantomData)96}97}9899impl<P, T> Reducer for NumFirstLastNonNullReducer<P, T>100where101P: NonNullPolicy,102T: PolarsNumericType,103{104type Dtype = T;105type Value = ValueForNonNull<T::Native>;106107fn init(&self) -> Self::Value {108ValueForNonNull::default()109}110111fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {112s.to_physical_repr()113}114115fn combine(&self, old: &mut Self::Value, new: &Self::Value) {116if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {117old.value = new.value;118old.seq = new.seq;119old.seen = true;120}121}122123fn reduce_one(&self, old: &mut Self::Value, new: Option<T::Native>, seq_id: u64) {124if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {125old.value = new;126old.seq = seq_id;127old.seen = true;128}129}130131fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {132if ca.is_empty() {133return;134}135136if self.0.might_replace(seq_id, v.seq, v.seen) {137let val = if ca.has_nulls() {138match self.0.is_first_or_last() {139FirstOrLast::First => ca.first_non_null(),140FirstOrLast::Last => ca.last_non_null(),141}142} else {143Some(self.0.index(ca.len()))144}145// SAFETY: idx is vlid.146.and_then(|idx| unsafe { ca.get_unchecked(idx) });147148if val.is_some() {149v.value = val;150v.seq = seq_id;151v.seen = true;152}153}154}155156fn finish(157&self,158v: Vec<Self::Value>,159m: Option<Bitmap>,160dtype: &DataType,161) -> PolarsResult<Series> {162assert!(m.is_none()); // This should only be used with VecGroupedReduction.163let ca: ChunkedArray<T> = v164.into_iter()165.map(|red_val| red_val.value)166.collect_ca(PlSmallStr::EMPTY);167let s = ca.into_series();168unsafe { s.from_physical_unchecked(dtype) }169}170}171172struct BinaryFirstLastNonNullReducer<P: NonNullPolicy>(P);173174impl<P: NonNullPolicy> Clone for BinaryFirstLastNonNullReducer<P> {175fn clone(&self) -> Self {176Self(self.0)177}178}179180impl<P> Reducer for BinaryFirstLastNonNullReducer<P>181where182P: NonNullPolicy,183{184type Dtype = BinaryType;185type Value = ValueForNonNull<Vec<u8>>;186187fn init(&self) -> Self::Value {188ValueForNonNull::default()189}190191fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {192Cow::Owned(s.cast(&DataType::Binary).unwrap())193}194195fn combine(&self, old: &mut Self::Value, new: &Self::Value) {196if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {197old.value.clone_from(&new.value);198old.seq = new.seq;199old.seen = true;200}201}202203fn reduce_one(&self, old: &mut Self::Value, new: Option<&[u8]>, seq_id: u64) {204if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {205replace_opt_bytes(&mut old.value, new);206old.seq = seq_id;207old.seen = true;208}209}210211fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {212if ca.is_empty() {213return;214}215if self.0.might_replace(seq_id, v.seq, v.seen) {216let val = if ca.has_nulls() {217match self.0.is_first_or_last() {218FirstOrLast::First => ca.first_non_null(),219FirstOrLast::Last => ca.last_non_null(),220}221} else {222Some(self.0.index(ca.len()))223}224.and_then(|idx| ca.get(idx));225226if val.is_some() {227replace_opt_bytes(&mut v.value, val);228v.seq = seq_id;229v.seen = true;230}231}232}233234fn finish(235&self,236v: Vec<Self::Value>,237m: Option<Bitmap>,238dtype: &DataType,239) -> PolarsResult<Series> {240assert!(m.is_none()); // This should only be used with VecGroupedReduction.241let ca: BinaryChunked = v242.into_iter()243.map(|ValueForNonNull { value, .. }| value)244.collect_ca(PlSmallStr::EMPTY);245ca.into_series().cast(dtype)246}247}248249#[derive(Clone)]250struct BoolFirstLastNonNullReducer<P: NonNullPolicy>(P);251252impl<P> Reducer for BoolFirstLastNonNullReducer<P>253where254P: NonNullPolicy,255{256type Dtype = BooleanType;257type Value = ValueForNonNull<bool>;258259fn init(&self) -> Self::Value {260ValueForNonNull::default()261}262263fn combine(&self, old: &mut Self::Value, new: &Self::Value) {264if new.value.is_some() && self.0.might_replace(new.seq, old.seq, old.seen) {265old.value = new.value;266old.seq = new.seq;267old.seen = new.seen;268}269}270271fn reduce_one(&self, old: &mut Self::Value, new: Option<bool>, seq_id: u64) {272if new.is_some() && self.0.might_replace(seq_id, old.seq, old.seen) {273old.value = new;274old.seq = seq_id;275old.seen = true;276}277}278279fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {280if ca.is_empty() {281return;282}283if self.0.might_replace(seq_id, v.seq, v.seen) {284let val = if ca.has_nulls() {285match self.0.is_first_or_last() {286FirstOrLast::First => ca.first_non_null(),287FirstOrLast::Last => ca.last_non_null(),288}289} else {290Some(self.0.index(ca.len()))291}292.and_then(|idx| ca.get(idx));293294if val.is_some() {295v.value = val;296v.seq = seq_id;297v.seen = true;298}299}300}301302fn finish(303&self,304v: Vec<Self::Value>,305m: Option<Bitmap>,306_dtype: &DataType,307) -> PolarsResult<Series> {308assert!(m.is_none()); // This should only be used with VecGroupedReduction.309let ca: BooleanChunked = v310.into_iter()311.map(|ValueForNonNull { value, .. }| value)312.collect_ca(PlSmallStr::EMPTY);313Ok(ca.into_series())314}315}316317struct GenericFirstLastNonNullGroupedReduction<P: NonNullPolicy> {318in_dtype: DataType,319policy: P,320values: Vec<AnyValue<'static>>,321seqs: Vec<u64>,322seen: MutableBitmap,323evicted_values: Vec<AnyValue<'static>>,324evicted_seqs: Vec<u64>,325evicted_seen: BitmapBuilder,326}327328impl<P: NonNullPolicy> GenericFirstLastNonNullGroupedReduction<P> {329fn new(in_dtype: DataType, policy: P) -> Self {330Self {331in_dtype,332policy,333values: Vec::new(),334seqs: Vec::new(),335seen: MutableBitmap::new(),336evicted_values: Vec::new(),337evicted_seqs: Vec::new(),338evicted_seen: BitmapBuilder::new(),339}340}341}342343impl<P: NonNullPolicy + 'static> GroupedReduction for GenericFirstLastNonNullGroupedReduction<P> {344fn new_empty(&self) -> Box<dyn GroupedReduction> {345Box::new(Self::new(self.in_dtype.clone(), self.policy))346}347348fn reserve(&mut self, additional: usize) {349self.values.reserve(additional);350self.seqs.reserve(additional);351self.seen.reserve(additional);352}353354fn resize(&mut self, num_groups: IdxSize) {355self.values.resize(num_groups as usize, AnyValue::Null);356self.seqs.resize(num_groups as usize, 0);357self.seen.resize(num_groups as usize, false);358}359360fn update_group(361&mut self,362values: &[&Column],363group_idx: IdxSize,364seq_id: u64,365) -> PolarsResult<()> {366let &[values] = values else { unreachable!() };367assert!(values.dtype() == &self.in_dtype);368if !values.is_empty() {369let seq_id = seq_id + 1; // We use 0 for 'no value'.370if self.policy.might_replace(371seq_id,372self.seqs[group_idx as usize],373self.seen.get(group_idx as usize),374) {375let val = if values.has_nulls() {376match self.policy.is_first_or_last() {377FirstOrLast::First => values378.as_materialized_series_maintain_scalar()379.first_non_null()380.into_value(),381FirstOrLast::Last => values382.as_materialized_series_maintain_scalar()383.last_non_null()384.into_value(),385}386} else {387// SAFETY: index is valid.388unsafe { values.get_unchecked(self.policy.index(values.len())) }389}390.into_static();391392if !val.is_null() {393self.values[group_idx as usize] = val;394self.seqs[group_idx as usize] = seq_id;395self.seen.set(group_idx as usize, true);396}397}398}399Ok(())400}401402unsafe fn update_groups_while_evicting(403&mut self,404values: &[&Column],405subset: &[IdxSize],406group_idxs: &[EvictIdx],407seq_id: u64,408) -> PolarsResult<()> {409let &[values] = values else { unreachable!() };410assert!(values.dtype() == &self.in_dtype);411assert!(subset.len() == group_idxs.len());412let seq_id = seq_id + 1; // We use 0 for 'no value'.413for (i, g) in subset.iter().zip(group_idxs) {414let idx = g.idx();415let grp_val = self.values.get_unchecked_mut(idx);416let grp_seq = self.seqs.get_unchecked_mut(idx);417if g.should_evict() {418self.evicted_values419.push(core::mem::replace(grp_val, AnyValue::Null));420self.evicted_seqs.push(core::mem::replace(grp_seq, 0));421self.evicted_seen.push(self.seen.get_unchecked(idx));422self.seen.set_unchecked(idx, false);423}424if self425.policy426.might_replace(seq_id, *grp_seq, self.seen.get_unchecked(idx))427{428let val = values.get_unchecked(*i as usize).into_static();429if !val.is_null() {430*grp_val = values.get_unchecked(*i as usize).into_static();431*grp_seq = seq_id;432self.seen.set_unchecked(idx, true);433}434}435}436Ok(())437}438439unsafe fn combine_subset(440&mut self,441other: &dyn GroupedReduction,442subset: &[IdxSize],443group_idxs: &[IdxSize],444) -> PolarsResult<()> {445let other = other.as_any().downcast_ref::<Self>().unwrap();446assert!(self.in_dtype == other.in_dtype);447assert!(subset.len() == group_idxs.len());448for (i, g) in group_idxs.iter().enumerate() {449let si = *subset.get_unchecked(i) as usize;450if self.policy.might_replace(451*other.seqs.get_unchecked(si),452*self.seqs.get_unchecked(*g as usize),453self.seen.get_unchecked(*g as usize),454) {455let val = other.values.get_unchecked(si);456if !val.is_null() {457*self.values.get_unchecked_mut(*g as usize) = val.clone();458*self.seqs.get_unchecked_mut(*g as usize) = *other.seqs.get_unchecked(si);459self.seen.set_unchecked(*g as usize, true);460}461}462}463Ok(())464}465466fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {467Box::new(Self {468in_dtype: self.in_dtype.clone(),469policy: self.policy,470values: core::mem::take(&mut self.evicted_values),471seqs: core::mem::take(&mut self.evicted_seqs),472seen: core::mem::take(&mut self.evicted_seen).into_mut(),473evicted_values: Vec::new(),474evicted_seqs: Vec::new(),475evicted_seen: BitmapBuilder::new(),476})477}478479fn finalize(&mut self) -> PolarsResult<Series> {480self.seqs.clear();481let phys_type = self.in_dtype.to_physical();482let mut buf = AnyValueBufferTrusted::new(&phys_type, self.values.len());483for v in core::mem::take(&mut self.values) {484// SAFETY: v is cast to physical.485unsafe { buf.add_unchecked_owned_physical(&v.to_physical()) };486}487// SAFETY: dtype is valid for series.488unsafe { buf.into_series().from_physical_unchecked(&self.in_dtype) }489}490491fn as_any(&self) -> &dyn Any {492self493}494}495496497