Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
8327 views
1
use polars_compute::rolling::QuantileMethod;
2
use strum_macros::IntoStaticStr;
3
4
use super::*;
5
6
#[cfg(feature = "python")]
7
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
9
#[derive(Clone)]
10
pub struct OpaquePythonUdf {
11
pub function: PythonFunction,
12
pub schema: Option<SchemaRef>,
13
/// allow predicate pushdown optimizations
14
pub predicate_pd: bool,
15
/// allow projection pushdown optimizations
16
pub projection_pd: bool,
17
pub streamable: bool,
18
pub validate_output: bool,
19
}
20
21
// Except for Opaque functions, this only has the DSL name of the function.
22
#[derive(Clone, IntoStaticStr)]
23
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
24
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
25
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
26
pub enum DslFunction {
27
RowIndex {
28
name: PlSmallStr,
29
offset: Option<IdxSize>,
30
},
31
// This is both in DSL and IR because we want to be able to serialize it.
32
#[cfg(feature = "python")]
33
OpaquePython(OpaquePythonUdf),
34
Explode {
35
columns: Selector,
36
options: ExplodeOptions,
37
allow_empty: bool,
38
},
39
#[cfg(feature = "pivot")]
40
Unpivot {
41
args: UnpivotArgsDSL,
42
},
43
Rename {
44
existing: Arc<[PlSmallStr]>,
45
new: Arc<[PlSmallStr]>,
46
strict: bool,
47
},
48
Unnest {
49
columns: Selector,
50
separator: Option<PlSmallStr>,
51
},
52
Stats(StatsFunction),
53
/// FillValue
54
FillNan(Expr),
55
// Function that is already converted to IR.
56
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
57
FunctionIR(FunctionIR),
58
Hint(HintIR),
59
}
60
61
#[derive(Clone)]
62
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
63
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
64
pub enum StatsFunction {
65
Var {
66
ddof: u8,
67
},
68
Std {
69
ddof: u8,
70
},
71
Quantile {
72
quantile: Expr,
73
method: QuantileMethod,
74
},
75
Median,
76
Mean,
77
Sum,
78
Min,
79
Max,
80
}
81
82
pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(
83
columns: I,
84
input_schema: &Schema,
85
operation_name: &str,
86
) -> PolarsResult<()> {
87
let columns = columns.into_iter();
88
for c in columns {
89
polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)
90
}
91
Ok(())
92
}
93
94
impl DslFunction {
95
pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {
96
let function = match self {
97
#[cfg(feature = "pivot")]
98
DslFunction::Unpivot { args } => {
99
polars_ensure!(
100
!input_schema.contains("variable"),
101
Duplicate: "duplicate column name variable"
102
);
103
104
polars_ensure!(
105
!input_schema.contains("value"),
106
Duplicate: "duplicate column name value"
107
);
108
109
let on = match args.on {
110
None => None,
111
Some(on) => Some(
112
on.into_columns(input_schema, &Default::default())?
113
.into_iter()
114
.collect::<Vec<_>>(),
115
),
116
};
117
118
let index = args
119
.index
120
.into_columns(input_schema, &Default::default())?
121
.into_vec();
122
123
let args = UnpivotArgsIR::new(
124
input_schema.iter().map(|(name, _)| name.clone()).collect(),
125
on,
126
index,
127
args.value_name,
128
args.variable_name,
129
);
130
131
FunctionIR::Unpivot {
132
args: Arc::new(args),
133
schema: Default::default(),
134
}
135
},
136
DslFunction::FunctionIR(func) => func,
137
DslFunction::RowIndex { name, offset } => {
138
polars_ensure!(
139
!input_schema.contains(&name),
140
Duplicate: "duplicate column name {name}"
141
);
142
143
FunctionIR::RowIndex {
144
name,
145
offset,
146
schema: Default::default(),
147
}
148
},
149
DslFunction::Unnest { columns, separator } => {
150
let columns = columns.into_columns(input_schema, &Default::default())?;
151
let columns: Arc<[PlSmallStr]> = columns.into_iter().collect();
152
for col in columns.iter() {
153
let dtype = input_schema.try_get(col.as_str())?;
154
polars_ensure!(
155
dtype.is_struct(),
156
InvalidOperation: "invalid dtype: expected 'Struct', got '{:?}' for '{}'", dtype, col
157
);
158
}
159
FunctionIR::Unnest { columns, separator }
160
},
161
DslFunction::Hint(h) => FunctionIR::Hint(h),
162
#[cfg(feature = "python")]
163
DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),
164
DslFunction::Stats(_)
165
| DslFunction::FillNan(_)
166
| DslFunction::Rename { .. }
167
| DslFunction::Explode { .. } => {
168
// We should not reach this.
169
panic!("impl error")
170
},
171
};
172
Ok(function)
173
}
174
}
175
176
impl Debug for DslFunction {
177
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
178
write!(f, "{self}")
179
}
180
}
181
182
impl Display for DslFunction {
183
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
184
use DslFunction::*;
185
match self {
186
FunctionIR(inner) => write!(f, "{inner}"),
187
v => {
188
let s: &str = v.into();
189
write!(f, "{s}")
190
},
191
}
192
}
193
}
194
195
impl From<FunctionIR> for DslFunction {
196
fn from(value: FunctionIR) -> Self {
197
DslFunction::FunctionIR(value)
198
}
199
}
200
201