Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/series/implementations/decimal.rs
8433 views
1
use polars_compute::rolling::QuantileMethod;
2
3
use super::*;
4
use crate::prelude::*;
5
6
unsafe impl IntoSeries for DecimalChunked {
7
fn into_series(self) -> Series {
8
Series(Arc::new(SeriesWrap(self)))
9
}
10
}
11
12
impl private::PrivateSeriesNumeric for SeriesWrap<DecimalChunked> {
13
fn bit_repr(&self) -> Option<BitRepr> {
14
Some(self.0.physical().to_bit_repr())
15
}
16
}
17
18
impl SeriesWrap<DecimalChunked> {
19
fn apply_physical_to_s<F: Fn(&Int128Chunked) -> Int128Chunked>(&self, f: F) -> Series {
20
f(self.0.physical())
21
.into_decimal_unchecked(self.0.precision(), self.0.scale())
22
.into_series()
23
}
24
25
fn apply_physical<T, F: Fn(&Int128Chunked) -> T>(&self, f: F) -> T {
26
f(self.0.physical())
27
}
28
29
fn scale_factor(&self) -> u128 {
30
10u128.pow(self.0.scale() as u32)
31
}
32
33
fn apply_scale(&self, mut scalar: Scalar) -> Scalar {
34
if scalar.is_null() {
35
return scalar;
36
}
37
38
debug_assert_eq!(scalar.dtype(), &DataType::Float64);
39
let v = scalar
40
.value()
41
.try_extract::<f64>()
42
.expect("should be f64 scalar");
43
scalar.update((v / self.scale_factor() as f64).into());
44
scalar
45
}
46
47
fn agg_helper<F: Fn(&Int128Chunked) -> Series>(&self, f: F) -> Series {
48
let agg_s = f(self.0.physical());
49
match agg_s.dtype() {
50
DataType::Int128 => {
51
let ca = agg_s.i128().unwrap();
52
let ca = ca.as_ref().clone();
53
let precision = self.0.precision();
54
let scale = self.0.scale();
55
ca.into_decimal_unchecked(precision, scale).into_series()
56
},
57
DataType::List(dtype) if matches!(dtype.as_ref(), DataType::Int128) => {
58
let dtype = self.0.dtype();
59
let ca = agg_s.list().unwrap();
60
let arr = ca.downcast_iter().next().unwrap();
61
// SAFETY: dtype is passed correctly
62
let precision = self.0.precision();
63
let scale = self.0.scale();
64
let s = unsafe {
65
Series::from_chunks_and_dtype_unchecked(
66
PlSmallStr::EMPTY,
67
vec![arr.values().clone()],
68
dtype,
69
)
70
}
71
.into_decimal(precision, scale)
72
.unwrap();
73
let new_values = s.array_ref(0).clone();
74
let dtype = DataType::Int128;
75
let arrow_dtype =
76
ListArray::<i64>::default_datatype(dtype.to_arrow(CompatLevel::newest()));
77
let new_arr = ListArray::<i64>::new(
78
arrow_dtype,
79
arr.offsets().clone(),
80
new_values,
81
arr.validity().cloned(),
82
);
83
unsafe {
84
ListChunked::from_chunks_and_dtype_unchecked(
85
agg_s.name().clone(),
86
vec![Box::new(new_arr)],
87
DataType::List(Box::new(DataType::Decimal(precision, scale))),
88
)
89
.into_series()
90
}
91
},
92
_ => unreachable!(),
93
}
94
}
95
}
96
97
impl private::PrivateSeries for SeriesWrap<DecimalChunked> {
98
fn compute_len(&mut self) {
99
self.0.physical_mut().compute_len()
100
}
101
102
fn _field(&self) -> Cow<'_, Field> {
103
Cow::Owned(self.0.field())
104
}
105
106
fn _dtype(&self) -> &DataType {
107
self.0.dtype()
108
}
109
fn _get_flags(&self) -> StatisticsFlags {
110
self.0.physical().get_flags()
111
}
112
fn _set_flags(&mut self, flags: StatisticsFlags) {
113
self.0.physical_mut().set_flags(flags)
114
}
115
116
#[cfg(feature = "zip_with")]
117
fn zip_with_same_type(&self, mask: &BooleanChunked, other: &Series) -> PolarsResult<Series> {
118
let other = other.decimal()?;
119
120
Ok(self
121
.0
122
.physical()
123
.zip_with(mask, other.physical())?
124
.into_decimal_unchecked(self.0.precision(), self.0.scale())
125
.into_series())
126
}
127
fn into_total_eq_inner<'a>(&'a self) -> Box<dyn TotalEqInner + 'a> {
128
self.0.physical().into_total_eq_inner()
129
}
130
fn into_total_ord_inner<'a>(&'a self) -> Box<dyn TotalOrdInner + 'a> {
131
self.0.physical().into_total_ord_inner()
132
}
133
134
fn vec_hash(
135
&self,
136
random_state: PlSeedableRandomStateQuality,
137
buf: &mut Vec<u64>,
138
) -> PolarsResult<()> {
139
self.0.physical().vec_hash(random_state, buf)?;
140
Ok(())
141
}
142
143
fn vec_hash_combine(
144
&self,
145
build_hasher: PlSeedableRandomStateQuality,
146
hashes: &mut [u64],
147
) -> PolarsResult<()> {
148
self.0.physical().vec_hash_combine(build_hasher, hashes)?;
149
Ok(())
150
}
151
152
#[cfg(feature = "algorithm_group_by")]
153
unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
154
self.agg_helper(|ca| ca.agg_sum(groups))
155
}
156
157
#[cfg(feature = "algorithm_group_by")]
158
unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
159
self.agg_helper(|ca| ca.agg_min(groups))
160
}
161
162
#[cfg(feature = "algorithm_group_by")]
163
unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
164
self.agg_helper(|ca| ca.agg_max(groups))
165
}
166
167
#[cfg(feature = "algorithm_group_by")]
168
unsafe fn agg_arg_min(&self, groups: &GroupsType) -> Series {
169
self.0.physical().agg_arg_min(groups)
170
}
171
172
#[cfg(feature = "algorithm_group_by")]
173
unsafe fn agg_arg_max(&self, groups: &GroupsType) -> Series {
174
self.0.physical().agg_arg_max(groups)
175
}
176
177
#[cfg(feature = "algorithm_group_by")]
178
unsafe fn agg_list(&self, groups: &GroupsType) -> Series {
179
self.agg_helper(|ca| ca.agg_list(groups))
180
}
181
182
#[cfg(feature = "algorithm_group_by")]
183
unsafe fn agg_var(&self, groups: &GroupsType, ddof: u8) -> Series {
184
self.0
185
.cast(&DataType::Float64)
186
.unwrap()
187
.agg_var(groups, ddof)
188
}
189
190
#[cfg(feature = "algorithm_group_by")]
191
unsafe fn agg_std(&self, groups: &GroupsType, ddof: u8) -> Series {
192
self.0
193
.cast(&DataType::Float64)
194
.unwrap()
195
.agg_std(groups, ddof)
196
}
197
198
fn subtract(&self, rhs: &Series) -> PolarsResult<Series> {
199
let rhs = rhs.decimal()?;
200
((&self.0) - rhs).map(|ca| ca.into_series())
201
}
202
fn add_to(&self, rhs: &Series) -> PolarsResult<Series> {
203
let rhs = rhs.decimal()?;
204
((&self.0) + rhs).map(|ca| ca.into_series())
205
}
206
fn multiply(&self, rhs: &Series) -> PolarsResult<Series> {
207
let rhs = rhs.decimal()?;
208
((&self.0) * rhs).map(|ca| ca.into_series())
209
}
210
fn divide(&self, rhs: &Series) -> PolarsResult<Series> {
211
let rhs = rhs.decimal()?;
212
((&self.0) / rhs).map(|ca| ca.into_series())
213
}
214
#[cfg(feature = "algorithm_group_by")]
215
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
216
self.0.physical().group_tuples(multithreaded, sorted)
217
}
218
fn arg_sort_multiple(
219
&self,
220
by: &[Column],
221
options: &SortMultipleOptions,
222
) -> PolarsResult<IdxCa> {
223
self.0.physical().arg_sort_multiple(by, options)
224
}
225
}
226
227
impl SeriesTrait for SeriesWrap<DecimalChunked> {
228
fn rename(&mut self, name: PlSmallStr) {
229
self.0.rename(name)
230
}
231
232
fn chunk_lengths(&self) -> ChunkLenIter<'_> {
233
self.0.physical().chunk_lengths()
234
}
235
236
fn name(&self) -> &PlSmallStr {
237
self.0.name()
238
}
239
240
fn chunks(&self) -> &Vec<ArrayRef> {
241
self.0.physical().chunks()
242
}
243
unsafe fn chunks_mut(&mut self) -> &mut Vec<ArrayRef> {
244
self.0.physical_mut().chunks_mut()
245
}
246
247
fn slice(&self, offset: i64, length: usize) -> Series {
248
self.apply_physical_to_s(|ca| ca.slice(offset, length))
249
}
250
251
fn split_at(&self, offset: i64) -> (Series, Series) {
252
let (a, b) = self.0.split_at(offset);
253
(a.into_series(), b.into_series())
254
}
255
256
fn append(&mut self, other: &Series) -> PolarsResult<()> {
257
polars_ensure!(self.0.dtype() == other.dtype(), append);
258
let mut other = other.to_physical_repr().into_owned();
259
self.0
260
.physical_mut()
261
.append_owned(std::mem::take(other._get_inner_mut().as_mut()))
262
}
263
fn append_owned(&mut self, mut other: Series) -> PolarsResult<()> {
264
polars_ensure!(self.0.dtype() == other.dtype(), append);
265
self.0.physical_mut().append_owned(std::mem::take(
266
&mut other
267
._get_inner_mut()
268
.as_any_mut()
269
.downcast_mut::<DecimalChunked>()
270
.unwrap()
271
.phys,
272
))
273
}
274
275
fn extend(&mut self, other: &Series) -> PolarsResult<()> {
276
polars_ensure!(self.0.dtype() == other.dtype(), extend);
277
// 3 refs
278
// ref Cow
279
// ref SeriesTrait
280
// ref ChunkedArray
281
let other = other.to_physical_repr();
282
self.0
283
.physical_mut()
284
.extend(other.as_ref().as_ref().as_ref())?;
285
Ok(())
286
}
287
288
fn filter(&self, filter: &BooleanChunked) -> PolarsResult<Series> {
289
Ok(self
290
.0
291
.physical()
292
.filter(filter)?
293
.into_decimal_unchecked(self.0.precision(), self.0.scale())
294
.into_series())
295
}
296
297
fn take(&self, indices: &IdxCa) -> PolarsResult<Series> {
298
Ok(self
299
.0
300
.physical()
301
.take(indices)?
302
.into_decimal_unchecked(self.0.precision(), self.0.scale())
303
.into_series())
304
}
305
306
unsafe fn take_unchecked(&self, indices: &IdxCa) -> Series {
307
self.0
308
.physical()
309
.take_unchecked(indices)
310
.into_decimal_unchecked(self.0.precision(), self.0.scale())
311
.into_series()
312
}
313
314
fn take_slice(&self, indices: &[IdxSize]) -> PolarsResult<Series> {
315
Ok(self
316
.0
317
.physical()
318
.take(indices)?
319
.into_decimal_unchecked(self.0.precision(), self.0.scale())
320
.into_series())
321
}
322
323
unsafe fn take_slice_unchecked(&self, indices: &[IdxSize]) -> Series {
324
self.0
325
.physical()
326
.take_unchecked(indices)
327
.into_decimal_unchecked(self.0.precision(), self.0.scale())
328
.into_series()
329
}
330
331
fn deposit(&self, validity: &Bitmap) -> Series {
332
self.0
333
.physical()
334
.deposit(validity)
335
.into_decimal_unchecked(self.0.precision(), self.0.scale())
336
.into_series()
337
}
338
339
fn len(&self) -> usize {
340
self.0.len()
341
}
342
343
fn rechunk(&self) -> Series {
344
let ca = self.0.physical().rechunk().into_owned();
345
ca.into_decimal_unchecked(self.0.precision(), self.0.scale())
346
.into_series()
347
}
348
349
fn new_from_index(&self, index: usize, length: usize) -> Series {
350
self.0
351
.physical()
352
.new_from_index(index, length)
353
.into_decimal_unchecked(self.0.precision(), self.0.scale())
354
.into_series()
355
}
356
357
fn cast(&self, dtype: &DataType, cast_options: CastOptions) -> PolarsResult<Series> {
358
self.0.cast_with_options(dtype, cast_options)
359
}
360
361
#[inline]
362
unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {
363
self.0.get_any_value_unchecked(index)
364
}
365
366
fn sort_with(&self, options: SortOptions) -> PolarsResult<Series> {
367
Ok(self
368
.0
369
.physical()
370
.sort_with(options)
371
.into_decimal_unchecked(self.0.precision(), self.0.scale())
372
.into_series())
373
}
374
375
fn arg_sort(&self, options: SortOptions) -> IdxCa {
376
self.0.physical().arg_sort(options)
377
}
378
379
fn null_count(&self) -> usize {
380
self.0.null_count()
381
}
382
383
fn has_nulls(&self) -> bool {
384
self.0.has_nulls()
385
}
386
387
#[cfg(feature = "algorithm_group_by")]
388
fn unique(&self) -> PolarsResult<Series> {
389
Ok(self.apply_physical_to_s(|ca| ca.unique().unwrap()))
390
}
391
392
#[cfg(feature = "algorithm_group_by")]
393
fn n_unique(&self) -> PolarsResult<usize> {
394
self.0.physical().n_unique()
395
}
396
397
#[cfg(feature = "algorithm_group_by")]
398
fn arg_unique(&self) -> PolarsResult<IdxCa> {
399
self.0.physical().arg_unique()
400
}
401
402
fn unique_id(&self) -> PolarsResult<(IdxSize, Vec<IdxSize>)> {
403
ChunkUnique::unique_id(self.0.physical())
404
}
405
406
fn is_null(&self) -> BooleanChunked {
407
self.0.is_null()
408
}
409
410
fn is_not_null(&self) -> BooleanChunked {
411
self.0.is_not_null()
412
}
413
414
fn reverse(&self) -> Series {
415
self.apply_physical_to_s(|ca| ca.reverse())
416
}
417
418
fn shift(&self, periods: i64) -> Series {
419
self.apply_physical_to_s(|ca| ca.shift(periods))
420
}
421
422
#[cfg(feature = "approx_unique")]
423
fn approx_n_unique(&self) -> PolarsResult<IdxSize> {
424
Ok(ChunkApproxNUnique::approx_n_unique(self.0.physical()))
425
}
426
427
fn clone_inner(&self) -> Arc<dyn SeriesTrait> {
428
Arc::new(SeriesWrap(Clone::clone(&self.0)))
429
}
430
431
fn sum_reduce(&self) -> PolarsResult<Scalar> {
432
Ok(self.apply_physical(|ca| {
433
let sum = ca.sum();
434
let DataType::Decimal(prec, scale) = self.dtype() else {
435
unreachable!()
436
};
437
let av = AnyValue::Decimal(sum.unwrap(), *prec, *scale);
438
Scalar::new(self.dtype().clone(), av)
439
}))
440
}
441
442
fn min_reduce(&self) -> PolarsResult<Scalar> {
443
Ok(self.apply_physical(|ca| {
444
let min = ca.min();
445
let DataType::Decimal(prec, scale) = self.dtype() else {
446
unreachable!()
447
};
448
let av = if let Some(min) = min {
449
AnyValue::Decimal(min, *prec, *scale)
450
} else {
451
AnyValue::Null
452
};
453
Scalar::new(self.dtype().clone(), av)
454
}))
455
}
456
457
fn max_reduce(&self) -> PolarsResult<Scalar> {
458
Ok(self.apply_physical(|ca| {
459
let max = ca.max();
460
let DataType::Decimal(prec, scale) = self.dtype() else {
461
unreachable!()
462
};
463
let av = if let Some(m) = max {
464
AnyValue::Decimal(m, *prec, *scale)
465
} else {
466
AnyValue::Null
467
};
468
Scalar::new(self.dtype().clone(), av)
469
}))
470
}
471
472
fn _sum_as_f64(&self) -> f64 {
473
self.0.physical()._sum_as_f64() / self.scale_factor() as f64
474
}
475
476
fn mean(&self) -> Option<f64> {
477
self.0
478
.physical()
479
.mean()
480
.map(|v| v / self.scale_factor() as f64)
481
}
482
fn mean_reduce(&self) -> PolarsResult<Scalar> {
483
Ok(Scalar::new(DataType::Float64, self.mean().into()))
484
}
485
486
fn median(&self) -> Option<f64> {
487
self.0
488
.physical()
489
.median()
490
.map(|v| v / self.scale_factor() as f64)
491
}
492
493
fn median_reduce(&self) -> PolarsResult<Scalar> {
494
Ok(self.apply_scale(self.0.physical().median_reduce()))
495
}
496
497
fn std(&self, ddof: u8) -> Option<f64> {
498
self.0.cast(&DataType::Float64).ok()?.std(ddof)
499
}
500
501
fn std_reduce(&self, ddof: u8) -> PolarsResult<Scalar> {
502
self.0.cast(&DataType::Float64)?.std_reduce(ddof)
503
}
504
505
fn var(&self, ddof: u8) -> Option<f64> {
506
self.0.cast(&DataType::Float64).ok()?.var(ddof)
507
}
508
509
fn var_reduce(&self, ddof: u8) -> PolarsResult<Scalar> {
510
self.0.cast(&DataType::Float64)?.var_reduce(ddof)
511
}
512
513
fn quantile_reduce(&self, quantile: f64, method: QuantileMethod) -> PolarsResult<Scalar> {
514
self.0
515
.physical()
516
.quantile_reduce(quantile, method)
517
.map(|v| self.apply_scale(v))
518
}
519
520
fn quantiles_reduce(&self, quantiles: &[f64], method: QuantileMethod) -> PolarsResult<Scalar> {
521
let result = self.0.physical().quantiles_reduce(quantiles, method)?;
522
if let AnyValue::List(float_s) = result.value() {
523
let scale_factor = self.scale_factor() as f64;
524
let float_ca = float_s.f64().unwrap();
525
let scaled_s = float_ca
526
.iter()
527
.map(|v: Option<f64>| v.map(|f| f / scale_factor))
528
.collect::<Float64Chunked>()
529
.into_series();
530
Ok(Scalar::new(
531
DataType::List(Box::new(self.dtype().clone())),
532
AnyValue::List(scaled_s),
533
))
534
} else {
535
polars_bail!(ComputeError: "expected list scalar from quantiles_reduce")
536
}
537
}
538
539
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
540
self.0.physical().find_validity_mismatch(other, idxs)
541
}
542
543
fn as_any(&self) -> &dyn Any {
544
&self.0
545
}
546
547
fn as_any_mut(&mut self) -> &mut dyn Any {
548
&mut self.0
549
}
550
551
fn as_phys_any(&self) -> &dyn Any {
552
self.0.physical()
553
}
554
555
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
556
self as _
557
}
558
}
559
560