Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/series/implementations/decimal.rs
6940 views
1
use polars_compute::rolling::QuantileMethod;
2
3
use super::*;
4
use crate::prelude::*;
5
6
unsafe impl IntoSeries for DecimalChunked {
7
fn into_series(self) -> Series {
8
Series(Arc::new(SeriesWrap(self)))
9
}
10
}
11
12
impl private::PrivateSeriesNumeric for SeriesWrap<DecimalChunked> {
13
fn bit_repr(&self) -> Option<BitRepr> {
14
Some(self.0.physical().to_bit_repr())
15
}
16
}
17
18
impl SeriesWrap<DecimalChunked> {
19
fn apply_physical_to_s<F: Fn(&Int128Chunked) -> Int128Chunked>(&self, f: F) -> Series {
20
f(self.0.physical())
21
.into_decimal_unchecked(self.0.precision(), self.0.scale())
22
.into_series()
23
}
24
25
fn apply_physical<T, F: Fn(&Int128Chunked) -> T>(&self, f: F) -> T {
26
f(self.0.physical())
27
}
28
29
fn scale_factor(&self) -> u128 {
30
10u128.pow(self.0.scale() as u32)
31
}
32
33
fn apply_scale(&self, mut scalar: Scalar) -> Scalar {
34
if scalar.is_null() {
35
return scalar;
36
}
37
38
debug_assert_eq!(scalar.dtype(), &DataType::Float64);
39
let v = scalar
40
.value()
41
.try_extract::<f64>()
42
.expect("should be f64 scalar");
43
scalar.update((v / self.scale_factor() as f64).into());
44
scalar
45
}
46
47
fn agg_helper<F: Fn(&Int128Chunked) -> Series>(&self, f: F) -> Series {
48
let agg_s = f(self.0.physical());
49
match agg_s.dtype() {
50
DataType::Int128 => {
51
let ca = agg_s.i128().unwrap();
52
let ca = ca.as_ref().clone();
53
let precision = self.0.precision();
54
let scale = self.0.scale();
55
ca.into_decimal_unchecked(precision, scale).into_series()
56
},
57
DataType::List(dtype) if matches!(dtype.as_ref(), DataType::Int128) => {
58
let dtype = self.0.dtype();
59
let ca = agg_s.list().unwrap();
60
let arr = ca.downcast_iter().next().unwrap();
61
// SAFETY: dtype is passed correctly
62
let precision = self.0.precision();
63
let scale = self.0.scale();
64
let s = unsafe {
65
Series::from_chunks_and_dtype_unchecked(
66
PlSmallStr::EMPTY,
67
vec![arr.values().clone()],
68
dtype,
69
)
70
}
71
.into_decimal(precision, scale)
72
.unwrap();
73
let new_values = s.array_ref(0).clone();
74
let dtype = DataType::Int128;
75
let arrow_dtype =
76
ListArray::<i64>::default_datatype(dtype.to_arrow(CompatLevel::newest()));
77
let new_arr = ListArray::<i64>::new(
78
arrow_dtype,
79
arr.offsets().clone(),
80
new_values,
81
arr.validity().cloned(),
82
);
83
unsafe {
84
ListChunked::from_chunks_and_dtype_unchecked(
85
agg_s.name().clone(),
86
vec![Box::new(new_arr)],
87
DataType::List(Box::new(DataType::Decimal(precision, Some(scale)))),
88
)
89
.into_series()
90
}
91
},
92
_ => unreachable!(),
93
}
94
}
95
}
96
97
impl private::PrivateSeries for SeriesWrap<DecimalChunked> {
98
fn compute_len(&mut self) {
99
self.0.physical_mut().compute_len()
100
}
101
102
fn _field(&self) -> Cow<'_, Field> {
103
Cow::Owned(self.0.field())
104
}
105
106
fn _dtype(&self) -> &DataType {
107
self.0.dtype()
108
}
109
fn _get_flags(&self) -> StatisticsFlags {
110
self.0.physical().get_flags()
111
}
112
fn _set_flags(&mut self, flags: StatisticsFlags) {
113
self.0.physical_mut().set_flags(flags)
114
}
115
116
#[cfg(feature = "zip_with")]
117
fn zip_with_same_type(&self, mask: &BooleanChunked, other: &Series) -> PolarsResult<Series> {
118
let other = other.decimal()?;
119
120
Ok(self
121
.0
122
.physical()
123
.zip_with(mask, other.physical())?
124
.into_decimal_unchecked(self.0.precision(), self.0.scale())
125
.into_series())
126
}
127
fn into_total_eq_inner<'a>(&'a self) -> Box<dyn TotalEqInner + 'a> {
128
self.0.physical().into_total_eq_inner()
129
}
130
fn into_total_ord_inner<'a>(&'a self) -> Box<dyn TotalOrdInner + 'a> {
131
self.0.physical().into_total_ord_inner()
132
}
133
134
fn vec_hash(
135
&self,
136
random_state: PlSeedableRandomStateQuality,
137
buf: &mut Vec<u64>,
138
) -> PolarsResult<()> {
139
self.0.physical().vec_hash(random_state, buf)?;
140
Ok(())
141
}
142
143
fn vec_hash_combine(
144
&self,
145
build_hasher: PlSeedableRandomStateQuality,
146
hashes: &mut [u64],
147
) -> PolarsResult<()> {
148
self.0.physical().vec_hash_combine(build_hasher, hashes)?;
149
Ok(())
150
}
151
152
#[cfg(feature = "algorithm_group_by")]
153
unsafe fn agg_sum(&self, groups: &GroupsType) -> Series {
154
self.agg_helper(|ca| ca.agg_sum(groups))
155
}
156
157
#[cfg(feature = "algorithm_group_by")]
158
unsafe fn agg_min(&self, groups: &GroupsType) -> Series {
159
self.agg_helper(|ca| ca.agg_min(groups))
160
}
161
162
#[cfg(feature = "algorithm_group_by")]
163
unsafe fn agg_max(&self, groups: &GroupsType) -> Series {
164
self.agg_helper(|ca| ca.agg_max(groups))
165
}
166
167
#[cfg(feature = "algorithm_group_by")]
168
unsafe fn agg_list(&self, groups: &GroupsType) -> Series {
169
self.agg_helper(|ca| ca.agg_list(groups))
170
}
171
172
fn subtract(&self, rhs: &Series) -> PolarsResult<Series> {
173
let rhs = rhs.decimal()?;
174
((&self.0) - rhs).map(|ca| ca.into_series())
175
}
176
fn add_to(&self, rhs: &Series) -> PolarsResult<Series> {
177
let rhs = rhs.decimal()?;
178
((&self.0) + rhs).map(|ca| ca.into_series())
179
}
180
fn multiply(&self, rhs: &Series) -> PolarsResult<Series> {
181
let rhs = rhs.decimal()?;
182
((&self.0) * rhs).map(|ca| ca.into_series())
183
}
184
fn divide(&self, rhs: &Series) -> PolarsResult<Series> {
185
let rhs = rhs.decimal()?;
186
((&self.0) / rhs).map(|ca| ca.into_series())
187
}
188
#[cfg(feature = "algorithm_group_by")]
189
fn group_tuples(&self, multithreaded: bool, sorted: bool) -> PolarsResult<GroupsType> {
190
self.0.physical().group_tuples(multithreaded, sorted)
191
}
192
fn arg_sort_multiple(
193
&self,
194
by: &[Column],
195
options: &SortMultipleOptions,
196
) -> PolarsResult<IdxCa> {
197
self.0.physical().arg_sort_multiple(by, options)
198
}
199
}
200
201
impl SeriesTrait for SeriesWrap<DecimalChunked> {
202
fn rename(&mut self, name: PlSmallStr) {
203
self.0.rename(name)
204
}
205
206
fn chunk_lengths(&self) -> ChunkLenIter<'_> {
207
self.0.physical().chunk_lengths()
208
}
209
210
fn name(&self) -> &PlSmallStr {
211
self.0.name()
212
}
213
214
fn chunks(&self) -> &Vec<ArrayRef> {
215
self.0.physical().chunks()
216
}
217
unsafe fn chunks_mut(&mut self) -> &mut Vec<ArrayRef> {
218
self.0.physical_mut().chunks_mut()
219
}
220
221
fn slice(&self, offset: i64, length: usize) -> Series {
222
self.apply_physical_to_s(|ca| ca.slice(offset, length))
223
}
224
225
fn split_at(&self, offset: i64) -> (Series, Series) {
226
let (a, b) = self.0.split_at(offset);
227
(a.into_series(), b.into_series())
228
}
229
230
fn append(&mut self, other: &Series) -> PolarsResult<()> {
231
polars_ensure!(self.0.dtype() == other.dtype(), append);
232
let mut other = other.to_physical_repr().into_owned();
233
self.0
234
.physical_mut()
235
.append_owned(std::mem::take(other._get_inner_mut().as_mut()))
236
}
237
fn append_owned(&mut self, mut other: Series) -> PolarsResult<()> {
238
polars_ensure!(self.0.dtype() == other.dtype(), append);
239
self.0.physical_mut().append_owned(std::mem::take(
240
&mut other
241
._get_inner_mut()
242
.as_any_mut()
243
.downcast_mut::<DecimalChunked>()
244
.unwrap()
245
.phys,
246
))
247
}
248
249
fn extend(&mut self, other: &Series) -> PolarsResult<()> {
250
polars_ensure!(self.0.dtype() == other.dtype(), extend);
251
// 3 refs
252
// ref Cow
253
// ref SeriesTrait
254
// ref ChunkedArray
255
let other = other.to_physical_repr();
256
self.0
257
.physical_mut()
258
.extend(other.as_ref().as_ref().as_ref())?;
259
Ok(())
260
}
261
262
fn filter(&self, filter: &BooleanChunked) -> PolarsResult<Series> {
263
Ok(self
264
.0
265
.physical()
266
.filter(filter)?
267
.into_decimal_unchecked(self.0.precision(), self.0.scale())
268
.into_series())
269
}
270
271
fn take(&self, indices: &IdxCa) -> PolarsResult<Series> {
272
Ok(self
273
.0
274
.physical()
275
.take(indices)?
276
.into_decimal_unchecked(self.0.precision(), self.0.scale())
277
.into_series())
278
}
279
280
unsafe fn take_unchecked(&self, indices: &IdxCa) -> Series {
281
self.0
282
.physical()
283
.take_unchecked(indices)
284
.into_decimal_unchecked(self.0.precision(), self.0.scale())
285
.into_series()
286
}
287
288
fn take_slice(&self, indices: &[IdxSize]) -> PolarsResult<Series> {
289
Ok(self
290
.0
291
.physical()
292
.take(indices)?
293
.into_decimal_unchecked(self.0.precision(), self.0.scale())
294
.into_series())
295
}
296
297
unsafe fn take_slice_unchecked(&self, indices: &[IdxSize]) -> Series {
298
self.0
299
.physical()
300
.take_unchecked(indices)
301
.into_decimal_unchecked(self.0.precision(), self.0.scale())
302
.into_series()
303
}
304
305
fn len(&self) -> usize {
306
self.0.len()
307
}
308
309
fn rechunk(&self) -> Series {
310
let ca = self.0.physical().rechunk().into_owned();
311
ca.into_decimal_unchecked(self.0.precision(), self.0.scale())
312
.into_series()
313
}
314
315
fn new_from_index(&self, index: usize, length: usize) -> Series {
316
self.0
317
.physical()
318
.new_from_index(index, length)
319
.into_decimal_unchecked(self.0.precision(), self.0.scale())
320
.into_series()
321
}
322
323
fn cast(&self, dtype: &DataType, cast_options: CastOptions) -> PolarsResult<Series> {
324
self.0.cast_with_options(dtype, cast_options)
325
}
326
327
#[inline]
328
unsafe fn get_unchecked(&self, index: usize) -> AnyValue<'_> {
329
self.0.get_any_value_unchecked(index)
330
}
331
332
fn sort_with(&self, options: SortOptions) -> PolarsResult<Series> {
333
Ok(self
334
.0
335
.physical()
336
.sort_with(options)
337
.into_decimal_unchecked(self.0.precision(), self.0.scale())
338
.into_series())
339
}
340
341
fn arg_sort(&self, options: SortOptions) -> IdxCa {
342
self.0.physical().arg_sort(options)
343
}
344
345
fn null_count(&self) -> usize {
346
self.0.null_count()
347
}
348
349
fn has_nulls(&self) -> bool {
350
self.0.has_nulls()
351
}
352
353
#[cfg(feature = "algorithm_group_by")]
354
fn unique(&self) -> PolarsResult<Series> {
355
Ok(self.apply_physical_to_s(|ca| ca.unique().unwrap()))
356
}
357
358
#[cfg(feature = "algorithm_group_by")]
359
fn n_unique(&self) -> PolarsResult<usize> {
360
self.0.physical().n_unique()
361
}
362
363
#[cfg(feature = "algorithm_group_by")]
364
fn arg_unique(&self) -> PolarsResult<IdxCa> {
365
self.0.physical().arg_unique()
366
}
367
368
fn is_null(&self) -> BooleanChunked {
369
self.0.is_null()
370
}
371
372
fn is_not_null(&self) -> BooleanChunked {
373
self.0.is_not_null()
374
}
375
376
fn reverse(&self) -> Series {
377
self.apply_physical_to_s(|ca| ca.reverse())
378
}
379
380
fn shift(&self, periods: i64) -> Series {
381
self.apply_physical_to_s(|ca| ca.shift(periods))
382
}
383
384
fn clone_inner(&self) -> Arc<dyn SeriesTrait> {
385
Arc::new(SeriesWrap(Clone::clone(&self.0)))
386
}
387
388
fn sum_reduce(&self) -> PolarsResult<Scalar> {
389
Ok(self.apply_physical(|ca| {
390
let sum = ca.sum();
391
let DataType::Decimal(_, Some(scale)) = self.dtype() else {
392
unreachable!()
393
};
394
let av = AnyValue::Decimal(sum.unwrap(), *scale);
395
Scalar::new(self.dtype().clone(), av)
396
}))
397
}
398
fn min_reduce(&self) -> PolarsResult<Scalar> {
399
Ok(self.apply_physical(|ca| {
400
let min = ca.min();
401
let DataType::Decimal(_, Some(scale)) = self.dtype() else {
402
unreachable!()
403
};
404
let av = if let Some(min) = min {
405
AnyValue::Decimal(min, *scale)
406
} else {
407
AnyValue::Null
408
};
409
Scalar::new(self.dtype().clone(), av)
410
}))
411
}
412
fn max_reduce(&self) -> PolarsResult<Scalar> {
413
Ok(self.apply_physical(|ca| {
414
let max = ca.max();
415
let DataType::Decimal(_, Some(scale)) = self.dtype() else {
416
unreachable!()
417
};
418
let av = if let Some(m) = max {
419
AnyValue::Decimal(m, *scale)
420
} else {
421
AnyValue::Null
422
};
423
Scalar::new(self.dtype().clone(), av)
424
}))
425
}
426
427
fn _sum_as_f64(&self) -> f64 {
428
self.0.physical()._sum_as_f64() / self.scale_factor() as f64
429
}
430
431
fn mean(&self) -> Option<f64> {
432
self.0
433
.physical()
434
.mean()
435
.map(|v| v / self.scale_factor() as f64)
436
}
437
438
fn median(&self) -> Option<f64> {
439
self.0
440
.physical()
441
.median()
442
.map(|v| v / self.scale_factor() as f64)
443
}
444
fn median_reduce(&self) -> PolarsResult<Scalar> {
445
Ok(self.apply_scale(self.0.physical().median_reduce()))
446
}
447
448
fn std(&self, ddof: u8) -> Option<f64> {
449
self.0
450
.physical()
451
.std(ddof)
452
.map(|v| v / self.scale_factor() as f64)
453
}
454
fn std_reduce(&self, ddof: u8) -> PolarsResult<Scalar> {
455
Ok(self.apply_scale(self.0.physical().std_reduce(ddof)))
456
}
457
458
fn quantile_reduce(&self, quantile: f64, method: QuantileMethod) -> PolarsResult<Scalar> {
459
self.0
460
.physical()
461
.quantile_reduce(quantile, method)
462
.map(|v| self.apply_scale(v))
463
}
464
465
fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
466
self.0.physical().find_validity_mismatch(other, idxs)
467
}
468
469
fn as_any(&self) -> &dyn Any {
470
&self.0
471
}
472
473
fn as_any_mut(&mut self) -> &mut dyn Any {
474
&mut self.0
475
}
476
477
fn as_phys_any(&self) -> &dyn Any {
478
self.0.physical()
479
}
480
481
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
482
self as _
483
}
484
}
485
486