Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
6940 views
1
use polars_compute::rolling::QuantileMethod;
2
use strum_macros::IntoStaticStr;
3
4
use super::*;
5
6
#[cfg(feature = "python")]
7
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
9
#[derive(Clone)]
10
pub struct OpaquePythonUdf {
11
pub function: PythonFunction,
12
pub schema: Option<SchemaRef>,
13
/// allow predicate pushdown optimizations
14
pub predicate_pd: bool,
15
/// allow projection pushdown optimizations
16
pub projection_pd: bool,
17
pub streamable: bool,
18
pub validate_output: bool,
19
}
20
21
// Except for Opaque functions, this only has the DSL name of the function.
22
#[derive(Clone, IntoStaticStr)]
23
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
24
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
25
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
26
pub enum DslFunction {
27
RowIndex {
28
name: PlSmallStr,
29
offset: Option<IdxSize>,
30
},
31
// This is both in DSL and IR because we want to be able to serialize it.
32
#[cfg(feature = "python")]
33
OpaquePython(OpaquePythonUdf),
34
Explode {
35
columns: Selector,
36
allow_empty: bool,
37
},
38
#[cfg(feature = "pivot")]
39
Unpivot {
40
args: UnpivotArgsDSL,
41
},
42
Rename {
43
existing: Arc<[PlSmallStr]>,
44
new: Arc<[PlSmallStr]>,
45
strict: bool,
46
},
47
Unnest(Selector),
48
Stats(StatsFunction),
49
/// FillValue
50
FillNan(Expr),
51
// Function that is already converted to IR.
52
#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
53
FunctionIR(FunctionIR),
54
}
55
56
#[derive(Clone)]
57
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
58
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
59
pub enum StatsFunction {
60
Var {
61
ddof: u8,
62
},
63
Std {
64
ddof: u8,
65
},
66
Quantile {
67
quantile: Expr,
68
method: QuantileMethod,
69
},
70
Median,
71
Mean,
72
Sum,
73
Min,
74
Max,
75
}
76
77
pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(
78
columns: I,
79
input_schema: &Schema,
80
operation_name: &str,
81
) -> PolarsResult<()> {
82
let columns = columns.into_iter();
83
for c in columns {
84
polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)
85
}
86
Ok(())
87
}
88
89
impl DslFunction {
90
pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {
91
let function = match self {
92
#[cfg(feature = "pivot")]
93
DslFunction::Unpivot { args } => {
94
let on = args.on.into_columns(input_schema, &Default::default())?;
95
let index = args.index.into_columns(input_schema, &Default::default())?;
96
97
let args = UnpivotArgsIR {
98
on: on.into_iter().collect(),
99
index: index.into_iter().collect(),
100
variable_name: args.variable_name.clone(),
101
value_name: args.value_name,
102
};
103
104
FunctionIR::Unpivot {
105
args: Arc::new(args),
106
schema: Default::default(),
107
}
108
},
109
DslFunction::FunctionIR(func) => func,
110
DslFunction::RowIndex { name, offset } => FunctionIR::RowIndex {
111
name,
112
offset,
113
schema: Default::default(),
114
},
115
DslFunction::Unnest(selector) => {
116
let columns = selector.into_columns(input_schema, &Default::default())?;
117
let columns = columns.into_iter().collect();
118
FunctionIR::Unnest { columns }
119
},
120
#[cfg(feature = "python")]
121
DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),
122
DslFunction::Stats(_)
123
| DslFunction::FillNan(_)
124
| DslFunction::Rename { .. }
125
| DslFunction::Explode { .. } => {
126
// We should not reach this.
127
panic!("impl error")
128
},
129
};
130
Ok(function)
131
}
132
}
133
134
impl Debug for DslFunction {
135
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136
write!(f, "{self}")
137
}
138
}
139
140
impl Display for DslFunction {
141
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
142
use DslFunction::*;
143
match self {
144
FunctionIR(inner) => write!(f, "{inner}"),
145
v => {
146
let s: &str = v.into();
147
write!(f, "{s}")
148
},
149
}
150
}
151
}
152
153
impl From<FunctionIR> for DslFunction {
154
fn from(value: FunctionIR) -> Self {
155
DslFunction::FunctionIR(value)
156
}
157
}
158
159