Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-compute/src/rolling/min_max.rs
6939 views
1
use std::collections::VecDeque;
2
use std::marker::PhantomData;
3
4
use arrow::bitmap::Bitmap;
5
use arrow::types::NativeType;
6
use polars_utils::min_max::MinMaxPolicy;
7
8
use super::RollingFnParams;
9
use super::no_nulls::RollingAggWindowNoNulls;
10
use super::nulls::RollingAggWindowNulls;
11
12
// Algorithm: https://cs.stackexchange.com/questions/120915/interview-question-with-arrays-and-consecutive-subintervals/120936#120936
13
pub struct MinMaxWindow<'a, T, P> {
14
values: &'a [T],
15
validity: Option<&'a Bitmap>,
16
// values[monotonic_idxs[i]] is better than values[monotonic_idxs[i+1]] for
17
// all i, as per the policy.
18
monotonic_idxs: VecDeque<usize>,
19
nonnulls_in_window: usize,
20
last_start: usize,
21
last_end: usize,
22
policy: PhantomData<P>,
23
}
24
25
impl<T: NativeType, P: MinMaxPolicy> MinMaxWindow<'_, T, P> {
26
/// # Safety
27
/// The index must be in-bounds.
28
unsafe fn insert_nonnull_value(&mut self, idx: usize) {
29
unsafe {
30
let value = self.values.get_unchecked(idx);
31
32
// Remove values which are older and worse.
33
while let Some(tail_idx) = self.monotonic_idxs.back() {
34
let tail_value = self.values.get_unchecked(*tail_idx);
35
if !P::is_better(value, tail_value) {
36
break;
37
}
38
self.monotonic_idxs.pop_back();
39
}
40
41
self.monotonic_idxs.push_back(idx);
42
self.nonnulls_in_window += 1;
43
}
44
}
45
46
fn remove_old_values(&mut self, window_start: usize) {
47
// Remove values which have fallen outside the window start.
48
while let Some(head_idx) = self.monotonic_idxs.front() {
49
if *head_idx >= window_start {
50
break;
51
}
52
self.monotonic_idxs.pop_front();
53
}
54
}
55
}
56
57
impl<'a, T: NativeType, P: MinMaxPolicy> RollingAggWindowNulls<'a, T> for MinMaxWindow<'a, T, P> {
58
unsafe fn new(
59
slice: &'a [T],
60
validity: &'a Bitmap,
61
start: usize,
62
end: usize,
63
params: Option<RollingFnParams>,
64
_window_size: Option<usize>,
65
) -> Self {
66
assert!(params.is_none());
67
let mut slf = Self {
68
values: slice,
69
validity: Some(validity),
70
monotonic_idxs: VecDeque::new(),
71
nonnulls_in_window: 0,
72
last_start: 0,
73
last_end: 0,
74
policy: PhantomData,
75
};
76
unsafe {
77
RollingAggWindowNulls::update(&mut slf, start, end);
78
}
79
slf
80
}
81
82
unsafe fn update(&mut self, start: usize, end: usize) -> Option<T> {
83
unsafe {
84
let v = self.validity.unwrap_unchecked();
85
self.remove_old_values(start);
86
for i in self.last_start..start.min(self.last_end) {
87
self.nonnulls_in_window -= v.get_bit_unchecked(i) as usize;
88
}
89
for i in start.max(self.last_end)..end {
90
if v.get_bit_unchecked(i) {
91
self.insert_nonnull_value(i);
92
}
93
}
94
95
self.last_start = start;
96
self.last_end = end;
97
self.monotonic_idxs
98
.front()
99
.map(|idx| *self.values.get_unchecked(*idx))
100
}
101
}
102
103
fn is_valid(&self, min_periods: usize) -> bool {
104
self.nonnulls_in_window >= min_periods
105
}
106
}
107
108
impl<'a, T: NativeType, P: MinMaxPolicy> RollingAggWindowNoNulls<'a, T> for MinMaxWindow<'a, T, P> {
109
fn new(
110
slice: &'a [T],
111
start: usize,
112
end: usize,
113
params: Option<RollingFnParams>,
114
_window_size: Option<usize>,
115
) -> Self {
116
assert!(params.is_none());
117
let mut slf = Self {
118
values: slice,
119
validity: None,
120
monotonic_idxs: VecDeque::new(),
121
nonnulls_in_window: 0,
122
last_start: 0,
123
last_end: 0,
124
policy: PhantomData,
125
};
126
unsafe {
127
RollingAggWindowNoNulls::update(&mut slf, start, end);
128
}
129
slf
130
}
131
132
unsafe fn update(&mut self, start: usize, end: usize) -> Option<T> {
133
unsafe {
134
self.remove_old_values(start);
135
for i in start.max(self.last_end)..end {
136
self.insert_nonnull_value(i);
137
}
138
139
self.last_start = start;
140
self.last_end = end;
141
self.monotonic_idxs
142
.front()
143
.map(|idx| *self.values.get_unchecked(*idx))
144
}
145
}
146
}
147
148