Path: blob/main/crates/polars-compute/src/rolling/min_max.rs
6939 views
use std::collections::VecDeque;1use std::marker::PhantomData;23use arrow::bitmap::Bitmap;4use arrow::types::NativeType;5use polars_utils::min_max::MinMaxPolicy;67use super::RollingFnParams;8use super::no_nulls::RollingAggWindowNoNulls;9use super::nulls::RollingAggWindowNulls;1011// Algorithm: https://cs.stackexchange.com/questions/120915/interview-question-with-arrays-and-consecutive-subintervals/120936#12093612pub struct MinMaxWindow<'a, T, P> {13values: &'a [T],14validity: Option<&'a Bitmap>,15// values[monotonic_idxs[i]] is better than values[monotonic_idxs[i+1]] for16// all i, as per the policy.17monotonic_idxs: VecDeque<usize>,18nonnulls_in_window: usize,19last_start: usize,20last_end: usize,21policy: PhantomData<P>,22}2324impl<T: NativeType, P: MinMaxPolicy> MinMaxWindow<'_, T, P> {25/// # Safety26/// The index must be in-bounds.27unsafe fn insert_nonnull_value(&mut self, idx: usize) {28unsafe {29let value = self.values.get_unchecked(idx);3031// Remove values which are older and worse.32while let Some(tail_idx) = self.monotonic_idxs.back() {33let tail_value = self.values.get_unchecked(*tail_idx);34if !P::is_better(value, tail_value) {35break;36}37self.monotonic_idxs.pop_back();38}3940self.monotonic_idxs.push_back(idx);41self.nonnulls_in_window += 1;42}43}4445fn remove_old_values(&mut self, window_start: usize) {46// Remove values which have fallen outside the window start.47while let Some(head_idx) = self.monotonic_idxs.front() {48if *head_idx >= window_start {49break;50}51self.monotonic_idxs.pop_front();52}53}54}5556impl<'a, T: NativeType, P: MinMaxPolicy> RollingAggWindowNulls<'a, T> for MinMaxWindow<'a, T, P> {57unsafe fn new(58slice: &'a [T],59validity: &'a Bitmap,60start: usize,61end: usize,62params: Option<RollingFnParams>,63_window_size: Option<usize>,64) -> Self {65assert!(params.is_none());66let mut slf = Self {67values: slice,68validity: Some(validity),69monotonic_idxs: VecDeque::new(),70nonnulls_in_window: 0,71last_start: 0,72last_end: 0,73policy: PhantomData,74};75unsafe {76RollingAggWindowNulls::update(&mut slf, start, end);77}78slf79}8081unsafe fn update(&mut self, start: usize, end: usize) -> Option<T> {82unsafe {83let v = self.validity.unwrap_unchecked();84self.remove_old_values(start);85for i in self.last_start..start.min(self.last_end) {86self.nonnulls_in_window -= v.get_bit_unchecked(i) as usize;87}88for i in start.max(self.last_end)..end {89if v.get_bit_unchecked(i) {90self.insert_nonnull_value(i);91}92}9394self.last_start = start;95self.last_end = end;96self.monotonic_idxs97.front()98.map(|idx| *self.values.get_unchecked(*idx))99}100}101102fn is_valid(&self, min_periods: usize) -> bool {103self.nonnulls_in_window >= min_periods104}105}106107impl<'a, T: NativeType, P: MinMaxPolicy> RollingAggWindowNoNulls<'a, T> for MinMaxWindow<'a, T, P> {108fn new(109slice: &'a [T],110start: usize,111end: usize,112params: Option<RollingFnParams>,113_window_size: Option<usize>,114) -> Self {115assert!(params.is_none());116let mut slf = Self {117values: slice,118validity: None,119monotonic_idxs: VecDeque::new(),120nonnulls_in_window: 0,121last_start: 0,122last_end: 0,123policy: PhantomData,124};125unsafe {126RollingAggWindowNoNulls::update(&mut slf, start, end);127}128slf129}130131unsafe fn update(&mut self, start: usize, end: usize) -> Option<T> {132unsafe {133self.remove_old_values(start);134for i in start.max(self.last_end)..end {135self.insert_nonnull_value(i);136}137138self.last_start = start;139self.last_end = end;140self.monotonic_idxs141.front()142.map(|idx| *self.values.get_unchecked(*idx))143}144}145}146147148