Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/first_last.rs
8420 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::fmt::Debug;
3
use std::marker::PhantomData;
4
5
use polars_core::frame::row::AnyValueBufferTrusted;
6
use polars_core::with_match_physical_numeric_polars_type;
7
8
use super::*;
9
10
pub fn new_first_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
11
new_reduction_with_policy(dtype, First)
12
}
13
14
pub fn new_last_reduction(dtype: DataType) -> Box<dyn GroupedReduction> {
15
new_reduction_with_policy(dtype, Last)
16
}
17
18
pub fn new_item_reduction(dtype: DataType, allow_empty: bool) -> Box<dyn GroupedReduction> {
19
new_reduction_with_policy(dtype, Item { allow_empty })
20
}
21
22
fn new_reduction_with_policy<P: Policy + 'static>(
23
dtype: DataType,
24
policy: P,
25
) -> Box<dyn GroupedReduction> {
26
use DataType::*;
27
use VecGroupedReduction as VGR;
28
match dtype {
29
Boolean => Box::new(VecGroupedReduction::new(
30
dtype,
31
BoolFirstLastReducer(policy),
32
)),
33
_ if dtype.is_primitive_numeric()
34
|| dtype.is_temporal()
35
|| dtype.is_decimal()
36
|| dtype.is_categorical() =>
37
{
38
with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| {
39
Box::new(VGR::new(dtype, NumFirstLastReducer::<_, $T>(policy, PhantomData)))
40
})
41
},
42
String | Binary => Box::new(VecGroupedReduction::new(
43
dtype,
44
BinaryFirstLastReducer(policy),
45
)),
46
_ => Box::new(GenericFirstLastGroupedReduction::new(dtype, policy)),
47
}
48
}
49
50
trait Policy: Copy + Send + Sync + 'static {
51
type Count: Default + Clone + Copy + Send + Sync + 'static;
52
53
fn add_count(_a: &mut Self::Count, _b: usize) {}
54
fn combine_count(_a: &mut Self::Count, _b: &Self::Count) {}
55
fn check_count(_count: Self::Count, _allow_empty: bool) -> PolarsResult<()> {
56
Ok(())
57
}
58
59
fn index(self, len: usize) -> usize;
60
fn should_replace(self, new: u64, old: u64) -> bool;
61
62
#[inline(always)]
63
fn item_policy(self) -> Option<bool> {
64
None
65
}
66
}
67
68
#[derive(Clone, Copy)]
69
pub struct First;
70
impl Policy for First {
71
type Count = ();
72
73
fn index(self, _len: usize) -> usize {
74
0
75
}
76
77
fn should_replace(self, new: u64, old: u64) -> bool {
78
// Subtracting 1 with wrapping leaves all order unchanged, except it
79
// makes 0 (no value) the largest possible.
80
new.wrapping_sub(1) < old.wrapping_sub(1)
81
}
82
}
83
84
#[derive(Clone, Copy)]
85
pub struct Last;
86
impl Policy for Last {
87
type Count = ();
88
89
fn index(self, len: usize) -> usize {
90
len - 1
91
}
92
93
fn should_replace(self, new: u64, old: u64) -> bool {
94
new >= old
95
}
96
}
97
98
#[derive(Clone, Copy)]
99
struct Item {
100
allow_empty: bool,
101
}
102
impl Policy for Item {
103
type Count = u8;
104
105
fn add_count(a: &mut Self::Count, b: usize) {
106
*a = a.saturating_add(b.min(255) as u8);
107
}
108
109
fn combine_count(a: &mut Self::Count, b: &Self::Count) {
110
*a = a.saturating_add(*b);
111
}
112
113
fn index(self, _len: usize) -> usize {
114
0
115
}
116
117
fn should_replace(self, _new: u64, old: u64) -> bool {
118
old == 0
119
}
120
121
fn item_policy(self) -> Option<bool> {
122
Some(self.allow_empty)
123
}
124
125
fn check_count(count: Self::Count, allow_empty: bool) -> PolarsResult<()> {
126
polars_ensure!(
127
(allow_empty && count == 0) || count == 1,
128
item_agg_count_not_one = count,
129
allow_empty = allow_empty
130
);
131
Ok(())
132
}
133
}
134
135
struct NumFirstLastReducer<P, T>(P, PhantomData<T>);
136
137
#[derive(Clone, Debug, Default)]
138
struct Value<T, C> {
139
value: Option<T>,
140
seq: u64,
141
count: C,
142
}
143
144
impl<P: Policy, T> Clone for NumFirstLastReducer<P, T> {
145
fn clone(&self) -> Self {
146
Self(self.0, PhantomData)
147
}
148
}
149
150
impl<P, T> Reducer for NumFirstLastReducer<P, T>
151
where
152
P: Policy,
153
T: PolarsNumericType,
154
{
155
type Dtype = T;
156
type Value = Value<T::Native, P::Count>;
157
158
fn init(&self) -> Self::Value {
159
Value::default()
160
}
161
162
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
163
s.to_physical_repr()
164
}
165
166
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
167
if self.0.should_replace(b.seq, a.seq) {
168
a.value = b.value;
169
a.seq = b.seq;
170
}
171
P::combine_count(&mut a.count, &b.count);
172
}
173
174
fn reduce_one(&self, a: &mut Self::Value, b: Option<T::Native>, seq_id: u64) {
175
if self.0.should_replace(seq_id, a.seq) {
176
a.value = b;
177
a.seq = seq_id;
178
}
179
P::add_count(&mut a.count, b.is_some() as usize);
180
}
181
182
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
183
if !ca.is_empty() && self.0.should_replace(seq_id, v.seq) {
184
let val = ca.get(self.0.index(ca.len()));
185
v.value = val;
186
v.seq = seq_id;
187
}
188
P::add_count(&mut v.count, ca.len());
189
}
190
191
fn finish(
192
&self,
193
v: Vec<Self::Value>,
194
m: Option<Bitmap>,
195
dtype: &DataType,
196
) -> PolarsResult<Series> {
197
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
198
if let Some(allow_empty) = self.0.item_policy() {
199
check_item_count_is_one::<_, P>(&v, allow_empty)?;
200
}
201
let ca: ChunkedArray<T> = v
202
.into_iter()
203
.map(|red_val| red_val.value)
204
.collect_ca(PlSmallStr::EMPTY);
205
let s = ca.into_series();
206
unsafe { s.from_physical_unchecked(dtype) }
207
}
208
}
209
210
struct BinaryFirstLastReducer<P>(P);
211
212
impl<P: Policy> Clone for BinaryFirstLastReducer<P> {
213
fn clone(&self) -> Self {
214
Self(self.0)
215
}
216
}
217
218
pub fn replace_opt_bytes(l: &mut Option<Vec<u8>>, r: Option<&[u8]>) {
219
match (l, r) {
220
(Some(l), Some(r)) => {
221
l.clear();
222
l.extend_from_slice(r);
223
},
224
(l, r) => *l = r.map(|s| s.to_owned()),
225
}
226
}
227
228
impl<P> Reducer for BinaryFirstLastReducer<P>
229
where
230
P: Policy,
231
{
232
type Dtype = BinaryType;
233
type Value = Value<Vec<u8>, P::Count>;
234
235
fn init(&self) -> Self::Value {
236
Value::default()
237
}
238
239
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
240
Cow::Owned(s.cast(&DataType::Binary).unwrap())
241
}
242
243
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
244
if self.0.should_replace(b.seq, a.seq) {
245
a.value.clone_from(&b.value);
246
a.seq = b.seq;
247
}
248
P::combine_count(&mut a.count, &b.count);
249
}
250
251
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, seq_id: u64) {
252
if self.0.should_replace(seq_id, a.seq) {
253
replace_opt_bytes(&mut a.value, b);
254
a.seq = seq_id;
255
}
256
P::add_count(&mut a.count, b.is_some() as usize);
257
}
258
259
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
260
if !ca.is_empty() && self.0.should_replace(seq_id, v.seq) {
261
replace_opt_bytes(&mut v.value, ca.get(self.0.index(ca.len())));
262
v.seq = seq_id;
263
}
264
P::add_count(&mut v.count, ca.len());
265
}
266
267
fn finish(
268
&self,
269
v: Vec<Self::Value>,
270
m: Option<Bitmap>,
271
dtype: &DataType,
272
) -> PolarsResult<Series> {
273
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
274
if let Some(allow_empty) = self.0.item_policy() {
275
check_item_count_is_one::<_, P>(&v, allow_empty)?;
276
}
277
let ca: BinaryChunked = v
278
.into_iter()
279
.map(|Value { value, .. }| value)
280
.collect_ca(PlSmallStr::EMPTY);
281
ca.into_series().cast(dtype)
282
}
283
}
284
285
#[derive(Clone)]
286
struct BoolFirstLastReducer<P: Policy>(P);
287
288
impl<P> Reducer for BoolFirstLastReducer<P>
289
where
290
P: Policy,
291
{
292
type Dtype = BooleanType;
293
type Value = Value<bool, P::Count>;
294
295
fn init(&self) -> Self::Value {
296
Value::default()
297
}
298
299
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
300
if self.0.should_replace(b.seq, a.seq) {
301
a.value = b.value;
302
a.seq = b.seq;
303
}
304
P::combine_count(&mut a.count, &b.count);
305
}
306
307
fn reduce_one(&self, a: &mut Self::Value, b: Option<bool>, seq_id: u64) {
308
if self.0.should_replace(seq_id, a.seq) {
309
a.value = b;
310
a.seq = seq_id;
311
}
312
P::add_count(&mut a.count, b.is_some() as usize);
313
}
314
315
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64) {
316
if !ca.is_empty() && self.0.should_replace(seq_id, v.seq) {
317
v.value = ca.get(self.0.index(ca.len()));
318
v.seq = seq_id;
319
}
320
P::add_count(&mut v.count, ca.len());
321
}
322
323
fn finish(
324
&self,
325
v: Vec<Self::Value>,
326
m: Option<Bitmap>,
327
_dtype: &DataType,
328
) -> PolarsResult<Series> {
329
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
330
if let Some(allow_empty) = self.0.item_policy() {
331
check_item_count_is_one::<_, P>(&v, allow_empty)?;
332
}
333
let ca: BooleanChunked = v
334
.into_iter()
335
.map(|Value { value, .. }| value)
336
.collect_ca(PlSmallStr::EMPTY);
337
Ok(ca.into_series())
338
}
339
}
340
341
struct GenericFirstLastGroupedReduction<P: Policy> {
342
in_dtype: DataType,
343
policy: P,
344
values: Vec<AnyValue<'static>>,
345
seqs: Vec<u64>,
346
counts: Vec<P::Count>,
347
evicted_values: Vec<AnyValue<'static>>,
348
evicted_seqs: Vec<u64>,
349
evicted_counts: Vec<P::Count>,
350
}
351
352
impl<P: Policy> GenericFirstLastGroupedReduction<P> {
353
fn new(in_dtype: DataType, policy: P) -> Self {
354
Self {
355
in_dtype,
356
policy,
357
values: Vec::new(),
358
seqs: Vec::new(),
359
counts: Vec::new(),
360
evicted_values: Vec::new(),
361
evicted_seqs: Vec::new(),
362
evicted_counts: Vec::new(),
363
}
364
}
365
}
366
367
impl<P: Policy + 'static> GroupedReduction for GenericFirstLastGroupedReduction<P> {
368
fn new_empty(&self) -> Box<dyn GroupedReduction> {
369
Box::new(Self::new(self.in_dtype.clone(), self.policy))
370
}
371
372
fn reserve(&mut self, additional: usize) {
373
self.values.reserve(additional);
374
self.seqs.reserve(additional);
375
self.counts.reserve(additional);
376
}
377
378
fn resize(&mut self, num_groups: IdxSize) {
379
self.values.resize(num_groups as usize, AnyValue::Null);
380
self.seqs.resize(num_groups as usize, 0);
381
self.counts.resize(num_groups as usize, P::Count::default());
382
}
383
384
fn update_group(
385
&mut self,
386
values: &[&Column],
387
group_idx: IdxSize,
388
seq_id: u64,
389
) -> PolarsResult<()> {
390
let &[values] = values else { unreachable!() };
391
assert!(values.dtype() == &self.in_dtype);
392
if !values.is_empty() {
393
let seq_id = seq_id + 1; // We use 0 for 'no value'.
394
if self
395
.policy
396
.should_replace(seq_id, self.seqs[group_idx as usize])
397
{
398
self.values[group_idx as usize] =
399
values.get(self.policy.index(values.len()))?.into_static();
400
self.seqs[group_idx as usize] = seq_id;
401
}
402
P::add_count(&mut self.counts[group_idx as usize], values.len());
403
}
404
Ok(())
405
}
406
407
unsafe fn update_groups_while_evicting(
408
&mut self,
409
values: &[&Column],
410
subset: &[IdxSize],
411
group_idxs: &[EvictIdx],
412
seq_id: u64,
413
) -> PolarsResult<()> {
414
let &[values] = values else { unreachable!() };
415
assert!(values.dtype() == &self.in_dtype);
416
assert!(subset.len() == group_idxs.len());
417
let seq_id = seq_id + 1; // We use 0 for 'no value'.
418
for (i, g) in subset.iter().zip(group_idxs) {
419
let grp_val = self.values.get_unchecked_mut(g.idx());
420
let grp_seq = self.seqs.get_unchecked_mut(g.idx());
421
let grp_count = self.counts.get_unchecked_mut(g.idx());
422
if g.should_evict() {
423
self.evicted_values
424
.push(core::mem::replace(grp_val, AnyValue::Null));
425
self.evicted_seqs.push(core::mem::replace(grp_seq, 0));
426
self.evicted_counts.push(core::mem::take(grp_count));
427
}
428
if self.policy.should_replace(seq_id, *grp_seq) {
429
*grp_val = values.get_unchecked(*i as usize).into_static();
430
*grp_seq = seq_id;
431
}
432
P::add_count(self.counts.get_unchecked_mut(g.idx()), 1);
433
}
434
Ok(())
435
}
436
437
unsafe fn combine_subset(
438
&mut self,
439
other: &dyn GroupedReduction,
440
subset: &[IdxSize],
441
group_idxs: &[IdxSize],
442
) -> PolarsResult<()> {
443
let other = other.as_any().downcast_ref::<Self>().unwrap();
444
assert!(self.in_dtype == other.in_dtype);
445
assert!(subset.len() == group_idxs.len());
446
for (i, g) in group_idxs.iter().enumerate() {
447
let si = *subset.get_unchecked(i) as usize;
448
if self.policy.should_replace(
449
*other.seqs.get_unchecked(si),
450
*self.seqs.get_unchecked(*g as usize),
451
) {
452
*self.values.get_unchecked_mut(*g as usize) =
453
other.values.get_unchecked(si).clone();
454
*self.seqs.get_unchecked_mut(*g as usize) = *other.seqs.get_unchecked(si);
455
}
456
P::combine_count(
457
self.counts.get_unchecked_mut(*g as usize),
458
other.counts.get_unchecked(si),
459
);
460
}
461
Ok(())
462
}
463
464
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
465
Box::new(Self {
466
in_dtype: self.in_dtype.clone(),
467
policy: self.policy,
468
values: core::mem::take(&mut self.evicted_values),
469
seqs: core::mem::take(&mut self.evicted_seqs),
470
counts: core::mem::take(&mut self.evicted_counts),
471
evicted_values: Vec::new(),
472
evicted_seqs: Vec::new(),
473
evicted_counts: Vec::new(),
474
})
475
}
476
477
fn finalize(&mut self) -> PolarsResult<Series> {
478
self.seqs.clear();
479
if let Some(allow_empty) = self.policy.item_policy() {
480
for count in self.counts.iter() {
481
P::check_count(*count, allow_empty)?;
482
}
483
}
484
let phys_type = self.in_dtype.to_physical();
485
let mut buf = AnyValueBufferTrusted::new(&phys_type, self.values.len());
486
for v in core::mem::take(&mut self.values) {
487
// SAFETY: v is cast to physical.
488
unsafe { buf.add_unchecked_owned_physical(&v.to_physical()) };
489
}
490
// SAFETY: dtype is valid for series.
491
unsafe { buf.into_series().from_physical_unchecked(&self.in_dtype) }
492
}
493
494
fn as_any(&self) -> &dyn Any {
495
self
496
}
497
}
498
499
fn check_item_count_is_one<T, P: Policy>(
500
values: &[Value<T, P::Count>],
501
allow_empty: bool,
502
) -> PolarsResult<()> {
503
for v in values {
504
P::check_count(v.count, allow_empty)?;
505
}
506
Ok(())
507
}
508
509