Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/mod.rs
8424 views
1
use polars_core::prelude::*;
2
use polars_error::feature_gated;
3
4
use crate::prelude::*;
5
6
mod delay_rechunk;
7
8
mod cluster_with_columns;
9
mod collapse_and_project;
10
mod collect_members;
11
mod count_star;
12
#[cfg(feature = "cse")]
13
mod cse;
14
mod flatten_union;
15
#[cfg(feature = "fused")]
16
mod fused;
17
mod join_utils;
18
pub(crate) use join_utils::ExprOrigin;
19
mod expand_datasets;
20
#[cfg(feature = "python")]
21
pub use expand_datasets::ExpandedPythonScan;
22
mod predicate_pushdown;
23
mod projection_pushdown;
24
pub mod set_order;
25
mod simplify_expr;
26
mod slice_pushdown_expr;
27
mod slice_pushdown_lp;
28
mod sortedness;
29
mod stack_opt;
30
31
use collapse_and_project::SimpleProjectionAndCollapse;
32
#[cfg(feature = "cse")]
33
pub use cse::NaiveExprMerger;
34
use delay_rechunk::DelayRechunk;
35
pub use expand_datasets::ExpandedDataset;
36
use polars_core::config::verbose;
37
pub use predicate_pushdown::PredicatePushDown;
38
pub use projection_pushdown::ProjectionPushDown;
39
pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
40
use slice_pushdown_lp::SlicePushDown;
41
pub use sortedness::{AExprSorted, IRSorted, are_keys_sorted_any, is_sorted};
42
pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};
43
44
use self::flatten_union::FlattenUnionRule;
45
pub use crate::frame::{AllowedOptimizations, OptFlags};
46
pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
47
use crate::plans::optimizer::count_star::CountStar;
48
#[cfg(feature = "cse")]
49
use crate::plans::optimizer::cse::CommonSubExprOptimizer;
50
#[cfg(feature = "cse")]
51
use crate::plans::visitor::*;
52
use crate::prelude::optimizer::collect_members::MemberCollector;
53
54
pub trait Optimize {
55
fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
56
}
57
58
// arbitrary constant to reduce reallocation.
59
const HASHMAP_SIZE: usize = 16;
60
61
pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
62
PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
63
}
64
65
pub(crate) fn pushdown_maintain_errors() -> bool {
66
std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
67
}
68
69
pub(super) fn run_projection_predicate_pushdown(
70
root: Node,
71
ir_arena: &mut Arena<IR>,
72
expr_arena: &mut Arena<AExpr>,
73
pushdown_maintain_errors: bool,
74
opt_flags: &OptFlags,
75
) -> PolarsResult<()> {
76
// Should be run before predicate pushdown.
77
if opt_flags.projection_pushdown() {
78
let mut projection_pushdown_opt = ProjectionPushDown::new();
79
let ir = ir_arena.take(root);
80
let ir = projection_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
81
ir_arena.replace(root, ir);
82
83
if projection_pushdown_opt.is_count_star {
84
let mut count_star_opt = CountStar::new();
85
count_star_opt.optimize_plan(ir_arena, expr_arena, root)?;
86
}
87
}
88
89
if opt_flags.predicate_pushdown() {
90
let mut predicate_pushdown_opt =
91
PredicatePushDown::new(pushdown_maintain_errors, opt_flags.new_streaming());
92
let ir = ir_arena.take(root);
93
let ir = predicate_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
94
ir_arena.replace(root, ir);
95
}
96
97
Ok(())
98
}
99
100
pub fn optimize(
101
logical_plan: DslPlan,
102
mut opt_flags: OptFlags,
103
ir_arena: &mut Arena<IR>,
104
expr_arena: &mut Arena<AExpr>,
105
scratch: &mut Vec<Node>,
106
apply_scan_predicate_to_scan_ir: fn(
107
Node,
108
&mut Arena<IR>,
109
&mut Arena<AExpr>,
110
) -> PolarsResult<()>,
111
) -> PolarsResult<Node> {
112
#[allow(dead_code)]
113
let verbose = verbose();
114
115
// Gradually fill the rules passed to the optimizer
116
let opt = StackOptimizer {};
117
let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
118
119
// Unset CSE
120
// This can be turned on again during ir-conversion.
121
#[allow(clippy::eq_op)]
122
#[cfg(feature = "cse")]
123
if opt_flags.contains(OptFlags::EAGER) {
124
opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
125
}
126
let mut root = to_alp(logical_plan, expr_arena, ir_arena, &mut opt_flags)?;
127
128
#[allow(unused_assignments)]
129
let mut comm_subplan_elim = false;
130
// Don't run optimizations that don't make sense on a single node.
131
// This keeps eager execution more snappy.
132
#[cfg(feature = "cse")]
133
{
134
comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
135
}
136
137
#[cfg(feature = "cse")]
138
let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
139
#[cfg(not(feature = "cse"))]
140
let comm_subexpr_elim = false;
141
142
// Note: This can be in opt_flags in the future if needed.
143
let pushdown_maintain_errors = pushdown_maintain_errors();
144
145
// During debug we check if the optimizations have not modified the final schema.
146
#[cfg(debug_assertions)]
147
let prev_schema = ir_arena.get(root).schema(ir_arena).into_owned();
148
149
let mut _opt_members: &mut Option<MemberCollector> = &mut None;
150
151
macro_rules! get_or_init_members {
152
() => {
153
_get_or_init_members(_opt_members, root, ir_arena, expr_arena)
154
};
155
}
156
157
// Run before slice pushdown
158
if opt_flags.simplify_expr() {
159
#[cfg(feature = "fused")]
160
rules.push(Box::new(fused::FusedArithmetic {}));
161
}
162
163
let run_pushdowns = if comm_subplan_elim {
164
#[allow(unused_assignments)]
165
let mut run_pd = true;
166
167
feature_gated!("cse", {
168
let members = get_or_init_members!();
169
run_pd = if (members.has_sink_multiple || members.has_joins_or_unions)
170
&& members.has_duplicate_scans()
171
&& !members.has_cache
172
{
173
use self::cse::CommonSubPlanOptimizer;
174
175
if verbose {
176
eprintln!("found multiple sources; run comm_subplan_elim")
177
}
178
179
root = CommonSubPlanOptimizer::new().optimize(
180
root,
181
ir_arena,
182
expr_arena,
183
pushdown_maintain_errors,
184
&opt_flags,
185
verbose,
186
scratch,
187
)?;
188
false
189
} else {
190
true
191
}
192
});
193
194
run_pd
195
} else {
196
true
197
};
198
199
if run_pushdowns {
200
run_projection_predicate_pushdown(
201
root,
202
ir_arena,
203
expr_arena,
204
pushdown_maintain_errors,
205
&opt_flags,
206
)?;
207
}
208
209
if opt_flags.slice_pushdown() {
210
let mut slice_pushdown_opt = SlicePushDown::new(
211
// We don't maintain errors on slice as the behavior is much more predictable that way.
212
//
213
// Even if we enable maintain_errors (thereby preventing the slice from being pushed),
214
// the new-streaming engine still may not error due to early-stopping.
215
false, // maintain_errors
216
);
217
let ir = ir_arena.take(root);
218
let ir = slice_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;
219
220
ir_arena.replace(root, ir);
221
222
// Expressions use the stack optimizer.
223
rules.push(Box::new(slice_pushdown_opt));
224
}
225
226
if opt_flags.fast_projection() {
227
rules.push(Box::new(SimpleProjectionAndCollapse::new(
228
opt_flags.eager(),
229
)));
230
}
231
232
if !opt_flags.eager() {
233
rules.push(Box::new(DelayRechunk::new()));
234
}
235
236
// This optimization removes branches, so we must do it when type coercion
237
// is completed.
238
if opt_flags.simplify_expr() {
239
rules.push(Box::new(SimplifyBooleanRule {}));
240
}
241
242
if !opt_flags.eager() {
243
rules.push(Box::new(FlattenUnionRule {}));
244
}
245
246
root = opt.optimize_loop(&mut rules, expr_arena, ir_arena, root)?;
247
248
if opt_flags.cluster_with_columns() && get_or_init_members!().with_columns_count > 1 {
249
cluster_with_columns::optimize(root, ir_arena, expr_arena)
250
}
251
252
// This one should run (nearly) last as this modifies the projections
253
#[cfg(feature = "cse")]
254
if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
255
let mut optimizer =
256
CommonSubExprOptimizer::new(opt_flags.contains(OptFlags::NEW_STREAMING));
257
let ir_node = IRNode::new_mutate(root);
258
259
root = try_with_ir_arena(ir_arena, expr_arena, |arena| {
260
let rewritten = ir_node.rewrite(&mut optimizer, arena)?;
261
Ok(rewritten.node())
262
})?;
263
}
264
265
if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
266
let members = get_or_init_members!();
267
if members.has_group_by
268
| members.has_sort
269
| members.has_distinct
270
| members.has_joins_or_unions
271
{
272
match ir_arena.get(root) {
273
IR::SinkMultiple { inputs } => {
274
let mut roots = inputs.clone();
275
for root in &mut roots {
276
if !matches!(ir_arena.get(*root), IR::Sink { .. }) {
277
*root = ir_arena.add(IR::Sink {
278
input: *root,
279
payload: SinkTypeIR::Memory,
280
});
281
}
282
}
283
set_order::simplify_and_fetch_orderings(&roots, ir_arena, expr_arena);
284
},
285
ir => {
286
let mut tmp_top = root;
287
if !matches!(ir, IR::Sink { .. }) {
288
tmp_top = ir_arena.add(IR::Sink {
289
input: root,
290
payload: SinkTypeIR::Memory,
291
});
292
}
293
_ = set_order::simplify_and_fetch_orderings(&[tmp_top], ir_arena, expr_arena)
294
},
295
}
296
}
297
}
298
299
expand_datasets::expand_datasets(root, ir_arena, expr_arena, apply_scan_predicate_to_scan_ir)?;
300
301
// During debug we check if the optimizations have not modified the final schema.
302
#[cfg(debug_assertions)]
303
{
304
// only check by names because we may supercast types.
305
assert_eq!(
306
prev_schema.iter_names().collect::<Vec<_>>(),
307
ir_arena
308
.get(root)
309
.schema(ir_arena)
310
.iter_names()
311
.collect::<Vec<_>>()
312
);
313
};
314
315
Ok(root)
316
}
317
318
fn _get_or_init_members<'a>(
319
opt_members: &'a mut Option<MemberCollector>,
320
root: Node,
321
ir_arena: &mut Arena<IR>,
322
expr_arena: &mut Arena<AExpr>,
323
) -> &'a mut MemberCollector {
324
opt_members.get_or_insert_with(|| {
325
let mut members = MemberCollector::new();
326
members.collect(root, ir_arena, expr_arena);
327
328
members
329
})
330
}
331
332