Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/column.rs
6940 views
1
use std::borrow::Cow;
2
3
use polars_core::prelude::*;
4
use polars_plan::constants::CSE_REPLACED;
5
6
use super::*;
7
use crate::expressions::{AggregationContext, PartitionedAggregation, PhysicalExpr};
8
9
pub struct ColumnExpr {
10
name: PlSmallStr,
11
expr: Expr,
12
schema: SchemaRef,
13
}
14
15
impl ColumnExpr {
16
pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self {
17
Self { name, expr, schema }
18
}
19
}
20
21
impl ColumnExpr {
22
fn check_external_context(
23
&self,
24
out: PolarsResult<Column>,
25
state: &ExecutionState,
26
) -> PolarsResult<Column> {
27
match out {
28
Ok(col) => Ok(col),
29
Err(e) => {
30
if state.ext_contexts.is_empty() {
31
Err(e)
32
} else {
33
for df in state.ext_contexts.as_ref() {
34
let out = df.column(&self.name);
35
if out.is_ok() {
36
return out.cloned();
37
}
38
}
39
Err(e)
40
}
41
},
42
}
43
}
44
45
fn process_by_idx(
46
&self,
47
out: &Column,
48
_state: &ExecutionState,
49
_schema: &Schema,
50
df: &DataFrame,
51
check_state_schema: bool,
52
) -> PolarsResult<Column> {
53
if out.name() != &*self.name {
54
if check_state_schema {
55
if let Some(schema) = _state.get_schema() {
56
return self.process_from_state_schema(df, _state, &schema);
57
}
58
}
59
60
df.column(&self.name).cloned()
61
} else {
62
Ok(out.clone())
63
}
64
}
65
fn process_by_linear_search(
66
&self,
67
df: &DataFrame,
68
_state: &ExecutionState,
69
_panic_during_test: bool,
70
) -> PolarsResult<Column> {
71
df.column(&self.name).cloned()
72
}
73
74
fn process_from_state_schema(
75
&self,
76
df: &DataFrame,
77
state: &ExecutionState,
78
schema: &Schema,
79
) -> PolarsResult<Column> {
80
match schema.get_full(&self.name) {
81
None => self.process_by_linear_search(df, state, true),
82
Some((idx, _, _)) => match df.get_columns().get(idx) {
83
Some(out) => self.process_by_idx(out, state, schema, df, false),
84
None => self.process_by_linear_search(df, state, true),
85
},
86
}
87
}
88
89
fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult<Column> {
90
// The CSE columns are added on the rhs.
91
let offset = schema.len();
92
let columns = &df.get_columns()[offset..];
93
// Linear search will be relatively cheap as we only search the CSE columns.
94
Ok(columns
95
.iter()
96
.find(|s| s.name() == &self.name)
97
.unwrap()
98
.clone())
99
}
100
}
101
102
impl PhysicalExpr for ColumnExpr {
103
fn as_expression(&self) -> Option<&Expr> {
104
Some(&self.expr)
105
}
106
107
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
108
let out = match self.schema.get_full(&self.name) {
109
Some((idx, _, _)) => {
110
// check if the schema was correct
111
// if not do O(n) search
112
match df.get_columns().get(idx) {
113
Some(out) => self.process_by_idx(out, state, &self.schema, df, true),
114
None => {
115
// partitioned group_by special case
116
if let Some(schema) = state.get_schema() {
117
self.process_from_state_schema(df, state, &schema)
118
} else {
119
self.process_by_linear_search(df, state, true)
120
}
121
},
122
}
123
},
124
// in the future we will throw an error here
125
// now we do a linear search first as the lazy reported schema may still be incorrect
126
// in debug builds we panic so that it can be fixed when occurring
127
None => {
128
if self.name.starts_with(CSE_REPLACED) {
129
return self.process_cse(df, &self.schema);
130
}
131
self.process_by_linear_search(df, state, true)
132
},
133
};
134
self.check_external_context(out, state)
135
}
136
137
#[allow(clippy::ptr_arg)]
138
fn evaluate_on_groups<'a>(
139
&self,
140
df: &DataFrame,
141
groups: &'a GroupPositions,
142
state: &ExecutionState,
143
) -> PolarsResult<AggregationContext<'a>> {
144
let c = self.evaluate(df, state)?;
145
Ok(AggregationContext::new(c, Cow::Borrowed(groups), false))
146
}
147
148
fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {
149
Some(self)
150
}
151
152
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
153
input_schema.get_field(&self.name).ok_or_else(|| {
154
polars_err!(
155
ColumnNotFound: "could not find {:?} in schema: {:?}", self.name, &input_schema
156
)
157
})
158
}
159
fn is_scalar(&self) -> bool {
160
false
161
}
162
}
163
164
impl PartitionedAggregation for ColumnExpr {
165
fn evaluate_partitioned(
166
&self,
167
df: &DataFrame,
168
_groups: &GroupPositions,
169
state: &ExecutionState,
170
) -> PolarsResult<Column> {
171
self.evaluate(df, state)
172
}
173
174
fn finalize(
175
&self,
176
partitioned: Column,
177
_groups: &GroupPositions,
178
_state: &ExecutionState,
179
) -> PolarsResult<Column> {
180
Ok(partitioned)
181
}
182
}
183
184