Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/utils.rs
8420 views
1
use std::fmt::Formatter;
2
use std::iter::FlatMap;
3
4
use polars_core::prelude::*;
5
use polars_utils::format_pl_smallstr;
6
7
use self::visitor::{AexprNode, RewritingVisitor, TreeWalker};
8
use crate::constants::get_len_name;
9
use crate::prelude::*;
10
11
/// Utility to write comma delimited strings
12
pub fn comma_delimited<S>(mut s: String, items: &[S]) -> String
13
where
14
S: AsRef<str>,
15
{
16
s.push('(');
17
for c in items {
18
s.push_str(c.as_ref());
19
s.push_str(", ");
20
}
21
s.pop();
22
s.pop();
23
s.push(')');
24
s
25
}
26
27
/// Utility to write comma delimited
28
pub(crate) fn fmt_column_delimited<S: AsRef<str>>(
29
f: &mut Formatter<'_>,
30
items: &[S],
31
container_start: &str,
32
container_end: &str,
33
) -> std::fmt::Result {
34
write!(f, "{container_start}")?;
35
for (i, c) in items.iter().enumerate() {
36
write!(f, "{}", c.as_ref())?;
37
if i != (items.len() - 1) {
38
write!(f, ", ")?;
39
}
40
}
41
write!(f, "{container_end}")
42
}
43
44
pub(crate) fn is_scan(plan: &IR) -> bool {
45
matches!(plan, IR::Scan { .. } | IR::DataFrameScan { .. })
46
}
47
48
/// A projection that only takes a column or a column + alias.
49
#[cfg(feature = "meta")]
50
pub(crate) fn aexpr_is_simple_projection(current_node: Node, arena: &Arena<AExpr>) -> bool {
51
arena
52
.iter(current_node)
53
.all(|(_node, e)| matches!(e, AExpr::Column(_)))
54
}
55
56
pub fn has_aexpr<F>(current_node: Node, arena: &Arena<AExpr>, matches: F) -> bool
57
where
58
F: Fn(&AExpr) -> bool,
59
{
60
arena.iter(current_node).any(|(_node, e)| matches(e))
61
}
62
63
pub fn has_aexpr_window(current_node: Node, arena: &Arena<AExpr>) -> bool {
64
has_aexpr(current_node, arena, |e| {
65
#[cfg(feature = "dynamic_group_by")]
66
if matches!(e, AExpr::Rolling { .. }) {
67
return true;
68
}
69
matches!(e, AExpr::Over { .. })
70
})
71
}
72
73
pub fn has_aexpr_literal(current_node: Node, arena: &Arena<AExpr>) -> bool {
74
has_aexpr(current_node, arena, |e| matches!(e, AExpr::Literal(_)))
75
}
76
77
/// Can check if an expression tree has a matching_expr. This
78
/// requires a dummy expression to be created that will be used to pattern match against.
79
pub fn has_expr<F>(current_expr: &Expr, matches: F) -> bool
80
where
81
F: Fn(&Expr) -> bool,
82
{
83
current_expr.into_iter().any(matches)
84
}
85
86
/// Check if expression is independent from any column.
87
pub(crate) fn is_column_independent_aexpr(expr: Node, arena: &Arena<AExpr>) -> bool {
88
!has_aexpr(expr, arena, |e| match e {
89
AExpr::Column(_) | AExpr::Len => true,
90
#[cfg(feature = "dtype-struct")]
91
AExpr::Function {
92
input: _,
93
function: IRFunctionExpr::StructExpr(IRStructFunction::FieldByName(_)),
94
options: _,
95
} => true,
96
_ => false,
97
})
98
}
99
100
pub fn has_null(current_expr: &Expr) -> bool {
101
has_expr(
102
current_expr,
103
|e| matches!(e, Expr::Literal(LiteralValue::Scalar(sc)) if sc.is_null()),
104
)
105
}
106
107
/// output name of expr
108
pub fn expr_output_name(expr: &Expr) -> PolarsResult<PlSmallStr> {
109
for e in expr {
110
match e {
111
// don't follow the partition by branch
112
#[cfg(feature = "dynamic_group_by")]
113
Expr::Rolling { function, .. } => return expr_output_name(function),
114
Expr::Over { function, .. } => return expr_output_name(function),
115
116
Expr::Column(name) => return Ok(name.clone()),
117
Expr::Alias(_, name) => return Ok(name.clone()),
118
Expr::KeepName(_) => polars_bail!(nyi = "`name.keep` is not allowed here"),
119
Expr::RenameAlias { expr, function } => return function.call(&expr_output_name(expr)?),
120
Expr::Len => return Ok(get_len_name()),
121
Expr::Literal(val) => return Ok(val.output_column_name()),
122
123
#[cfg(feature = "dtype-struct")]
124
Expr::Function {
125
input: _,
126
function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
127
} => return Ok(name.clone()),
128
129
// Selector with single by_name is fine.
130
Expr::Selector(Selector::ByName { names, .. }) if names.len() == 1 => {
131
return Ok(names[0].clone());
132
},
133
134
#[cfg(feature = "dtype-struct")]
135
Expr::Function {
136
function:
137
FunctionExpr::StructExpr(StructFunction::SelectFields(Selector::ByName {
138
names,
139
..
140
})),
141
..
142
} if names.len() == 1 => return Ok(names[0].clone()),
143
144
// Other selectors aren't possible right now.
145
Expr::Selector(_) => break,
146
147
#[cfg(feature = "dtype-struct")]
148
Expr::Function {
149
function: FunctionExpr::StructExpr(StructFunction::SelectFields(_)),
150
..
151
} => break,
152
153
_ => {},
154
}
155
}
156
polars_bail!(
157
ComputeError:
158
"unable to find root column name for expr '{expr:?}' when calling 'output_name'",
159
);
160
}
161
162
#[allow(clippy::type_complexity)]
163
pub fn expr_to_leaf_column_names_iter(expr: &Expr) -> impl Iterator<Item = PlSmallStr> + '_ {
164
expr_to_leaf_column_exprs_iter(expr).flat_map(|e| expr_to_leaf_column_name(e).ok())
165
}
166
167
/// This should gradually replace expr_to_root_column as this will get all names in the tree.
168
pub fn expr_to_leaf_column_names(expr: &Expr) -> Vec<PlSmallStr> {
169
expr_to_leaf_column_names_iter(expr).collect()
170
}
171
172
/// unpack alias(col) to name of the root column name
173
pub fn expr_to_leaf_column_name(expr: &Expr) -> PolarsResult<PlSmallStr> {
174
let mut leaves = expr_to_leaf_column_exprs_iter(expr).collect::<Vec<_>>();
175
polars_ensure!(leaves.len() <= 1, ComputeError: "found more than one root column name");
176
match leaves.pop() {
177
Some(Expr::Column(name)) => Ok(name.clone()),
178
Some(Expr::Selector(_)) => polars_bail!(
179
ComputeError: "selector has no root column name",
180
),
181
Some(_) => unreachable!(),
182
None => polars_bail!(
183
ComputeError: "no root column name found",
184
),
185
}
186
}
187
188
#[allow(clippy::type_complexity)]
189
pub(crate) fn aexpr_to_column_nodes_iter<'a>(
190
root: Node,
191
arena: &'a Arena<AExpr>,
192
) -> FlatMap<AExprIter<'a>, Option<ColumnNode>, fn((Node, &'a AExpr)) -> Option<ColumnNode>> {
193
arena.iter(root).flat_map(|(node, ae)| {
194
if matches!(ae, AExpr::Column(_)) {
195
Some(ColumnNode(node))
196
} else {
197
None
198
}
199
})
200
}
201
202
pub fn column_node_to_name(node: ColumnNode, arena: &Arena<AExpr>) -> &PlSmallStr {
203
if let AExpr::Column(name) = arena.get(node.0) {
204
name
205
} else {
206
unreachable!()
207
}
208
}
209
210
/// Get all leaf column expressions in the expression tree.
211
pub(crate) fn expr_to_leaf_column_exprs_iter(expr: &Expr) -> impl Iterator<Item = &Expr> {
212
expr.into_iter().flat_map(|e| match e {
213
Expr::Column(_) => Some(e),
214
_ => None,
215
})
216
}
217
218
/// Take a list of expressions and a schema and determine the output schema.
219
pub fn expressions_to_schema<E>(
220
expr: &[Expr],
221
schema: &Schema,
222
duplicate_err_msg_func: E,
223
) -> PolarsResult<Schema>
224
where
225
E: Fn(&str) -> String,
226
{
227
let mut expr_arena = Arena::with_capacity(4 * expr.len());
228
229
Schema::try_from_iter_check_duplicates(
230
expr.iter().map(|expr| {
231
let mut field = expr.to_field_amortized(schema, &mut expr_arena)?;
232
field.dtype = field.dtype.materialize_unknown(true)?;
233
Ok(field)
234
}),
235
|duplicate_name: &str| {
236
polars_err!(
237
Duplicate:
238
"{}. It's possible that multiple expressions are returning the same default column name. \
239
If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
240
duplicate column names.",
241
duplicate_err_msg_func(duplicate_name)
242
)
243
},
244
)
245
}
246
247
pub fn aexpr_to_leaf_names_iter(
248
node: Node,
249
arena: &'_ Arena<AExpr>,
250
) -> impl Iterator<Item = &'_ PlSmallStr> + '_ {
251
aexpr_to_column_nodes_iter(node, arena).map(|node| match arena.get(node.0) {
252
AExpr::Column(name) => name,
253
_ => unreachable!(),
254
})
255
}
256
257
pub fn aexpr_to_leaf_names(node: Node, arena: &Arena<AExpr>) -> Vec<PlSmallStr> {
258
aexpr_to_leaf_names_iter(node, arena).cloned().collect()
259
}
260
261
/// check if a selection/projection can be done on the downwards schema
262
pub(crate) fn check_input_node(
263
node: Node,
264
input_schema: &Schema,
265
expr_arena: &Arena<AExpr>,
266
) -> bool {
267
aexpr_to_leaf_names_iter(node, expr_arena).all(|name| input_schema.contains(name.as_ref()))
268
}
269
270
pub(crate) fn check_input_column_node(
271
node: ColumnNode,
272
input_schema: &Schema,
273
expr_arena: &Arena<AExpr>,
274
) -> bool {
275
match expr_arena.get(node.0) {
276
AExpr::Column(name) => input_schema.contains(name.as_ref()),
277
// Invariant of `ColumnNode`
278
_ => unreachable!(),
279
}
280
}
281
282
pub(crate) fn aexprs_to_schema<I: IntoIterator<Item = K>, K: Into<Node>>(
283
expr: I,
284
schema: &Schema,
285
arena: &Arena<AExpr>,
286
) -> Schema {
287
expr.into_iter()
288
.map(|node| {
289
arena
290
.get(node.into())
291
.to_field(&ToFieldContext::new(arena, schema))
292
.unwrap()
293
})
294
.collect()
295
}
296
297
pub(crate) fn expr_irs_to_schema<I: IntoIterator<Item = K>, K: AsRef<ExprIR>>(
298
expr: I,
299
schema: &Schema,
300
arena: &Arena<AExpr>,
301
) -> PolarsResult<Schema> {
302
expr.into_iter()
303
.map(|e| {
304
let e = e.as_ref();
305
let field = e.field(schema, arena).map(move |mut field| {
306
// TODO! (can this be removed?)
307
if let Some(name) = e.get_alias() {
308
field.name = name.clone()
309
}
310
field.dtype = field.dtype.materialize_unknown(true).unwrap();
311
field
312
})?;
313
Ok(field)
314
})
315
.collect()
316
}
317
318
/// Concatenate multiple schemas into one, disallowing duplicate field names
319
pub fn merge_schemas(schemas: &[SchemaRef]) -> PolarsResult<Schema> {
320
let schema_size = schemas.iter().map(|schema| schema.len()).sum();
321
let mut merged_schema = Schema::with_capacity(schema_size);
322
323
for schema in schemas {
324
schema.iter().try_for_each(|(name, dtype)| {
325
if merged_schema.with_column(name.clone(), dtype.clone()).is_none() {
326
Ok(())
327
} else {
328
Err(polars_err!(Duplicate: "Column with name '{}' has more than one occurrence", name))
329
}
330
})?;
331
}
332
333
Ok(merged_schema)
334
}
335
336
/// Rename all reference to the column in `map` with their corresponding new name.
337
pub fn rename_columns(
338
node: Node,
339
expr_arena: &mut Arena<AExpr>,
340
map: &PlIndexMap<PlSmallStr, PlSmallStr>,
341
) -> Node {
342
struct RenameColumns<'a>(&'a PlIndexMap<PlSmallStr, PlSmallStr>);
343
impl RewritingVisitor for RenameColumns<'_> {
344
type Node = AexprNode;
345
type Arena = Arena<AExpr>;
346
347
fn mutate(
348
&mut self,
349
node: Self::Node,
350
arena: &mut Self::Arena,
351
) -> PolarsResult<Self::Node> {
352
if let AExpr::Column(name) = arena.get(node.node()) {
353
if let Some(new_name) = self.0.get(name) {
354
return Ok(AexprNode::new(arena.add(AExpr::Column(new_name.clone()))));
355
}
356
}
357
358
Ok(node)
359
}
360
}
361
362
AexprNode::new(node)
363
.rewrite(&mut RenameColumns(map), expr_arena)
364
.unwrap()
365
.node()
366
}
367
368
/// Rename any `StructField(x)` to its corresponding `Column(prefix_x)` using the provided prefix.
369
#[cfg(feature = "dtype-struct")]
370
pub fn structfield_to_column(
371
node: Node,
372
expr_arena: &mut Arena<AExpr>,
373
prefix: &PlSmallStr,
374
) -> Node {
375
struct MapStructFields<'a>(&'a PlSmallStr);
376
impl RewritingVisitor for MapStructFields<'_> {
377
type Node = AexprNode;
378
type Arena = Arena<AExpr>;
379
380
fn mutate(
381
&mut self,
382
node: Self::Node,
383
arena: &mut Self::Arena,
384
) -> PolarsResult<Self::Node> {
385
if let AExpr::StructField(name) = arena.get(node.node()) {
386
let new_name = format_pl_smallstr!("{}{}", self.0, name);
387
return Ok(AexprNode::new(arena.add(AExpr::Column(new_name))));
388
}
389
390
Ok(node)
391
}
392
}
393
394
AexprNode::new(node)
395
.rewrite(&mut MapStructFields(prefix), expr_arena)
396
.unwrap()
397
.node()
398
}
399
400