Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/column.rs
8416 views
1
use std::borrow::Cow;
2
3
use polars_core::prelude::*;
4
use polars_plan::constants::CSE_REPLACED;
5
6
use super::*;
7
use crate::expressions::{AggregationContext, PhysicalExpr};
8
9
#[derive(Debug)]
10
pub struct ColumnExpr {
11
name: PlSmallStr,
12
expr: Expr,
13
schema: SchemaRef,
14
}
15
16
impl ColumnExpr {
17
pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self {
18
Self { name, expr, schema }
19
}
20
}
21
22
impl ColumnExpr {
23
fn check_external_context(
24
&self,
25
out: PolarsResult<Column>,
26
state: &ExecutionState,
27
) -> PolarsResult<Column> {
28
match out {
29
Ok(col) => Ok(col),
30
Err(e) => {
31
if state.ext_contexts.is_empty() {
32
Err(e)
33
} else {
34
for df in state.ext_contexts.as_ref() {
35
let out = df.column(&self.name);
36
if out.is_ok() {
37
return out.cloned();
38
}
39
}
40
Err(e)
41
}
42
},
43
}
44
}
45
46
fn process_by_idx(
47
&self,
48
out: &Column,
49
_state: &ExecutionState,
50
_schema: &Schema,
51
df: &DataFrame,
52
check_state_schema: bool,
53
) -> PolarsResult<Column> {
54
if out.name() != &*self.name {
55
if check_state_schema {
56
if let Some(schema) = _state.get_schema() {
57
return self.process_from_state_schema(df, _state, &schema);
58
}
59
}
60
61
df.column(&self.name).cloned()
62
} else {
63
Ok(out.clone())
64
}
65
}
66
fn process_by_linear_search(
67
&self,
68
df: &DataFrame,
69
_state: &ExecutionState,
70
_panic_during_test: bool,
71
) -> PolarsResult<Column> {
72
df.column(&self.name).cloned()
73
}
74
75
fn process_from_state_schema(
76
&self,
77
df: &DataFrame,
78
state: &ExecutionState,
79
schema: &Schema,
80
) -> PolarsResult<Column> {
81
match schema.get_full(&self.name) {
82
None => self.process_by_linear_search(df, state, true),
83
Some((idx, _, _)) => match df.columns().get(idx) {
84
Some(out) => self.process_by_idx(out, state, schema, df, false),
85
None => self.process_by_linear_search(df, state, true),
86
},
87
}
88
}
89
90
fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult<Column> {
91
// The CSE columns are added on the rhs.
92
let offset = schema.len();
93
let columns = &df.columns()[offset..];
94
// Linear search will be relatively cheap as we only search the CSE columns.
95
Ok(columns
96
.iter()
97
.find(|s| s.name() == &self.name)
98
.unwrap()
99
.clone())
100
}
101
}
102
103
impl PhysicalExpr for ColumnExpr {
104
fn as_expression(&self) -> Option<&Expr> {
105
Some(&self.expr)
106
}
107
108
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
109
let out = match self.schema.get_full(&self.name) {
110
Some((idx, _, _)) => {
111
// check if the schema was correct
112
// if not do O(n) search
113
match df.columns().get(idx) {
114
Some(out) => self.process_by_idx(out, state, &self.schema, df, true),
115
None => {
116
// partitioned group_by special case
117
if let Some(schema) = state.get_schema() {
118
self.process_from_state_schema(df, state, &schema)
119
} else {
120
self.process_by_linear_search(df, state, true)
121
}
122
},
123
}
124
},
125
// in the future we will throw an error here
126
// now we do a linear search first as the lazy reported schema may still be incorrect
127
// in debug builds we panic so that it can be fixed when occurring
128
None => {
129
if self.name.starts_with(CSE_REPLACED) {
130
return self.process_cse(df, &self.schema);
131
}
132
self.process_by_linear_search(df, state, true)
133
},
134
};
135
self.check_external_context(out, state)
136
}
137
138
#[allow(clippy::ptr_arg)]
139
fn evaluate_on_groups<'a>(
140
&self,
141
df: &DataFrame,
142
groups: &'a GroupPositions,
143
state: &ExecutionState,
144
) -> PolarsResult<AggregationContext<'a>> {
145
let c = self.evaluate(df, state)?;
146
Ok(AggregationContext::new(c, Cow::Borrowed(groups), false))
147
}
148
149
fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
150
input_schema.get_field(&self.name).ok_or_else(|| {
151
polars_err!(
152
ColumnNotFound: "could not find {:?} in schema: {:?}", self.name, &input_schema
153
)
154
})
155
}
156
fn is_scalar(&self) -> bool {
157
false
158
}
159
160
fn as_column(&self) -> Option<PlSmallStr> {
161
Some(self.name.clone())
162
}
163
}
164
165