Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/mod.rs
8420 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
mod any_all;
3
#[cfg(feature = "approx_unique")]
4
mod approx_n_unique;
5
#[cfg(feature = "bitwise")]
6
mod bitwise;
7
mod convert;
8
mod count;
9
mod first_last;
10
mod first_last_nonnull;
11
mod len;
12
mod mean;
13
mod min_max;
14
mod min_max_by;
15
mod sum;
16
mod var_std;
17
18
use std::any::Any;
19
use std::borrow::Cow;
20
use std::marker::PhantomData;
21
22
use arrow::array::{Array, PrimitiveArray, StaticArray};
23
use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
24
pub use convert::into_reduction;
25
pub use min_max::{new_max_reduction, new_min_reduction};
26
use polars_core::prelude::*;
27
28
use crate::EvictIdx;
29
30
/// A reduction with groups.
31
///
32
/// Each group has its own reduction state that values can be aggregated into.
33
pub trait GroupedReduction: Any + Send + Sync {
34
/// Returns a new empty reduction.
35
fn new_empty(&self) -> Box<dyn GroupedReduction>;
36
37
/// Reserves space in this GroupedReduction for an additional number of groups.
38
fn reserve(&mut self, additional: usize);
39
40
/// Resizes this GroupedReduction to the given number of groups.
41
///
42
/// While not an actual member of the trait, the safety preconditions below
43
/// refer to self.num_groups() as given by the last call of this function.
44
fn resize(&mut self, num_groups: IdxSize);
45
46
/// Updates the specified group with the given values.
47
///
48
/// For order-sensitive grouped reductions, seq_id can be used to resolve
49
/// order between calls/multiple reductions.
50
fn update_group(
51
&mut self,
52
values: &[&Column],
53
group_idx: IdxSize,
54
seq_id: u64,
55
) -> PolarsResult<()>;
56
57
/// Updates this GroupedReduction with new values. values[subset[i]] should
58
/// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
59
/// reductions, seq_id can be used to resolve order between calls/multiple
60
/// reductions.
61
///
62
/// The column MUST consist of single chunk.
63
///
64
/// # Safety
65
/// The subset and group_idxs are in-bounds.
66
unsafe fn update_groups_subset(
67
&mut self,
68
values: &[&Column],
69
subset: &[IdxSize],
70
group_idxs: &[IdxSize],
71
seq_id: u64,
72
) -> PolarsResult<()> {
73
assert!(values.len() < (1 << (IdxSize::BITS - 1)));
74
// SAFETY: EvictIdx is a wrapper for IdxSize and has same alignment.
75
let evict_group_idxs = EvictIdx::cast_slice(group_idxs);
76
self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
77
}
78
79
/// Updates this GroupedReduction with new values. values[subset[i]] should
80
/// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
81
/// reductions, seq_id can be used to resolve order between calls/multiple
82
/// reductions. If the group_idxs[i] has its evict bit set the current value
83
/// in the group should be evicted and reset before updating.
84
///
85
/// The column MUST consist of single chunk.
86
///
87
/// # Safety
88
/// The subset and group_idxs are in-bounds.
89
unsafe fn update_groups_while_evicting(
90
&mut self,
91
values: &[&Column],
92
subset: &[IdxSize],
93
group_idxs: &[EvictIdx],
94
seq_id: u64,
95
) -> PolarsResult<()>;
96
97
/// Combines this GroupedReduction with another. Group other[subset[i]]
98
/// should be combined into group self[group_idxs[i]].
99
///
100
/// # Safety
101
/// subset[i] < other.num_groups() for all i.
102
/// group_idxs[i] < self.num_groups() for all i.
103
unsafe fn combine_subset(
104
&mut self,
105
other: &dyn GroupedReduction,
106
subset: &[IdxSize],
107
group_idxs: &[IdxSize],
108
) -> PolarsResult<()>;
109
110
/// Take the accumulated evicted groups.
111
fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
112
113
/// Returns the finalized value per group as a Series.
114
///
115
/// After this operation the number of groups is reset to 0.
116
fn finalize(&mut self) -> PolarsResult<Series>;
117
118
/// Returns this GroupedReduction as a dyn Any.
119
fn as_any(&self) -> &dyn Any;
120
}
121
122
// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
123
// reduce code duplication.
124
pub trait Reducer: Send + Sync + Clone + 'static {
125
type Dtype: PolarsPhysicalType;
126
type Value: Clone + Send + Sync + 'static;
127
fn init(&self) -> Self::Value;
128
#[inline(always)]
129
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
130
Cow::Borrowed(s)
131
}
132
fn combine(&self, a: &mut Self::Value, b: &Self::Value);
133
fn reduce_one(
134
&self,
135
a: &mut Self::Value,
136
b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
137
seq_id: u64,
138
);
139
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
140
fn finish(
141
&self,
142
v: Vec<Self::Value>,
143
m: Option<Bitmap>,
144
dtype: &DataType,
145
) -> PolarsResult<Series>;
146
}
147
148
pub trait NumericReduction: Send + Sync + 'static {
149
type Dtype: PolarsNumericType;
150
fn init() -> <Self::Dtype as PolarsNumericType>::Native;
151
fn combine(
152
a: <Self::Dtype as PolarsNumericType>::Native,
153
b: <Self::Dtype as PolarsNumericType>::Native,
154
) -> <Self::Dtype as PolarsNumericType>::Native;
155
fn reduce_ca(
156
ca: &ChunkedArray<Self::Dtype>,
157
) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
158
}
159
160
struct NumReducer<R: NumericReduction>(PhantomData<R>);
161
impl<R: NumericReduction> NumReducer<R> {
162
fn new() -> Self {
163
Self(PhantomData)
164
}
165
}
166
impl<R: NumericReduction> Clone for NumReducer<R> {
167
fn clone(&self) -> Self {
168
Self(PhantomData)
169
}
170
}
171
172
impl<R: NumericReduction> Reducer for NumReducer<R> {
173
type Dtype = <R as NumericReduction>::Dtype;
174
type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
175
176
#[inline(always)]
177
fn init(&self) -> Self::Value {
178
<R as NumericReduction>::init()
179
}
180
181
#[inline(always)]
182
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
183
s.to_physical_repr()
184
}
185
186
#[inline(always)]
187
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
188
*a = <R as NumericReduction>::combine(*a, *b);
189
}
190
191
#[inline(always)]
192
fn reduce_one(
193
&self,
194
a: &mut Self::Value,
195
b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
196
_seq_id: u64,
197
) {
198
if let Some(b) = b {
199
*a = <R as NumericReduction>::combine(*a, b);
200
}
201
}
202
203
#[inline(always)]
204
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
205
if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
206
*v = <R as NumericReduction>::combine(*v, r);
207
}
208
}
209
210
fn finish(
211
&self,
212
v: Vec<Self::Value>,
213
m: Option<Bitmap>,
214
dtype: &DataType,
215
) -> PolarsResult<Series> {
216
let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
217
Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
218
}
219
}
220
221
pub struct VecGroupedReduction<R: Reducer> {
222
values: Vec<R::Value>,
223
evicted_values: Vec<R::Value>,
224
in_dtype: DataType,
225
reducer: R,
226
}
227
228
impl<R: Reducer> VecGroupedReduction<R> {
229
pub fn new(in_dtype: DataType, reducer: R) -> Self {
230
Self {
231
values: Vec::new(),
232
evicted_values: Vec::new(),
233
in_dtype,
234
reducer,
235
}
236
}
237
}
238
239
impl<R> GroupedReduction for VecGroupedReduction<R>
240
where
241
R: Reducer,
242
{
243
fn new_empty(&self) -> Box<dyn GroupedReduction> {
244
Box::new(Self {
245
values: Vec::new(),
246
evicted_values: Vec::new(),
247
in_dtype: self.in_dtype.clone(),
248
reducer: self.reducer.clone(),
249
})
250
}
251
252
fn reserve(&mut self, additional: usize) {
253
self.values.reserve(additional);
254
}
255
256
fn resize(&mut self, num_groups: IdxSize) {
257
self.values.resize(num_groups as usize, self.reducer.init());
258
}
259
260
fn update_group(
261
&mut self,
262
values: &[&Column],
263
group_idx: IdxSize,
264
seq_id: u64,
265
) -> PolarsResult<()> {
266
assert!(values.len() == 1);
267
let values = values[0];
268
assert!(values.dtype() == &self.in_dtype);
269
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
270
let values = values.as_materialized_series(); // @scalar-opt
271
let values = self.reducer.cast_series(values);
272
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
273
self.reducer
274
.reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
275
Ok(())
276
}
277
278
unsafe fn update_groups_while_evicting(
279
&mut self,
280
values: &[&Column],
281
subset: &[IdxSize],
282
group_idxs: &[EvictIdx],
283
seq_id: u64,
284
) -> PolarsResult<()> {
285
let &[values] = values else { unreachable!() };
286
assert!(values.dtype() == &self.in_dtype);
287
assert!(subset.len() == group_idxs.len());
288
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
289
let values = values.as_materialized_series(); // @scalar-opt
290
let values = self.reducer.cast_series(values);
291
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
292
let arr = ca.downcast_as_array();
293
unsafe {
294
// SAFETY: indices are in-bounds guaranteed by trait.
295
if values.has_nulls() {
296
for (i, g) in subset.iter().zip(group_idxs) {
297
let ov = arr.get_unchecked(*i as usize);
298
let grp = self.values.get_unchecked_mut(g.idx());
299
if g.should_evict() {
300
let old = core::mem::replace(grp, self.reducer.init());
301
self.evicted_values.push(old);
302
}
303
self.reducer.reduce_one(grp, ov, seq_id);
304
}
305
} else {
306
for (i, g) in subset.iter().zip(group_idxs) {
307
let v = arr.value_unchecked(*i as usize);
308
let grp = self.values.get_unchecked_mut(g.idx());
309
if g.should_evict() {
310
let old = core::mem::replace(grp, self.reducer.init());
311
self.evicted_values.push(old);
312
}
313
self.reducer.reduce_one(grp, Some(v), seq_id);
314
}
315
}
316
}
317
Ok(())
318
}
319
320
unsafe fn combine_subset(
321
&mut self,
322
other: &dyn GroupedReduction,
323
subset: &[IdxSize],
324
group_idxs: &[IdxSize],
325
) -> PolarsResult<()> {
326
let other = other.as_any().downcast_ref::<Self>().unwrap();
327
assert!(self.in_dtype == other.in_dtype);
328
assert!(subset.len() == group_idxs.len());
329
unsafe {
330
// SAFETY: indices are in-bounds guaranteed by trait.
331
for (i, g) in subset.iter().zip(group_idxs) {
332
let v = other.values.get_unchecked(*i as usize);
333
let grp = self.values.get_unchecked_mut(*g as usize);
334
self.reducer.combine(grp, v);
335
}
336
}
337
Ok(())
338
}
339
340
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
341
Box::new(Self {
342
values: core::mem::take(&mut self.evicted_values),
343
evicted_values: Vec::new(),
344
in_dtype: self.in_dtype.clone(),
345
reducer: self.reducer.clone(),
346
})
347
}
348
349
fn finalize(&mut self) -> PolarsResult<Series> {
350
let v = core::mem::take(&mut self.values);
351
self.reducer.finish(v, None, &self.in_dtype)
352
}
353
354
fn as_any(&self) -> &dyn Any {
355
self
356
}
357
}
358
359
pub struct VecMaskGroupedReduction<R: Reducer> {
360
values: Vec<R::Value>,
361
mask: MutableBitmap,
362
evicted_values: Vec<R::Value>,
363
evicted_mask: BitmapBuilder,
364
in_dtype: DataType,
365
reducer: R,
366
}
367
368
impl<R: Reducer> VecMaskGroupedReduction<R> {
369
fn new(in_dtype: DataType, reducer: R) -> Self {
370
Self {
371
values: Vec::new(),
372
mask: MutableBitmap::new(),
373
evicted_values: Vec::new(),
374
evicted_mask: BitmapBuilder::new(),
375
in_dtype,
376
reducer,
377
}
378
}
379
}
380
381
impl<R> GroupedReduction for VecMaskGroupedReduction<R>
382
where
383
R: Reducer,
384
{
385
fn new_empty(&self) -> Box<dyn GroupedReduction> {
386
Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
387
}
388
389
fn reserve(&mut self, additional: usize) {
390
self.values.reserve(additional);
391
self.mask.reserve(additional)
392
}
393
394
fn resize(&mut self, num_groups: IdxSize) {
395
self.values.resize(num_groups as usize, self.reducer.init());
396
self.mask.resize(num_groups as usize, false);
397
}
398
399
fn update_group(
400
&mut self,
401
values: &[&Column],
402
group_idx: IdxSize,
403
seq_id: u64,
404
) -> PolarsResult<()> {
405
let &[values] = values else { unreachable!() };
406
assert!(values.dtype() == &self.in_dtype);
407
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
408
let values = values.as_materialized_series(); // @scalar-opt
409
let values = self.reducer.cast_series(values);
410
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
411
self.reducer
412
.reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
413
if ca.len() != ca.null_count() {
414
self.mask.set(group_idx as usize, true);
415
}
416
Ok(())
417
}
418
419
unsafe fn update_groups_while_evicting(
420
&mut self,
421
values: &[&Column],
422
subset: &[IdxSize],
423
group_idxs: &[EvictIdx],
424
seq_id: u64,
425
) -> PolarsResult<()> {
426
let &[values] = values else { unreachable!() };
427
assert!(values.dtype() == &self.in_dtype);
428
assert!(subset.len() == group_idxs.len());
429
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
430
let values = values.as_materialized_series(); // @scalar-opt
431
let values = self.reducer.cast_series(values);
432
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
433
let arr = ca.downcast_as_array();
434
unsafe {
435
// SAFETY: indices are in-bounds guaranteed by trait.
436
for (i, g) in subset.iter().zip(group_idxs) {
437
let ov = arr.get_unchecked(*i as usize);
438
let grp = self.values.get_unchecked_mut(g.idx());
439
if g.should_evict() {
440
self.evicted_values
441
.push(core::mem::replace(grp, self.reducer.init()));
442
self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
443
self.mask.set_unchecked(g.idx(), false);
444
}
445
if let Some(v) = ov {
446
self.reducer.reduce_one(grp, Some(v), seq_id);
447
self.mask.set_unchecked(g.idx(), true);
448
}
449
}
450
}
451
Ok(())
452
}
453
454
unsafe fn combine_subset(
455
&mut self,
456
other: &dyn GroupedReduction,
457
subset: &[IdxSize],
458
group_idxs: &[IdxSize],
459
) -> PolarsResult<()> {
460
let other = other.as_any().downcast_ref::<Self>().unwrap();
461
assert!(self.in_dtype == other.in_dtype);
462
assert!(subset.len() == group_idxs.len());
463
unsafe {
464
// SAFETY: indices are in-bounds guaranteed by trait.
465
for (i, g) in subset.iter().zip(group_idxs) {
466
let o = other.mask.get_unchecked(*i as usize);
467
if o {
468
let v = other.values.get_unchecked(*i as usize);
469
let grp = self.values.get_unchecked_mut(*g as usize);
470
self.reducer.combine(grp, v);
471
self.mask.set_unchecked(*g as usize, true);
472
}
473
}
474
}
475
Ok(())
476
}
477
478
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
479
Box::new(Self {
480
values: core::mem::take(&mut self.evicted_values),
481
mask: core::mem::take(&mut self.evicted_mask).into_mut(),
482
evicted_values: Vec::new(),
483
evicted_mask: BitmapBuilder::new(),
484
in_dtype: self.in_dtype.clone(),
485
reducer: self.reducer.clone(),
486
})
487
}
488
489
fn finalize(&mut self) -> PolarsResult<Series> {
490
let v = core::mem::take(&mut self.values);
491
let m = core::mem::take(&mut self.mask);
492
self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
493
}
494
495
fn as_any(&self) -> &dyn Any {
496
self
497
}
498
}
499
500
#[derive(Clone)]
501
pub struct NullGroupedReduction {
502
num_groups: IdxSize,
503
num_evictions: IdxSize,
504
output: Scalar,
505
}
506
507
impl NullGroupedReduction {
508
pub fn new(output: Scalar) -> Self {
509
Self {
510
num_groups: 0,
511
num_evictions: 0,
512
output,
513
}
514
}
515
}
516
517
impl GroupedReduction for NullGroupedReduction {
518
fn new_empty(&self) -> Box<dyn GroupedReduction> {
519
Box::new(Self::new(self.output.clone()))
520
}
521
522
fn reserve(&mut self, _additional: usize) {}
523
524
fn resize(&mut self, num_groups: IdxSize) {
525
self.num_groups = num_groups;
526
}
527
528
fn update_group(
529
&mut self,
530
values: &[&Column],
531
_group_idx: IdxSize,
532
_seq_id: u64,
533
) -> PolarsResult<()> {
534
let &[values] = values else { unreachable!() };
535
assert!(values.dtype().is_null());
536
Ok(())
537
}
538
539
unsafe fn update_groups_while_evicting(
540
&mut self,
541
values: &[&Column],
542
subset: &[IdxSize],
543
group_idxs: &[EvictIdx],
544
_seq_id: u64,
545
) -> PolarsResult<()> {
546
let &[values] = values else { unreachable!() };
547
assert!(values.dtype().is_null());
548
assert!(subset.len() == group_idxs.len());
549
for g in group_idxs {
550
self.num_evictions += g.should_evict() as IdxSize;
551
}
552
Ok(())
553
}
554
555
unsafe fn combine_subset(
556
&mut self,
557
other: &dyn GroupedReduction,
558
subset: &[IdxSize],
559
group_idxs: &[IdxSize],
560
) -> PolarsResult<()> {
561
let _other = other.as_any().downcast_ref::<Self>().unwrap();
562
assert!(subset.len() == group_idxs.len());
563
Ok(())
564
}
565
566
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
567
Box::new(Self {
568
num_groups: core::mem::replace(&mut self.num_evictions, 0),
569
num_evictions: 0,
570
output: self.output.clone(),
571
})
572
}
573
574
fn finalize(&mut self) -> PolarsResult<Series> {
575
let length = core::mem::replace(&mut self.num_groups, 0) as usize;
576
let s = self.output.clone().into_series(PlSmallStr::EMPTY);
577
Ok(s.new_from_index(0, length))
578
}
579
580
fn as_any(&self) -> &dyn Any {
581
self
582
}
583
}
584
585