Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/mod.rs
6940 views
1
mod count;
2
mod dsl;
3
#[cfg(feature = "python")]
4
mod python_udf;
5
mod schema;
6
7
use std::borrow::Cow;
8
use std::fmt::{Debug, Display, Formatter};
9
use std::hash::{Hash, Hasher};
10
use std::sync::Arc;
11
12
pub use dsl::*;
13
use polars_core::error::feature_gated;
14
use polars_core::prelude::*;
15
use polars_io::cloud::CloudOptions;
16
use polars_utils::pl_str::PlSmallStr;
17
#[cfg(feature = "serde")]
18
use serde::{Deserialize, Serialize};
19
use strum_macros::IntoStaticStr;
20
21
#[cfg(feature = "python")]
22
use crate::dsl::python_dsl::PythonFunction;
23
use crate::plans::ir::ScanSourcesDisplay;
24
use crate::prelude::*;
25
26
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
27
#[derive(Clone, IntoStaticStr)]
28
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
29
pub enum FunctionIR {
30
RowIndex {
31
name: PlSmallStr,
32
offset: Option<IdxSize>,
33
// Might be cached.
34
#[cfg_attr(feature = "ir_serde", serde(skip))]
35
schema: CachedSchema,
36
},
37
#[cfg(feature = "python")]
38
OpaquePython(OpaquePythonUdf),
39
40
FastCount {
41
sources: ScanSources,
42
scan_type: Box<FileScanIR>,
43
cloud_options: Option<CloudOptions>,
44
alias: Option<PlSmallStr>,
45
},
46
47
Unnest {
48
columns: Arc<[PlSmallStr]>,
49
},
50
Rechunk,
51
Explode {
52
columns: Arc<[PlSmallStr]>,
53
#[cfg_attr(feature = "ir_serde", serde(skip))]
54
schema: CachedSchema,
55
},
56
#[cfg(feature = "pivot")]
57
Unpivot {
58
args: Arc<UnpivotArgsIR>,
59
#[cfg_attr(feature = "ir_serde", serde(skip))]
60
schema: CachedSchema,
61
},
62
#[cfg_attr(feature = "ir_serde", serde(skip))]
63
Opaque {
64
function: Arc<dyn DataFrameUdf>,
65
schema: Option<Arc<dyn UdfSchema>>,
66
/// allow predicate pushdown optimizations
67
predicate_pd: bool,
68
/// allow projection pushdown optimizations
69
projection_pd: bool,
70
streamable: bool,
71
// used for formatting
72
fmt_str: PlSmallStr,
73
},
74
}
75
76
impl Eq for FunctionIR {}
77
78
impl PartialEq for FunctionIR {
79
fn eq(&self, other: &Self) -> bool {
80
use FunctionIR::*;
81
match (self, other) {
82
(Rechunk, Rechunk) => true,
83
(
84
FastCount {
85
sources: srcs_l, ..
86
},
87
FastCount {
88
sources: srcs_r, ..
89
},
90
) => srcs_l == srcs_r,
91
(Explode { columns: l, .. }, Explode { columns: r, .. }) => l == r,
92
#[cfg(feature = "pivot")]
93
(Unpivot { args: l, .. }, Unpivot { args: r, .. }) => l == r,
94
(RowIndex { name: l, .. }, RowIndex { name: r, .. }) => l == r,
95
_ => false,
96
}
97
}
98
}
99
100
impl Hash for FunctionIR {
101
fn hash<H: Hasher>(&self, state: &mut H) {
102
std::mem::discriminant(self).hash(state);
103
match self {
104
#[cfg(feature = "python")]
105
FunctionIR::OpaquePython { .. } => {},
106
FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),
107
FunctionIR::FastCount {
108
sources,
109
scan_type,
110
cloud_options,
111
alias,
112
} => {
113
sources.hash(state);
114
scan_type.hash(state);
115
cloud_options.hash(state);
116
alias.hash(state);
117
},
118
FunctionIR::Unnest { columns } => columns.hash(state),
119
FunctionIR::Rechunk => {},
120
FunctionIR::Explode { columns, schema: _ } => columns.hash(state),
121
#[cfg(feature = "pivot")]
122
FunctionIR::Unpivot { args, schema: _ } => args.hash(state),
123
FunctionIR::RowIndex {
124
name,
125
schema: _,
126
offset,
127
} => {
128
name.hash(state);
129
offset.hash(state);
130
},
131
}
132
}
133
}
134
135
impl FunctionIR {
136
/// Whether this function can run on batches of data at a time.
137
pub fn is_streamable(&self) -> bool {
138
use FunctionIR::*;
139
match self {
140
Rechunk => false,
141
FastCount { .. } | Unnest { .. } | Explode { .. } => true,
142
#[cfg(feature = "pivot")]
143
Unpivot { .. } => true,
144
Opaque { streamable, .. } => *streamable,
145
#[cfg(feature = "python")]
146
OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,
147
RowIndex { .. } => false,
148
}
149
}
150
151
/// Whether this function will increase the number of rows
152
pub fn expands_rows(&self) -> bool {
153
use FunctionIR::*;
154
match self {
155
#[cfg(feature = "pivot")]
156
Unpivot { .. } => true,
157
Explode { .. } => true,
158
_ => false,
159
}
160
}
161
162
pub(crate) fn allow_predicate_pd(&self) -> bool {
163
use FunctionIR::*;
164
match self {
165
Opaque { predicate_pd, .. } => *predicate_pd,
166
#[cfg(feature = "python")]
167
OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,
168
#[cfg(feature = "pivot")]
169
Unpivot { .. } => true,
170
Rechunk | Unnest { .. } | Explode { .. } => true,
171
RowIndex { .. } | FastCount { .. } => false,
172
}
173
}
174
175
pub(crate) fn allow_projection_pd(&self) -> bool {
176
use FunctionIR::*;
177
match self {
178
Opaque { projection_pd, .. } => *projection_pd,
179
#[cfg(feature = "python")]
180
OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,
181
Rechunk | FastCount { .. } | Unnest { .. } | Explode { .. } => true,
182
#[cfg(feature = "pivot")]
183
Unpivot { .. } => true,
184
RowIndex { .. } => true,
185
}
186
}
187
188
pub(crate) fn additional_projection_pd_columns(&self) -> Cow<'_, [PlSmallStr]> {
189
use FunctionIR::*;
190
match self {
191
Unnest { columns } => Cow::Borrowed(columns.as_ref()),
192
Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),
193
_ => Cow::Borrowed(&[]),
194
}
195
}
196
197
pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {
198
use FunctionIR::*;
199
match self {
200
Opaque { function, .. } => function.call_udf(df),
201
#[cfg(feature = "python")]
202
OpaquePython(OpaquePythonUdf {
203
function,
204
validate_output,
205
schema,
206
..
207
}) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),
208
FastCount {
209
sources,
210
scan_type,
211
cloud_options,
212
alias,
213
} => count::count_rows(sources, scan_type, cloud_options.as_ref(), alias.clone()),
214
Rechunk => {
215
df.as_single_chunk_par();
216
Ok(df)
217
},
218
Unnest { columns: _columns } => {
219
feature_gated!("dtype-struct", df.unnest(_columns.iter().cloned()))
220
},
221
Explode { columns, .. } => df.explode(columns.iter().cloned()),
222
#[cfg(feature = "pivot")]
223
Unpivot { args, .. } => {
224
use polars_ops::pivot::UnpivotDF;
225
let args = (**args).clone();
226
df.unpivot2(args)
227
},
228
RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),
229
}
230
}
231
}
232
233
impl Debug for FunctionIR {
234
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
235
write!(f, "{self}")
236
}
237
}
238
239
impl Display for FunctionIR {
240
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241
use FunctionIR::*;
242
match self {
243
Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
244
Unnest { columns } => {
245
write!(f, "UNNEST by:")?;
246
let columns = columns.as_ref();
247
fmt_column_delimited(f, columns, "[", "]")
248
},
249
FastCount {
250
sources,
251
scan_type,
252
cloud_options: _,
253
alias,
254
} => {
255
let scan_type: &str = (&(**scan_type)).into();
256
let default_column_name = PlSmallStr::from_static(crate::constants::LEN);
257
let alias = alias.as_ref().unwrap_or(&default_column_name);
258
259
write!(
260
f,
261
"FAST COUNT ({scan_type}) {} as \"{alias}\"",
262
ScanSourcesDisplay(sources)
263
)
264
},
265
v => {
266
let s: &str = v.into();
267
write!(f, "{s}")
268
},
269
}
270
}
271
}
272
273