Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/cast.rs
6940 views
1
use polars_core::chunked_array::cast::CastOptions;
2
use polars_core::prelude::*;
3
4
use super::*;
5
use crate::expressions::{AggState, AggregationContext, PartitionedAggregation, PhysicalExpr};
6
7
pub struct CastExpr {
8
pub(crate) input: Arc<dyn PhysicalExpr>,
9
pub(crate) dtype: DataType,
10
pub(crate) expr: Expr,
11
pub(crate) options: CastOptions,
12
}
13
14
impl CastExpr {
15
fn finish(&self, input: &Column) -> PolarsResult<Column> {
16
input.cast_with_options(&self.dtype, self.options)
17
}
18
}
19
20
impl PhysicalExpr for CastExpr {
21
fn as_expression(&self) -> Option<&Expr> {
22
Some(&self.expr)
23
}
24
25
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
26
let column = self.input.evaluate(df, state)?;
27
self.finish(&column)
28
}
29
30
#[allow(clippy::ptr_arg)]
31
fn evaluate_on_groups<'a>(
32
&self,
33
df: &DataFrame,
34
groups: &'a GroupPositions,
35
state: &ExecutionState,
36
) -> PolarsResult<AggregationContext<'a>> {
37
let mut ac = self.input.evaluate_on_groups(df, groups, state)?;
38
39
match ac.agg_state() {
40
// this will not explode and potentially increase memory due to overlapping groups
41
AggState::AggregatedList(s) => {
42
let ca = s.list().unwrap();
43
let casted = ca.apply_to_inner(&|s| {
44
self.finish(&s.into_column())
45
.map(|c| c.take_materialized_series())
46
})?;
47
ac.with_values(casted.into_column(), true, None)?;
48
},
49
AggState::AggregatedScalar(s) => {
50
let s = self.finish(&s.clone().into_column())?;
51
if ac.is_literal() {
52
ac.with_literal(s);
53
} else {
54
ac.with_values(s, true, None)?;
55
}
56
},
57
AggState::NotAggregated(_) => {
58
if match self.options {
59
CastOptions::NonStrict | CastOptions::Overflowing => true,
60
CastOptions::Strict => ac.original_len,
61
} {
62
// before we flatten, make sure that groups are updated
63
ac.groups();
64
65
let s = ac.flat_naive();
66
let s = self.finish(&s.as_ref().clone().into_column())?;
67
68
ac.with_values(s, false, None)?;
69
} else {
70
// We need to perform aggregation only for strict mode, since if this is not done,
71
// filtered-out values may incorrectly cause a cast error.
72
let s = ac.aggregated();
73
let ca = s.list().unwrap();
74
let casted = ca.apply_to_inner(&|s| {
75
self.finish(&s.into_column())
76
.map(|c| c.take_materialized_series())
77
})?;
78
ac.with_values(casted.into_column(), true, None)?;
79
}
80
},
81
82
AggState::LiteralScalar(s) => {
83
let s = self.finish(s)?;
84
ac.with_literal(s);
85
},
86
}
87
88
Ok(ac)
89
}
90
91
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
92
self.input.to_field(input_schema).map(|mut fld| {
93
fld.coerce(self.dtype.clone());
94
fld
95
})
96
}
97
98
fn is_scalar(&self) -> bool {
99
self.input.is_scalar()
100
}
101
102
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
103
Some(self)
104
}
105
}
106
107
impl PartitionedAggregation for CastExpr {
108
fn evaluate_partitioned(
109
&self,
110
df: &DataFrame,
111
groups: &GroupPositions,
112
state: &ExecutionState,
113
) -> PolarsResult<Column> {
114
let e = self.input.as_partitioned_aggregator().unwrap();
115
self.finish(&e.evaluate_partitioned(df, groups, state)?)
116
}
117
118
fn finalize(
119
&self,
120
partitioned: Column,
121
groups: &GroupPositions,
122
state: &ExecutionState,
123
) -> PolarsResult<Column> {
124
let agg = self.input.as_partitioned_aggregator().unwrap();
125
agg.finalize(partitioned, groups, state)
126
}
127
}
128
129