Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/dispatch/misc.rs
8409 views
1
use polars_core::error::{PolarsResult, polars_bail, polars_ensure, polars_err};
2
use polars_core::prelude::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
3
use polars_core::prelude::*;
4
use polars_core::scalar::Scalar;
5
use polars_core::series::ops::NullBehavior;
6
use polars_core::series::{IsSorted, Series};
7
use polars_core::utils::try_get_supertype;
8
#[cfg(feature = "interpolate")]
9
use polars_ops::series::InterpolationMethod;
10
#[cfg(feature = "rank")]
11
use polars_ops::series::RankOptions;
12
use polars_ops::series::{ArgAgg, NullStrategy, SeriesMethods};
13
#[cfg(feature = "dtype-array")]
14
use polars_plan::dsl::ReshapeDimension;
15
#[cfg(feature = "fused")]
16
use polars_plan::plans::FusedOperator;
17
#[cfg(feature = "cov")]
18
use polars_plan::plans::IRCorrelationMethod;
19
use polars_plan::plans::RowEncodingVariant;
20
use polars_row::RowEncodingOptions;
21
use polars_utils::IdxSize;
22
use polars_utils::pl_str::PlSmallStr;
23
24
#[cfg(feature = "abs")]
25
pub(super) fn abs(s: &Column) -> PolarsResult<Column> {
26
polars_ops::prelude::abs(s.as_materialized_series()).map(Column::from)
27
}
28
29
pub(super) fn reverse(s: &Column) -> PolarsResult<Column> {
30
Ok(s.reverse())
31
}
32
33
#[cfg(feature = "approx_unique")]
34
pub(super) fn approx_n_unique(s: &Column) -> PolarsResult<Column> {
35
s.approx_n_unique()
36
.map(|v| Column::new_scalar(s.name().clone(), Scalar::new(IDX_DTYPE, v.into()), 1))
37
}
38
39
#[cfg(feature = "diff")]
40
pub(super) fn diff(s: &[Column], null_behavior: NullBehavior) -> PolarsResult<Column> {
41
let s1 = s[0].as_materialized_series();
42
let n = &s[1];
43
44
polars_ensure!(
45
n.len() == 1,
46
ComputeError: "n must be a single value."
47
);
48
let n = n.strict_cast(&DataType::Int64)?;
49
match n.i64()?.get(0) {
50
Some(n) => polars_ops::prelude::diff(s1, n, null_behavior).map(Column::from),
51
None => polars_bail!(ComputeError: "'n' can not be None for diff"),
52
}
53
}
54
55
#[cfg(feature = "pct_change")]
56
pub(super) fn pct_change(s: &[Column]) -> PolarsResult<Column> {
57
polars_ops::prelude::pct_change(s[0].as_materialized_series(), s[1].as_materialized_series())
58
.map(Column::from)
59
}
60
61
#[cfg(feature = "interpolate")]
62
pub(super) fn interpolate(s: &Column, method: InterpolationMethod) -> PolarsResult<Column> {
63
Ok(polars_ops::prelude::interpolate(s.as_materialized_series(), method).into())
64
}
65
66
#[cfg(feature = "interpolate_by")]
67
pub(super) fn interpolate_by(s: &[Column]) -> PolarsResult<Column> {
68
use polars_ops::series::SeriesMethods;
69
70
let by = &s[1];
71
let by_is_sorted = by.as_materialized_series().is_sorted(Default::default())?;
72
polars_ops::prelude::interpolate_by(&s[0], by, by_is_sorted)
73
}
74
75
pub(super) fn to_physical(s: &Column) -> PolarsResult<Column> {
76
Ok(s.to_physical_repr())
77
}
78
79
pub(super) fn set_sorted_flag(s: &Column, sorted: IsSorted) -> PolarsResult<Column> {
80
let mut s = s.clone();
81
s.set_sorted_flag(sorted);
82
Ok(s)
83
}
84
85
#[cfg(feature = "timezones")]
86
pub(super) fn replace_time_zone(
87
s: &[Column],
88
time_zone: Option<&TimeZone>,
89
non_existent: NonExistent,
90
) -> PolarsResult<Column> {
91
let s1 = &s[0];
92
let ca = s1.datetime().unwrap();
93
let s2 = &s[1].str()?;
94
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, s2, non_existent)?.into_column())
95
}
96
97
#[cfg(feature = "dtype-struct")]
98
pub(super) fn value_counts(
99
s: &Column,
100
sort: bool,
101
parallel: bool,
102
name: PlSmallStr,
103
normalize: bool,
104
) -> PolarsResult<Column> {
105
use polars_ops::series::SeriesMethods;
106
107
s.as_materialized_series()
108
.value_counts(sort, parallel, name, normalize)
109
.map(|df| df.into_struct(s.name().clone()).into_column())
110
}
111
112
#[cfg(feature = "unique_counts")]
113
pub(super) fn unique_counts(s: &Column) -> PolarsResult<Column> {
114
polars_ops::prelude::unique_counts(s.as_materialized_series()).map(Column::from)
115
}
116
117
#[cfg(feature = "dtype-array")]
118
pub(super) fn reshape(c: &Column, dimensions: &[ReshapeDimension]) -> PolarsResult<Column> {
119
c.reshape_array(dimensions)
120
}
121
122
#[cfg(feature = "repeat_by")]
123
pub(super) fn repeat_by(s: &[Column]) -> PolarsResult<Column> {
124
let by = &s[1];
125
let s = &s[0];
126
let by = by.strict_cast(&IDX_DTYPE)?;
127
polars_ops::chunked_array::repeat_by(s.as_materialized_series(), by.idx()?)
128
.map(|ok| ok.into_column())
129
}
130
131
pub(super) fn max_horizontal(s: &mut [Column]) -> PolarsResult<Column> {
132
polars_ops::prelude::max_horizontal(s).map(Option::unwrap)
133
}
134
135
pub(super) fn min_horizontal(s: &mut [Column]) -> PolarsResult<Column> {
136
polars_ops::prelude::min_horizontal(s).map(Option::unwrap)
137
}
138
139
pub(super) fn sum_horizontal(s: &mut [Column], ignore_nulls: bool) -> PolarsResult<Column> {
140
let null_strategy = if ignore_nulls {
141
NullStrategy::Ignore
142
} else {
143
NullStrategy::Propagate
144
};
145
polars_ops::prelude::sum_horizontal(s, null_strategy).map(Option::unwrap)
146
}
147
148
pub(super) fn mean_horizontal(s: &mut [Column], ignore_nulls: bool) -> PolarsResult<Column> {
149
let null_strategy = if ignore_nulls {
150
NullStrategy::Ignore
151
} else {
152
NullStrategy::Propagate
153
};
154
polars_ops::prelude::mean_horizontal(s, null_strategy).map(Option::unwrap)
155
}
156
157
pub(super) fn drop_nulls(s: &Column) -> PolarsResult<Column> {
158
Ok(s.drop_nulls())
159
}
160
161
pub fn rechunk(s: &Column) -> PolarsResult<Column> {
162
Ok(s.rechunk())
163
}
164
165
pub fn append(s: &[Column], upcast: bool) -> PolarsResult<Column> {
166
assert_eq!(s.len(), 2);
167
168
let a = &s[0];
169
let b = &s[1];
170
171
if upcast {
172
let dtype = try_get_supertype(a.dtype(), b.dtype())?;
173
let mut a = a.cast(&dtype)?;
174
a.append_owned(b.cast(&dtype)?)?;
175
Ok(a)
176
} else {
177
let mut a = a.clone();
178
a.append(b)?;
179
Ok(a)
180
}
181
}
182
183
#[cfg(feature = "mode")]
184
pub(super) fn mode(s: &Column, maintain_order: bool) -> PolarsResult<Column> {
185
polars_ops::prelude::mode::mode(s.as_materialized_series(), maintain_order).map(Column::from)
186
}
187
188
#[cfg(feature = "moment")]
189
pub(super) fn skew(s: &Column, bias: bool) -> PolarsResult<Column> {
190
// @scalar-opt
191
192
use polars_ops::series::MomentSeries;
193
s.as_materialized_series()
194
.skew(bias)
195
.map(|opt_v| Column::new(s.name().clone(), &[opt_v]))
196
}
197
198
#[cfg(feature = "moment")]
199
pub(super) fn kurtosis(s: &Column, fisher: bool, bias: bool) -> PolarsResult<Column> {
200
// @scalar-opt
201
202
use polars_ops::series::MomentSeries;
203
s.as_materialized_series()
204
.kurtosis(fisher, bias)
205
.map(|opt_v| Column::new(s.name().clone(), &[opt_v]))
206
}
207
208
pub(super) fn arg_unique(s: &Column) -> PolarsResult<Column> {
209
// @scalar-opt
210
s.as_materialized_series()
211
.arg_unique()
212
.map(|ok| ok.into_column())
213
}
214
215
pub(super) fn arg_min(s: &Column) -> PolarsResult<Column> {
216
// @scalar-opt
217
Ok(s.as_materialized_series()
218
.arg_min()
219
.map_or(Scalar::null(IDX_DTYPE), |v| {
220
Scalar::from(IdxSize::try_from(v).expect("idxsize"))
221
})
222
.into_column(s.name().clone()))
223
}
224
225
pub(super) fn arg_max(s: &Column) -> PolarsResult<Column> {
226
// @scalar-opt
227
Ok(s.as_materialized_series()
228
.arg_max()
229
.map_or(Scalar::null(IDX_DTYPE), |v| {
230
Scalar::from(IdxSize::try_from(v).expect("idxsize"))
231
})
232
.into_column(s.name().clone()))
233
}
234
235
pub(super) fn arg_sort(s: &Column, descending: bool, nulls_last: bool) -> PolarsResult<Column> {
236
// @scalar-opt
237
Ok(s.as_materialized_series()
238
.arg_sort(SortOptions {
239
descending,
240
nulls_last,
241
multithreaded: true,
242
maintain_order: false,
243
limit: None,
244
})
245
.into_column())
246
}
247
248
pub(super) fn min_by(s: &[Column]) -> PolarsResult<Column> {
249
assert!(s.len() == 2);
250
let input = &s[0];
251
let by = &s[1];
252
if input.len() != by.len() {
253
polars_bail!(ShapeMismatch: "'by' column in `min_by` operation has incorrect length (got {}, expected {})", by.len(), input.len());
254
}
255
match by.as_materialized_series().arg_min() {
256
Some(idx) => Ok(input.new_from_index(idx, 1)),
257
None => Ok(Series::new_null(input.name().clone(), 1).into_column()),
258
}
259
}
260
261
pub(super) fn max_by(s: &[Column]) -> PolarsResult<Column> {
262
assert!(s.len() == 2);
263
let input = &s[0];
264
let by = &s[1];
265
if input.len() != by.len() {
266
polars_bail!(ShapeMismatch: "'by' column in `max_by` operation has incorrect length (got {}, expected {})", by.len(), input.len());
267
}
268
match by.as_materialized_series().arg_max() {
269
Some(idx) => Ok(input.new_from_index(idx, 1)),
270
None => Ok(Series::new_null(input.name().clone(), 1).into_column()),
271
}
272
}
273
274
pub(super) fn product(s: &Column) -> PolarsResult<Column> {
275
// @scalar-opt
276
s.as_materialized_series()
277
.product()
278
.map(|sc| sc.into_column(s.name().clone()))
279
}
280
281
#[cfg(feature = "rank")]
282
pub(super) fn rank(s: &Column, options: RankOptions, seed: Option<u64>) -> PolarsResult<Column> {
283
use polars_ops::series::SeriesRank;
284
285
Ok(s.as_materialized_series().rank(options, seed).into_column())
286
}
287
288
#[cfg(feature = "hist")]
289
pub(super) fn hist(
290
s: &[Column],
291
bin_count: Option<usize>,
292
include_category: bool,
293
include_breakpoint: bool,
294
) -> PolarsResult<Column> {
295
let bins = if s.len() == 2 { Some(&s[1]) } else { None };
296
let s = s[0].as_materialized_series();
297
polars_ops::prelude::hist_series(
298
s,
299
bin_count,
300
bins.map(|b| b.as_materialized_series().clone()),
301
include_category,
302
include_breakpoint,
303
)
304
.map(Column::from)
305
}
306
307
#[cfg(feature = "replace")]
308
pub(super) fn replace(s: &[Column]) -> PolarsResult<Column> {
309
polars_ops::series::replace(s[0].as_materialized_series(), s[1].list()?, s[2].list()?)
310
.map(Column::from)
311
}
312
313
#[cfg(feature = "replace")]
314
pub(super) fn replace_strict(s: &[Column], return_dtype: Option<DataType>) -> PolarsResult<Column> {
315
match s.get(3) {
316
Some(default) => polars_ops::series::replace_or_default(
317
s[0].as_materialized_series(),
318
s[1].list()?,
319
s[2].list()?,
320
default.as_materialized_series(),
321
return_dtype,
322
),
323
None => polars_ops::series::replace_strict(
324
s[0].as_materialized_series(),
325
s[1].list()?,
326
s[2].list()?,
327
return_dtype,
328
),
329
}
330
.map(Column::from)
331
}
332
333
pub(super) fn fill_null_with_strategy(
334
s: &Column,
335
strategy: FillNullStrategy,
336
) -> PolarsResult<Column> {
337
s.fill_null(strategy)
338
}
339
340
pub(super) fn gather_every(s: &Column, n: usize, offset: usize) -> PolarsResult<Column> {
341
s.gather_every(n, offset)
342
}
343
344
#[cfg(feature = "reinterpret")]
345
pub(super) fn reinterpret(s: &Column, signed: bool) -> PolarsResult<Column> {
346
polars_ops::series::reinterpret(s.as_materialized_series(), signed).map(Column::from)
347
}
348
349
pub(super) fn negate(s: &Column) -> PolarsResult<Column> {
350
polars_ops::series::negate(s.as_materialized_series()).map(Column::from)
351
}
352
353
pub(super) fn extend_constant(s: &[Column]) -> PolarsResult<Column> {
354
let value = &s[1];
355
let n = &s[2];
356
polars_ensure!(value.len() == 1 && n.len() == 1, ComputeError: "value and n should have unit length.");
357
let n = n.strict_cast(&DataType::UInt64)?;
358
let v = value.get(0)?;
359
let s = &s[0];
360
match n.u64()?.get(0) {
361
Some(n) => s.extend_constant(v, n as usize),
362
None => {
363
polars_bail!(ComputeError: "n can not be None for extend_constant.")
364
},
365
}
366
}
367
368
#[cfg(feature = "row_hash")]
369
pub(super) fn row_hash(c: &Column, k0: u64, k1: u64, k2: u64, k3: u64) -> PolarsResult<Column> {
370
use std::hash::BuildHasher;
371
372
use polars_utils::aliases::{
373
PlFixedStateQuality, PlSeedableRandomStateQuality, SeedableFromU64SeedExt,
374
};
375
376
// TODO: don't expose all these seeds.
377
let seed = PlFixedStateQuality::default().hash_one((k0, k1, k2, k3));
378
379
// @scalar-opt
380
Ok(c.as_materialized_series()
381
.hash(PlSeedableRandomStateQuality::seed_from_u64(seed))
382
.into_column())
383
}
384
385
#[cfg(feature = "arg_where")]
386
pub(super) fn arg_where(s: &mut [Column]) -> PolarsResult<Column> {
387
use polars_core::utils::arrow::bitmap::utils::SlicesIterator;
388
389
let predicate = s[0].bool()?;
390
391
if predicate.is_empty() {
392
Ok(Column::full_null(predicate.name().clone(), 0, &IDX_DTYPE))
393
} else {
394
use arrow::datatypes::IdxArr;
395
use polars_core::prelude::IdxCa;
396
397
let capacity = predicate.sum().unwrap();
398
let mut out = Vec::with_capacity(capacity as usize);
399
let mut total_offset = 0;
400
401
predicate.downcast_iter().for_each(|arr| {
402
let values = match arr.validity() {
403
Some(validity) if validity.unset_bits() > 0 => validity & arr.values(),
404
_ => arr.values().clone(),
405
};
406
407
for (offset, len) in SlicesIterator::new(&values) {
408
// law of small numbers optimization
409
if len == 1 {
410
out.push((total_offset + offset) as IdxSize)
411
} else {
412
let offset = (offset + total_offset) as IdxSize;
413
let len = len as IdxSize;
414
let iter = offset..offset + len;
415
out.extend(iter)
416
}
417
}
418
419
total_offset += arr.len();
420
});
421
let ca = IdxCa::with_chunk(predicate.name().clone(), IdxArr::from_vec(out));
422
Ok(ca.into_column())
423
}
424
}
425
426
#[cfg(feature = "index_of")]
427
/// Given two columns, find the index of a value (the second column) within the
428
/// first column. Will use binary search if possible, as an optimization.
429
pub(super) fn index_of(s: &mut [Column]) -> PolarsResult<Column> {
430
use polars_core::series::IsSorted;
431
use polars_ops::series::index_of as index_of_op;
432
let series = if let Column::Scalar(ref sc) = s[0] {
433
// We only care about the first value:
434
&sc.as_single_value_series()
435
} else {
436
s[0].as_materialized_series()
437
};
438
439
let needle_s = &s[1];
440
polars_ensure!(
441
needle_s.len() == 1,
442
InvalidOperation: "needle of `index_of` can only contain a single value, found {} values",
443
needle_s.len()
444
);
445
let needle = Scalar::new(
446
needle_s.dtype().clone(),
447
needle_s.get(0).unwrap().into_static(),
448
);
449
450
let is_sorted_flag = series.is_sorted_flag();
451
let result = match is_sorted_flag {
452
// If the Series is sorted, we can use an optimized binary search to
453
// find the value.
454
IsSorted::Ascending | IsSorted::Descending if !needle.is_null() => {
455
use polars_ops::series::SearchSortedSide;
456
457
polars_ops::series::search_sorted(
458
series,
459
needle_s.as_materialized_series(),
460
SearchSortedSide::Left,
461
IsSorted::Descending == is_sorted_flag,
462
)?
463
.get(0)
464
.and_then(|idx| {
465
// search_sorted() gives an index even if it's not an exact
466
// match! So we want to make sure it actually found the value.
467
if series.get(idx as usize).ok()? == needle.as_any_value() {
468
Some(idx as usize)
469
} else {
470
None
471
}
472
})
473
},
474
_ => index_of_op(series, needle)?,
475
};
476
477
let av = match result {
478
None => AnyValue::Null,
479
Some(idx) => AnyValue::from(idx as IdxSize),
480
};
481
let scalar = Scalar::new(IDX_DTYPE, av);
482
Ok(Column::new_scalar(series.name().clone(), scalar, 1))
483
}
484
485
#[cfg(feature = "search_sorted")]
486
pub(super) fn search_sorted_impl(
487
s: &mut [Column],
488
side: polars_ops::series::SearchSortedSide,
489
descending: bool,
490
) -> PolarsResult<Column> {
491
let sorted_array = &s[0];
492
let search_value = &s[1];
493
494
polars_ops::series::search_sorted(
495
sorted_array.as_materialized_series(),
496
search_value.as_materialized_series(),
497
side,
498
descending,
499
)
500
.map(|ca| ca.into_column())
501
}
502
503
#[cfg(feature = "sign")]
504
pub(super) fn sign(s: &Column) -> PolarsResult<Column> {
505
use num_traits::{One, Zero};
506
use polars_core::prelude::{ChunkedArray, PolarsNumericType};
507
use polars_core::with_match_physical_numeric_polars_type;
508
509
fn sign_impl<T>(ca: &ChunkedArray<T>) -> Column
510
where
511
T: PolarsNumericType,
512
ChunkedArray<T>: IntoColumn,
513
{
514
ca.apply_values(|x| {
515
if x < T::Native::zero() {
516
T::Native::zero() - T::Native::one()
517
} else if x > T::Native::zero() {
518
T::Native::one()
519
} else {
520
// Returning x here ensures we return NaN for NaN input, and
521
// maintain the sign for signed zeroes (although we don't really
522
// care about the latter).
523
x
524
}
525
})
526
.into_column()
527
}
528
529
let s = s.as_materialized_series();
530
let dtype = s.dtype();
531
use polars_core::datatypes::*;
532
match dtype {
533
_ if dtype.is_primitive_numeric() => with_match_physical_numeric_polars_type!(dtype, |$T| {
534
let ca: &ChunkedArray<$T> = s.as_ref().as_ref();
535
Ok(sign_impl(ca))
536
}),
537
DataType::Decimal(_, scale) => {
538
use polars_core::prelude::ChunkApply;
539
540
let ca = s.decimal()?;
541
let out = ca
542
.physical()
543
.apply_values(|x| polars_compute::decimal::dec128_sign(x, *scale))
544
.into_column();
545
unsafe { out.from_physical_unchecked(dtype) }
546
},
547
_ => polars_bail!(opq = sign, dtype),
548
}
549
}
550
551
pub(super) fn fill_null(s: &[Column]) -> PolarsResult<Column> {
552
match (s[0].len(), s[1].len()) {
553
(a, b) if a == b || b == 1 => {
554
let series = s[0].clone();
555
556
// Nothing to fill, so return early
557
// this is done after casting as the output type must be correct
558
if series.null_count() == 0 {
559
return Ok(series);
560
}
561
562
let fill_value = s[1].clone();
563
564
// default branch
565
fn default(series: Column, fill_value: Column) -> PolarsResult<Column> {
566
let mask = series.is_not_null();
567
series.zip_with_same_type(&mask, &fill_value)
568
}
569
570
let fill_value = if series.dtype().is_categorical() && fill_value.dtype().is_string() {
571
fill_value.cast(series.dtype()).unwrap()
572
} else {
573
fill_value
574
};
575
default(series, fill_value)
576
},
577
(1, other_len) => {
578
if s[0].has_nulls() {
579
Ok(s[1].clone())
580
} else {
581
Ok(s[0].new_from_index(0, other_len))
582
}
583
},
584
(self_len, other_len) => polars_bail!(length_mismatch = "fill_null", self_len, other_len),
585
}
586
}
587
588
pub(super) fn coalesce(s: &mut [Column]) -> PolarsResult<Column> {
589
polars_ops::series::coalesce_columns(s)
590
}
591
592
pub(super) fn drop_nans(s: Column) -> PolarsResult<Column> {
593
match s.dtype() {
594
#[cfg(feature = "dtype-f16")]
595
DataType::Float16 => {
596
let ca = s.f16()?;
597
let mask = ca.is_not_nan() | ca.is_null();
598
ca.filter(&mask).map(|ca| ca.into_column())
599
},
600
DataType::Float32 => {
601
let ca = s.f32()?;
602
let mask = ca.is_not_nan() | ca.is_null();
603
ca.filter(&mask).map(|ca| ca.into_column())
604
},
605
DataType::Float64 => {
606
let ca = s.f64()?;
607
let mask = ca.is_not_nan() | ca.is_null();
608
ca.filter(&mask).map(|ca| ca.into_column())
609
},
610
_ => Ok(s),
611
}
612
}
613
614
#[cfg(feature = "round_series")]
615
pub(super) fn clip(s: &[Column], has_min: bool, has_max: bool) -> PolarsResult<Column> {
616
match (has_min, has_max) {
617
(true, true) => polars_ops::series::clip(
618
s[0].as_materialized_series(),
619
s[1].as_materialized_series(),
620
s[2].as_materialized_series(),
621
),
622
(true, false) => polars_ops::series::clip_min(
623
s[0].as_materialized_series(),
624
s[1].as_materialized_series(),
625
),
626
(false, true) => polars_ops::series::clip_max(
627
s[0].as_materialized_series(),
628
s[1].as_materialized_series(),
629
),
630
_ => unreachable!(),
631
}
632
.map(Column::from)
633
}
634
635
#[cfg(feature = "dtype-struct")]
636
pub fn as_struct(cols: &[Column]) -> PolarsResult<Column> {
637
use polars_core::prelude::StructChunked;
638
639
let Some(fst) = cols.first() else {
640
polars_bail!(nyi = "turning no columns as_struct");
641
};
642
643
let mut min_length = usize::MAX;
644
let mut max_length = usize::MIN;
645
646
for col in cols {
647
let len = col.len();
648
649
min_length = min_length.min(len);
650
max_length = max_length.max(len);
651
}
652
653
// @NOTE: Any additional errors should be handled by the StructChunked::from_columns
654
let length = if min_length == 0 { 0 } else { max_length };
655
656
Ok(StructChunked::from_columns(fst.name().clone(), length, cols)?.into_column())
657
}
658
659
#[cfg(feature = "log")]
660
pub(super) fn entropy(s: &Column, base: f64, normalize: bool) -> PolarsResult<Column> {
661
use polars_ops::series::LogSeries;
662
663
let out = s.as_materialized_series().entropy(base, normalize)?;
664
if matches!(s.dtype(), DataType::Float32) {
665
let out = out as f32;
666
Ok(Column::new(s.name().clone(), [out]))
667
} else {
668
Ok(Column::new(s.name().clone(), [out]))
669
}
670
}
671
672
#[cfg(feature = "log")]
673
pub(super) fn log(columns: &[Column]) -> PolarsResult<Column> {
674
use polars_ops::series::LogSeries;
675
676
assert_eq!(columns.len(), 2);
677
Column::apply_broadcasting_binary_elementwise(&columns[0], &columns[1], Series::log)
678
}
679
680
#[cfg(feature = "log")]
681
pub(super) fn log1p(s: &Column) -> PolarsResult<Column> {
682
use polars_ops::series::LogSeries;
683
684
Ok(s.as_materialized_series().log1p().into())
685
}
686
687
#[cfg(feature = "log")]
688
pub(super) fn exp(s: &Column) -> PolarsResult<Column> {
689
use polars_ops::series::LogSeries;
690
691
Ok(s.as_materialized_series().exp().into())
692
}
693
694
pub(super) fn unique(s: &Column, stable: bool) -> PolarsResult<Column> {
695
if stable {
696
s.unique_stable()
697
} else {
698
s.unique()
699
}
700
}
701
702
#[cfg(feature = "fused")]
703
pub(super) fn fused(input: &[Column], op: FusedOperator) -> PolarsResult<Column> {
704
use polars_plan::plans::FusedOperator;
705
706
let s0 = &input[0];
707
let s1 = &input[1];
708
let s2 = &input[2];
709
match op {
710
FusedOperator::MultiplyAdd => Ok(polars_ops::series::fma_columns(s0, s1, s2)),
711
FusedOperator::SubMultiply => Ok(polars_ops::series::fsm_columns(s0, s1, s2)),
712
FusedOperator::MultiplySub => Ok(polars_ops::series::fms_columns(s0, s1, s2)),
713
}
714
}
715
716
pub(super) fn concat_expr(s: &[Column], rechunk: bool) -> PolarsResult<Column> {
717
let mut first = s[0].clone();
718
719
for s in &s[1..] {
720
first.append(s)?;
721
}
722
if rechunk {
723
first = first.rechunk()
724
}
725
Ok(first)
726
}
727
728
#[cfg(feature = "cov")]
729
pub(super) fn corr(s: &[Column], method: IRCorrelationMethod) -> PolarsResult<Column> {
730
use polars_plan::plans::IRCorrelationMethod;
731
732
fn covariance(s: &[Column], ddof: u8) -> PolarsResult<Column> {
733
let a = &s[0];
734
let b = &s[1];
735
let name = PlSmallStr::from_static("cov");
736
737
use polars_ops::chunked_array::cov::cov;
738
let ret = match a.dtype() {
739
#[cfg(feature = "dtype-f16")]
740
DataType::Float16 => {
741
use num_traits::AsPrimitive;
742
use polars_utils::float16::pf16;
743
744
let ret =
745
cov(a.f16().unwrap(), b.f16().unwrap(), ddof).map(AsPrimitive::<pf16>::as_);
746
return Ok(Column::new(name, &[ret]));
747
},
748
DataType::Float32 => {
749
let ret = cov(a.f32().unwrap(), b.f32().unwrap(), ddof).map(|v| v as f32);
750
return Ok(Column::new(name, &[ret]));
751
},
752
DataType::Float64 => cov(a.f64().unwrap(), b.f64().unwrap(), ddof),
753
DataType::Int32 => cov(a.i32().unwrap(), b.i32().unwrap(), ddof),
754
DataType::Int64 => cov(a.i64().unwrap(), b.i64().unwrap(), ddof),
755
DataType::UInt32 => cov(a.u32().unwrap(), b.u32().unwrap(), ddof),
756
DataType::UInt64 => cov(a.u64().unwrap(), b.u64().unwrap(), ddof),
757
_ => {
758
let a = a.cast(&DataType::Float64)?;
759
let b = b.cast(&DataType::Float64)?;
760
cov(a.f64().unwrap(), b.f64().unwrap(), ddof)
761
},
762
};
763
Ok(Column::new(name, &[ret]))
764
}
765
766
fn pearson_corr(s: &[Column]) -> PolarsResult<Column> {
767
let a = &s[0];
768
let b = &s[1];
769
let name = PlSmallStr::from_static("pearson_corr");
770
771
use polars_ops::chunked_array::cov::pearson_corr;
772
let ret = match a.dtype() {
773
#[cfg(feature = "dtype-f16")]
774
DataType::Float16 => {
775
use num_traits::AsPrimitive;
776
use polars_utils::float16::pf16;
777
778
let ret =
779
pearson_corr(a.f16().unwrap(), b.f16().unwrap()).map(AsPrimitive::<pf16>::as_);
780
return Ok(Column::new(name, &[ret]));
781
},
782
DataType::Float32 => {
783
let ret = pearson_corr(a.f32().unwrap(), b.f32().unwrap()).map(|v| v as f32);
784
return Ok(Column::new(name, &[ret]));
785
},
786
DataType::Float64 => pearson_corr(a.f64().unwrap(), b.f64().unwrap()),
787
DataType::Int32 => pearson_corr(a.i32().unwrap(), b.i32().unwrap()),
788
DataType::Int64 => pearson_corr(a.i64().unwrap(), b.i64().unwrap()),
789
DataType::UInt32 => pearson_corr(a.u32().unwrap(), b.u32().unwrap()),
790
_ => {
791
let a = a.cast(&DataType::Float64)?;
792
let b = b.cast(&DataType::Float64)?;
793
pearson_corr(a.f64().unwrap(), b.f64().unwrap())
794
},
795
};
796
Ok(Column::new(name, &[ret]))
797
}
798
799
#[cfg(all(feature = "rank", feature = "propagate_nans"))]
800
fn spearman_rank_corr(s: &[Column], propagate_nans: bool) -> PolarsResult<Column> {
801
use polars_core::utils::coalesce_nulls_columns;
802
use polars_ops::chunked_array::nan_propagating_aggregate::nan_max_s;
803
use polars_ops::series::{RankMethod, SeriesRank};
804
let a = &s[0];
805
let b = &s[1];
806
807
let (a, b) = coalesce_nulls_columns(a, b);
808
809
let name = PlSmallStr::from_static("spearman_rank_correlation");
810
if propagate_nans && a.dtype().is_float() {
811
for s in [&a, &b] {
812
let max = nan_max_s(s.as_materialized_series(), PlSmallStr::EMPTY);
813
if max.get(0).is_ok_and(|m| m.is_nan()) {
814
return Ok(Column::new(name, &[f64::NAN]));
815
}
816
}
817
}
818
819
// drop nulls so that they are excluded
820
let a = a.drop_nulls();
821
let b = b.drop_nulls();
822
823
let a_rank = a.as_materialized_series().rank(
824
RankOptions {
825
method: RankMethod::Average,
826
..Default::default()
827
},
828
None,
829
);
830
let b_rank = b.as_materialized_series().rank(
831
RankOptions {
832
method: RankMethod::Average,
833
..Default::default()
834
},
835
None,
836
);
837
838
// Because rank results in f64, we may need to restore the dtype
839
let a_rank = if a.dtype().is_float() {
840
a_rank.cast(a.dtype())?.into()
841
} else {
842
a_rank.into()
843
};
844
let b_rank = if b.dtype().is_float() {
845
b_rank.cast(b.dtype())?.into()
846
} else {
847
b_rank.into()
848
};
849
850
pearson_corr(&[a_rank, b_rank])
851
}
852
853
polars_ensure!(
854
s[0].len() == s[1].len() || s[0].len() == 1 || s[1].len() == 1,
855
length_mismatch = "corr",
856
s[0].len(),
857
s[1].len()
858
);
859
860
match method {
861
IRCorrelationMethod::Pearson => pearson_corr(s),
862
#[cfg(all(feature = "rank", feature = "propagate_nans"))]
863
IRCorrelationMethod::SpearmanRank(propagate_nans) => spearman_rank_corr(s, propagate_nans),
864
IRCorrelationMethod::Covariance(ddof) => covariance(s, ddof),
865
}
866
}
867
868
#[cfg(feature = "peaks")]
869
pub(super) fn peak_min(s: &Column) -> PolarsResult<Column> {
870
polars_ops::prelude::peaks::peak_min_max(s, &AnyValue::Int8(0), &AnyValue::Int8(0), false)
871
.map(IntoColumn::into_column)
872
}
873
874
#[cfg(feature = "peaks")]
875
pub(super) fn peak_max(s: &Column) -> PolarsResult<Column> {
876
polars_ops::prelude::peaks::peak_min_max(s, &AnyValue::Int8(0), &AnyValue::Int8(0), true)
877
.map(IntoColumn::into_column)
878
}
879
880
#[cfg(feature = "cutqcut")]
881
pub(super) fn cut(
882
s: &Column,
883
breaks: Vec<f64>,
884
labels: Option<Vec<PlSmallStr>>,
885
left_closed: bool,
886
include_breaks: bool,
887
) -> PolarsResult<Column> {
888
polars_ops::prelude::cut(
889
s.as_materialized_series(),
890
breaks,
891
labels,
892
left_closed,
893
include_breaks,
894
)
895
.map(Column::from)
896
}
897
898
#[cfg(feature = "cutqcut")]
899
pub(super) fn qcut(
900
s: &Column,
901
probs: Vec<f64>,
902
labels: Option<Vec<PlSmallStr>>,
903
left_closed: bool,
904
allow_duplicates: bool,
905
include_breaks: bool,
906
) -> PolarsResult<Column> {
907
polars_ops::prelude::qcut(
908
s.as_materialized_series(),
909
probs,
910
labels,
911
left_closed,
912
allow_duplicates,
913
include_breaks,
914
)
915
.map(Column::from)
916
}
917
918
#[cfg(feature = "ewma")]
919
pub(super) fn ewm_mean(
920
s: &Column,
921
options: polars_ops::series::EWMOptions,
922
) -> PolarsResult<Column> {
923
polars_ops::prelude::ewm_mean(s.as_materialized_series(), options).map(Column::from)
924
}
925
926
#[cfg(feature = "ewma")]
927
pub(super) fn ewm_std(s: &Column, options: polars_ops::series::EWMOptions) -> PolarsResult<Column> {
928
polars_ops::prelude::ewm_std(s.as_materialized_series(), options).map(Column::from)
929
}
930
931
#[cfg(feature = "ewma")]
932
pub(super) fn ewm_var(s: &Column, options: polars_ops::series::EWMOptions) -> PolarsResult<Column> {
933
polars_ops::prelude::ewm_var(s.as_materialized_series(), options).map(Column::from)
934
}
935
936
#[cfg(feature = "ewma_by")]
937
pub(super) fn ewm_mean_by(s: &[Column], half_life: polars_time::Duration) -> PolarsResult<Column> {
938
use polars_ops::series::SeriesMethods;
939
940
let time_zone = match s[1].dtype() {
941
DataType::Datetime(_, Some(time_zone)) => Some(time_zone),
942
_ => None,
943
};
944
polars_ensure!(!half_life.negative(), InvalidOperation: "half_life cannot be negative");
945
polars_time::prelude::ensure_is_constant_duration(half_life, time_zone, "half_life")?;
946
// `half_life` is a constant duration so we can safely use `duration_ns()`.
947
let half_life = half_life.duration_ns();
948
let values = &s[0];
949
let times = &s[1];
950
let times_is_sorted = times
951
.as_materialized_series()
952
.is_sorted(Default::default())?;
953
polars_ops::prelude::ewm_mean_by(
954
values.as_materialized_series(),
955
times.as_materialized_series(),
956
half_life,
957
times_is_sorted,
958
)
959
.map(Column::from)
960
}
961
962
pub fn row_encode(
963
c: &mut [Column],
964
dts: Vec<DataType>,
965
variant: RowEncodingVariant,
966
) -> PolarsResult<Column> {
967
assert_eq!(c.len(), dts.len());
968
969
// We need to make sure that the output types are correct or we will get wrong results or even
970
// segfaults when decoding.
971
for (dt, c) in dts.iter().zip(c.iter_mut()) {
972
if c.dtype().matches_schema_type(dt)? {
973
*c = c.cast(dt)?;
974
}
975
}
976
977
let name = PlSmallStr::from_static("row_encoded");
978
match variant {
979
RowEncodingVariant::Unordered => _get_rows_encoded_ca_unordered(name, c),
980
RowEncodingVariant::Ordered {
981
descending,
982
nulls_last,
983
broadcast_nulls,
984
} => {
985
let descending = descending.unwrap_or_else(|| vec![false; c.len()]);
986
let nulls_last = nulls_last.unwrap_or_else(|| vec![false; c.len()]);
987
let broadcast_nulls = broadcast_nulls.unwrap_or(false);
988
989
assert_eq!(c.len(), descending.len());
990
assert_eq!(c.len(), nulls_last.len());
991
992
_get_rows_encoded_ca(name, c, &descending, &nulls_last, broadcast_nulls)
993
},
994
}
995
.map(IntoColumn::into_column)
996
}
997
998
#[cfg(feature = "dtype-struct")]
999
pub fn row_decode(
1000
c: &mut [Column],
1001
fields: Vec<Field>,
1002
variant: RowEncodingVariant,
1003
) -> PolarsResult<Column> {
1004
use polars_core::prelude::row_encode::row_encoding_decode;
1005
1006
assert_eq!(c.len(), 1);
1007
let ca = c[0].binary_offset()?;
1008
1009
let mut opts = Vec::with_capacity(fields.len());
1010
match variant {
1011
RowEncodingVariant::Unordered => opts.extend(std::iter::repeat_n(
1012
RowEncodingOptions::new_unsorted(),
1013
fields.len(),
1014
)),
1015
RowEncodingVariant::Ordered {
1016
descending,
1017
nulls_last,
1018
broadcast_nulls,
1019
} => {
1020
let descending = descending.unwrap_or_else(|| vec![false; fields.len()]);
1021
let nulls_last = nulls_last.unwrap_or_else(|| vec![false; fields.len()]);
1022
if broadcast_nulls.is_some() {
1023
polars_bail!(InvalidOperation: "broadcast_nulls is not supported for row_decode.");
1024
}
1025
1026
assert_eq!(fields.len(), descending.len());
1027
assert_eq!(fields.len(), nulls_last.len());
1028
1029
opts.extend(
1030
descending
1031
.into_iter()
1032
.zip(nulls_last)
1033
.map(|(d, n)| RowEncodingOptions::new_sorted(d, n)),
1034
)
1035
},
1036
}
1037
1038
row_encoding_decode(ca, &fields, &opts).map(IntoColumn::into_column)
1039
}
1040
1041
pub fn repeat(args: &[Column]) -> PolarsResult<Column> {
1042
let c = &args[0];
1043
let n = &args[1];
1044
1045
polars_ensure!(
1046
n.dtype().is_integer(),
1047
SchemaMismatch: "expected expression of dtype 'integer', got '{}'", n.dtype()
1048
);
1049
1050
let first_value = n.get(0)?;
1051
let n = first_value.extract::<usize>().ok_or_else(
1052
|| polars_err!(ComputeError: "could not parse value '{}' as a size.", first_value),
1053
)?;
1054
1055
Ok(c.new_from_index(0, n))
1056
}
1057
1058