Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/reduce/mod.rs
6940 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
mod any_all;
3
#[cfg(feature = "bitwise")]
4
mod bitwise;
5
mod convert;
6
mod count;
7
mod first_last;
8
mod len;
9
mod mean;
10
mod min_max;
11
mod sum;
12
mod var_std;
13
14
use std::any::Any;
15
use std::borrow::Cow;
16
use std::marker::PhantomData;
17
18
use arrow::array::{Array, PrimitiveArray, StaticArray};
19
use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
20
pub use convert::into_reduction;
21
pub use min_max::{new_max_reduction, new_min_reduction};
22
use polars_core::prelude::*;
23
24
use crate::EvictIdx;
25
26
/// A reduction with groups.
27
///
28
/// Each group has its own reduction state that values can be aggregated into.
29
pub trait GroupedReduction: Any + Send + Sync {
30
/// Returns a new empty reduction.
31
fn new_empty(&self) -> Box<dyn GroupedReduction>;
32
33
/// Reserves space in this GroupedReduction for an additional number of groups.
34
fn reserve(&mut self, additional: usize);
35
36
/// Resizes this GroupedReduction to the given number of groups.
37
///
38
/// While not an actual member of the trait, the safety preconditions below
39
/// refer to self.num_groups() as given by the last call of this function.
40
fn resize(&mut self, num_groups: IdxSize);
41
42
/// Updates the specified group with the given values.
43
///
44
/// For order-sensitive grouped reductions, seq_id can be used to resolve
45
/// order between calls/multiple reductions.
46
fn update_group(
47
&mut self,
48
values: &Column,
49
group_idx: IdxSize,
50
seq_id: u64,
51
) -> PolarsResult<()>;
52
53
/// Updates this GroupedReduction with new values. values[subset[i]] should
54
/// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
55
/// reductions, seq_id can be used to resolve order between calls/multiple
56
/// reductions.
57
///
58
/// # Safety
59
/// The subset and group_idxs are in-bounds.
60
unsafe fn update_groups_subset(
61
&mut self,
62
values: &Column,
63
subset: &[IdxSize],
64
group_idxs: &[IdxSize],
65
seq_id: u64,
66
) -> PolarsResult<()> {
67
assert!(values.len() < (1 << (IdxSize::BITS - 1)));
68
let evict_group_idxs = core::mem::transmute::<&[IdxSize], &[EvictIdx]>(group_idxs);
69
self.update_groups_while_evicting(values, subset, evict_group_idxs, seq_id)
70
}
71
72
/// Updates this GroupedReduction with new values. values[subset[i]] should
73
/// be added to reduction self[group_idxs[i]]. For order-sensitive grouped
74
/// reductions, seq_id can be used to resolve order between calls/multiple
75
/// reductions. If the group_idxs[i] has its evict bit set the current value
76
/// in the group should be evicted and reset before updating.
77
///
78
/// # Safety
79
/// The subset and group_idxs are in-bounds.
80
unsafe fn update_groups_while_evicting(
81
&mut self,
82
values: &Column,
83
subset: &[IdxSize],
84
group_idxs: &[EvictIdx],
85
seq_id: u64,
86
) -> PolarsResult<()>;
87
88
/// Combines this GroupedReduction with another. Group other[subset[i]]
89
/// should be combined into group self[group_idxs[i]].
90
///
91
/// # Safety
92
/// subset[i] < other.num_groups() for all i.
93
/// group_idxs[i] < self.num_groups() for all i.
94
unsafe fn combine_subset(
95
&mut self,
96
other: &dyn GroupedReduction,
97
subset: &[IdxSize],
98
group_idxs: &[IdxSize],
99
) -> PolarsResult<()>;
100
101
/// Take the accumulated evicted groups.
102
fn take_evictions(&mut self) -> Box<dyn GroupedReduction>;
103
104
/// Returns the finalized value per group as a Series.
105
///
106
/// After this operation the number of groups is reset to 0.
107
fn finalize(&mut self) -> PolarsResult<Series>;
108
109
/// Returns this GroupedReduction as a dyn Any.
110
fn as_any(&self) -> &dyn Any;
111
}
112
113
// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to
114
// reduce code duplication.
115
pub trait Reducer: Send + Sync + Clone + 'static {
116
type Dtype: PolarsPhysicalType;
117
type Value: Clone + Send + Sync + 'static;
118
fn init(&self) -> Self::Value;
119
#[inline(always)]
120
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
121
Cow::Borrowed(s)
122
}
123
fn combine(&self, a: &mut Self::Value, b: &Self::Value);
124
fn reduce_one(
125
&self,
126
a: &mut Self::Value,
127
b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
128
seq_id: u64,
129
);
130
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, seq_id: u64);
131
fn finish(
132
&self,
133
v: Vec<Self::Value>,
134
m: Option<Bitmap>,
135
dtype: &DataType,
136
) -> PolarsResult<Series>;
137
}
138
139
pub trait NumericReduction: Send + Sync + 'static {
140
type Dtype: PolarsNumericType;
141
fn init() -> <Self::Dtype as PolarsNumericType>::Native;
142
fn combine(
143
a: <Self::Dtype as PolarsNumericType>::Native,
144
b: <Self::Dtype as PolarsNumericType>::Native,
145
) -> <Self::Dtype as PolarsNumericType>::Native;
146
fn reduce_ca(
147
ca: &ChunkedArray<Self::Dtype>,
148
) -> Option<<Self::Dtype as PolarsNumericType>::Native>;
149
}
150
151
struct NumReducer<R: NumericReduction>(PhantomData<R>);
152
impl<R: NumericReduction> NumReducer<R> {
153
fn new() -> Self {
154
Self(PhantomData)
155
}
156
}
157
impl<R: NumericReduction> Clone for NumReducer<R> {
158
fn clone(&self) -> Self {
159
Self(PhantomData)
160
}
161
}
162
163
impl<R: NumericReduction> Reducer for NumReducer<R> {
164
type Dtype = <R as NumericReduction>::Dtype;
165
type Value = <<R as NumericReduction>::Dtype as PolarsNumericType>::Native;
166
167
#[inline(always)]
168
fn init(&self) -> Self::Value {
169
<R as NumericReduction>::init()
170
}
171
172
#[inline(always)]
173
fn cast_series<'a>(&self, s: &'a Series) -> Cow<'a, Series> {
174
s.to_physical_repr()
175
}
176
177
#[inline(always)]
178
fn combine(&self, a: &mut Self::Value, b: &Self::Value) {
179
*a = <R as NumericReduction>::combine(*a, *b);
180
}
181
182
#[inline(always)]
183
fn reduce_one(
184
&self,
185
a: &mut Self::Value,
186
b: Option<<Self::Dtype as PolarsDataType>::Physical<'_>>,
187
_seq_id: u64,
188
) {
189
if let Some(b) = b {
190
*a = <R as NumericReduction>::combine(*a, b);
191
}
192
}
193
194
#[inline(always)]
195
fn reduce_ca(&self, v: &mut Self::Value, ca: &ChunkedArray<Self::Dtype>, _seq_id: u64) {
196
if let Some(r) = <R as NumericReduction>::reduce_ca(ca) {
197
*v = <R as NumericReduction>::combine(*v, r);
198
}
199
}
200
201
fn finish(
202
&self,
203
v: Vec<Self::Value>,
204
m: Option<Bitmap>,
205
dtype: &DataType,
206
) -> PolarsResult<Series> {
207
let arr = Box::new(PrimitiveArray::<Self::Value>::from_vec(v).with_validity(m));
208
Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) })
209
}
210
}
211
212
pub struct VecGroupedReduction<R: Reducer> {
213
values: Vec<R::Value>,
214
evicted_values: Vec<R::Value>,
215
in_dtype: DataType,
216
reducer: R,
217
}
218
219
impl<R: Reducer> VecGroupedReduction<R> {
220
fn new(in_dtype: DataType, reducer: R) -> Self {
221
Self {
222
values: Vec::new(),
223
evicted_values: Vec::new(),
224
in_dtype,
225
reducer,
226
}
227
}
228
}
229
230
impl<R> GroupedReduction for VecGroupedReduction<R>
231
where
232
R: Reducer,
233
{
234
fn new_empty(&self) -> Box<dyn GroupedReduction> {
235
Box::new(Self {
236
values: Vec::new(),
237
evicted_values: Vec::new(),
238
in_dtype: self.in_dtype.clone(),
239
reducer: self.reducer.clone(),
240
})
241
}
242
243
fn reserve(&mut self, additional: usize) {
244
self.values.reserve(additional);
245
}
246
247
fn resize(&mut self, num_groups: IdxSize) {
248
self.values.resize(num_groups as usize, self.reducer.init());
249
}
250
251
fn update_group(
252
&mut self,
253
values: &Column,
254
group_idx: IdxSize,
255
seq_id: u64,
256
) -> PolarsResult<()> {
257
assert!(values.dtype() == &self.in_dtype);
258
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
259
let values = values.as_materialized_series(); // @scalar-opt
260
let values = self.reducer.cast_series(values);
261
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
262
self.reducer
263
.reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
264
Ok(())
265
}
266
267
unsafe fn update_groups_while_evicting(
268
&mut self,
269
values: &Column,
270
subset: &[IdxSize],
271
group_idxs: &[EvictIdx],
272
seq_id: u64,
273
) -> PolarsResult<()> {
274
assert!(values.dtype() == &self.in_dtype);
275
assert!(subset.len() == group_idxs.len());
276
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
277
let values = values.as_materialized_series(); // @scalar-opt
278
let values = self.reducer.cast_series(values);
279
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
280
let arr = ca.downcast_as_array();
281
unsafe {
282
// SAFETY: indices are in-bounds guaranteed by trait.
283
if values.has_nulls() {
284
for (i, g) in subset.iter().zip(group_idxs) {
285
let ov = arr.get_unchecked(*i as usize);
286
let grp = self.values.get_unchecked_mut(g.idx());
287
if g.should_evict() {
288
let old = core::mem::replace(grp, self.reducer.init());
289
self.evicted_values.push(old);
290
}
291
self.reducer.reduce_one(grp, ov, seq_id);
292
}
293
} else {
294
for (i, g) in subset.iter().zip(group_idxs) {
295
let v = arr.value_unchecked(*i as usize);
296
let grp = self.values.get_unchecked_mut(g.idx());
297
if g.should_evict() {
298
let old = core::mem::replace(grp, self.reducer.init());
299
self.evicted_values.push(old);
300
}
301
self.reducer.reduce_one(grp, Some(v), seq_id);
302
}
303
}
304
}
305
Ok(())
306
}
307
308
unsafe fn combine_subset(
309
&mut self,
310
other: &dyn GroupedReduction,
311
subset: &[IdxSize],
312
group_idxs: &[IdxSize],
313
) -> PolarsResult<()> {
314
let other = other.as_any().downcast_ref::<Self>().unwrap();
315
assert!(self.in_dtype == other.in_dtype);
316
assert!(subset.len() == group_idxs.len());
317
unsafe {
318
// SAFETY: indices are in-bounds guaranteed by trait.
319
for (i, g) in subset.iter().zip(group_idxs) {
320
let v = other.values.get_unchecked(*i as usize);
321
let grp = self.values.get_unchecked_mut(*g as usize);
322
self.reducer.combine(grp, v);
323
}
324
}
325
Ok(())
326
}
327
328
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
329
Box::new(Self {
330
values: core::mem::take(&mut self.evicted_values),
331
evicted_values: Vec::new(),
332
in_dtype: self.in_dtype.clone(),
333
reducer: self.reducer.clone(),
334
})
335
}
336
337
fn finalize(&mut self) -> PolarsResult<Series> {
338
let v = core::mem::take(&mut self.values);
339
self.reducer.finish(v, None, &self.in_dtype)
340
}
341
342
fn as_any(&self) -> &dyn Any {
343
self
344
}
345
}
346
347
pub struct VecMaskGroupedReduction<R: Reducer> {
348
values: Vec<R::Value>,
349
mask: MutableBitmap,
350
evicted_values: Vec<R::Value>,
351
evicted_mask: BitmapBuilder,
352
in_dtype: DataType,
353
reducer: R,
354
}
355
356
impl<R: Reducer> VecMaskGroupedReduction<R> {
357
fn new(in_dtype: DataType, reducer: R) -> Self {
358
Self {
359
values: Vec::new(),
360
mask: MutableBitmap::new(),
361
evicted_values: Vec::new(),
362
evicted_mask: BitmapBuilder::new(),
363
in_dtype,
364
reducer,
365
}
366
}
367
}
368
369
impl<R> GroupedReduction for VecMaskGroupedReduction<R>
370
where
371
R: Reducer,
372
{
373
fn new_empty(&self) -> Box<dyn GroupedReduction> {
374
Box::new(Self::new(self.in_dtype.clone(), self.reducer.clone()))
375
}
376
377
fn reserve(&mut self, additional: usize) {
378
self.values.reserve(additional);
379
self.mask.reserve(additional)
380
}
381
382
fn resize(&mut self, num_groups: IdxSize) {
383
self.values.resize(num_groups as usize, self.reducer.init());
384
self.mask.resize(num_groups as usize, false);
385
}
386
387
fn update_group(
388
&mut self,
389
values: &Column,
390
group_idx: IdxSize,
391
seq_id: u64,
392
) -> PolarsResult<()> {
393
assert!(values.dtype() == &self.in_dtype);
394
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
395
let values = values.as_materialized_series(); // @scalar-opt
396
let values = self.reducer.cast_series(values);
397
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
398
self.reducer
399
.reduce_ca(&mut self.values[group_idx as usize], ca, seq_id);
400
if ca.len() != ca.null_count() {
401
self.mask.set(group_idx as usize, true);
402
}
403
Ok(())
404
}
405
406
unsafe fn update_groups_while_evicting(
407
&mut self,
408
values: &Column,
409
subset: &[IdxSize],
410
group_idxs: &[EvictIdx],
411
seq_id: u64,
412
) -> PolarsResult<()> {
413
assert!(values.dtype() == &self.in_dtype);
414
assert!(subset.len() == group_idxs.len());
415
let seq_id = seq_id + 1; // So we can use 0 for 'none yet'.
416
let values = values.as_materialized_series(); // @scalar-opt
417
let values = self.reducer.cast_series(values);
418
let ca: &ChunkedArray<R::Dtype> = values.as_ref().as_ref().as_ref();
419
let arr = ca.downcast_as_array();
420
unsafe {
421
// SAFETY: indices are in-bounds guaranteed by trait.
422
for (i, g) in subset.iter().zip(group_idxs) {
423
let ov = arr.get_unchecked(*i as usize);
424
let grp = self.values.get_unchecked_mut(g.idx());
425
if g.should_evict() {
426
self.evicted_values
427
.push(core::mem::replace(grp, self.reducer.init()));
428
self.evicted_mask.push(self.mask.get_unchecked(g.idx()));
429
self.mask.set_unchecked(g.idx(), false);
430
}
431
if let Some(v) = ov {
432
self.reducer.reduce_one(grp, Some(v), seq_id);
433
self.mask.set_unchecked(g.idx(), true);
434
}
435
}
436
}
437
Ok(())
438
}
439
440
unsafe fn combine_subset(
441
&mut self,
442
other: &dyn GroupedReduction,
443
subset: &[IdxSize],
444
group_idxs: &[IdxSize],
445
) -> PolarsResult<()> {
446
let other = other.as_any().downcast_ref::<Self>().unwrap();
447
assert!(self.in_dtype == other.in_dtype);
448
assert!(subset.len() == group_idxs.len());
449
unsafe {
450
// SAFETY: indices are in-bounds guaranteed by trait.
451
for (i, g) in subset.iter().zip(group_idxs) {
452
let o = other.mask.get_unchecked(*i as usize);
453
if o {
454
let v = other.values.get_unchecked(*i as usize);
455
let grp = self.values.get_unchecked_mut(*g as usize);
456
self.reducer.combine(grp, v);
457
self.mask.set_unchecked(*g as usize, true);
458
}
459
}
460
}
461
Ok(())
462
}
463
464
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
465
Box::new(Self {
466
values: core::mem::take(&mut self.evicted_values),
467
mask: core::mem::take(&mut self.evicted_mask).into_mut(),
468
evicted_values: Vec::new(),
469
evicted_mask: BitmapBuilder::new(),
470
in_dtype: self.in_dtype.clone(),
471
reducer: self.reducer.clone(),
472
})
473
}
474
475
fn finalize(&mut self) -> PolarsResult<Series> {
476
let v = core::mem::take(&mut self.values);
477
let m = core::mem::take(&mut self.mask);
478
self.reducer.finish(v, Some(m.freeze()), &self.in_dtype)
479
}
480
481
fn as_any(&self) -> &dyn Any {
482
self
483
}
484
}
485
486
struct NullGroupedReduction {
487
num_groups: IdxSize,
488
num_evictions: IdxSize,
489
dtype: DataType,
490
}
491
492
impl NullGroupedReduction {
493
fn new(dtype: DataType) -> Self {
494
Self {
495
num_groups: 0,
496
num_evictions: 0,
497
dtype,
498
}
499
}
500
}
501
502
impl GroupedReduction for NullGroupedReduction {
503
fn new_empty(&self) -> Box<dyn GroupedReduction> {
504
Box::new(Self::new(self.dtype.clone()))
505
}
506
507
fn reserve(&mut self, _additional: usize) {}
508
509
fn resize(&mut self, num_groups: IdxSize) {
510
self.num_groups = num_groups;
511
}
512
513
fn update_group(
514
&mut self,
515
_values: &Column,
516
_group_idx: IdxSize,
517
_seq_id: u64,
518
) -> PolarsResult<()> {
519
Ok(())
520
}
521
522
unsafe fn update_groups_while_evicting(
523
&mut self,
524
_values: &Column,
525
_subset: &[IdxSize],
526
group_idxs: &[EvictIdx],
527
_seq_id: u64,
528
) -> PolarsResult<()> {
529
for g in group_idxs {
530
self.num_evictions += g.should_evict() as IdxSize;
531
}
532
Ok(())
533
}
534
535
unsafe fn combine_subset(
536
&mut self,
537
_other: &dyn GroupedReduction,
538
_subset: &[IdxSize],
539
_group_idxs: &[IdxSize],
540
) -> PolarsResult<()> {
541
Ok(())
542
}
543
544
fn take_evictions(&mut self) -> Box<dyn GroupedReduction> {
545
Box::new(Self {
546
num_groups: core::mem::replace(&mut self.num_evictions, 0),
547
num_evictions: 0,
548
dtype: self.dtype.clone(),
549
})
550
}
551
552
fn finalize(&mut self) -> PolarsResult<Series> {
553
Ok(Series::full_null(
554
PlSmallStr::EMPTY,
555
core::mem::replace(&mut self.num_groups, 0) as usize,
556
&self.dtype,
557
))
558
}
559
560
fn as_any(&self) -> &dyn Any {
561
self
562
}
563
}
564
565