Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs
6940 views
1
use std::collections::BTreeSet;
2
3
use super::*;
4
5
/// Projection in the physical plan is done by selecting an expression per thread.
6
/// In case of many projections and columns this can be expensive when the expressions are simple
7
/// column selections. These can be selected on a single thread. The single thread is faster, because
8
/// the eager selection algorithm hashes the column names, making the projection complexity linear
9
/// instead of quadratic.
10
///
11
/// It is important that this optimization is ran after projection pushdown.
12
///
13
/// The schema reported after this optimization is also
14
pub(super) struct SimpleProjectionAndCollapse {
15
/// Keep track of nodes that are already processed when they
16
/// can be expensive. Schema materialization can be for instance.
17
processed: BTreeSet<Node>,
18
eager: bool,
19
}
20
21
impl SimpleProjectionAndCollapse {
22
pub(super) fn new(eager: bool) -> Self {
23
Self {
24
processed: Default::default(),
25
eager,
26
}
27
}
28
}
29
30
impl OptimizationRule for SimpleProjectionAndCollapse {
31
fn optimize_plan(
32
&mut self,
33
lp_arena: &mut Arena<IR>,
34
expr_arena: &mut Arena<AExpr>,
35
node: Node,
36
) -> PolarsResult<Option<IR>> {
37
use IR::*;
38
let lp = lp_arena.get(node);
39
40
match lp {
41
Select { input, expr, .. } => {
42
if !matches!(lp_arena.get(*input), ExtContext { .. })
43
&& !self.processed.contains(&node)
44
{
45
// First check if we can apply the optimization before we allocate.
46
if !expr.iter().all(|e| {
47
matches!(expr_arena.get(e.node()), AExpr::Column(name) if e.output_name() == name)
48
}) {
49
self.processed.insert(node);
50
return Ok(None);
51
}
52
53
let exprs = expr
54
.iter()
55
.map(|e| e.output_name().clone())
56
.collect::<Vec<_>>();
57
let Some(alp) = IRBuilder::new(*input, expr_arena, lp_arena)
58
.project_simple(exprs.iter().cloned())
59
.ok()
60
else {
61
return Ok(None);
62
};
63
let alp = alp.build();
64
65
Ok(Some(alp))
66
} else {
67
self.processed.insert(node);
68
Ok(None)
69
}
70
},
71
SimpleProjection { columns, input } if !self.eager => {
72
match lp_arena.get(*input) {
73
// If there are 2 subsequent fast projections, flatten them and only take the last
74
SimpleProjection {
75
input: prev_input, ..
76
} => Ok(Some(SimpleProjection {
77
input: *prev_input,
78
columns: columns.clone(),
79
})),
80
// Cleanup projections set in projection pushdown just above caches
81
// they are not needed.
82
cache_lp @ Cache { .. } if self.processed.contains(&node) => {
83
let cache_schema = cache_lp.schema(lp_arena);
84
if cache_schema.len() == columns.len()
85
&& cache_schema.iter_names().zip(columns.iter_names()).all(
86
|(left_name, right_name)| left_name.as_str() == right_name.as_str(),
87
)
88
{
89
Ok(Some(cache_lp.clone()))
90
} else {
91
Ok(None)
92
}
93
},
94
// If a projection does nothing, remove it.
95
other => {
96
let input_schema = other.schema(lp_arena);
97
// This will fail fast if lengths are not equal
98
if *input_schema.as_ref() == *columns {
99
Ok(Some(other.clone()))
100
} else {
101
self.processed.insert(node);
102
Ok(None)
103
}
104
},
105
}
106
},
107
// if there are 2 subsequent caches, flatten them and only take the inner
108
Cache { input, .. } if !self.eager => {
109
if let Cache {
110
input: prev_input,
111
id,
112
} = lp_arena.get(*input)
113
{
114
Ok(Some(Cache {
115
input: *prev_input,
116
id: *id,
117
}))
118
} else {
119
Ok(None)
120
}
121
},
122
_ => Ok(None),
123
}
124
}
125
}
126
127