Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/utils.rs
6939 views
1
use std::fmt::Formatter;
2
use std::iter::FlatMap;
3
4
use polars_core::prelude::*;
5
6
use self::visitor::{AexprNode, RewritingVisitor, TreeWalker};
7
use crate::constants::get_len_name;
8
use crate::prelude::*;
9
10
/// Utility to write comma delimited strings
11
pub fn comma_delimited<S>(mut s: String, items: &[S]) -> String
12
where
13
S: AsRef<str>,
14
{
15
s.push('(');
16
for c in items {
17
s.push_str(c.as_ref());
18
s.push_str(", ");
19
}
20
s.pop();
21
s.pop();
22
s.push(')');
23
s
24
}
25
26
/// Utility to write comma delimited
27
pub(crate) fn fmt_column_delimited<S: AsRef<str>>(
28
f: &mut Formatter<'_>,
29
items: &[S],
30
container_start: &str,
31
container_end: &str,
32
) -> std::fmt::Result {
33
write!(f, "{container_start}")?;
34
for (i, c) in items.iter().enumerate() {
35
write!(f, "{}", c.as_ref())?;
36
if i != (items.len() - 1) {
37
write!(f, ", ")?;
38
}
39
}
40
write!(f, "{container_end}")
41
}
42
43
pub(crate) fn is_scan(plan: &IR) -> bool {
44
matches!(plan, IR::Scan { .. } | IR::DataFrameScan { .. })
45
}
46
47
/// A projection that only takes a column or a column + alias.
48
#[cfg(feature = "meta")]
49
pub(crate) fn aexpr_is_simple_projection(current_node: Node, arena: &Arena<AExpr>) -> bool {
50
arena
51
.iter(current_node)
52
.all(|(_node, e)| matches!(e, AExpr::Column(_)))
53
}
54
55
pub fn has_aexpr<F>(current_node: Node, arena: &Arena<AExpr>, matches: F) -> bool
56
where
57
F: Fn(&AExpr) -> bool,
58
{
59
arena.iter(current_node).any(|(_node, e)| matches(e))
60
}
61
62
pub fn has_aexpr_window(current_node: Node, arena: &Arena<AExpr>) -> bool {
63
has_aexpr(current_node, arena, |e| matches!(e, AExpr::Window { .. }))
64
}
65
66
pub fn has_aexpr_literal(current_node: Node, arena: &Arena<AExpr>) -> bool {
67
has_aexpr(current_node, arena, |e| matches!(e, AExpr::Literal(_)))
68
}
69
70
/// Can check if an expression tree has a matching_expr. This
71
/// requires a dummy expression to be created that will be used to pattern match against.
72
pub fn has_expr<F>(current_expr: &Expr, matches: F) -> bool
73
where
74
F: Fn(&Expr) -> bool,
75
{
76
current_expr.into_iter().any(matches)
77
}
78
79
/// Check if expression is independent from any column.
80
pub(crate) fn is_column_independent_aexpr(expr: Node, arena: &Arena<AExpr>) -> bool {
81
!has_aexpr(expr, arena, |e| match e {
82
AExpr::Column(_) | AExpr::Len => true,
83
#[cfg(feature = "dtype-struct")]
84
AExpr::Function {
85
input: _,
86
function: IRFunctionExpr::StructExpr(IRStructFunction::FieldByName(_)),
87
options: _,
88
} => true,
89
_ => false,
90
})
91
}
92
93
pub fn has_null(current_expr: &Expr) -> bool {
94
has_expr(
95
current_expr,
96
|e| matches!(e, Expr::Literal(LiteralValue::Scalar(sc)) if sc.is_null()),
97
)
98
}
99
100
pub fn aexpr_output_name(node: Node, arena: &Arena<AExpr>) -> PolarsResult<PlSmallStr> {
101
for (_, ae) in arena.iter(node) {
102
match ae {
103
// don't follow the partition by branch
104
AExpr::Window { function, .. } => return aexpr_output_name(*function, arena),
105
AExpr::Column(name) => return Ok(name.clone()),
106
AExpr::Len => return Ok(get_len_name()),
107
AExpr::Literal(val) => return Ok(val.output_column_name().clone()),
108
AExpr::Ternary { truthy, .. } => return aexpr_output_name(*truthy, arena),
109
_ => {},
110
}
111
}
112
let expr = node_to_expr(node, arena);
113
polars_bail!(
114
ComputeError:
115
"unable to find root column name for expr '{expr:?}' when calling 'output_name'",
116
);
117
}
118
119
/// output name of expr
120
pub fn expr_output_name(expr: &Expr) -> PolarsResult<PlSmallStr> {
121
for e in expr {
122
match e {
123
// don't follow the partition by branch
124
Expr::Window { function, .. } => return expr_output_name(function),
125
Expr::Column(name) => return Ok(name.clone()),
126
Expr::Alias(_, name) => return Ok(name.clone()),
127
Expr::KeepName(_) => polars_bail!(nyi = "`name.keep` is not allowed here"),
128
Expr::RenameAlias { expr, function } => return function.call(&expr_output_name(expr)?),
129
Expr::Len => return Ok(get_len_name()),
130
Expr::Literal(val) => return Ok(val.output_column_name().clone()),
131
132
#[cfg(feature = "dtype-struct")]
133
Expr::Function {
134
input: _,
135
function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
136
} => return Ok(name.clone()),
137
138
// Selector with single by_name is fine.
139
Expr::Selector(Selector::ByName { names, .. }) if names.len() == 1 => {
140
return Ok(names[0].clone());
141
},
142
143
#[cfg(feature = "dtype-struct")]
144
Expr::Function {
145
function:
146
FunctionExpr::StructExpr(StructFunction::SelectFields(Selector::ByName {
147
names,
148
..
149
})),
150
..
151
} if names.len() == 1 => return Ok(names[0].clone()),
152
153
// Other selectors aren't possible right now.
154
Expr::Selector(_) => break,
155
156
#[cfg(feature = "dtype-struct")]
157
Expr::Function {
158
function: FunctionExpr::StructExpr(StructFunction::SelectFields(_)),
159
..
160
} => break,
161
162
_ => {},
163
}
164
}
165
polars_bail!(
166
ComputeError:
167
"unable to find root column name for expr '{expr:?}' when calling 'output_name'",
168
);
169
}
170
171
#[allow(clippy::type_complexity)]
172
pub fn expr_to_leaf_column_names_iter(expr: &Expr) -> impl Iterator<Item = PlSmallStr> + '_ {
173
expr_to_leaf_column_exprs_iter(expr).flat_map(|e| expr_to_leaf_column_name(e).ok())
174
}
175
176
/// This should gradually replace expr_to_root_column as this will get all names in the tree.
177
pub fn expr_to_leaf_column_names(expr: &Expr) -> Vec<PlSmallStr> {
178
expr_to_leaf_column_names_iter(expr).collect()
179
}
180
181
/// unpack alias(col) to name of the root column name
182
pub fn expr_to_leaf_column_name(expr: &Expr) -> PolarsResult<PlSmallStr> {
183
let mut leaves = expr_to_leaf_column_exprs_iter(expr).collect::<Vec<_>>();
184
polars_ensure!(leaves.len() <= 1, ComputeError: "found more than one root column name");
185
match leaves.pop() {
186
Some(Expr::Column(name)) => Ok(name.clone()),
187
Some(Expr::Selector(_)) => polars_bail!(
188
ComputeError: "selector has no root column name",
189
),
190
Some(_) => unreachable!(),
191
None => polars_bail!(
192
ComputeError: "no root column name found",
193
),
194
}
195
}
196
197
#[allow(clippy::type_complexity)]
198
pub(crate) fn aexpr_to_column_nodes_iter<'a>(
199
root: Node,
200
arena: &'a Arena<AExpr>,
201
) -> FlatMap<AExprIter<'a>, Option<ColumnNode>, fn((Node, &'a AExpr)) -> Option<ColumnNode>> {
202
arena.iter(root).flat_map(|(node, ae)| {
203
if matches!(ae, AExpr::Column(_)) {
204
Some(ColumnNode(node))
205
} else {
206
None
207
}
208
})
209
}
210
211
pub fn column_node_to_name(node: ColumnNode, arena: &Arena<AExpr>) -> &PlSmallStr {
212
if let AExpr::Column(name) = arena.get(node.0) {
213
name
214
} else {
215
unreachable!()
216
}
217
}
218
219
/// Get all leaf column expressions in the expression tree.
220
pub(crate) fn expr_to_leaf_column_exprs_iter(expr: &Expr) -> impl Iterator<Item = &Expr> {
221
expr.into_iter().flat_map(|e| match e {
222
Expr::Column(_) => Some(e),
223
_ => None,
224
})
225
}
226
227
/// Take a list of expressions and a schema and determine the output schema.
228
pub fn expressions_to_schema(expr: &[Expr], schema: &Schema) -> PolarsResult<Schema> {
229
let mut expr_arena = Arena::with_capacity(4 * expr.len());
230
expr.iter()
231
.map(|expr| {
232
let mut field = expr.to_field_amortized(schema, &mut expr_arena)?;
233
234
field.dtype = field.dtype.materialize_unknown(true)?;
235
Ok(field)
236
})
237
.collect()
238
}
239
240
pub fn aexpr_to_leaf_names_iter(
241
node: Node,
242
arena: &Arena<AExpr>,
243
) -> impl Iterator<Item = PlSmallStr> + '_ {
244
aexpr_to_column_nodes_iter(node, arena).map(|node| match arena.get(node.0) {
245
AExpr::Column(name) => name.clone(),
246
_ => unreachable!(),
247
})
248
}
249
250
pub fn aexpr_to_leaf_names(node: Node, arena: &Arena<AExpr>) -> Vec<PlSmallStr> {
251
aexpr_to_leaf_names_iter(node, arena).collect()
252
}
253
254
pub fn aexpr_to_leaf_name(node: Node, arena: &Arena<AExpr>) -> PlSmallStr {
255
aexpr_to_leaf_names_iter(node, arena).next().unwrap()
256
}
257
258
/// check if a selection/projection can be done on the downwards schema
259
pub(crate) fn check_input_node(
260
node: Node,
261
input_schema: &Schema,
262
expr_arena: &Arena<AExpr>,
263
) -> bool {
264
aexpr_to_leaf_names_iter(node, expr_arena).all(|name| input_schema.contains(name.as_ref()))
265
}
266
267
pub(crate) fn check_input_column_node(
268
node: ColumnNode,
269
input_schema: &Schema,
270
expr_arena: &Arena<AExpr>,
271
) -> bool {
272
match expr_arena.get(node.0) {
273
AExpr::Column(name) => input_schema.contains(name.as_ref()),
274
// Invariant of `ColumnNode`
275
_ => unreachable!(),
276
}
277
}
278
279
pub(crate) fn aexprs_to_schema<I: IntoIterator<Item = K>, K: Into<Node>>(
280
expr: I,
281
schema: &Schema,
282
arena: &Arena<AExpr>,
283
) -> Schema {
284
expr.into_iter()
285
.map(|node| arena.get(node.into()).to_field(schema, arena).unwrap())
286
.collect()
287
}
288
289
pub(crate) fn expr_irs_to_schema<I: IntoIterator<Item = K>, K: AsRef<ExprIR>>(
290
expr: I,
291
schema: &Schema,
292
arena: &Arena<AExpr>,
293
) -> Schema {
294
expr.into_iter()
295
.map(|e| {
296
let e = e.as_ref();
297
let mut field = e.field(schema, arena).expect("should be resolved");
298
299
// TODO! (can this be removed?)
300
if let Some(name) = e.get_alias() {
301
field.name = name.clone()
302
}
303
field.dtype = field.dtype.materialize_unknown(true).unwrap();
304
field
305
})
306
.collect()
307
}
308
309
/// Concatenate multiple schemas into one, disallowing duplicate field names
310
pub fn merge_schemas(schemas: &[SchemaRef]) -> PolarsResult<Schema> {
311
let schema_size = schemas.iter().map(|schema| schema.len()).sum();
312
let mut merged_schema = Schema::with_capacity(schema_size);
313
314
for schema in schemas {
315
schema.iter().try_for_each(|(name, dtype)| {
316
if merged_schema.with_column(name.clone(), dtype.clone()).is_none() {
317
Ok(())
318
} else {
319
Err(polars_err!(Duplicate: "Column with name '{}' has more than one occurrence", name))
320
}
321
})?;
322
}
323
324
Ok(merged_schema)
325
}
326
327
/// Rename all reference to the column in `map` with their corresponding new name.
328
pub fn rename_columns(
329
node: Node,
330
expr_arena: &mut Arena<AExpr>,
331
map: &PlIndexMap<PlSmallStr, PlSmallStr>,
332
) -> Node {
333
struct RenameColumns<'a>(&'a PlIndexMap<PlSmallStr, PlSmallStr>);
334
impl RewritingVisitor for RenameColumns<'_> {
335
type Node = AexprNode;
336
type Arena = Arena<AExpr>;
337
338
fn mutate(
339
&mut self,
340
node: Self::Node,
341
arena: &mut Self::Arena,
342
) -> PolarsResult<Self::Node> {
343
if let AExpr::Column(name) = arena.get(node.node()) {
344
if let Some(new_name) = self.0.get(name) {
345
return Ok(AexprNode::new(arena.add(AExpr::Column(new_name.clone()))));
346
}
347
}
348
349
Ok(node)
350
}
351
}
352
353
AexprNode::new(node)
354
.rewrite(&mut RenameColumns(map), expr_arena)
355
.unwrap()
356
.node()
357
}
358
359