Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/utils.rs
7889 views
1
use polars_core::prelude::*;
2
use polars_utils::idx_vec::UnitVec;
3
use polars_utils::unitvec;
4
5
use super::keys::*;
6
use crate::plans::visitor::{
7
AExprArena, AexprNode, RewriteRecursion, RewritingVisitor, TreeWalker,
8
};
9
use crate::prelude::*;
10
fn combine_by_and(left: Node, right: Node, arena: &mut Arena<AExpr>) -> Node {
11
arena.add(AExpr::BinaryExpr {
12
left,
13
op: Operator::And,
14
right,
15
})
16
}
17
18
/// Inserts a predicate into the map, with some basic de-duplication.
19
///
20
/// The map is keyed in a way that may cause some predicates to fall into the same bucket. In that
21
/// case the predicate is AND'ed with the existing node in that bucket.
22
pub(super) fn insert_predicate_dedup(
23
acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,
24
predicate: &ExprIR,
25
expr_arena: &mut Arena<AExpr>,
26
) {
27
let name = predicate_to_key(predicate.node(), expr_arena);
28
29
let mut new_min_terms = unitvec![];
30
31
acc_predicates
32
.entry(name)
33
.and_modify(|existing_predicate| {
34
let mut out_node = existing_predicate.node();
35
36
new_min_terms.clear();
37
new_min_terms.extend(MintermIter::new(predicate.node(), expr_arena));
38
39
// Limit the number of existing min-terms that we check against so that we have linear-time performance.
40
// Without this limit the loop below will be quadratic. The side effect is that we may not perfectly
41
// identify duplicates when there are large amounts of filter expressions.
42
const CHECK_LIMIT: usize = 32;
43
44
'next_new_min_term: for new_predicate in new_min_terms {
45
let new_min_term_eq_wrap = AExprArena::new(new_predicate, expr_arena);
46
47
if MintermIter::new(existing_predicate.node(), expr_arena)
48
.take(CHECK_LIMIT)
49
.any(|existing_min_term| {
50
new_min_term_eq_wrap == AExprArena::new(existing_min_term, expr_arena)
51
})
52
{
53
continue 'next_new_min_term;
54
}
55
56
out_node = combine_by_and(new_predicate, out_node, expr_arena);
57
}
58
59
existing_predicate.set_node(out_node)
60
})
61
.or_insert_with(|| predicate.clone());
62
}
63
64
pub(super) fn temporary_unique_key(acc_predicates: &PlHashMap<PlSmallStr, ExprIR>) -> PlSmallStr {
65
// TODO: Don't heap allocate during construction.
66
let mut out_key = '\u{1D17A}'.to_string();
67
let mut existing_keys = acc_predicates.keys();
68
69
while acc_predicates.contains_key(&*out_key) {
70
out_key.push_str(existing_keys.next().unwrap());
71
}
72
73
PlSmallStr::from_string(out_key)
74
}
75
76
pub(super) fn combine_predicates<I>(iter: I, arena: &mut Arena<AExpr>) -> ExprIR
77
where
78
I: Iterator<Item = ExprIR>,
79
{
80
let mut single_pred = None;
81
for e in iter {
82
single_pred = match single_pred {
83
None => Some(e.node()),
84
Some(left) => Some(arena.add(AExpr::BinaryExpr {
85
left,
86
op: Operator::And,
87
right: e.node(),
88
})),
89
};
90
}
91
single_pred
92
.map(|node| ExprIR::from_node(node, arena))
93
.expect("an empty iterator was passed")
94
}
95
96
pub(super) fn predicate_at_scan(
97
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
98
predicate: Option<ExprIR>,
99
expr_arena: &mut Arena<AExpr>,
100
) -> Option<ExprIR> {
101
if !acc_predicates.is_empty() {
102
let mut new_predicate = combine_predicates(acc_predicates.into_values(), expr_arena);
103
if let Some(pred) = predicate {
104
new_predicate.set_node(combine_by_and(
105
new_predicate.node(),
106
pred.node(),
107
expr_arena,
108
));
109
}
110
Some(new_predicate)
111
} else {
112
None
113
}
114
}
115
116
/// Evaluates a condition on the column name inputs of every predicate, where if
117
/// the condition evaluates to true on any column name the predicate is
118
/// transferred to local.
119
pub(super) fn transfer_to_local_by_expr_ir<F>(
120
expr_arena: &Arena<AExpr>,
121
acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,
122
mut condition: F,
123
) -> Vec<ExprIR>
124
where
125
F: FnMut(&ExprIR) -> bool,
126
{
127
let mut remove_keys = Vec::with_capacity(acc_predicates.len());
128
129
for predicate in acc_predicates.values() {
130
if condition(predicate) {
131
if let Some(name) = aexpr_to_leaf_names_iter(predicate.node(), expr_arena).next() {
132
remove_keys.push(name);
133
}
134
}
135
}
136
let mut local_predicates = Vec::with_capacity(remove_keys.len());
137
for key in remove_keys {
138
if let Some(pred) = acc_predicates.remove(key) {
139
local_predicates.push(pred)
140
}
141
}
142
local_predicates
143
}
144
145
/// Evaluates a condition on the column name inputs of every predicate, where if
146
/// the condition evaluates to true on any column name the predicate is
147
/// transferred to local.
148
pub(super) fn transfer_to_local_by_name<F>(
149
expr_arena: &Arena<AExpr>,
150
acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,
151
mut condition: F,
152
) -> Vec<ExprIR>
153
where
154
F: FnMut(&PlSmallStr) -> bool,
155
{
156
let mut remove_keys = Vec::with_capacity(acc_predicates.len());
157
158
for (key, predicate) in &*acc_predicates {
159
let root_names = aexpr_to_leaf_names_iter(predicate.node(), expr_arena);
160
for name in root_names {
161
if condition(name) {
162
remove_keys.push(key.clone());
163
break;
164
}
165
}
166
}
167
let mut local_predicates = Vec::with_capacity(remove_keys.len());
168
for key in remove_keys {
169
if let Some(pred) = acc_predicates.remove(&*key) {
170
local_predicates.push(pred)
171
}
172
}
173
local_predicates
174
}
175
176
/// * `col(A).alias(B).alias(C) => (C, A)`
177
/// * `col(A) => (A, A)`
178
/// * `col(A).sum().alias(B) => None`
179
fn get_maybe_aliased_projection_to_input_name_map(
180
e: &ExprIR,
181
expr_arena: &Arena<AExpr>,
182
) -> Option<(PlSmallStr, PlSmallStr)> {
183
let ae = expr_arena.get(e.node());
184
match e.get_alias() {
185
Some(alias) => match ae {
186
AExpr::Column(c_name) => Some((alias.clone(), c_name.clone())),
187
_ => None,
188
},
189
_ => match ae {
190
AExpr::Column(c_name) => Some((c_name.clone(), c_name.clone())),
191
_ => None,
192
},
193
}
194
}
195
196
#[derive(Debug)]
197
pub enum PushdownEligibility {
198
Full,
199
// Partial can happen when there are window exprs.
200
Partial { to_local: Vec<PlSmallStr> },
201
NoPushdown,
202
}
203
204
#[allow(clippy::type_complexity)]
205
pub fn pushdown_eligibility(
206
projection_nodes: &[ExprIR],
207
// Predicates that need to be checked (key, expr_ir)
208
new_predicates: &[(&PlSmallStr, ExprIR)],
209
// Note: These predicates have already passed checks.
210
acc_predicates: &PlHashMap<PlSmallStr, ExprIR>,
211
expr_arena: &mut Arena<AExpr>,
212
scratch: &mut UnitVec<Node>,
213
maintain_errors: bool,
214
input_ir: &IR,
215
) -> PolarsResult<(PushdownEligibility, PlHashMap<PlSmallStr, PlSmallStr>)> {
216
scratch.clear();
217
let ae_nodes_stack = scratch;
218
219
let mut alias_to_col_map =
220
optimizer::init_hashmap::<PlSmallStr, PlSmallStr>(Some(projection_nodes.len()));
221
let mut col_to_alias_map = alias_to_col_map.clone();
222
223
let mut modified_projection_columns =
224
PlHashSet::<PlSmallStr>::with_capacity(projection_nodes.len());
225
let mut has_window = false;
226
let mut common_window_inputs = PlHashSet::<PlSmallStr>::new();
227
228
// Important: Names inserted into any data structure by this function are
229
// all non-aliased.
230
// This function returns false if pushdown cannot be performed.
231
let process_projection_or_predicate = |ae_nodes_stack: &mut UnitVec<Node>,
232
has_window: &mut bool,
233
common_window_inputs: &mut PlHashSet<PlSmallStr>|
234
-> ExprPushdownGroup {
235
debug_assert_eq!(ae_nodes_stack.len(), 1);
236
237
let mut partition_by_names = PlHashSet::<PlSmallStr>::new();
238
let mut expr_pushdown_eligibility = ExprPushdownGroup::Pushable;
239
240
while let Some(node) = ae_nodes_stack.pop() {
241
let ae = expr_arena.get(node);
242
243
match ae {
244
#[cfg(feature = "dynamic_group_by")]
245
AExpr::Rolling { .. } => return ExprPushdownGroup::Barrier,
246
AExpr::Over {
247
function: _,
248
partition_by,
249
order_by: _,
250
mapping: _,
251
} => {
252
partition_by_names.clear();
253
partition_by_names.reserve(partition_by.len());
254
255
for node in partition_by.iter() {
256
// Only accept col()
257
if let AExpr::Column(name) = expr_arena.get(*node) {
258
partition_by_names.insert(name.clone());
259
} else {
260
// Nested windows can also qualify for push down.
261
// e.g.:
262
// * expr1 = min().over(A)
263
// * expr2 = sum().over(A, expr1)
264
// Both exprs window over A, so predicates referring
265
// to A can still be pushed.
266
ae_nodes_stack.push(*node);
267
}
268
}
269
270
if !*has_window {
271
for name in partition_by_names.drain() {
272
common_window_inputs.insert(name);
273
}
274
275
*has_window = true;
276
} else {
277
common_window_inputs.retain(|k| partition_by_names.contains(k))
278
}
279
280
// Cannot push into disjoint windows:
281
// e.g.:
282
// * sum().over(A)
283
// * sum().over(B)
284
if common_window_inputs.is_empty() {
285
return ExprPushdownGroup::Barrier;
286
}
287
},
288
_ => {
289
if let ExprPushdownGroup::Barrier =
290
expr_pushdown_eligibility.update_with_expr(ae_nodes_stack, ae, expr_arena)
291
{
292
return ExprPushdownGroup::Barrier;
293
}
294
},
295
}
296
}
297
298
expr_pushdown_eligibility
299
};
300
301
for e in projection_nodes.iter() {
302
if let Some((alias, column_name)) =
303
get_maybe_aliased_projection_to_input_name_map(e, expr_arena)
304
{
305
if alias != column_name {
306
alias_to_col_map.insert(alias.clone(), column_name.clone());
307
col_to_alias_map.insert(column_name, alias);
308
}
309
continue;
310
}
311
312
if !does_not_modify_rec(e.node(), expr_arena) {
313
modified_projection_columns.insert(e.output_name().clone());
314
}
315
316
debug_assert!(ae_nodes_stack.is_empty());
317
ae_nodes_stack.push(e.node());
318
319
if process_projection_or_predicate(
320
ae_nodes_stack,
321
&mut has_window,
322
&mut common_window_inputs,
323
)
324
.blocks_pushdown(maintain_errors)
325
{
326
return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));
327
}
328
}
329
330
if has_window && !col_to_alias_map.is_empty() {
331
// Rename to aliased names.
332
let mut new = PlHashSet::<PlSmallStr>::with_capacity(2 * common_window_inputs.len());
333
334
for key in common_window_inputs.into_iter() {
335
if let Some(aliased) = col_to_alias_map.get(&key) {
336
new.insert(aliased.clone());
337
}
338
// Ensure predicate does not refer to a different column that
339
// got aliased to the same name as the window column. E.g.:
340
// .with_columns(col(A).alias(C), sum=sum().over(C))
341
// .filter(col(C) == ..)
342
if !alias_to_col_map.contains_key(&key) {
343
new.insert(key);
344
}
345
}
346
347
if new.is_empty() {
348
return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));
349
}
350
351
common_window_inputs = new;
352
}
353
354
for (_, e) in new_predicates.iter() {
355
debug_assert!(ae_nodes_stack.is_empty());
356
ae_nodes_stack.push(e.node());
357
358
let pd_group = process_projection_or_predicate(
359
ae_nodes_stack,
360
&mut has_window,
361
&mut common_window_inputs,
362
);
363
364
if pd_group.blocks_pushdown(maintain_errors) {
365
return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));
366
}
367
}
368
369
// Should have returned early.
370
debug_assert!(!common_window_inputs.is_empty() || !has_window);
371
372
// Note: has_window is constant.
373
let can_use_column = |col: &str| {
374
if has_window {
375
common_window_inputs.contains(col)
376
} else {
377
!modified_projection_columns.contains(col)
378
}
379
};
380
381
// For an allocation-free dyn iterator
382
let mut check_predicates_all: Option<_> = None;
383
let mut check_predicates_only_new: Option<_> = None;
384
385
// We only need to check the new predicates if no columns were renamed and there are no window
386
// aggregations.
387
if !has_window
388
&& modified_projection_columns.is_empty()
389
&& !(
390
// If there is only a single predicate, it may be fallible
391
acc_predicates.len() == 1 && ir_removes_rows(input_ir)
392
)
393
{
394
check_predicates_only_new = Some(new_predicates.iter().map(|(key, expr)| (*key, expr)))
395
} else {
396
check_predicates_all = Some(acc_predicates.iter())
397
}
398
399
let to_check_iter: &mut dyn Iterator<Item = (&PlSmallStr, &ExprIR)> = check_predicates_all
400
.as_mut()
401
.map(|x| x as _)
402
.unwrap_or_else(|| check_predicates_only_new.as_mut().map(|x| x as _).unwrap());
403
404
let mut allow_single_fallible = !ir_removes_rows(input_ir);
405
ae_nodes_stack.clear();
406
407
let to_local = to_check_iter
408
.filter_map(|(key, e)| {
409
debug_assert!(ae_nodes_stack.is_empty());
410
411
ae_nodes_stack.push(e.node());
412
413
let mut uses_blocked_name = false;
414
let mut pd_group = ExprPushdownGroup::Pushable;
415
416
while let Some(node) = ae_nodes_stack.pop() {
417
let ae = expr_arena.get(node);
418
419
if let AExpr::Column(name) = ae {
420
uses_blocked_name |= !can_use_column(name);
421
} else {
422
pd_group.update_with_expr(ae_nodes_stack, ae, expr_arena);
423
};
424
425
if uses_blocked_name {
426
break;
427
};
428
}
429
430
ae_nodes_stack.clear();
431
432
if uses_blocked_name || matches!(pd_group, ExprPushdownGroup::Barrier) {
433
allow_single_fallible = false;
434
}
435
436
if uses_blocked_name
437
|| matches!(
438
// Note: We do not use `blocks_pushdown()`, this fallible indicates that the
439
// predicate we are checking to push is fallible.
440
pd_group,
441
ExprPushdownGroup::Fallible | ExprPushdownGroup::Barrier
442
)
443
{
444
Some(key.clone())
445
} else {
446
None
447
}
448
})
449
.collect::<Vec<_>>();
450
451
Ok(match to_local.len() {
452
0 => (PushdownEligibility::Full, alias_to_col_map),
453
len if len == acc_predicates.len() => {
454
if len == 1 && allow_single_fallible {
455
(PushdownEligibility::Full, alias_to_col_map)
456
} else {
457
(PushdownEligibility::NoPushdown, alias_to_col_map)
458
}
459
},
460
_ => (PushdownEligibility::Partial { to_local }, alias_to_col_map),
461
})
462
}
463
464
/// Note: This may give false positives as it is a conservative function.
465
pub(crate) fn ir_removes_rows(ir: &IR) -> bool {
466
use IR::*;
467
468
// NOTE
469
// At time of writing predicate pushdown runs before slice pushdown, so
470
// some of the below checks for slice may never be hit.
471
472
match ir {
473
DataFrameScan { .. }
474
| SimpleProjection { .. }
475
| Select { .. }
476
| Cache { .. }
477
| HStack { .. }
478
| HConcat { .. } => false,
479
480
GroupBy { options, .. } => options.slice.is_some(),
481
482
Sort { slice, .. } => slice.is_some(),
483
484
#[cfg(feature = "merge_sorted")]
485
MergeSorted { .. } => false,
486
487
#[cfg(feature = "python")]
488
PythonScan { options } => options.n_rows.is_some(),
489
490
// Scan currently may evaluate the predicate on the statistics of the
491
// entire files list.
492
Scan {
493
unified_scan_args, ..
494
} => unified_scan_args.pre_slice.is_some(),
495
496
_ => true,
497
}
498
}
499
500
/// Maps column references within an expression. Used to handle column renaming when pushing
501
/// predicates.
502
///
503
/// This will add a new expression tree in the arena (i.e. it won't mutate the existing node in-place).
504
pub(super) fn map_column_references(
505
expr: &mut ExprIR,
506
expr_arena: &mut Arena<AExpr>,
507
rename_map: &PlHashMap<PlSmallStr, PlSmallStr>,
508
) {
509
if rename_map.is_empty() {
510
return;
511
}
512
513
let node = AexprNode::new(expr.node())
514
.rewrite(
515
&mut MapColumnReferences {
516
rename_map,
517
column_nodes: PlHashMap::with_capacity(rename_map.len()),
518
},
519
expr_arena,
520
)
521
.unwrap()
522
.node();
523
524
*expr = ExprIR::from_node(node, expr_arena);
525
526
struct MapColumnReferences<'a> {
527
rename_map: &'a PlHashMap<PlSmallStr, PlSmallStr>,
528
column_nodes: PlHashMap<&'a str, Node>,
529
}
530
531
impl RewritingVisitor for MapColumnReferences<'_> {
532
type Node = AexprNode;
533
type Arena = Arena<AExpr>;
534
535
fn pre_visit(
536
&mut self,
537
node: &Self::Node,
538
arena: &mut Self::Arena,
539
) -> polars_core::prelude::PolarsResult<crate::prelude::visitor::RewriteRecursion> {
540
let AExpr::Column(colname) = arena.get(node.node()) else {
541
return Ok(RewriteRecursion::NoMutateAndContinue);
542
};
543
544
if !self.rename_map.contains_key(colname) {
545
return Ok(RewriteRecursion::NoMutateAndContinue);
546
}
547
548
Ok(RewriteRecursion::MutateAndContinue)
549
}
550
551
fn mutate(
552
&mut self,
553
node: Self::Node,
554
arena: &mut Self::Arena,
555
) -> polars_core::prelude::PolarsResult<Self::Node> {
556
let AExpr::Column(colname) = arena.get(node.node()) else {
557
unreachable!();
558
};
559
560
let new_colname = self.rename_map.get(colname).unwrap();
561
562
if !self.column_nodes.contains_key(new_colname.as_str()) {
563
self.column_nodes.insert(
564
new_colname.as_str(),
565
arena.add(AExpr::Column(new_colname.clone())),
566
);
567
}
568
569
// Safety: Checked in pre_visit()
570
Ok(AexprNode::new(
571
*self.column_nodes.get(new_colname.as_str()).unwrap(),
572
))
573
}
574
}
575
}
576
577