Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/min_max.rs
8424 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::borrow::Cow;
3
use std::marker::PhantomData;
4
5
use arrow::array::BooleanArray;
6
use arrow::bitmap::Bitmap;
7
use num_traits::Bounded;
8
use polars_core::with_match_physical_integer_polars_type;
9
#[cfg(feature = "propagate_nans")]
10
use polars_ops::prelude::nan_propagating_aggregate::ca_nan_agg;
11
use polars_utils::float::IsFloat;
12
use polars_utils::min_max::MinMax;
13
14
use super::*;
15
16
pub fn new_min_reduction(
17
dtype: DataType,
18
propagate_nans: bool,
19
) -> PolarsResult<Box<dyn GroupedReduction>> {
20
// TODO: Move the error checks up and make this function infallible
21
use DataType::*;
22
use VecMaskGroupedReduction as VMGR;
23
Ok(match &dtype {
24
Boolean => Box::new(BoolMinGroupedReduction::default()),
25
#[cfg(all(feature = "dtype-f16", feature = "propagate_nans"))]
26
Float16 if propagate_nans => {
27
Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float16Type>>::new()))
28
},
29
#[cfg(feature = "propagate_nans")]
30
Float32 if propagate_nans => {
31
Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float32Type>>::new()))
32
},
33
#[cfg(feature = "propagate_nans")]
34
Float64 if propagate_nans => {
35
Box::new(VMGR::new(dtype, NumReducer::<NanMin<Float64Type>>::new()))
36
},
37
#[cfg(feature = "dtype-f16")]
38
Float16 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float16Type>>::new())),
39
Float32 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float32Type>>::new())),
40
Float64 => Box::new(VMGR::new(dtype, NumReducer::<Min<Float64Type>>::new())),
41
Null => Box::new(NullGroupedReduction::default()),
42
String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMinReducer)),
43
_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {
44
with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {
45
Box::new(VMGR::new(dtype, NumReducer::<Min<$T>>::new()))
46
})
47
},
48
#[cfg(feature = "dtype-decimal")]
49
Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Min<Int128Type>>::new())),
50
#[cfg(feature = "dtype-categorical")]
51
Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {
52
Box::new(VMGR::new(dtype.clone(), CatMinReducer::<$C>(map.clone(), PhantomData)))
53
}),
54
_ => polars_bail!(InvalidOperation: "`min` operation not supported for dtype `{dtype}`"),
55
})
56
}
57
58
pub fn new_max_reduction(
59
dtype: DataType,
60
propagate_nans: bool,
61
) -> PolarsResult<Box<dyn GroupedReduction>> {
62
// TODO: Move the error checks up and make this function infallible
63
use DataType::*;
64
use VecMaskGroupedReduction as VMGR;
65
Ok(match &dtype {
66
Boolean => Box::new(BoolMaxGroupedReduction::default()),
67
#[cfg(all(feature = "dtype-f16", feature = "propagate_nans"))]
68
Float16 if propagate_nans => {
69
Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float16Type>>::new()))
70
},
71
#[cfg(feature = "propagate_nans")]
72
Float32 if propagate_nans => {
73
Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float32Type>>::new()))
74
},
75
#[cfg(feature = "propagate_nans")]
76
Float64 if propagate_nans => {
77
Box::new(VMGR::new(dtype, NumReducer::<NanMax<Float64Type>>::new()))
78
},
79
#[cfg(feature = "dtype-f16")]
80
Float16 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float16Type>>::new())),
81
Float32 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float32Type>>::new())),
82
Float64 => Box::new(VMGR::new(dtype, NumReducer::<Max<Float64Type>>::new())),
83
Null => Box::new(NullGroupedReduction::default()),
84
String | Binary => Box::new(VecGroupedReduction::new(dtype, BinaryMaxReducer)),
85
_ if dtype.is_integer() || dtype.is_temporal() || dtype.is_enum() => {
86
with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| {
87
Box::new(VMGR::new(dtype, NumReducer::<Max<$T>>::new()))
88
})
89
},
90
#[cfg(feature = "dtype-decimal")]
91
Decimal(_, _) => Box::new(VMGR::new(dtype, NumReducer::<Max<Int128Type>>::new())),
92
#[cfg(feature = "dtype-categorical")]
93
Categorical(cats, map) => with_match_categorical_physical_type!(cats.physical(), |$C| {
94
Box::new(VMGR::new(dtype.clone(), CatMaxReducer::<$C>(map.clone(), PhantomData)))
95
}),
96
_ => polars_bail!(InvalidOperation: "`max` operation not supported for dtype `{dtype}`"),
97
})
98
}
99
100
// These two variants ignore nans.
101
struct Min<T>(PhantomData<T>);
102
struct Max<T>(PhantomData<T>);
103
104
// These two variants propagate nans.
105
#[cfg(feature = "propagate_nans")]
106
struct NanMin<T>(PhantomData<T>);
107
#[cfg(feature = "propagate_nans")]
108
struct NanMax<T>(PhantomData<T>);
109
110
impl<T> NumericReduction for Min<T>
111
where
112
T: PolarsNumericType,
113
ChunkedArray<T>: ChunkAgg<T::Native>,
114
{
115
type Dtype = T;
116
117
#[inline(always)]
118
fn init() -> T::Native {
119
if T::Native::is_float() {
120
T::Native::nan_value()
121
} else {
122
T::Native::max_value()
123
}
124
}
125
126
#[inline(always)]
127
fn combine(a: T::Native, b: T::Native) -> T::Native {
128
MinMax::min_ignore_nan(a, b)
129
}
130
131
#[inline(always)]
132
fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {
133
ChunkAgg::min(ca)
134
}
135
}
136
137
impl<T> NumericReduction for Max<T>
138
where
139
T: PolarsNumericType,
140
ChunkedArray<T>: ChunkAgg<T::Native>,
141
{
142
type Dtype = T;
143
144
#[inline(always)]
145
fn init() -> T::Native {
146
if T::Native::is_float() {
147
T::Native::nan_value()
148
} else {
149
T::Native::min_value()
150
}
151
}
152
153
#[inline(always)]
154
fn combine(a: T::Native, b: T::Native) -> T::Native {
155
MinMax::max_ignore_nan(a, b)
156
}
157
158
#[inline(always)]
159
fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {
160
ChunkAgg::max(ca)
161
}
162
}
163
164
#[cfg(feature = "propagate_nans")]
165
impl<T: PolarsFloatType> NumericReduction for NanMin<T> {
166
type Dtype = T;
167
168
#[inline(always)]
169
fn init() -> T::Native {
170
T::Native::max_value()
171
}
172
173
#[inline(always)]
174
fn combine(a: T::Native, b: T::Native) -> T::Native {
175
MinMax::min_propagate_nan(a, b)
176
}
177
178
#[inline(always)]
179
fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {
180
ca_nan_agg(ca, MinMax::min_propagate_nan)
181
}
182
}
183
184
#[cfg(feature = "propagate_nans")]
185
impl<T: PolarsFloatType> NumericReduction for NanMax<T> {
186
type Dtype = T;
187
188
#[inline(always)]
189
fn init() -> T::Native {
190
T::Native::min_value()
191
}
192
193
#[inline(always)]
194
fn combine(a: T::Native, b: T::Native) -> T::Native {
195
MinMax::max_propagate_nan(a, b)
196
}
197
198
#[inline(always)]
199
fn reduce_ca(ca: &ChunkedArray<T>) -> Option<T::Native> {
200
ca_nan_agg(ca, MinMax::max_propagate_nan)
201
}
202
}
203
204
#[derive(Clone)]
205
struct BinaryMinReducer;
206
#[derive(Clone)]
207
struct BinaryMaxReducer;
208
209
impl Reducer for BinaryMinReducer {
210
type Dtype = BinaryType;
211
type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.
212
213
fn init(&self) -> Self::Value {
214
None
215
}
216
217
#[inline(always)]
218
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
219
Cow::Owned(s.cast(&DataType::Binary).unwrap())
220
}
221
222
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
223
self.reduce_one(a, b.as_deref(), 0)
224
}
225
226
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {
227
match (a, b) {
228
(_, None) => {},
229
(l @ None, Some(r)) => *l = Some(r.to_owned()),
230
(Some(l), Some(r)) => {
231
if l.as_slice() > r {
232
l.clear();
233
l.extend_from_slice(r);
234
}
235
},
236
}
237
}
238
239
fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {
240
self.reduce_one(v, ca.min_binary(), 0)
241
}
242
243
fn finish(
244
&self,
245
v: Vec<Self::Value>,
246
m: Option<Bitmap>,
247
dtype: &DataType,
248
) -> PolarsResult<Series> {
249
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
250
let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);
251
ca.into_series().cast(dtype)
252
}
253
}
254
255
impl Reducer for BinaryMaxReducer {
256
type Dtype = BinaryType;
257
type Value = Option<Vec<u8>>; // TODO: evaluate SmallVec<u8>.
258
259
#[inline(always)]
260
fn init(&self) -> Self::Value {
261
None
262
}
263
264
#[inline(always)]
265
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
266
Cow::Owned(s.cast(&DataType::Binary).unwrap())
267
}
268
269
#[inline(always)]
270
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
271
self.reduce_one(a, b.as_deref(), 0)
272
}
273
274
#[inline(always)]
275
fn reduce_one(&self, a: &mut Self::Value, b: Option<&[u8]>, _seq_id: u64) {
276
match (a, b) {
277
(_, None) => {},
278
(l @ None, Some(r)) => *l = Some(r.to_owned()),
279
(Some(l), Some(r)) => {
280
if l.as_slice() < r {
281
l.clear();
282
l.extend_from_slice(r);
283
}
284
},
285
}
286
}
287
288
#[inline(always)]
289
fn reduce_ca(&self, v: &mut Self::Value, ca: &BinaryChunked, _seq_id: u64) {
290
self.reduce_one(v, ca.max_binary(), 0)
291
}
292
293
#[inline(always)]
294
fn finish(
295
&self,
296
v: Vec<Self::Value>,
297
m: Option<Bitmap>,
298
dtype: &DataType,
299
) -> PolarsResult<Series> {
300
assert!(m.is_none()); // This should only be used with VecGroupedReduction.
301
let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY);
302
ca.into_series().cast(dtype)
303
}
304
}
305
306
#[derive(Default)]
307
pub struct BoolMinGroupedReduction {
308
values: MutableBitmap,
309
mask: MutableBitmap,
310
evicted_values: BitmapBuilder,
311
evicted_mask: BitmapBuilder,
312
}
313
314
impl GroupedReduction for BoolMinGroupedReduction {
315
fn new_empty(&self) -> Box<dyn GroupedReduction> {
316
Box::new(Self::default())
317
}
318
319
fn reserve(&mut self, additional: usize) {
320
self.values.reserve(additional);
321
self.mask.reserve(additional)
322
}
323
324
fn resize(&mut self, num_groups: IdxSize) {
325
self.values.resize(num_groups as usize, true);
326
self.mask.resize(num_groups as usize, false);
327
}
328
329
fn update_group(
330
&mut self,
331
values: &[&Column],
332
group_idx: IdxSize,
333
_seq_id: u64,
334
) -> PolarsResult<()> {
335
let &[values] = values else { unreachable!() };
336
// TODO: we should really implement a sum-as-other-type operation instead
337
// of doing this materialized cast.
338
assert!(values.dtype() == &DataType::Boolean);
339
let values = values.as_materialized_series_maintain_scalar();
340
let ca: &BooleanChunked = values.as_ref().as_ref();
341
if !ca.all() {
342
self.values.set(group_idx as usize, false);
343
}
344
if ca.len() != ca.null_count() {
345
self.mask.set(group_idx as usize, true);
346
}
347
Ok(())
348
}
349
350
unsafe fn update_groups_while_evicting(
351
&mut self,
352
values: &[&Column],
353
subset: &[IdxSize],
354
group_idxs: &[EvictIdx],
355
_seq_id: u64,
356
) -> PolarsResult<()> {
357
let &[values] = values else { unreachable!() };
358
assert!(values.dtype() == &DataType::Boolean);
359
assert!(subset.len() == group_idxs.len());
360
let values = values.as_materialized_series(); // @scalar-opt
361
let ca: &BooleanChunked = values.as_ref().as_ref();
362
let arr = ca.downcast_as_array();
363
unsafe {
364
// SAFETY: indices are in-bounds guaranteed by trait.
365
for (i, g) in subset.iter().zip(group_idxs) {
366
let ov = arr.get_unchecked(*i as usize);
367
if g.should_evict() {
368
self.evicted_values.push(self.values.get_unchecked(g.idx()));
369
self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
370
self.values.set_unchecked(g.idx(), ov.unwrap_or(true));
371
self.mask.set_unchecked(g.idx(), ov.is_some());
372
} else {
373
self.values.and_pos_unchecked(g.idx(), ov.unwrap_or(true));
374
self.mask.or_pos_unchecked(g.idx(), ov.is_some());
375
}
376
}
377
}
378
Ok(())
379
}
380
381
unsafe fn combine_subset(
382
&mut self,
383
other: &dyn GroupedReduction,
384
subset: &[IdxSize],
385
group_idxs: &[IdxSize],
386
) -> PolarsResult<()> {
387
let other = other.as_any().downcast_ref::<Self>().unwrap();
388
assert!(subset.len() == group_idxs.len());
389
unsafe {
390
// SAFETY: indices are in-bounds guaranteed by trait.
391
for (i, g) in subset.iter().zip(group_idxs) {
392
self.values
393
.and_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));
394
self.mask
395
.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));
396
}
397
}
398
Ok(())
399
}
400
401
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
402
Box::new(Self {
403
values: core::mem::take(&mut self.evicted_values).into_mut(),
404
mask: core::mem::take(&mut self.evicted_mask).into_mut(),
405
evicted_values: BitmapBuilder::new(),
406
evicted_mask: BitmapBuilder::new(),
407
})
408
}
409
410
fn finalize(&mut self) -> PolarsResult<Series> {
411
let v = core::mem::take(&mut self.values);
412
let m = core::mem::take(&mut self.mask);
413
let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));
414
Ok(Series::from_array(PlSmallStr::EMPTY, arr))
415
}
416
417
fn as_any(&self) -> &dyn Any {
418
self
419
}
420
}
421
422
#[derive(Default)]
423
pub struct BoolMaxGroupedReduction {
424
values: MutableBitmap,
425
mask: MutableBitmap,
426
evicted_values: BitmapBuilder,
427
evicted_mask: BitmapBuilder,
428
}
429
430
impl GroupedReduction for BoolMaxGroupedReduction {
431
fn new_empty(&self) -> Box<dyn GroupedReduction> {
432
Box::new(Self::default())
433
}
434
435
fn reserve(&mut self, additional: usize) {
436
self.values.reserve(additional);
437
self.mask.reserve(additional)
438
}
439
440
fn resize(&mut self, num_groups: IdxSize) {
441
self.values.resize(num_groups as usize, false);
442
self.mask.resize(num_groups as usize, false);
443
}
444
445
fn update_group(
446
&mut self,
447
values: &[&Column],
448
group_idx: IdxSize,
449
_seq_id: u64,
450
) -> PolarsResult<()> {
451
let &[values] = values else { unreachable!() };
452
// TODO: we should really implement a sum-as-other-type operation instead
453
// of doing this materialized cast.
454
assert!(values.dtype() == &DataType::Boolean);
455
let values = values.as_materialized_series_maintain_scalar();
456
let ca: &BooleanChunked = values.as_ref().as_ref();
457
if ca.any() {
458
self.values.set(group_idx as usize, true);
459
}
460
if ca.len() != ca.null_count() {
461
self.mask.set(group_idx as usize, true);
462
}
463
Ok(())
464
}
465
466
unsafe fn update_groups_while_evicting(
467
&mut self,
468
values: &[&Column],
469
subset: &[IdxSize],
470
group_idxs: &[EvictIdx],
471
_seq_id: u64,
472
) -> PolarsResult<()> {
473
let &[values] = values else { unreachable!() };
474
assert!(values.dtype() == &DataType::Boolean);
475
assert!(subset.len() == group_idxs.len());
476
let values = values.as_materialized_series(); // @scalar-opt
477
let ca: &BooleanChunked = values.as_ref().as_ref();
478
let arr = ca.downcast_as_array();
479
unsafe {
480
// SAFETY: indices are in-bounds guaranteed by trait.
481
for (i, g) in subset.iter().zip(group_idxs) {
482
let ov = arr.get_unchecked(*i as usize);
483
if g.should_evict() {
484
self.evicted_values.push(self.values.get_unchecked(g.idx()));
485
self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
486
self.values.set_unchecked(g.idx(), ov.unwrap_or(false));
487
self.mask.set_unchecked(g.idx(), ov.is_some());
488
} else {
489
self.values.or_pos_unchecked(g.idx(), ov.unwrap_or(false));
490
self.mask.or_pos_unchecked(g.idx(), ov.is_some());
491
}
492
}
493
}
494
Ok(())
495
}
496
497
unsafe fn combine_subset(
498
&mut self,
499
other: &dyn GroupedReduction,
500
subset: &[IdxSize],
501
group_idxs: &[IdxSize],
502
) -> PolarsResult<()> {
503
let other = other.as_any().downcast_ref::<Self>().unwrap();
504
assert!(subset.len() == group_idxs.len());
505
unsafe {
506
// SAFETY: indices are in-bounds guaranteed by trait.
507
for (i, g) in subset.iter().zip(group_idxs) {
508
self.values
509
.or_pos_unchecked(*g as usize, other.values.get_unchecked(*i as usize));
510
self.mask
511
.or_pos_unchecked(*g as usize, other.mask.get_unchecked(*i as usize));
512
}
513
}
514
Ok(())
515
}
516
517
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
518
Box::new(Self {
519
values: core::mem::take(&mut self.evicted_values).into_mut(),
520
mask: core::mem::take(&mut self.evicted_mask).into_mut(),
521
evicted_values: BitmapBuilder::new(),
522
evicted_mask: BitmapBuilder::new(),
523
})
524
}
525
526
fn finalize(&mut self) -> PolarsResult<Series> {
527
let v = core::mem::take(&mut self.values);
528
let m = core::mem::take(&mut self.mask);
529
let arr = BooleanArray::from(v.freeze()).with_validity(Some(m.freeze()));
530
Ok(Series::from_array(PlSmallStr::EMPTY, arr))
531
}
532
533
fn as_any(&self) -> &dyn Any {
534
self
535
}
536
}
537
538
#[cfg(feature = "dtype-categorical")]
539
struct CatMinReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);
540
541
#[cfg(feature = "dtype-categorical")]
542
impl<T> Clone for CatMinReducer<T> {
543
fn clone(&self) -> Self {
544
Self(self.0.clone(), PhantomData)
545
}
546
}
547
548
#[cfg(feature = "dtype-categorical")]
549
impl<T: PolarsCategoricalType> Reducer for CatMinReducer<T> {
550
type Dtype = T::PolarsPhysical;
551
type Value = T::Native;
552
553
fn init(&self) -> Self::Value {
554
T::Native::max_value() // Ensures it's invalid, preferring the other value.
555
}
556
557
#[inline(always)]
558
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
559
s.to_physical_repr()
560
}
561
562
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
563
let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {
564
return;
565
};
566
let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {
567
*a = *b;
568
return;
569
};
570
571
if b_s < a_s {
572
*a = *b;
573
}
574
}
575
576
fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {
577
if let Some(b) = b {
578
self.combine(a, &b);
579
}
580
}
581
582
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {
583
for cat in ca.iter().flatten() {
584
self.combine(v, &cat);
585
}
586
}
587
588
fn finish(
589
&self,
590
v: Vec<Self::Value>,
591
m: Option<Bitmap>,
592
dtype: &DataType,
593
) -> PolarsResult<Series> {
594
let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);
595
let cat_ids = ChunkedArray::from(cat_ids);
596
unsafe {
597
Ok(
598
CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())
599
.into_series(),
600
)
601
}
602
}
603
}
604
605
#[cfg(feature = "dtype-categorical")]
606
struct CatMaxReducer<T>(Arc<CategoricalMapping>, PhantomData<T>);
607
608
#[cfg(feature = "dtype-categorical")]
609
impl<T> Clone for CatMaxReducer<T> {
610
fn clone(&self) -> Self {
611
Self(self.0.clone(), PhantomData)
612
}
613
}
614
615
#[cfg(feature = "dtype-categorical")]
616
impl<T: PolarsCategoricalType> Reducer for CatMaxReducer<T> {
617
type Dtype = T::PolarsPhysical;
618
type Value = T::Native;
619
620
fn init(&self) -> Self::Value {
621
T::Native::max_value() // Ensures it's invalid, preferring the other value.
622
}
623
624
#[inline(always)]
625
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
626
s.to_physical_repr()
627
}
628
629
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
630
let Some(b_s) = self.0.cat_to_str(b.as_cat()) else {
631
return;
632
};
633
let Some(a_s) = self.0.cat_to_str(a.as_cat()) else {
634
*a = *b;
635
return;
636
};
637
638
if b_s > a_s {
639
*a = *b;
640
}
641
}
642
643
fn reduce_one(&self, a: &mut Self::Value, b: Option<Self::Value>, _seq_id: u64) {
644
if let Some(b) = b {
645
self.combine(a, &b);
646
}
647
}
648
649
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<T::PolarsPhysical>, _seq_id: u64) {
650
for cat in ca.iter().flatten() {
651
self.combine(v, &cat);
652
}
653
}
654
655
fn finish(
656
&self,
657
v: Vec<Self::Value>,
658
m: Option<Bitmap>,
659
dtype: &DataType,
660
) -> PolarsResult<Series> {
661
let cat_ids = PrimitiveArray::from_vec(v).with_validity(m);
662
let cat_ids = ChunkedArray::from(cat_ids);
663
unsafe {
664
Ok(
665
CategoricalChunked::<T>::from_cats_and_dtype_unchecked(cat_ids, dtype.clone())
666
.into_series(),
667
)
668
}
669
}
670
}
671
672
#[derive(Default)]
673
pub struct NullGroupedReduction {
674
length: usize,
675
num_evictions: usize,
676
}
677
678
impl GroupedReduction for NullGroupedReduction {
679
fn new_empty(&self) -> Box<dyn GroupedReduction> {
680
Box::new(Self::default())
681
}
682
683
fn reserve(&mut self, _additional: usize) {}
684
685
fn resize(&mut self, num_groups: IdxSize) {
686
self.length = num_groups as usize;
687
}
688
689
fn update_group(
690
&mut self,
691
values: &[&Column],
692
_group_idx: IdxSize,
693
_seq_id: u64,
694
) -> PolarsResult<()> {
695
let &[values] = values else { unreachable!() };
696
assert!(values.dtype() == &DataType::Null);
697
698
// no-op
699
Ok(())
700
}
701
702
unsafe fn update_groups_while_evicting(
703
&mut self,
704
values: &[&Column],
705
subset: &[IdxSize],
706
group_idxs: &[EvictIdx],
707
_seq_id: u64,
708
) -> PolarsResult<()> {
709
let &[values] = values else { unreachable!() };
710
assert!(values.dtype() == &DataType::Null);
711
assert!(subset.len() == group_idxs.len());
712
713
for g in group_idxs {
714
self.num_evictions += g.should_evict() as usize;
715
}
716
Ok(())
717
}
718
719
unsafe fn combine_subset(
720
&mut self,
721
_other: &dyn GroupedReduction,
722
subset: &[IdxSize],
723
group_idxs: &[IdxSize],
724
) -> PolarsResult<()> {
725
assert!(subset.len() == group_idxs.len());
726
727
// no-op
728
Ok(())
729
}
730
731
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
732
let out = Box::new(Self {
733
length: self.num_evictions,
734
num_evictions: 0,
735
});
736
self.num_evictions = 0;
737
out
738
}
739
740
fn finalize(&mut self) -> PolarsResult<Series> {
741
Ok(Series::full_null(
742
PlSmallStr::EMPTY,
743
self.length,
744
&DataType::Null,
745
))
746
}
747
748
fn as_any(&self) -> &dyn Any {
749
self
750
}
751
}
752
753