Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/series/ops/cum_agg.rs
8489 views
1
use std::ops::{AddAssign, Mul};
2
3
use arity::unary_elementwise_values;
4
use arrow::array::{Array, BooleanArray};
5
use arrow::bitmap::{Bitmap, BitmapBuilder};
6
use num_traits::{Bounded, One, Zero};
7
use polars_core::prelude::*;
8
use polars_core::series::IsSorted;
9
use polars_core::utils::{CustomIterTools, NoNull};
10
use polars_core::with_match_physical_numeric_polars_type;
11
use polars_utils::float::IsFloat;
12
use polars_utils::min_max::MinMax;
13
14
fn det_max<T>(state: &mut T, v: Option<T>) -> Option<Option<T>>
15
where
16
T: Copy + MinMax,
17
{
18
match v {
19
Some(v) => {
20
*state = MinMax::max_ignore_nan(*state, v);
21
Some(Some(*state))
22
},
23
None => Some(None),
24
}
25
}
26
27
fn det_min<T>(state: &mut T, v: Option<T>) -> Option<Option<T>>
28
where
29
T: Copy + MinMax,
30
{
31
match v {
32
Some(v) => {
33
*state = MinMax::min_ignore_nan(*state, v);
34
Some(Some(*state))
35
},
36
None => Some(None),
37
}
38
}
39
40
fn det_sum<T>(state: &mut T, v: Option<T>) -> Option<Option<T>>
41
where
42
T: Copy + AddAssign,
43
{
44
match v {
45
Some(v) => {
46
*state += v;
47
Some(Some(*state))
48
},
49
None => Some(None),
50
}
51
}
52
53
fn det_prod<T>(state: &mut T, v: Option<T>) -> Option<Option<T>>
54
where
55
T: Copy + Mul<Output = T>,
56
{
57
match v {
58
Some(v) => {
59
*state = *state * v;
60
Some(Some(*state))
61
},
62
None => Some(None),
63
}
64
}
65
66
fn cum_scan_numeric<T, F>(
67
ca: &ChunkedArray<T>,
68
reverse: bool,
69
init: T::Native,
70
update: F,
71
) -> ChunkedArray<T>
72
where
73
T: PolarsNumericType,
74
ChunkedArray<T>: FromIterator<Option<T::Native>>,
75
F: Fn(&mut T::Native, Option<T::Native>) -> Option<Option<T::Native>>,
76
{
77
let out: ChunkedArray<T> = match reverse {
78
false => ca.iter().scan(init, update).collect_trusted(),
79
true => ca.iter().rev().scan(init, update).collect_reversed(),
80
};
81
out.with_name(ca.name().clone())
82
}
83
84
fn cum_max_numeric<T>(
85
ca: &ChunkedArray<T>,
86
reverse: bool,
87
init: Option<T::Native>,
88
) -> ChunkedArray<T>
89
where
90
T: PolarsNumericType,
91
T::Native: MinMax + Bounded,
92
ChunkedArray<T>: FromIterator<Option<T::Native>>,
93
{
94
let init = init.unwrap_or(if T::Native::is_float() {
95
T::Native::nan_value()
96
} else {
97
Bounded::min_value()
98
});
99
cum_scan_numeric(ca, reverse, init, det_max)
100
}
101
102
fn cum_min_numeric<T>(
103
ca: &ChunkedArray<T>,
104
reverse: bool,
105
init: Option<T::Native>,
106
) -> ChunkedArray<T>
107
where
108
T: PolarsNumericType,
109
T::Native: MinMax + Bounded,
110
ChunkedArray<T>: FromIterator<Option<T::Native>>,
111
{
112
let init = init.unwrap_or(if T::Native::is_float() {
113
T::Native::nan_value()
114
} else {
115
Bounded::max_value()
116
});
117
cum_scan_numeric(ca, reverse, init, det_min)
118
}
119
120
fn cum_max_bool(ca: &BooleanChunked, reverse: bool, init: Option<bool>) -> BooleanChunked {
121
if ca.len() == ca.null_count() {
122
return ca.clone();
123
}
124
125
if init == Some(true) {
126
return unsafe {
127
BooleanChunked::from_chunks(
128
ca.name().clone(),
129
ca.downcast_iter()
130
.map(|arr| {
131
arr.with_values(Bitmap::new_with_value(true, arr.len()))
132
.to_boxed()
133
})
134
.collect(),
135
)
136
};
137
}
138
139
let mut out;
140
if !reverse {
141
// TODO: efficient bitscan.
142
let Some(first_true_idx) = ca.iter().position(|x| x == Some(true)) else {
143
return ca.clone();
144
};
145
out = BitmapBuilder::with_capacity(ca.len());
146
out.extend_constant(first_true_idx, false);
147
out.extend_constant(ca.len() - first_true_idx, true);
148
} else {
149
// TODO: efficient bitscan.
150
let Some(last_true_idx) = ca.iter().rposition(|x| x == Some(true)) else {
151
return ca.clone();
152
};
153
out = BitmapBuilder::with_capacity(ca.len());
154
out.extend_constant(last_true_idx + 1, true);
155
out.extend_constant(ca.len() - 1 - last_true_idx, false);
156
}
157
158
let arr: BooleanArray = out.freeze().into();
159
BooleanChunked::with_chunk_like(ca, arr.with_validity(ca.rechunk_validity()))
160
}
161
162
fn cum_min_bool(ca: &BooleanChunked, reverse: bool, init: Option<bool>) -> BooleanChunked {
163
if ca.len() == ca.null_count() {
164
return ca.clone();
165
}
166
167
if init == Some(false) {
168
return unsafe {
169
BooleanChunked::from_chunks(
170
ca.name().clone(),
171
ca.downcast_iter()
172
.map(|arr| {
173
arr.with_values(Bitmap::new_with_value(false, arr.len()))
174
.to_boxed()
175
})
176
.collect(),
177
)
178
};
179
}
180
181
let mut out;
182
if !reverse {
183
// TODO: efficient bitscan.
184
let Some(first_false_idx) = ca.iter().position(|x| x == Some(false)) else {
185
return ca.clone();
186
};
187
out = BitmapBuilder::with_capacity(ca.len());
188
out.extend_constant(first_false_idx, true);
189
out.extend_constant(ca.len() - first_false_idx, false);
190
} else {
191
// TODO: efficient bitscan.
192
let Some(last_false_idx) = ca.iter().rposition(|x| x == Some(false)) else {
193
return ca.clone();
194
};
195
out = BitmapBuilder::with_capacity(ca.len());
196
out.extend_constant(last_false_idx + 1, false);
197
out.extend_constant(ca.len() - 1 - last_false_idx, true);
198
}
199
200
let arr: BooleanArray = out.freeze().into();
201
BooleanChunked::with_chunk_like(ca, arr.with_validity(ca.rechunk_validity()))
202
}
203
204
fn cum_sum_numeric<T>(
205
ca: &ChunkedArray<T>,
206
reverse: bool,
207
init: Option<T::Native>,
208
) -> ChunkedArray<T>
209
where
210
T: PolarsNumericType,
211
ChunkedArray<T>: FromIterator<Option<T::Native>>,
212
{
213
let init = init.unwrap_or(T::Native::zero());
214
cum_scan_numeric(ca, reverse, init, det_sum)
215
}
216
217
#[cfg(feature = "dtype-decimal")]
218
fn cum_sum_decimal(
219
ca: &Int128Chunked,
220
reverse: bool,
221
init: Option<i128>,
222
) -> PolarsResult<Int128Chunked> {
223
use polars_compute::decimal::{DEC128_MAX_PREC, dec128_add};
224
225
let mut value = init.unwrap_or(0);
226
let update = |opt_v| {
227
if let Some(v) = opt_v {
228
value = dec128_add(value, v, DEC128_MAX_PREC).ok_or_else(
229
|| polars_err!(ComputeError: "overflow in decimal addition in cum_sum"),
230
)?;
231
Ok(Some(value))
232
} else {
233
Ok(None)
234
}
235
};
236
if reverse {
237
ca.iter().rev().map(update).try_collect_ca_trusted_like(ca)
238
} else {
239
ca.iter().map(update).try_collect_ca_trusted_like(ca)
240
}
241
}
242
243
fn cum_prod_numeric<T>(
244
ca: &ChunkedArray<T>,
245
reverse: bool,
246
init: Option<T::Native>,
247
) -> ChunkedArray<T>
248
where
249
T: PolarsNumericType,
250
ChunkedArray<T>: FromIterator<Option<T::Native>>,
251
{
252
let init = init.unwrap_or(T::Native::one());
253
cum_scan_numeric(ca, reverse, init, det_prod)
254
}
255
256
pub fn cum_prod_with_init(
257
s: &Series,
258
reverse: bool,
259
init: &AnyValue<'static>,
260
) -> PolarsResult<Series> {
261
use DataType::*;
262
let out = match s.dtype() {
263
Boolean | Int8 | UInt8 | Int16 | UInt16 | Int32 | UInt32 => {
264
let s = s.cast(&Int64)?;
265
cum_prod_numeric(s.i64()?, reverse, init.extract()).into_series()
266
},
267
Int64 => cum_prod_numeric(s.i64()?, reverse, init.extract()).into_series(),
268
UInt64 => cum_prod_numeric(s.u64()?, reverse, init.extract()).into_series(),
269
#[cfg(feature = "dtype-i128")]
270
Int128 => cum_prod_numeric(s.i128()?, reverse, init.extract()).into_series(),
271
#[cfg(feature = "dtype-u128")]
272
UInt128 => cum_prod_numeric(s.u128()?, reverse, init.extract()).into_series(),
273
#[cfg(feature = "dtype-f16")]
274
Float16 => cum_prod_numeric(s.f16()?, reverse, init.extract()).into_series(),
275
Float32 => cum_prod_numeric(s.f32()?, reverse, init.extract()).into_series(),
276
Float64 => cum_prod_numeric(s.f64()?, reverse, init.extract()).into_series(),
277
dt => polars_bail!(opq = cum_prod, dt),
278
};
279
Ok(out)
280
}
281
282
/// Get an array with the cumulative product computed at every element.
283
///
284
/// If the [`DataType`] is one of `{Int8, UInt8, Int16, UInt16, Int32, UInt32}` the `Series` is
285
/// first cast to `Int64` to prevent overflow issues.
286
pub fn cum_prod(s: &Series, reverse: bool) -> PolarsResult<Series> {
287
cum_prod_with_init(s, reverse, &AnyValue::Null)
288
}
289
290
pub fn cum_sum_with_init(
291
s: &Series,
292
reverse: bool,
293
init: &AnyValue<'static>,
294
) -> PolarsResult<Series> {
295
use DataType::*;
296
let out = match s.dtype() {
297
Boolean => {
298
let s = s.cast(&UInt32)?;
299
cum_sum_numeric(s.u32()?, reverse, init.extract()).into_series()
300
},
301
Int8 | UInt8 | Int16 | UInt16 => {
302
let s = s.cast(&Int64)?;
303
cum_sum_numeric(s.i64()?, reverse, init.extract()).into_series()
304
},
305
Int32 => cum_sum_numeric(s.i32()?, reverse, init.extract()).into_series(),
306
UInt32 => cum_sum_numeric(s.u32()?, reverse, init.extract()).into_series(),
307
Int64 => cum_sum_numeric(s.i64()?, reverse, init.extract()).into_series(),
308
UInt64 => cum_sum_numeric(s.u64()?, reverse, init.extract()).into_series(),
309
#[cfg(feature = "dtype-u128")]
310
UInt128 => cum_sum_numeric(s.u128()?, reverse, init.extract()).into_series(),
311
#[cfg(feature = "dtype-i128")]
312
Int128 => cum_sum_numeric(s.i128()?, reverse, init.extract()).into_series(),
313
#[cfg(feature = "dtype-f16")]
314
Float16 => cum_sum_numeric(s.f16()?, reverse, init.extract()).into_series(),
315
Float32 => cum_sum_numeric(s.f32()?, reverse, init.extract()).into_series(),
316
Float64 => cum_sum_numeric(s.f64()?, reverse, init.extract()).into_series(),
317
#[cfg(feature = "dtype-decimal")]
318
Decimal(_precision, scale) => {
319
use polars_compute::decimal::DEC128_MAX_PREC;
320
let ca = s.decimal().unwrap().physical();
321
cum_sum_decimal(ca, reverse, init.clone().to_physical().extract())?
322
.into_decimal_unchecked(DEC128_MAX_PREC, *scale)
323
.into_series()
324
},
325
#[cfg(feature = "dtype-duration")]
326
Duration(tu) => {
327
let s = s.to_physical_repr();
328
let ca = s.i64()?;
329
cum_sum_numeric(ca, reverse, init.extract()).cast(&Duration(*tu))?
330
},
331
dt => polars_bail!(opq = cum_sum, dt),
332
};
333
Ok(out)
334
}
335
336
/// Get an array with the cumulative sum computed at every element
337
///
338
/// If the [`DataType`] is one of `{Int8, UInt8, Int16, UInt16}` the `Series` is
339
/// first cast to `Int64` to prevent overflow issues.
340
pub fn cum_sum(s: &Series, reverse: bool) -> PolarsResult<Series> {
341
cum_sum_with_init(s, reverse, &AnyValue::Null)
342
}
343
344
pub fn cum_min_with_init(
345
s: &Series,
346
reverse: bool,
347
init: &AnyValue<'static>,
348
) -> PolarsResult<Series> {
349
match s.dtype() {
350
DataType::Boolean => {
351
Ok(cum_min_bool(s.bool()?, reverse, init.extract_bool()).into_series())
352
},
353
#[cfg(feature = "dtype-decimal")]
354
DataType::Decimal(precision, scale) => {
355
let ca = s.decimal().unwrap().physical();
356
let out = cum_min_numeric(ca, reverse, init.clone().to_physical().extract())
357
.into_decimal_unchecked(*precision, *scale)
358
.into_series();
359
Ok(out)
360
},
361
dt if dt.to_physical().is_primitive_numeric() => {
362
let s = s.to_physical_repr();
363
with_match_physical_numeric_polars_type!(s.dtype(), |$T| {
364
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
365
let out = cum_min_numeric(ca, reverse, init.extract()).into_series();
366
if dt.is_logical() {
367
out.cast(dt)
368
} else {
369
Ok(out)
370
}
371
})
372
},
373
dt => polars_bail!(opq = cum_min, dt),
374
}
375
}
376
377
/// Get an array with the cumulative min computed at every element.
378
pub fn cum_min(s: &Series, reverse: bool) -> PolarsResult<Series> {
379
cum_min_with_init(s, reverse, &AnyValue::Null)
380
}
381
382
pub fn cum_max_with_init(
383
s: &Series,
384
reverse: bool,
385
init: &AnyValue<'static>,
386
) -> PolarsResult<Series> {
387
match s.dtype() {
388
DataType::Boolean => {
389
Ok(cum_max_bool(s.bool()?, reverse, init.extract_bool()).into_series())
390
},
391
#[cfg(feature = "dtype-decimal")]
392
DataType::Decimal(precision, scale) => {
393
let ca = s.decimal().unwrap().physical();
394
let out = cum_max_numeric(ca, reverse, init.clone().to_physical().extract())
395
.into_decimal_unchecked(*precision, *scale)
396
.into_series();
397
Ok(out)
398
},
399
dt if dt.to_physical().is_primitive_numeric() => {
400
let s = s.to_physical_repr();
401
with_match_physical_numeric_polars_type!(s.dtype(), |$T| {
402
let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
403
let out = cum_max_numeric(ca, reverse, init.extract()).into_series();
404
if dt.is_logical() {
405
out.cast(dt)
406
} else {
407
Ok(out)
408
}
409
})
410
},
411
dt => polars_bail!(opq = cum_max, dt),
412
}
413
}
414
415
/// Get an array with the cumulative max computed at every element.
416
pub fn cum_max(s: &Series, reverse: bool) -> PolarsResult<Series> {
417
cum_max_with_init(s, reverse, &AnyValue::Null)
418
}
419
420
pub fn cum_count(s: &Series, reverse: bool) -> PolarsResult<Series> {
421
cum_count_with_init(s, reverse, 0)
422
}
423
424
pub fn cum_count_with_init(s: &Series, reverse: bool, init: IdxSize) -> PolarsResult<Series> {
425
let mut out = if s.null_count() == 0 {
426
// Fast paths for no nulls
427
cum_count_no_nulls(s.name().clone(), s.len(), reverse, init)
428
} else {
429
let ca = s.is_not_null();
430
let out: IdxCa = if reverse {
431
let mut count = init + (s.len() - s.null_count()) as IdxSize;
432
let mut prev = false;
433
unary_elementwise_values(&ca, |v: bool| {
434
if prev {
435
count -= 1;
436
}
437
prev = v;
438
count
439
})
440
} else {
441
let mut count = init;
442
unary_elementwise_values(&ca, |v: bool| {
443
if v {
444
count += 1;
445
}
446
count
447
})
448
};
449
450
out.into()
451
};
452
453
out.set_sorted_flag([IsSorted::Ascending, IsSorted::Descending][reverse as usize]);
454
455
Ok(out)
456
}
457
458
fn cum_count_no_nulls(name: PlSmallStr, len: usize, reverse: bool, init: IdxSize) -> Series {
459
let start = 1 as IdxSize;
460
let end = len as IdxSize + 1;
461
let ca: NoNull<IdxCa> = if reverse {
462
(start..end).rev().map(|v| v + init).collect()
463
} else {
464
(start..end).map(|v| v + init).collect()
465
};
466
let mut ca = ca.into_inner();
467
ca.rename(name);
468
ca.into_series()
469
}
470
471