Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
8446 views
1
use polars_compute::rolling::QuantileMethod;
2
use strum_macros::IntoStaticStr;
3
4
use super::*;
5
6
#[cfg(feature = "python")]
7
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
9
#[derive(Clone)]
10
pub struct OpaquePythonUdf {
11
pub function: PythonFunction,
12
pub schema: Option<SchemaRef>,
13
/// allow predicate pushdown optimizations
14
pub predicate_pd: bool,
15
/// allow projection pushdown optimizations
16
pub projection_pd: bool,
17
pub streamable: bool,
18
pub validate_output: bool,
19
}
20
21
// Except for Opaque functions, this only has the DSL name of the function.
22
#[derive(Clone, IntoStaticStr)]
23
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
24
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
25
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
26
pub enum DslFunction {
27
RowIndex {
28
name: PlSmallStr,
29
offset: Option<IdxSize>,
30
},
31
// This is both in DSL and IR because we want to be able to serialize it.
32
#[cfg(feature = "python")]
33
OpaquePython(OpaquePythonUdf),
34
Explode {
35
columns: Selector,
36
options: ExplodeOptions,
37
allow_empty: bool,
38
},
39
#[cfg(feature = "pivot")]
40
Unpivot {
41
args: UnpivotArgsDSL,
42
},
43
Rename {
44
existing: Arc<[PlSmallStr]>,
45
new: Arc<[PlSmallStr]>,
46
strict: bool,
47
},
48
Unnest {
49
columns: Selector,
50
separator: Option<PlSmallStr>,
51
},
52
Stats(StatsFunction),
53
/// FillValue
54
FillNan(Expr),
55
// Function that is already converted to IR.
56
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
57
FunctionIR(FunctionIR),
58
Hint(HintIR),
59
}
60
61
#[derive(Clone)]
62
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
63
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
64
pub enum StatsFunction {
65
Var {
66
ddof: u8,
67
},
68
Std {
69
ddof: u8,
70
},
71
Quantile {
72
quantile: Expr,
73
method: QuantileMethod,
74
},
75
Median,
76
Mean,
77
Sum,
78
Min,
79
Max,
80
}
81
82
pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(
83
columns: I,
84
input_schema: &Schema,
85
operation_name: &str,
86
) -> PolarsResult<()> {
87
let columns = columns.into_iter();
88
for c in columns {
89
polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)
90
}
91
Ok(())
92
}
93
94
impl DslFunction {
95
pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {
96
let function = match self {
97
#[cfg(feature = "pivot")]
98
DslFunction::Unpivot { args } => {
99
let variable_name = args.variable_name.as_deref().unwrap_or("variable");
100
polars_ensure!(
101
!input_schema.contains(variable_name),
102
Duplicate: "duplicate column name '{variable_name}'"
103
);
104
105
let value_name = args.value_name.as_deref().unwrap_or("value");
106
polars_ensure!(
107
!input_schema.contains(value_name),
108
Duplicate: "duplicate column name '{value_name}'"
109
);
110
111
let on = match args.on {
112
None => None,
113
Some(on) => Some(
114
on.into_columns(input_schema, &Default::default())?
115
.into_iter()
116
.collect::<Vec<_>>(),
117
),
118
};
119
120
let index = args
121
.index
122
.into_columns(input_schema, &Default::default())?
123
.into_vec();
124
125
let args = UnpivotArgsIR::new(
126
input_schema.iter().map(|(name, _)| name.clone()).collect(),
127
on,
128
index,
129
args.value_name,
130
args.variable_name,
131
);
132
133
FunctionIR::Unpivot {
134
args: Arc::new(args),
135
schema: Default::default(),
136
}
137
},
138
DslFunction::FunctionIR(func) => func,
139
DslFunction::RowIndex { name, offset } => {
140
polars_ensure!(
141
!input_schema.contains(&name),
142
Duplicate: "duplicate column name {name}"
143
);
144
145
FunctionIR::RowIndex {
146
name,
147
offset,
148
schema: Default::default(),
149
}
150
},
151
DslFunction::Unnest { columns, separator } => {
152
let columns = columns.into_columns(input_schema, &Default::default())?;
153
let columns: Arc<[PlSmallStr]> = columns.into_iter().collect();
154
for col in columns.iter() {
155
let dtype = input_schema.try_get(col.as_str())?;
156
polars_ensure!(
157
dtype.is_struct(),
158
InvalidOperation: "invalid dtype: expected 'Struct', got '{:?}' for '{}'", dtype, col
159
);
160
}
161
FunctionIR::Unnest { columns, separator }
162
},
163
DslFunction::Hint(h) => FunctionIR::Hint(h),
164
#[cfg(feature = "python")]
165
DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),
166
DslFunction::Stats(_)
167
| DslFunction::FillNan(_)
168
| DslFunction::Rename { .. }
169
| DslFunction::Explode { .. } => {
170
// We should not reach this.
171
panic!("impl error")
172
},
173
};
174
Ok(function)
175
}
176
}
177
178
impl Debug for DslFunction {
179
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180
write!(f, "{self}")
181
}
182
}
183
184
impl Display for DslFunction {
185
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
186
use DslFunction::*;
187
match self {
188
FunctionIR(inner) => write!(f, "{inner}"),
189
v => {
190
let s: &str = v.into();
191
write!(f, "{s}")
192
},
193
}
194
}
195
}
196
197
impl From<FunctionIR> for DslFunction {
198
fn from(value: FunctionIR) -> Self {
199
DslFunction::FunctionIR(value)
200
}
201
}
202
203