Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/dispatch/misc.rs
7884 views
1
use polars_core::error::{PolarsResult, polars_bail, polars_ensure, polars_err};
2
use polars_core::prelude::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
3
use polars_core::prelude::*;
4
use polars_core::scalar::Scalar;
5
use polars_core::series::ops::NullBehavior;
6
use polars_core::series::{IsSorted, Series};
7
use polars_core::utils::try_get_supertype;
8
#[cfg(feature = "interpolate")]
9
use polars_ops::series::InterpolationMethod;
10
#[cfg(feature = "rank")]
11
use polars_ops::series::RankOptions;
12
use polars_ops::series::{ArgAgg, NullStrategy, SeriesMethods};
13
#[cfg(feature = "dtype-array")]
14
use polars_plan::dsl::ReshapeDimension;
15
#[cfg(feature = "fused")]
16
use polars_plan::plans::FusedOperator;
17
#[cfg(feature = "cov")]
18
use polars_plan::plans::IRCorrelationMethod;
19
use polars_plan::plans::RowEncodingVariant;
20
use polars_row::RowEncodingOptions;
21
use polars_utils::IdxSize;
22
use polars_utils::pl_str::PlSmallStr;
23
24
#[cfg(feature = "abs")]
25
pub(super) fn abs(s: &Column) -> PolarsResult<Column> {
26
polars_ops::prelude::abs(s.as_materialized_series()).map(Column::from)
27
}
28
29
pub(super) fn reverse(s: &Column) -> PolarsResult<Column> {
30
Ok(s.reverse())
31
}
32
33
#[cfg(feature = "approx_unique")]
34
pub(super) fn approx_n_unique(s: &Column) -> PolarsResult<Column> {
35
s.approx_n_unique()
36
.map(|v| Column::new_scalar(s.name().clone(), Scalar::new(IDX_DTYPE, v.into()), 1))
37
}
38
39
#[cfg(feature = "diff")]
40
pub(super) fn diff(s: &[Column], null_behavior: NullBehavior) -> PolarsResult<Column> {
41
let s1 = s[0].as_materialized_series();
42
let n = &s[1];
43
44
polars_ensure!(
45
n.len() == 1,
46
ComputeError: "n must be a single value."
47
);
48
let n = n.strict_cast(&DataType::Int64)?;
49
match n.i64()?.get(0) {
50
Some(n) => polars_ops::prelude::diff(s1, n, null_behavior).map(Column::from),
51
None => polars_bail!(ComputeError: "'n' can not be None for diff"),
52
}
53
}
54
55
#[cfg(feature = "pct_change")]
56
pub(super) fn pct_change(s: &[Column]) -> PolarsResult<Column> {
57
polars_ops::prelude::pct_change(s[0].as_materialized_series(), s[1].as_materialized_series())
58
.map(Column::from)
59
}
60
61
#[cfg(feature = "interpolate")]
62
pub(super) fn interpolate(s: &Column, method: InterpolationMethod) -> PolarsResult<Column> {
63
Ok(polars_ops::prelude::interpolate(s.as_materialized_series(), method).into())
64
}
65
66
#[cfg(feature = "interpolate_by")]
67
pub(super) fn interpolate_by(s: &[Column]) -> PolarsResult<Column> {
68
use polars_ops::series::SeriesMethods;
69
70
let by = &s[1];
71
let by_is_sorted = by.as_materialized_series().is_sorted(Default::default())?;
72
polars_ops::prelude::interpolate_by(&s[0], by, by_is_sorted)
73
}
74
75
pub(super) fn to_physical(s: &Column) -> PolarsResult<Column> {
76
Ok(s.to_physical_repr())
77
}
78
79
pub(super) fn set_sorted_flag(s: &Column, sorted: IsSorted) -> PolarsResult<Column> {
80
let mut s = s.clone();
81
s.set_sorted_flag(sorted);
82
Ok(s)
83
}
84
85
#[cfg(feature = "timezones")]
86
pub(super) fn replace_time_zone(
87
s: &[Column],
88
time_zone: Option<&TimeZone>,
89
non_existent: NonExistent,
90
) -> PolarsResult<Column> {
91
let s1 = &s[0];
92
let ca = s1.datetime().unwrap();
93
let s2 = &s[1].str()?;
94
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, s2, non_existent)?.into_column())
95
}
96
97
#[cfg(feature = "dtype-struct")]
98
pub(super) fn value_counts(
99
s: &Column,
100
sort: bool,
101
parallel: bool,
102
name: PlSmallStr,
103
normalize: bool,
104
) -> PolarsResult<Column> {
105
use polars_ops::series::SeriesMethods;
106
107
s.as_materialized_series()
108
.value_counts(sort, parallel, name, normalize)
109
.map(|df| df.into_struct(s.name().clone()).into_column())
110
}
111
112
#[cfg(feature = "unique_counts")]
113
pub(super) fn unique_counts(s: &Column) -> PolarsResult<Column> {
114
polars_ops::prelude::unique_counts(s.as_materialized_series()).map(Column::from)
115
}
116
117
#[cfg(feature = "dtype-array")]
118
pub(super) fn reshape(c: &Column, dimensions: &[ReshapeDimension]) -> PolarsResult<Column> {
119
c.reshape_array(dimensions)
120
}
121
122
#[cfg(feature = "repeat_by")]
123
pub(super) fn repeat_by(s: &[Column]) -> PolarsResult<Column> {
124
let by = &s[1];
125
let s = &s[0];
126
let by = by.cast(&IDX_DTYPE)?;
127
polars_ops::chunked_array::repeat_by(s.as_materialized_series(), by.idx()?)
128
.map(|ok| ok.into_column())
129
}
130
131
pub(super) fn max_horizontal(s: &mut [Column]) -> PolarsResult<Column> {
132
polars_ops::prelude::max_horizontal(s).map(Option::unwrap)
133
}
134
135
pub(super) fn min_horizontal(s: &mut [Column]) -> PolarsResult<Column> {
136
polars_ops::prelude::min_horizontal(s).map(Option::unwrap)
137
}
138
139
pub(super) fn sum_horizontal(s: &mut [Column], ignore_nulls: bool) -> PolarsResult<Column> {
140
let null_strategy = if ignore_nulls {
141
NullStrategy::Ignore
142
} else {
143
NullStrategy::Propagate
144
};
145
polars_ops::prelude::sum_horizontal(s, null_strategy).map(Option::unwrap)
146
}
147
148
pub(super) fn mean_horizontal(s: &mut [Column], ignore_nulls: bool) -> PolarsResult<Column> {
149
let null_strategy = if ignore_nulls {
150
NullStrategy::Ignore
151
} else {
152
NullStrategy::Propagate
153
};
154
polars_ops::prelude::mean_horizontal(s, null_strategy).map(Option::unwrap)
155
}
156
157
pub(super) fn drop_nulls(s: &Column) -> PolarsResult<Column> {
158
Ok(s.drop_nulls())
159
}
160
161
pub fn rechunk(s: &Column) -> PolarsResult<Column> {
162
Ok(s.rechunk())
163
}
164
165
pub fn append(s: &[Column], upcast: bool) -> PolarsResult<Column> {
166
assert_eq!(s.len(), 2);
167
168
let a = &s[0];
169
let b = &s[1];
170
171
if upcast {
172
let dtype = try_get_supertype(a.dtype(), b.dtype())?;
173
let mut a = a.cast(&dtype)?;
174
a.append_owned(b.cast(&dtype)?)?;
175
Ok(a)
176
} else {
177
let mut a = a.clone();
178
a.append(b)?;
179
Ok(a)
180
}
181
}
182
183
#[cfg(feature = "mode")]
184
pub(super) fn mode(s: &Column, maintain_order: bool) -> PolarsResult<Column> {
185
polars_ops::prelude::mode::mode(s.as_materialized_series(), maintain_order).map(Column::from)
186
}
187
188
#[cfg(feature = "moment")]
189
pub(super) fn skew(s: &Column, bias: bool) -> PolarsResult<Column> {
190
// @scalar-opt
191
192
use polars_ops::series::MomentSeries;
193
s.as_materialized_series()
194
.skew(bias)
195
.map(|opt_v| Column::new(s.name().clone(), &[opt_v]))
196
}
197
198
#[cfg(feature = "moment")]
199
pub(super) fn kurtosis(s: &Column, fisher: bool, bias: bool) -> PolarsResult<Column> {
200
// @scalar-opt
201
202
use polars_ops::series::MomentSeries;
203
s.as_materialized_series()
204
.kurtosis(fisher, bias)
205
.map(|opt_v| Column::new(s.name().clone(), &[opt_v]))
206
}
207
208
pub(super) fn arg_unique(s: &Column) -> PolarsResult<Column> {
209
// @scalar-opt
210
s.as_materialized_series()
211
.arg_unique()
212
.map(|ok| ok.into_column())
213
}
214
215
pub(super) fn arg_min(s: &Column) -> PolarsResult<Column> {
216
// @scalar-opt
217
Ok(s.as_materialized_series()
218
.arg_min()
219
.map_or(Scalar::null(IDX_DTYPE), |v| {
220
Scalar::from(IdxSize::try_from(v).expect("idxsize"))
221
})
222
.into_column(s.name().clone()))
223
}
224
225
pub(super) fn arg_max(s: &Column) -> PolarsResult<Column> {
226
// @scalar-opt
227
Ok(s.as_materialized_series()
228
.arg_max()
229
.map_or(Scalar::null(IDX_DTYPE), |v| {
230
Scalar::from(IdxSize::try_from(v).expect("idxsize"))
231
})
232
.into_column(s.name().clone()))
233
}
234
235
pub(super) fn arg_sort(s: &Column, descending: bool, nulls_last: bool) -> PolarsResult<Column> {
236
// @scalar-opt
237
Ok(s.as_materialized_series()
238
.arg_sort(SortOptions {
239
descending,
240
nulls_last,
241
multithreaded: true,
242
maintain_order: false,
243
limit: None,
244
})
245
.into_column())
246
}
247
248
pub(super) fn product(s: &Column) -> PolarsResult<Column> {
249
// @scalar-opt
250
s.as_materialized_series()
251
.product()
252
.map(|sc| sc.into_column(s.name().clone()))
253
}
254
255
#[cfg(feature = "rank")]
256
pub(super) fn rank(s: &Column, options: RankOptions, seed: Option<u64>) -> PolarsResult<Column> {
257
use polars_ops::series::SeriesRank;
258
259
Ok(s.as_materialized_series().rank(options, seed).into_column())
260
}
261
262
#[cfg(feature = "hist")]
263
pub(super) fn hist(
264
s: &[Column],
265
bin_count: Option<usize>,
266
include_category: bool,
267
include_breakpoint: bool,
268
) -> PolarsResult<Column> {
269
let bins = if s.len() == 2 { Some(&s[1]) } else { None };
270
let s = s[0].as_materialized_series();
271
polars_ops::prelude::hist_series(
272
s,
273
bin_count,
274
bins.map(|b| b.as_materialized_series().clone()),
275
include_category,
276
include_breakpoint,
277
)
278
.map(Column::from)
279
}
280
281
#[cfg(feature = "replace")]
282
pub(super) fn replace(s: &[Column]) -> PolarsResult<Column> {
283
polars_ops::series::replace(s[0].as_materialized_series(), s[1].list()?, s[2].list()?)
284
.map(Column::from)
285
}
286
287
#[cfg(feature = "replace")]
288
pub(super) fn replace_strict(s: &[Column], return_dtype: Option<DataType>) -> PolarsResult<Column> {
289
match s.get(3) {
290
Some(default) => polars_ops::series::replace_or_default(
291
s[0].as_materialized_series(),
292
s[1].list()?,
293
s[2].list()?,
294
default.as_materialized_series(),
295
return_dtype,
296
),
297
None => polars_ops::series::replace_strict(
298
s[0].as_materialized_series(),
299
s[1].list()?,
300
s[2].list()?,
301
return_dtype,
302
),
303
}
304
.map(Column::from)
305
}
306
307
pub(super) fn fill_null_with_strategy(
308
s: &Column,
309
strategy: FillNullStrategy,
310
) -> PolarsResult<Column> {
311
s.fill_null(strategy)
312
}
313
314
pub(super) fn gather_every(s: &Column, n: usize, offset: usize) -> PolarsResult<Column> {
315
s.gather_every(n, offset)
316
}
317
318
#[cfg(feature = "reinterpret")]
319
pub(super) fn reinterpret(s: &Column, signed: bool) -> PolarsResult<Column> {
320
polars_ops::series::reinterpret(s.as_materialized_series(), signed).map(Column::from)
321
}
322
323
pub(super) fn negate(s: &Column) -> PolarsResult<Column> {
324
polars_ops::series::negate(s.as_materialized_series()).map(Column::from)
325
}
326
327
pub(super) fn extend_constant(s: &[Column]) -> PolarsResult<Column> {
328
let value = &s[1];
329
let n = &s[2];
330
polars_ensure!(value.len() == 1 && n.len() == 1, ComputeError: "value and n should have unit length.");
331
let n = n.strict_cast(&DataType::UInt64)?;
332
let v = value.get(0)?;
333
let s = &s[0];
334
match n.u64()?.get(0) {
335
Some(n) => s.extend_constant(v, n as usize),
336
None => {
337
polars_bail!(ComputeError: "n can not be None for extend_constant.")
338
},
339
}
340
}
341
342
#[cfg(feature = "row_hash")]
343
pub(super) fn row_hash(c: &Column, k0: u64, k1: u64, k2: u64, k3: u64) -> PolarsResult<Column> {
344
use std::hash::BuildHasher;
345
346
use polars_utils::aliases::{
347
PlFixedStateQuality, PlSeedableRandomStateQuality, SeedableFromU64SeedExt,
348
};
349
350
// TODO: don't expose all these seeds.
351
let seed = PlFixedStateQuality::default().hash_one((k0, k1, k2, k3));
352
353
// @scalar-opt
354
Ok(c.as_materialized_series()
355
.hash(PlSeedableRandomStateQuality::seed_from_u64(seed))
356
.into_column())
357
}
358
359
#[cfg(feature = "arg_where")]
360
pub(super) fn arg_where(s: &mut [Column]) -> PolarsResult<Column> {
361
use polars_core::utils::arrow::bitmap::utils::SlicesIterator;
362
363
let predicate = s[0].bool()?;
364
365
if predicate.is_empty() {
366
Ok(Column::full_null(predicate.name().clone(), 0, &IDX_DTYPE))
367
} else {
368
use arrow::datatypes::IdxArr;
369
use polars_core::prelude::IdxCa;
370
371
let capacity = predicate.sum().unwrap();
372
let mut out = Vec::with_capacity(capacity as usize);
373
let mut total_offset = 0;
374
375
predicate.downcast_iter().for_each(|arr| {
376
let values = match arr.validity() {
377
Some(validity) if validity.unset_bits() > 0 => validity & arr.values(),
378
_ => arr.values().clone(),
379
};
380
381
for (offset, len) in SlicesIterator::new(&values) {
382
// law of small numbers optimization
383
if len == 1 {
384
out.push((total_offset + offset) as IdxSize)
385
} else {
386
let offset = (offset + total_offset) as IdxSize;
387
let len = len as IdxSize;
388
let iter = offset..offset + len;
389
out.extend(iter)
390
}
391
}
392
393
total_offset += arr.len();
394
});
395
let ca = IdxCa::with_chunk(predicate.name().clone(), IdxArr::from_vec(out));
396
Ok(ca.into_column())
397
}
398
}
399
400
#[cfg(feature = "index_of")]
401
/// Given two columns, find the index of a value (the second column) within the
402
/// first column. Will use binary search if possible, as an optimization.
403
pub(super) fn index_of(s: &mut [Column]) -> PolarsResult<Column> {
404
use polars_core::series::IsSorted;
405
use polars_ops::series::index_of as index_of_op;
406
let series = if let Column::Scalar(ref sc) = s[0] {
407
// We only care about the first value:
408
&sc.as_single_value_series()
409
} else {
410
s[0].as_materialized_series()
411
};
412
413
let needle_s = &s[1];
414
polars_ensure!(
415
needle_s.len() == 1,
416
InvalidOperation: "needle of `index_of` can only contain a single value, found {} values",
417
needle_s.len()
418
);
419
let needle = Scalar::new(
420
needle_s.dtype().clone(),
421
needle_s.get(0).unwrap().into_static(),
422
);
423
424
let is_sorted_flag = series.is_sorted_flag();
425
let result = match is_sorted_flag {
426
// If the Series is sorted, we can use an optimized binary search to
427
// find the value.
428
IsSorted::Ascending | IsSorted::Descending if !needle.is_null() => {
429
use polars_ops::series::SearchSortedSide;
430
431
polars_ops::series::search_sorted(
432
series,
433
needle_s.as_materialized_series(),
434
SearchSortedSide::Left,
435
IsSorted::Descending == is_sorted_flag,
436
)?
437
.get(0)
438
.and_then(|idx| {
439
// search_sorted() gives an index even if it's not an exact
440
// match! So we want to make sure it actually found the value.
441
if series.get(idx as usize).ok()? == needle.as_any_value() {
442
Some(idx as usize)
443
} else {
444
None
445
}
446
})
447
},
448
_ => index_of_op(series, needle)?,
449
};
450
451
let av = match result {
452
None => AnyValue::Null,
453
Some(idx) => AnyValue::from(idx as IdxSize),
454
};
455
let scalar = Scalar::new(IDX_DTYPE, av);
456
Ok(Column::new_scalar(series.name().clone(), scalar, 1))
457
}
458
459
#[cfg(feature = "search_sorted")]
460
pub(super) fn search_sorted_impl(
461
s: &mut [Column],
462
side: polars_ops::series::SearchSortedSide,
463
descending: bool,
464
) -> PolarsResult<Column> {
465
let sorted_array = &s[0];
466
let search_value = &s[1];
467
468
polars_ops::series::search_sorted(
469
sorted_array.as_materialized_series(),
470
search_value.as_materialized_series(),
471
side,
472
descending,
473
)
474
.map(|ca| ca.into_column())
475
}
476
477
#[cfg(feature = "sign")]
478
pub(super) fn sign(s: &Column) -> PolarsResult<Column> {
479
use num_traits::{One, Zero};
480
use polars_core::prelude::{ChunkedArray, PolarsNumericType};
481
use polars_core::with_match_physical_numeric_polars_type;
482
483
fn sign_impl<T>(ca: &ChunkedArray<T>) -> Column
484
where
485
T: PolarsNumericType,
486
ChunkedArray<T>: IntoColumn,
487
{
488
ca.apply_values(|x| {
489
if x < T::Native::zero() {
490
T::Native::zero() - T::Native::one()
491
} else if x > T::Native::zero() {
492
T::Native::one()
493
} else {
494
// Returning x here ensures we return NaN for NaN input, and
495
// maintain the sign for signed zeroes (although we don't really
496
// care about the latter).
497
x
498
}
499
})
500
.into_column()
501
}
502
503
let s = s.as_materialized_series();
504
let dtype = s.dtype();
505
use polars_core::datatypes::*;
506
match dtype {
507
_ if dtype.is_primitive_numeric() => with_match_physical_numeric_polars_type!(dtype, |$T| {
508
let ca: &ChunkedArray<$T> = s.as_ref().as_ref();
509
Ok(sign_impl(ca))
510
}),
511
DataType::Decimal(_, scale) => {
512
use polars_core::prelude::ChunkApply;
513
514
let ca = s.decimal()?;
515
let out = ca
516
.physical()
517
.apply_values(|x| polars_compute::decimal::dec128_sign(x, *scale))
518
.into_column();
519
unsafe { out.from_physical_unchecked(dtype) }
520
},
521
_ => polars_bail!(opq = sign, dtype),
522
}
523
}
524
525
pub(super) fn fill_null(s: &[Column]) -> PolarsResult<Column> {
526
match (s[0].len(), s[1].len()) {
527
(a, b) if a == b || b == 1 => {
528
let series = s[0].clone();
529
530
// Nothing to fill, so return early
531
// this is done after casting as the output type must be correct
532
if series.null_count() == 0 {
533
return Ok(series);
534
}
535
536
let fill_value = s[1].clone();
537
538
// default branch
539
fn default(series: Column, fill_value: Column) -> PolarsResult<Column> {
540
let mask = series.is_not_null();
541
series.zip_with_same_type(&mask, &fill_value)
542
}
543
544
let fill_value = if series.dtype().is_categorical() && fill_value.dtype().is_string() {
545
fill_value.cast(series.dtype()).unwrap()
546
} else {
547
fill_value
548
};
549
default(series, fill_value)
550
},
551
(1, other_len) => {
552
if s[0].has_nulls() {
553
Ok(s[1].clone())
554
} else {
555
Ok(s[0].new_from_index(0, other_len))
556
}
557
},
558
(self_len, other_len) => polars_bail!(length_mismatch = "fill_null", self_len, other_len),
559
}
560
}
561
562
pub(super) fn coalesce(s: &mut [Column]) -> PolarsResult<Column> {
563
polars_ops::series::coalesce_columns(s)
564
}
565
566
pub(super) fn drop_nans(s: Column) -> PolarsResult<Column> {
567
match s.dtype() {
568
#[cfg(feature = "dtype-f16")]
569
DataType::Float16 => {
570
let ca = s.f16()?;
571
let mask = ca.is_not_nan() | ca.is_null();
572
ca.filter(&mask).map(|ca| ca.into_column())
573
},
574
DataType::Float32 => {
575
let ca = s.f32()?;
576
let mask = ca.is_not_nan() | ca.is_null();
577
ca.filter(&mask).map(|ca| ca.into_column())
578
},
579
DataType::Float64 => {
580
let ca = s.f64()?;
581
let mask = ca.is_not_nan() | ca.is_null();
582
ca.filter(&mask).map(|ca| ca.into_column())
583
},
584
_ => Ok(s),
585
}
586
}
587
588
#[cfg(feature = "round_series")]
589
pub(super) fn clip(s: &[Column], has_min: bool, has_max: bool) -> PolarsResult<Column> {
590
match (has_min, has_max) {
591
(true, true) => polars_ops::series::clip(
592
s[0].as_materialized_series(),
593
s[1].as_materialized_series(),
594
s[2].as_materialized_series(),
595
),
596
(true, false) => polars_ops::series::clip_min(
597
s[0].as_materialized_series(),
598
s[1].as_materialized_series(),
599
),
600
(false, true) => polars_ops::series::clip_max(
601
s[0].as_materialized_series(),
602
s[1].as_materialized_series(),
603
),
604
_ => unreachable!(),
605
}
606
.map(Column::from)
607
}
608
609
#[cfg(feature = "dtype-struct")]
610
pub fn as_struct(cols: &[Column]) -> PolarsResult<Column> {
611
use polars_core::prelude::StructChunked;
612
613
let Some(fst) = cols.first() else {
614
polars_bail!(nyi = "turning no columns as_struct");
615
};
616
617
let mut min_length = usize::MAX;
618
let mut max_length = usize::MIN;
619
620
for col in cols {
621
let len = col.len();
622
623
min_length = min_length.min(len);
624
max_length = max_length.max(len);
625
}
626
627
// @NOTE: Any additional errors should be handled by the StructChunked::from_columns
628
let length = if min_length == 0 { 0 } else { max_length };
629
630
Ok(StructChunked::from_columns(fst.name().clone(), length, cols)?.into_column())
631
}
632
633
#[cfg(feature = "log")]
634
pub(super) fn entropy(s: &Column, base: f64, normalize: bool) -> PolarsResult<Column> {
635
use polars_ops::series::LogSeries;
636
637
let out = s.as_materialized_series().entropy(base, normalize)?;
638
if matches!(s.dtype(), DataType::Float32) {
639
let out = out as f32;
640
Ok(Column::new(s.name().clone(), [out]))
641
} else {
642
Ok(Column::new(s.name().clone(), [out]))
643
}
644
}
645
646
#[cfg(feature = "log")]
647
pub(super) fn log(columns: &[Column]) -> PolarsResult<Column> {
648
use polars_ops::series::LogSeries;
649
650
assert_eq!(columns.len(), 2);
651
Column::apply_broadcasting_binary_elementwise(&columns[0], &columns[1], Series::log)
652
}
653
654
#[cfg(feature = "log")]
655
pub(super) fn log1p(s: &Column) -> PolarsResult<Column> {
656
use polars_ops::series::LogSeries;
657
658
Ok(s.as_materialized_series().log1p().into())
659
}
660
661
#[cfg(feature = "log")]
662
pub(super) fn exp(s: &Column) -> PolarsResult<Column> {
663
use polars_ops::series::LogSeries;
664
665
Ok(s.as_materialized_series().exp().into())
666
}
667
668
pub(super) fn unique(s: &Column, stable: bool) -> PolarsResult<Column> {
669
if stable {
670
s.unique_stable()
671
} else {
672
s.unique()
673
}
674
}
675
676
#[cfg(feature = "fused")]
677
pub(super) fn fused(input: &[Column], op: FusedOperator) -> PolarsResult<Column> {
678
use polars_plan::plans::FusedOperator;
679
680
let s0 = &input[0];
681
let s1 = &input[1];
682
let s2 = &input[2];
683
match op {
684
FusedOperator::MultiplyAdd => Ok(polars_ops::series::fma_columns(s0, s1, s2)),
685
FusedOperator::SubMultiply => Ok(polars_ops::series::fsm_columns(s0, s1, s2)),
686
FusedOperator::MultiplySub => Ok(polars_ops::series::fms_columns(s0, s1, s2)),
687
}
688
}
689
690
pub(super) fn concat_expr(s: &[Column], rechunk: bool) -> PolarsResult<Column> {
691
let mut first = s[0].clone();
692
693
for s in &s[1..] {
694
first.append(s)?;
695
}
696
if rechunk {
697
first = first.rechunk()
698
}
699
Ok(first)
700
}
701
702
#[cfg(feature = "cov")]
703
pub(super) fn corr(s: &[Column], method: IRCorrelationMethod) -> PolarsResult<Column> {
704
use polars_plan::plans::IRCorrelationMethod;
705
706
fn covariance(s: &[Column], ddof: u8) -> PolarsResult<Column> {
707
let a = &s[0];
708
let b = &s[1];
709
let name = PlSmallStr::from_static("cov");
710
711
use polars_ops::chunked_array::cov::cov;
712
let ret = match a.dtype() {
713
#[cfg(feature = "dtype-f16")]
714
DataType::Float16 => {
715
use num_traits::AsPrimitive;
716
use polars_utils::float16::pf16;
717
718
let ret =
719
cov(a.f16().unwrap(), b.f16().unwrap(), ddof).map(AsPrimitive::<pf16>::as_);
720
return Ok(Column::new(name, &[ret]));
721
},
722
DataType::Float32 => {
723
let ret = cov(a.f32().unwrap(), b.f32().unwrap(), ddof).map(|v| v as f32);
724
return Ok(Column::new(name, &[ret]));
725
},
726
DataType::Float64 => cov(a.f64().unwrap(), b.f64().unwrap(), ddof),
727
DataType::Int32 => cov(a.i32().unwrap(), b.i32().unwrap(), ddof),
728
DataType::Int64 => cov(a.i64().unwrap(), b.i64().unwrap(), ddof),
729
DataType::UInt32 => cov(a.u32().unwrap(), b.u32().unwrap(), ddof),
730
DataType::UInt64 => cov(a.u64().unwrap(), b.u64().unwrap(), ddof),
731
_ => {
732
let a = a.cast(&DataType::Float64)?;
733
let b = b.cast(&DataType::Float64)?;
734
cov(a.f64().unwrap(), b.f64().unwrap(), ddof)
735
},
736
};
737
Ok(Column::new(name, &[ret]))
738
}
739
740
fn pearson_corr(s: &[Column]) -> PolarsResult<Column> {
741
let a = &s[0];
742
let b = &s[1];
743
let name = PlSmallStr::from_static("pearson_corr");
744
745
use polars_ops::chunked_array::cov::pearson_corr;
746
let ret = match a.dtype() {
747
#[cfg(feature = "dtype-f16")]
748
DataType::Float16 => {
749
use num_traits::AsPrimitive;
750
use polars_utils::float16::pf16;
751
752
let ret =
753
pearson_corr(a.f16().unwrap(), b.f16().unwrap()).map(AsPrimitive::<pf16>::as_);
754
return Ok(Column::new(name, &[ret]));
755
},
756
DataType::Float32 => {
757
let ret = pearson_corr(a.f32().unwrap(), b.f32().unwrap()).map(|v| v as f32);
758
return Ok(Column::new(name, &[ret]));
759
},
760
DataType::Float64 => pearson_corr(a.f64().unwrap(), b.f64().unwrap()),
761
DataType::Int32 => pearson_corr(a.i32().unwrap(), b.i32().unwrap()),
762
DataType::Int64 => pearson_corr(a.i64().unwrap(), b.i64().unwrap()),
763
DataType::UInt32 => pearson_corr(a.u32().unwrap(), b.u32().unwrap()),
764
_ => {
765
let a = a.cast(&DataType::Float64)?;
766
let b = b.cast(&DataType::Float64)?;
767
pearson_corr(a.f64().unwrap(), b.f64().unwrap())
768
},
769
};
770
Ok(Column::new(name, &[ret]))
771
}
772
773
#[cfg(all(feature = "rank", feature = "propagate_nans"))]
774
fn spearman_rank_corr(s: &[Column], propagate_nans: bool) -> PolarsResult<Column> {
775
use polars_core::utils::coalesce_nulls_columns;
776
use polars_ops::chunked_array::nan_propagating_aggregate::nan_max_s;
777
use polars_ops::series::{RankMethod, SeriesRank};
778
let a = &s[0];
779
let b = &s[1];
780
781
let (a, b) = coalesce_nulls_columns(a, b);
782
783
let name = PlSmallStr::from_static("spearman_rank_correlation");
784
if propagate_nans && a.dtype().is_float() {
785
for s in [&a, &b] {
786
let max = nan_max_s(s.as_materialized_series(), PlSmallStr::EMPTY);
787
if max.get(0).is_ok_and(|m| m.is_nan()) {
788
return Ok(Column::new(name, &[f64::NAN]));
789
}
790
}
791
}
792
793
// drop nulls so that they are excluded
794
let a = a.drop_nulls();
795
let b = b.drop_nulls();
796
797
let a_rank = a
798
.as_materialized_series()
799
.rank(
800
RankOptions {
801
method: RankMethod::Average,
802
..Default::default()
803
},
804
None,
805
)
806
.into();
807
let b_rank = b
808
.as_materialized_series()
809
.rank(
810
RankOptions {
811
method: RankMethod::Average,
812
..Default::default()
813
},
814
None,
815
)
816
.into();
817
818
pearson_corr(&[a_rank, b_rank])
819
}
820
821
polars_ensure!(
822
s[0].len() == s[1].len() || s[0].len() == 1 || s[1].len() == 1,
823
length_mismatch = "corr",
824
s[0].len(),
825
s[1].len()
826
);
827
828
match method {
829
IRCorrelationMethod::Pearson => pearson_corr(s),
830
#[cfg(all(feature = "rank", feature = "propagate_nans"))]
831
IRCorrelationMethod::SpearmanRank(propagate_nans) => spearman_rank_corr(s, propagate_nans),
832
IRCorrelationMethod::Covariance(ddof) => covariance(s, ddof),
833
}
834
}
835
836
#[cfg(feature = "peaks")]
837
pub(super) fn peak_min(s: &Column) -> PolarsResult<Column> {
838
polars_ops::prelude::peaks::peak_min_max(s, &AnyValue::Int8(0), &AnyValue::Int8(0), false)
839
.map(IntoColumn::into_column)
840
}
841
842
#[cfg(feature = "peaks")]
843
pub(super) fn peak_max(s: &Column) -> PolarsResult<Column> {
844
polars_ops::prelude::peaks::peak_min_max(s, &AnyValue::Int8(0), &AnyValue::Int8(0), true)
845
.map(IntoColumn::into_column)
846
}
847
848
#[cfg(feature = "cutqcut")]
849
pub(super) fn cut(
850
s: &Column,
851
breaks: Vec<f64>,
852
labels: Option<Vec<PlSmallStr>>,
853
left_closed: bool,
854
include_breaks: bool,
855
) -> PolarsResult<Column> {
856
polars_ops::prelude::cut(
857
s.as_materialized_series(),
858
breaks,
859
labels,
860
left_closed,
861
include_breaks,
862
)
863
.map(Column::from)
864
}
865
866
#[cfg(feature = "cutqcut")]
867
pub(super) fn qcut(
868
s: &Column,
869
probs: Vec<f64>,
870
labels: Option<Vec<PlSmallStr>>,
871
left_closed: bool,
872
allow_duplicates: bool,
873
include_breaks: bool,
874
) -> PolarsResult<Column> {
875
polars_ops::prelude::qcut(
876
s.as_materialized_series(),
877
probs,
878
labels,
879
left_closed,
880
allow_duplicates,
881
include_breaks,
882
)
883
.map(Column::from)
884
}
885
886
#[cfg(feature = "ewma")]
887
pub(super) fn ewm_mean(
888
s: &Column,
889
options: polars_ops::series::EWMOptions,
890
) -> PolarsResult<Column> {
891
polars_ops::prelude::ewm_mean(s.as_materialized_series(), options).map(Column::from)
892
}
893
894
#[cfg(feature = "ewma")]
895
pub(super) fn ewm_std(s: &Column, options: polars_ops::series::EWMOptions) -> PolarsResult<Column> {
896
polars_ops::prelude::ewm_std(s.as_materialized_series(), options).map(Column::from)
897
}
898
899
#[cfg(feature = "ewma")]
900
pub(super) fn ewm_var(s: &Column, options: polars_ops::series::EWMOptions) -> PolarsResult<Column> {
901
polars_ops::prelude::ewm_var(s.as_materialized_series(), options).map(Column::from)
902
}
903
904
#[cfg(feature = "ewma_by")]
905
pub(super) fn ewm_mean_by(s: &[Column], half_life: polars_time::Duration) -> PolarsResult<Column> {
906
use polars_ops::series::SeriesMethods;
907
908
let time_zone = match s[1].dtype() {
909
DataType::Datetime(_, Some(time_zone)) => Some(time_zone),
910
_ => None,
911
};
912
polars_ensure!(!half_life.negative(), InvalidOperation: "half_life cannot be negative");
913
polars_time::prelude::ensure_is_constant_duration(half_life, time_zone, "half_life")?;
914
// `half_life` is a constant duration so we can safely use `duration_ns()`.
915
let half_life = half_life.duration_ns();
916
let values = &s[0];
917
let times = &s[1];
918
let times_is_sorted = times
919
.as_materialized_series()
920
.is_sorted(Default::default())?;
921
polars_ops::prelude::ewm_mean_by(
922
values.as_materialized_series(),
923
times.as_materialized_series(),
924
half_life,
925
times_is_sorted,
926
)
927
.map(Column::from)
928
}
929
930
pub fn row_encode(
931
c: &mut [Column],
932
dts: Vec<DataType>,
933
variant: RowEncodingVariant,
934
) -> PolarsResult<Column> {
935
assert_eq!(c.len(), dts.len());
936
937
// We need to make sure that the output types are correct or we will get wrong results or even
938
// segfaults when decoding.
939
for (dt, c) in dts.iter().zip(c.iter_mut()) {
940
if c.dtype().matches_schema_type(dt)? {
941
*c = c.cast(dt)?;
942
}
943
}
944
945
let name = PlSmallStr::from_static("row_encoded");
946
match variant {
947
RowEncodingVariant::Unordered => _get_rows_encoded_ca_unordered(name, c),
948
RowEncodingVariant::Ordered {
949
descending,
950
nulls_last,
951
} => {
952
let descending = descending.unwrap_or_else(|| vec![false; c.len()]);
953
let nulls_last = nulls_last.unwrap_or_else(|| vec![false; c.len()]);
954
955
assert_eq!(c.len(), descending.len());
956
assert_eq!(c.len(), nulls_last.len());
957
958
_get_rows_encoded_ca(name, c, &descending, &nulls_last)
959
},
960
}
961
.map(IntoColumn::into_column)
962
}
963
964
#[cfg(feature = "dtype-struct")]
965
pub fn row_decode(
966
c: &mut [Column],
967
fields: Vec<Field>,
968
variant: RowEncodingVariant,
969
) -> PolarsResult<Column> {
970
use polars_core::prelude::row_encode::row_encoding_decode;
971
972
assert_eq!(c.len(), 1);
973
let ca = c[0].binary_offset()?;
974
975
let mut opts = Vec::with_capacity(fields.len());
976
match variant {
977
RowEncodingVariant::Unordered => opts.extend(std::iter::repeat_n(
978
RowEncodingOptions::new_unsorted(),
979
fields.len(),
980
)),
981
RowEncodingVariant::Ordered {
982
descending,
983
nulls_last,
984
} => {
985
let descending = descending.unwrap_or_else(|| vec![false; fields.len()]);
986
let nulls_last = nulls_last.unwrap_or_else(|| vec![false; fields.len()]);
987
988
assert_eq!(fields.len(), descending.len());
989
assert_eq!(fields.len(), nulls_last.len());
990
991
opts.extend(
992
descending
993
.into_iter()
994
.zip(nulls_last)
995
.map(|(d, n)| RowEncodingOptions::new_sorted(d, n)),
996
)
997
},
998
}
999
1000
row_encoding_decode(ca, &fields, &opts).map(IntoColumn::into_column)
1001
}
1002
1003
pub fn repeat(args: &[Column]) -> PolarsResult<Column> {
1004
let c = &args[0];
1005
let n = &args[1];
1006
1007
polars_ensure!(
1008
n.dtype().is_integer(),
1009
SchemaMismatch: "expected expression of dtype 'integer', got '{}'", n.dtype()
1010
);
1011
1012
let first_value = n.get(0)?;
1013
let n = first_value.extract::<usize>().ok_or_else(
1014
|| polars_err!(ComputeError: "could not parse value '{}' as a size.", first_value),
1015
)?;
1016
1017
Ok(c.new_from_index(0, n))
1018
}
1019
1020