Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/mod.rs
6940 views
1
use polars_core::prelude::*;
2
3
use crate::prelude::*;
4
5
mod cache_states;
6
mod delay_rechunk;
7
8
mod cluster_with_columns;
9
mod collapse_and_project;
10
mod collapse_joins;
11
mod collect_members;
12
mod count_star;
13
#[cfg(feature = "cse")]
14
mod cse;
15
mod flatten_union;
16
#[cfg(feature = "fused")]
17
mod fused;
18
mod join_utils;
19
pub(crate) use join_utils::ExprOrigin;
20
mod expand_datasets;
21
#[cfg(feature = "python")]
22
pub use expand_datasets::ExpandedPythonScan;
23
mod predicate_pushdown;
24
mod projection_pushdown;
25
pub mod set_order;
26
mod simplify_expr;
27
mod slice_pushdown_expr;
28
mod slice_pushdown_lp;
29
mod stack_opt;
30
31
use collapse_and_project::SimpleProjectionAndCollapse;
32
#[cfg(feature = "cse")]
33
pub use cse::NaiveExprMerger;
34
use delay_rechunk::DelayRechunk;
35
pub use expand_datasets::ExpandedDataset;
36
use polars_core::config::verbose;
37
use polars_io::predicates::PhysicalIoExpr;
38
pub use predicate_pushdown::PredicatePushDown;
39
pub use projection_pushdown::ProjectionPushDown;
40
pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};
41
use slice_pushdown_lp::SlicePushDown;
42
pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};
43
44
use self::flatten_union::FlattenUnionRule;
45
pub use crate::frame::{AllowedOptimizations, OptFlags};
46
pub use crate::plans::conversion::type_coercion::TypeCoercionRule;
47
use crate::plans::optimizer::count_star::CountStar;
48
#[cfg(feature = "cse")]
49
use crate::plans::optimizer::cse::CommonSubExprOptimizer;
50
#[cfg(feature = "cse")]
51
use crate::plans::optimizer::cse::prune_unused_caches;
52
use crate::plans::optimizer::predicate_pushdown::ExprEval;
53
#[cfg(feature = "cse")]
54
use crate::plans::visitor::*;
55
use crate::prelude::optimizer::collect_members::MemberCollector;
56
57
pub trait Optimize {
58
fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;
59
}
60
61
// arbitrary constant to reduce reallocation.
62
const HASHMAP_SIZE: usize = 16;
63
64
pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {
65
PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))
66
}
67
68
pub(crate) fn pushdown_maintain_errors() -> bool {
69
std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")
70
}
71
72
pub fn optimize(
73
logical_plan: DslPlan,
74
mut opt_flags: OptFlags,
75
lp_arena: &mut Arena<IR>,
76
expr_arena: &mut Arena<AExpr>,
77
scratch: &mut Vec<Node>,
78
expr_eval: ExprEval<'_>,
79
) -> PolarsResult<Node> {
80
#[allow(dead_code)]
81
let verbose = verbose();
82
83
// Gradually fill the rules passed to the optimizer
84
let opt = StackOptimizer {};
85
let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);
86
87
// Unset CSE
88
// This can be turned on again during ir-conversion.
89
#[allow(clippy::eq_op)]
90
#[cfg(feature = "cse")]
91
if opt_flags.contains(OptFlags::EAGER) {
92
opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);
93
}
94
let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_flags)?;
95
96
// Don't run optimizations that don't make sense on a single node.
97
// This keeps eager execution more snappy.
98
#[cfg(feature = "cse")]
99
let comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);
100
101
#[cfg(feature = "cse")]
102
let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);
103
#[cfg(not(feature = "cse"))]
104
let comm_subexpr_elim = false;
105
106
// Note: This can be in opt_flags in the future if needed.
107
let pushdown_maintain_errors = pushdown_maintain_errors();
108
109
// During debug we check if the optimizations have not modified the final schema.
110
#[cfg(debug_assertions)]
111
let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();
112
113
let mut _opt_members = &mut None;
114
115
macro_rules! get_or_init_members {
116
() => {
117
_get_or_init_members(_opt_members, lp_top, lp_arena, expr_arena)
118
};
119
}
120
121
macro_rules! get_members_opt {
122
() => {
123
_opt_members.as_mut()
124
};
125
}
126
127
// Run before slice pushdown
128
if opt_flags.simplify_expr() {
129
#[cfg(feature = "fused")]
130
rules.push(Box::new(fused::FusedArithmetic {}));
131
}
132
133
#[cfg(feature = "cse")]
134
let _cse_plan_changed = if comm_subplan_elim {
135
let members = get_or_init_members!();
136
if (members.has_sink_multiple || members.has_joins_or_unions)
137
&& members.has_duplicate_scans()
138
&& !members.has_cache
139
{
140
if verbose {
141
eprintln!("found multiple sources; run comm_subplan_elim")
142
}
143
144
let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
145
146
prune_unused_caches(lp_arena, cid2c);
147
148
lp_top = lp;
149
members.has_cache |= changed;
150
changed
151
} else {
152
false
153
}
154
} else {
155
false
156
};
157
#[cfg(not(feature = "cse"))]
158
let _cse_plan_changed = false;
159
160
// Should be run before predicate pushdown.
161
if opt_flags.projection_pushdown() {
162
let mut projection_pushdown_opt = ProjectionPushDown::new();
163
let alp = lp_arena.take(lp_top);
164
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
165
lp_arena.replace(lp_top, alp);
166
167
if projection_pushdown_opt.is_count_star {
168
let mut count_star_opt = CountStar::new();
169
count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;
170
}
171
}
172
173
if opt_flags.predicate_pushdown() {
174
let mut predicate_pushdown_opt = PredicatePushDown::new(
175
expr_eval,
176
pushdown_maintain_errors,
177
opt_flags.new_streaming(),
178
);
179
let alp = lp_arena.take(lp_top);
180
let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
181
lp_arena.replace(lp_top, alp);
182
}
183
184
// Make sure it is after predicate pushdown
185
if opt_flags.collapse_joins() && get_or_init_members!().has_filter_with_join_input {
186
collapse_joins::optimize(lp_top, lp_arena, expr_arena, opt_flags.new_streaming());
187
}
188
189
// Make sure its before slice pushdown.
190
if opt_flags.fast_projection() {
191
rules.push(Box::new(SimpleProjectionAndCollapse::new(
192
opt_flags.eager(),
193
)));
194
}
195
196
if !opt_flags.eager() {
197
rules.push(Box::new(DelayRechunk::new()));
198
}
199
200
if opt_flags.slice_pushdown() {
201
let mut slice_pushdown_opt = SlicePushDown::new(
202
// We don't maintain errors on slice as the behavior is much more predictable that way.
203
//
204
// Even if we enable maintain_errors (thereby preventing the slice from being pushed),
205
// the new-streaming engine still may not error due to early-stopping.
206
false, // maintain_errors
207
opt_flags.new_streaming(),
208
);
209
let alp = lp_arena.take(lp_top);
210
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
211
212
lp_arena.replace(lp_top, alp);
213
214
// Expressions use the stack optimizer.
215
rules.push(Box::new(slice_pushdown_opt));
216
}
217
218
// This optimization removes branches, so we must do it when type coercion
219
// is completed.
220
if opt_flags.simplify_expr() {
221
rules.push(Box::new(SimplifyBooleanRule {}));
222
}
223
224
if !opt_flags.eager() {
225
rules.push(Box::new(FlattenUnionRule {}));
226
}
227
228
// Note: ExpandDatasets must run after slice and predicate pushdown.
229
rules.push(Box::new(expand_datasets::ExpandDatasets {}) as Box<dyn OptimizationRule>);
230
231
lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;
232
233
if opt_flags.cluster_with_columns() {
234
cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)
235
}
236
237
if _cse_plan_changed
238
&& get_members_opt!().is_some_and(|members| {
239
(members.has_joins_or_unions | members.has_sink_multiple) && members.has_cache
240
})
241
{
242
// We only want to run this on cse inserted caches
243
cache_states::set_cache_states(
244
lp_top,
245
lp_arena,
246
expr_arena,
247
scratch,
248
expr_eval,
249
verbose,
250
pushdown_maintain_errors,
251
opt_flags.new_streaming(),
252
)?;
253
}
254
255
// This one should run (nearly) last as this modifies the projections
256
#[cfg(feature = "cse")]
257
if comm_subexpr_elim && !get_or_init_members!().has_ext_context {
258
let mut optimizer = CommonSubExprOptimizer::new();
259
let alp_node = IRNode::new_mutate(lp_top);
260
261
lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {
262
let rewritten = alp_node.rewrite(&mut optimizer, arena)?;
263
Ok(rewritten.node())
264
})?;
265
}
266
267
if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {
268
let members = get_or_init_members!();
269
if members.has_group_by
270
| members.has_sort
271
| members.has_distinct
272
| members.has_joins_or_unions
273
{
274
match lp_arena.get(lp_top) {
275
IR::SinkMultiple { inputs } => {
276
let mut roots = inputs.clone();
277
for root in &mut roots {
278
if !matches!(lp_arena.get(*root), IR::Sink { .. }) {
279
*root = lp_arena.add(IR::Sink {
280
input: *root,
281
payload: SinkTypeIR::Memory,
282
});
283
}
284
}
285
set_order::simplify_and_fetch_orderings(&roots, lp_arena, expr_arena);
286
},
287
ir => {
288
let mut tmp_top = lp_top;
289
if !matches!(ir, IR::Sink { .. }) {
290
tmp_top = lp_arena.add(IR::Sink {
291
input: lp_top,
292
payload: SinkTypeIR::Memory,
293
});
294
}
295
_ = set_order::simplify_and_fetch_orderings(&[tmp_top], lp_arena, expr_arena)
296
},
297
}
298
}
299
}
300
301
// During debug we check if the optimizations have not modified the final schema.
302
#[cfg(debug_assertions)]
303
{
304
// only check by names because we may supercast types.
305
assert_eq!(
306
prev_schema.iter_names().collect::<Vec<_>>(),
307
lp_arena
308
.get(lp_top)
309
.schema(lp_arena)
310
.iter_names()
311
.collect::<Vec<_>>()
312
);
313
};
314
315
Ok(lp_top)
316
}
317
318
fn _get_or_init_members<'a>(
319
opt_members: &'a mut Option<MemberCollector>,
320
lp_top: Node,
321
lp_arena: &mut Arena<IR>,
322
expr_arena: &mut Arena<AExpr>,
323
) -> &'a mut MemberCollector {
324
opt_members.get_or_insert_with(|| {
325
let mut members = MemberCollector::new();
326
members.collect(lp_top, lp_arena, expr_arena);
327
328
members
329
})
330
}
331
332