Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/functions/mod.rs
8446 views
1
mod count;
2
mod dsl;
3
mod hint;
4
#[cfg(feature = "python")]
5
mod python_udf;
6
mod schema;
7
8
use std::borrow::Cow;
9
use std::fmt::{Debug, Display, Formatter};
10
use std::hash::{Hash, Hasher};
11
use std::sync::Arc;
12
13
pub use dsl::*;
14
pub use hint::*;
15
use polars_core::error::feature_gated;
16
use polars_core::prelude::*;
17
use polars_core::series::IsSorted;
18
use polars_utils::pl_str::PlSmallStr;
19
#[cfg(feature = "serde")]
20
use serde::{Deserialize, Serialize};
21
use strum_macros::IntoStaticStr;
22
23
#[cfg(feature = "python")]
24
use crate::dsl::python_dsl::PythonFunction;
25
use crate::plans::ir::ScanSourcesDisplay;
26
use crate::prelude::*;
27
28
#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
29
#[derive(Clone, IntoStaticStr)]
30
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
31
pub enum FunctionIR {
32
RowIndex {
33
name: PlSmallStr,
34
offset: Option<IdxSize>,
35
// Might be cached.
36
#[cfg_attr(feature = "ir_serde", serde(skip))]
37
schema: CachedSchema,
38
},
39
#[cfg(feature = "python")]
40
OpaquePython(OpaquePythonUdf),
41
42
FastCount {
43
sources: ScanSources,
44
scan_type: Box<FileScanIR>,
45
alias: Option<PlSmallStr>,
46
},
47
48
Unnest {
49
columns: Arc<[PlSmallStr]>,
50
separator: Option<PlSmallStr>,
51
},
52
Rechunk,
53
Explode {
54
columns: Arc<[PlSmallStr]>,
55
options: ExplodeOptions,
56
#[cfg_attr(feature = "ir_serde", serde(skip))]
57
schema: CachedSchema,
58
},
59
#[cfg(feature = "pivot")]
60
Unpivot {
61
args: Arc<UnpivotArgsIR>,
62
#[cfg_attr(feature = "ir_serde", serde(skip))]
63
schema: CachedSchema,
64
},
65
#[cfg_attr(feature = "ir_serde", serde(skip))]
66
Opaque {
67
function: Arc<dyn DataFrameUdf>,
68
schema: Option<Arc<dyn UdfSchema>>,
69
/// allow predicate pushdown optimizations
70
predicate_pd: bool,
71
/// allow projection pushdown optimizations
72
projection_pd: bool,
73
streamable: bool,
74
// used for formatting
75
fmt_str: PlSmallStr,
76
},
77
Hint(HintIR),
78
}
79
80
impl Hash for FunctionIR {
81
fn hash<H: Hasher>(&self, state: &mut H) {
82
std::mem::discriminant(self).hash(state);
83
match self {
84
#[cfg(feature = "python")]
85
FunctionIR::OpaquePython { .. } => {},
86
FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),
87
FunctionIR::FastCount {
88
sources,
89
scan_type,
90
alias,
91
} => {
92
sources.hash(state);
93
scan_type.hash(state);
94
alias.hash(state);
95
},
96
FunctionIR::Unnest { columns, separator } => {
97
columns.hash(state);
98
separator.hash(state);
99
},
100
FunctionIR::Rechunk => {},
101
FunctionIR::Explode {
102
columns,
103
options,
104
schema: _,
105
} => {
106
columns.hash(state);
107
options.hash(state);
108
},
109
#[cfg(feature = "pivot")]
110
FunctionIR::Unpivot { args, schema: _ } => args.hash(state),
111
FunctionIR::RowIndex {
112
name,
113
schema: _,
114
offset,
115
} => {
116
name.hash(state);
117
offset.hash(state);
118
},
119
FunctionIR::Hint(hint) => hint.hash(state),
120
}
121
}
122
}
123
124
impl FunctionIR {
125
/// Whether this function can run on batches of data at a time.
126
pub fn is_streamable(&self) -> bool {
127
use FunctionIR::*;
128
match self {
129
Rechunk => false,
130
FastCount { .. } | Unnest { .. } | Explode { .. } => true,
131
#[cfg(feature = "pivot")]
132
Unpivot { .. } => true,
133
Opaque { streamable, .. } => *streamable,
134
#[cfg(feature = "python")]
135
OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,
136
RowIndex { .. } => false,
137
Hint(_) => true,
138
}
139
}
140
141
/// Whether this function will increase the number of rows
142
pub fn expands_rows(&self) -> bool {
143
use FunctionIR::*;
144
match self {
145
#[cfg(feature = "pivot")]
146
Unpivot { .. } => true,
147
Explode { .. } => true,
148
_ => false,
149
}
150
}
151
152
pub(crate) fn allow_predicate_pd(&self) -> bool {
153
use FunctionIR::*;
154
match self {
155
Opaque { predicate_pd, .. } => *predicate_pd,
156
#[cfg(feature = "python")]
157
OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,
158
#[cfg(feature = "pivot")]
159
Unpivot { .. } => true,
160
Rechunk | Unnest { .. } | Explode { .. } | Hint(_) => true,
161
RowIndex { .. } | FastCount { .. } => false,
162
}
163
}
164
165
pub(crate) fn allow_projection_pd(&self) -> bool {
166
use FunctionIR::*;
167
match self {
168
Opaque { projection_pd, .. } => *projection_pd,
169
#[cfg(feature = "python")]
170
OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,
171
Rechunk | FastCount { .. } | Unnest { .. } | Explode { .. } | Hint(_) => true,
172
#[cfg(feature = "pivot")]
173
Unpivot { .. } => true,
174
RowIndex { .. } => true,
175
}
176
}
177
178
pub(crate) fn additional_projection_pd_columns(&self) -> Cow<'_, [PlSmallStr]> {
179
use FunctionIR::*;
180
match self {
181
Unnest { columns, .. } => Cow::Borrowed(columns.as_ref()),
182
Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),
183
_ => Cow::Borrowed(&[]),
184
}
185
}
186
187
pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {
188
use FunctionIR::*;
189
match self {
190
Opaque { function, .. } => function.call_udf(df),
191
#[cfg(feature = "python")]
192
OpaquePython(OpaquePythonUdf {
193
function,
194
validate_output,
195
schema,
196
..
197
}) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),
198
FastCount {
199
sources,
200
scan_type,
201
alias,
202
} => count::count_rows(sources, scan_type, alias.clone()),
203
Rechunk => {
204
df.rechunk_mut_par();
205
Ok(df)
206
},
207
Unnest { columns, separator } => {
208
feature_gated!(
209
"dtype-struct",
210
df.unnest(columns.iter().cloned(), separator.as_deref())
211
)
212
},
213
Explode {
214
columns, options, ..
215
} => df.explode(columns.iter().cloned(), *options),
216
#[cfg(feature = "pivot")]
217
Unpivot { args, .. } => {
218
use polars_ops::unpivot::UnpivotDF;
219
let args = (**args).clone();
220
df.unpivot2(args)
221
},
222
RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),
223
Hint(hint) => {
224
#[expect(irrefutable_let_patterns)]
225
if let HintIR::Sorted(s) = &hint
226
&& let Some(s) = s.first()
227
{
228
let idx = df.try_get_column_index(&s.column)?;
229
let col = &mut unsafe { df.columns_mut_retain_schema() }[idx];
230
if let Some(d) = s.descending {
231
let flag = if d {
232
IsSorted::Descending
233
} else {
234
IsSorted::Ascending
235
};
236
col.set_sorted_flag(flag);
237
}
238
}
239
240
Ok(df)
241
},
242
}
243
}
244
245
pub fn is_order_producing(&self, is_input_ordered: bool) -> bool {
246
match self {
247
FunctionIR::RowIndex { .. } => true,
248
FunctionIR::FastCount { .. } => false,
249
FunctionIR::Unnest { .. } => is_input_ordered,
250
FunctionIR::Rechunk => is_input_ordered,
251
#[cfg(feature = "python")]
252
FunctionIR::OpaquePython(..) => true,
253
FunctionIR::Explode { .. } => true,
254
#[cfg(feature = "pivot")]
255
FunctionIR::Unpivot { .. } => true,
256
FunctionIR::Opaque { .. } => true,
257
FunctionIR::Hint(_) => is_input_ordered,
258
}
259
}
260
261
pub fn is_elementwise(&self) -> bool {
262
match self {
263
Self::Unnest { .. } | Self::Hint(_) => true,
264
#[cfg(feature = "python")]
265
Self::OpaquePython(..) => false,
266
#[cfg(feature = "pivot")]
267
Self::Unpivot { .. } => false,
268
Self::RowIndex { .. }
269
| Self::FastCount { .. }
270
| Self::Rechunk
271
| Self::Explode { .. }
272
| Self::Opaque { .. } => false,
273
}
274
}
275
276
pub fn observes_input_order(&self) -> bool {
277
true
278
}
279
280
/// Is the input ordering always the same as the output ordering.
281
pub fn has_equal_order(&self) -> bool {
282
match self {
283
Self::Unnest { .. } | Self::Rechunk | Self::Hint(_) => true,
284
#[cfg(feature = "python")]
285
Self::OpaquePython(..) => false,
286
#[cfg(feature = "pivot")]
287
Self::Unpivot { .. } => false,
288
Self::RowIndex { .. }
289
| Self::FastCount { .. }
290
| Self::Explode { .. }
291
| Self::Opaque { .. } => false,
292
}
293
}
294
}
295
296
impl Debug for FunctionIR {
297
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
298
write!(f, "{self}")
299
}
300
}
301
302
impl Display for FunctionIR {
303
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
304
use FunctionIR::*;
305
match self {
306
Hint(hint) => {
307
write!(f, "hint.{hint}")
308
},
309
Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
310
Unnest { columns, separator } => {
311
write!(f, "UNNEST by:")?;
312
let columns = columns.as_ref();
313
fmt_column_delimited(f, columns, "[", "]")?;
314
if let Some(separator) = separator {
315
write!(f, ", separator: {separator}")?;
316
}
317
Ok(())
318
},
319
FastCount {
320
sources,
321
scan_type,
322
alias,
323
} => {
324
let scan_type: &str = (&(**scan_type)).into();
325
let default_column_name = PlSmallStr::from_static(crate::constants::LEN);
326
let alias = alias.as_ref().unwrap_or(&default_column_name);
327
328
write!(
329
f,
330
"FAST COUNT ({scan_type}) {} as \"{alias}\"",
331
ScanSourcesDisplay(sources)
332
)
333
},
334
RowIndex {
335
name,
336
offset,
337
schema: _,
338
} => {
339
write!(f, "ROW INDEX name: {name}")?;
340
if let Some(offset) = offset {
341
write!(f, ", offset: {offset}")?;
342
}
343
344
Ok(())
345
},
346
Explode {
347
columns,
348
options,
349
schema: _,
350
} => {
351
f.write_str("EXPLODE ")?;
352
fmt_column_delimited(f, columns, "[", "]")?;
353
if !options.empty_as_null {
354
f.write_str(", empty_as_null: false")?;
355
}
356
if !options.keep_nulls {
357
f.write_str(", keep_nulls: false")?;
358
}
359
Ok(())
360
},
361
#[cfg(feature = "pivot")]
362
Unpivot { args, schema: _ } => {
363
let UnpivotArgsIR {
364
on,
365
index,
366
variable_name,
367
value_name,
368
} = args.as_ref();
369
370
f.write_str("UNPIVOT on: ")?;
371
fmt_column_delimited(f, on, "[", "]")?;
372
fmt_column_delimited(f, index, "[", "]")?;
373
write!(f, ", variable_name: {variable_name}")?;
374
write!(f, ", value_name: {value_name}")?;
375
Ok(())
376
},
377
#[cfg(feature = "python")]
378
OpaquePython(_) => f.write_str(<&'static str>::from(self)),
379
Rechunk => f.write_str(<&'static str>::from(self)),
380
}
381
}
382
}
383
384