Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/first_last.rs
6940 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::marker::PhantomData;
3
4
use polars_core::frame::row::AnyValueBufferTrusted;
5
use polars_core::with_match_physical_numeric_polars_type;
6
7
use super::*;
8
9
pub fn new_first_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
10
new_reduction_with_policy::<First>(dtype)
11
}
12
13
pub fn new_last_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
14
new_reduction_with_policy::<Last>(dtype)
15
}
16
17
fn new_reduction_with_policy<P: Policy + 'static>(dtype: DataType) -> Box<dyn GroupedReduction> {
18
use DataType::*;
19
use VecGroupedReduction as VGR;
20
match dtype {
21
Boolean => Box::new(VecGroupedReduction::new(
22
dtype,
23
BoolFirstLastReducer::<P>(PhantomData),
24
)),
25
_ if dtype.is_primitive_numeric() || dtype.is_temporal() => {
26
with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {
27
Box::new(VGR::new(dtype, NumFirstLastReducer::<P, $T>(PhantomData)))
28
})
29
},
30
String | Binary => Box::new(VecGroupedReduction::new(
31
dtype,
32
BinaryFirstLastReducer::<P>(PhantomData),
33
)),
34
_ => Box::new(GenericFirstLastGroupedReduction::<P>::new(dtype)),
35
}
36
}
37
38
trait Policy: Send + Sync + 'static {
39
fn index(len: usize) -> usize;
40
fn should_replace(new: u64, old: u64) -> bool;
41
}
42
43
struct First;
44
impl Policy for First {
45
fn index(_len: usize) -> usize {
46
0
47
}
48
49
fn should_replace(new: u64, old: u64) -> bool {
50
// Subtracting 1 with wrapping leaves all order unchanged, except it
51
// makes 0 (no value) the largest possible.
52
new.wrapping_sub(1) < old.wrapping_sub(1)
53
}
54
}
55
56
struct Last;
57
impl Policy for Last {
58
fn index(len: usize) -> usize {
59
len - 1
60
}
61
62
fn should_replace(new: u64, old: u64) -> bool {
63
new >= old
64
}
65
}
66
67
#[allow(dead_code)]
68
struct Arbitrary;
69
impl Policy for Arbitrary {
70
fn index(_len: usize) -> usize {
71
0
72
}
73
74
fn should_replace(_new: u64, old: u64) -> bool {
75
old == 0
76
}
77
}
78
79
struct NumFirstLastReducer<P, T>(PhantomData<(P, T)>);
80
81
impl<P, T> Clone for NumFirstLastReducer<P, T> {
82
fn clone(&self) -> Self {
83
Self(PhantomData)
84
}
85
}
86
87
impl<P, T> Reducer for NumFirstLastReducer<P, T>
88
where
89
P: Policy,
90
T: PolarsNumericType,
91
{
92
type Dtype = T;
93
type Value = (Option<T::Native>, u64);
94
95
fn init(&self) -> Self::Value {
96
(None, 0)
97
}
98
99
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
100
s.to_physical_repr()
101
}
102
103
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
104
if P::should_replace(b.1, a.1) {
105
*a = *b;
106
}
107
}
108
109
fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, seq_id: u64) {
110
if P::should_replace(seq_id, a.1) {
111
*a = (b, seq_id);
112
}
113
}
114
115
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
116
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
117
let val = ca.get(P::index(ca.len()));
118
*v = (val, seq_id);
119
}
120
}
121
122
fn finish(
123
&self,
124
v: Vec<Self::Value>,
125
m: Option<Bitmap>,
126
dtype: &DataType,
127
) -> PolarsResult<Series> {
128
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
129
let ca: ChunkedArray<T> = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
130
ca.into_series().cast(dtype)
131
}
132
}
133
134
struct BinaryFirstLastReducer<P>(PhantomData<P>);
135
136
impl<P> Clone for BinaryFirstLastReducer<P> {
137
fn clone(&self) -> Self {
138
Self(PhantomData)
139
}
140
}
141
142
fn replace_opt_bytes(l: &mut Option<Vec<u8>>, r: Option<&[u8]>) {
143
match (l, r) {
144
(Some(l), Some(r)) => {
145
l.clear();
146
l.extend_from_slice(r);
147
},
148
(l, r) => *l = r.map(|s| s.to_owned()),
149
}
150
}
151
152
impl<P> Reducer for BinaryFirstLastReducer<P>
153
where
154
P: Policy,
155
{
156
type Dtype = BinaryType;
157
type Value = (Option<Vec<u8>>, u64);
158
159
fn init(&self) -> Self::Value {
160
(None, 0)
161
}
162
163
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
164
Cow::Owned(s.cast(&DataType::Binary).unwrap())
165
}
166
167
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
168
if P::should_replace(b.1, a.1) {
169
a.0.clone_from(&b.0);
170
a.1 = b.1;
171
}
172
}
173
174
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, seq_id: u64) {
175
if P::should_replace(seq_id, a.1) {
176
replace_opt_bytes(&mut a.0, b);
177
a.1 = seq_id;
178
}
179
}
180
181
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
182
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
183
replace_opt_bytes(&mut v.0, ca.get(P::index(ca.len())));
184
v.1 = seq_id;
185
}
186
}
187
188
fn finish(
189
&self,
190
v: Vec<Self::Value>,
191
m: Option<Bitmap>,
192
dtype: &DataType,
193
) -> PolarsResult<Series> {
194
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
195
let ca: BinaryChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
196
ca.into_series().cast(dtype)
197
}
198
}
199
200
struct BoolFirstLastReducer<P>(PhantomData<P>);
201
202
impl<P> Clone for BoolFirstLastReducer<P> {
203
fn clone(&self) -> Self {
204
Self(PhantomData)
205
}
206
}
207
208
impl<P> Reducer for BoolFirstLastReducer<P>
209
where
210
P: Policy,
211
{
212
type Dtype = BooleanType;
213
type Value = (Option<bool>, u64);
214
215
fn init(&self) -> Self::Value {
216
(None, 0)
217
}
218
219
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
220
if P::should_replace(b.1, a.1) {
221
*a = *b;
222
}
223
}
224
225
fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, seq_id: u64) {
226
if P::should_replace(seq_id, a.1) {
227
a.0 = b;
228
a.1 = seq_id;
229
}
230
}
231
232
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
233
if !ca.is_empty() && P::should_replace(seq_id, v.1) {
234
v.0 = ca.get(P::index(ca.len()));
235
v.1 = seq_id;
236
}
237
}
238
239
fn finish(
240
&self,
241
v: Vec<Self::Value>,
242
m: Option<Bitmap>,
243
_dtype: &DataType,
244
) -> PolarsResult<Series> {
245
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
246
let ca: BooleanChunked = v.into_iter().map(|(x, _s)| x).collect_ca(PlSmallStr::EMPTY);
247
Ok(ca.into_series())
248
}
249
}
250
251
pub struct GenericFirstLastGroupedReduction<P> {
252
in_dtype: DataType,
253
values: Vec<AnyValue<'static>>,
254
seqs: Vec<u64>,
255
evicted_values: Vec<AnyValue<'static>>,
256
evicted_seqs: Vec<u64>,
257
policy: PhantomData<fn() -> P>,
258
}
259
260
impl<P> GenericFirstLastGroupedReduction<P> {
261
fn new(in_dtype: DataType) -> Self {
262
Self {
263
in_dtype,
264
values: Vec::new(),
265
seqs: Vec::new(),
266
evicted_values: Vec::new(),
267
evicted_seqs: Vec::new(),
268
policy: PhantomData,
269
}
270
}
271
}
272
273
impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<P> {
274
fn new_empty(&self) -> Box<dyn GroupedReduction> {
275
Box::new(Self::new(self.in_dtype.clone()))
276
}
277
278
fn reserve(&mut self, additional: usize) {
279
self.values.reserve(additional);
280
self.seqs.reserve(additional);
281
}
282
283
fn resize(&mut self, num_groups: IdxSize) {
284
self.values.resize(num_groups as usize, AnyValue::Null);
285
self.seqs.resize(num_groups as usize, 0);
286
}
287
288
fn update_group(
289
&mut self,
290
values: &Column,
291
group_idx: IdxSize,
292
seq_id: u64,
293
) -> PolarsResult<()> {
294
if !values.is_empty() {
295
let seq_id = seq_id + 1; // We use 0 for 'no value'.
296
if P::should_replace(seq_id, self.seqs[group_idx as usize]) {
297
self.values[group_idx as usize] = values.get(P::index(values.len()))?.into_static();
298
self.seqs[group_idx as usize] = seq_id;
299
}
300
}
301
Ok(())
302
}
303
304
unsafe fn update_groups_while_evicting(
305
&mut self,
306
values: &Column,
307
subset: &[IdxSize],
308
group_idxs: &[EvictIdx],
309
seq_id: u64,
310
) -> PolarsResult<()> {
311
let seq_id = seq_id + 1; // We use 0 for 'no value'.
312
for (i, g) in subset.iter().zip(group_idxs) {
313
let grp_val = self.values.get_unchecked_mut(g.idx());
314
let grp_seq = self.seqs.get_unchecked_mut(g.idx());
315
if g.should_evict() {
316
self.evicted_values
317
.push(core::mem::replace(grp_val, AnyValue::Null));
318
self.evicted_seqs.push(core::mem::replace(grp_seq, 0));
319
}
320
if P::should_replace(seq_id, *grp_seq) {
321
*grp_val = values.get_unchecked(*i as usize).into_static();
322
*grp_seq = seq_id;
323
}
324
}
325
Ok(())
326
}
327
328
unsafe fn combine_subset(
329
&mut self,
330
other: &dyn GroupedReduction,
331
subset: &[IdxSize],
332
group_idxs: &[IdxSize],
333
) -> PolarsResult<()> {
334
let other = other.as_any().downcast_ref::<Self>().unwrap();
335
for (i, g) in group_idxs.iter().enumerate() {
336
let si = *subset.get_unchecked(i) as usize;
337
if P::should_replace(
338
*other.seqs.get_unchecked(si),
339
*self.seqs.get_unchecked(*g as usize),
340
) {
341
*self.values.get_unchecked_mut(*g as usize) =
342
other.values.get_unchecked(si).clone();
343
*self.seqs.get_unchecked_mut(*g as usize) = *other.seqs.get_unchecked(si);
344
}
345
}
346
Ok(())
347
}
348
349
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
350
Box::new(Self {
351
in_dtype: self.in_dtype.clone(),
352
values: core::mem::take(&mut self.evicted_values),
353
seqs: core::mem::take(&mut self.evicted_seqs),
354
evicted_values: Vec::new(),
355
evicted_seqs: Vec::new(),
356
policy: PhantomData,
357
})
358
}
359
360
fn finalize(&mut self) -> PolarsResult<Series> {
361
self.seqs.clear();
362
unsafe {
363
let mut buf = AnyValueBufferTrusted::new(&self.in_dtype, self.values.len());
364
for v in core::mem::take(&mut self.values) {
365
buf.add_unchecked_owned_physical(&v);
366
}
367
Ok(buf.into_series())
368
}
369
}
370
371
fn as_any(&self) -> &dyn Any {
372
self
373
}
374
}
375
376