Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs
7889 views
1
mod group_by;
2
mod join;
3
mod keys;
4
mod utils;
5
6
use polars_core::datatypes::PlHashMap;
7
use polars_core::prelude::*;
8
use polars_utils::idx_vec::UnitVec;
9
use recursive::recursive;
10
use utils::*;
11
12
use super::*;
13
use crate::prelude::optimizer::predicate_pushdown::group_by::process_group_by;
14
use crate::prelude::optimizer::predicate_pushdown::join::process_join;
15
use crate::utils::{check_input_node, has_aexpr};
16
17
/// The struct is wrapped in a mod to prevent direct member access of `nodes_scratch`
18
mod inner {
19
use polars_core::config::verbose;
20
use polars_utils::arena::Node;
21
use polars_utils::idx_vec::UnitVec;
22
use polars_utils::unitvec;
23
24
pub struct PredicatePushDown {
25
// TODO: Remove unused
26
#[expect(unused)]
27
pub(super) verbose: bool,
28
// How many cache nodes a predicate may be pushed down to.
29
// Normally this is 0. Only needed for CSPE.
30
pub(super) caches_pass_allowance: u32,
31
nodes_scratch: UnitVec<Node>,
32
pub(super) new_streaming: bool,
33
// Controls pushing filters past fallible projections
34
pub(super) maintain_errors: bool,
35
}
36
37
impl PredicatePushDown {
38
pub fn new(maintain_errors: bool, new_streaming: bool) -> Self {
39
Self {
40
verbose: verbose(),
41
caches_pass_allowance: 0,
42
nodes_scratch: unitvec![],
43
new_streaming,
44
maintain_errors,
45
}
46
}
47
48
/// Returns shared scratch space after clearing.
49
pub(super) fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {
50
self.nodes_scratch.clear();
51
&mut self.nodes_scratch
52
}
53
}
54
}
55
56
pub use inner::PredicatePushDown;
57
58
impl PredicatePushDown {
59
pub(crate) fn block_at_cache(mut self, count: u32) -> Self {
60
self.caches_pass_allowance = count;
61
self
62
}
63
64
fn optional_apply_predicate(
65
&mut self,
66
lp: IR,
67
local_predicates: Vec<ExprIR>,
68
lp_arena: &mut Arena<IR>,
69
expr_arena: &mut Arena<AExpr>,
70
) -> IR {
71
if !local_predicates.is_empty() {
72
let predicate = combine_predicates(local_predicates.into_iter(), expr_arena);
73
let input = lp_arena.add(lp);
74
75
IR::Filter { input, predicate }
76
} else {
77
lp
78
}
79
}
80
81
fn pushdown_and_assign(
82
&mut self,
83
input: Node,
84
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
85
lp_arena: &mut Arena<IR>,
86
expr_arena: &mut Arena<AExpr>,
87
) -> PolarsResult<()> {
88
let alp = lp_arena.take(input);
89
let lp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
90
lp_arena.replace(input, lp);
91
Ok(())
92
}
93
94
/// Filter will be pushed down.
95
fn pushdown_and_continue(
96
&mut self,
97
lp: IR,
98
mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
99
lp_arena: &mut Arena<IR>,
100
expr_arena: &mut Arena<AExpr>,
101
has_projections: bool,
102
) -> PolarsResult<IR> {
103
if has_projections {
104
let input = {
105
let mut inputs = lp.inputs();
106
let input = inputs.next().unwrap();
107
// projections should only have a single input.
108
if inputs.next().is_some() {
109
// except for ExtContext
110
assert!(matches!(lp, IR::ExtContext { .. }));
111
}
112
input
113
};
114
115
let maintain_errors = self.maintain_errors;
116
let (eligibility, alias_rename_map) = pushdown_eligibility(
117
&lp.exprs().cloned().collect::<Vec<_>>(),
118
&[],
119
&acc_predicates,
120
expr_arena,
121
self.empty_nodes_scratch_mut(),
122
maintain_errors,
123
lp_arena.get(input),
124
)?;
125
126
let local_predicates = match eligibility {
127
PushdownEligibility::Full => vec![],
128
PushdownEligibility::Partial { to_local } => {
129
let mut out = Vec::with_capacity(to_local.len());
130
for key in to_local {
131
out.push(acc_predicates.remove(&key).unwrap());
132
}
133
out
134
},
135
PushdownEligibility::NoPushdown => {
136
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
137
},
138
};
139
140
if !alias_rename_map.is_empty() {
141
for (_, expr_ir) in acc_predicates.iter_mut() {
142
map_column_references(expr_ir, expr_arena, &alias_rename_map);
143
}
144
}
145
146
let alp = lp_arena.take(input);
147
let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
148
lp_arena.replace(input, alp);
149
150
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
151
} else {
152
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
153
154
let inputs = lp.get_inputs();
155
156
// determine new inputs by pushing down predicates
157
let new_inputs = inputs
158
.into_iter()
159
.map(|node| {
160
// first we check if we are able to push down the predicate passed this node
161
// it could be that this node just added the column where we base the predicate on
162
let input_schema = lp_arena.get(node).schema(lp_arena);
163
let mut pushdown_predicates =
164
optimizer::init_hashmap(Some(acc_predicates.len()));
165
for (_, predicate) in acc_predicates.iter() {
166
// we can pushdown the predicate
167
if check_input_node(predicate.node(), &input_schema, expr_arena) {
168
insert_predicate_dedup(&mut pushdown_predicates, predicate, expr_arena)
169
}
170
// we cannot pushdown the predicate we do it here
171
else {
172
local_predicates.push(predicate.clone());
173
}
174
}
175
176
let alp = lp_arena.take(node);
177
let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;
178
lp_arena.replace(node, alp);
179
Ok(node)
180
})
181
.collect::<PolarsResult<UnitVec<_>>>()?;
182
183
let lp = lp.with_inputs(new_inputs);
184
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
185
}
186
}
187
188
/// Filter will be done at this node, but we continue optimization
189
fn no_pushdown_restart_opt(
190
&mut self,
191
lp: IR,
192
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
193
lp_arena: &mut Arena<IR>,
194
expr_arena: &mut Arena<AExpr>,
195
) -> PolarsResult<IR> {
196
let inputs = lp.inputs();
197
198
let new_inputs = inputs
199
.map(|node| {
200
let alp = lp_arena.take(node);
201
let alp = self.push_down(
202
alp,
203
init_hashmap(Some(acc_predicates.len())),
204
lp_arena,
205
expr_arena,
206
)?;
207
lp_arena.replace(node, alp);
208
Ok(node)
209
})
210
.collect::<PolarsResult<Vec<_>>>()?;
211
let lp = lp.with_inputs(new_inputs);
212
213
// all predicates are done locally
214
let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();
215
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
216
}
217
218
fn no_pushdown(
219
&mut self,
220
lp: IR,
221
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
222
lp_arena: &mut Arena<IR>,
223
expr_arena: &mut Arena<AExpr>,
224
) -> PolarsResult<IR> {
225
// all predicates are done locally
226
let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();
227
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
228
}
229
230
/// Predicate pushdown optimizer
231
///
232
/// # Arguments
233
///
234
/// * `IR` - Arena based logical plan tree representing the query.
235
/// * `acc_predicates` - The predicates we accumulate during tree traversal.
236
/// The hashmap maps from leaf-column name to predicates on that column.
237
/// If the key is already taken we combine the predicate with a bitand operation.
238
/// The `Node`s are indexes in the `expr_arena`
239
/// * `lp_arena` - The local memory arena for the logical plan.
240
/// * `expr_arena` - The local memory arena for the expressions.
241
#[recursive]
242
fn push_down(
243
&mut self,
244
lp: IR,
245
mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
246
lp_arena: &mut Arena<IR>,
247
expr_arena: &mut Arena<AExpr>,
248
) -> PolarsResult<IR> {
249
use IR::*;
250
251
// Note: The logic within the match block should ensure `acc_predicates` is left in a state
252
// where it contains only pushable exprs after it is done (although in some cases it may
253
// contain a single fallible expression).
254
255
match lp {
256
Filter {
257
// Note: We assume AND'ed predicates have already been split to separate IR filter
258
// nodes during DSL conversion so we don't do that here.
259
ref predicate,
260
input,
261
} => {
262
// Use a tmp_key to avoid inadvertently combining predicates that otherwise would have
263
// been partially pushed:
264
//
265
// (1) .filter(pl.count().over("key") == 1)
266
// (2) .filter(pl.col("key") == 1)
267
//
268
// (2) can be pushed past (1) but they both have the same predicate
269
// key name in the hashtable.
270
let tmp_key = temporary_unique_key(&acc_predicates);
271
acc_predicates.insert(tmp_key.clone(), predicate.clone());
272
273
let maintain_errors = self.maintain_errors;
274
275
let local_predicates = match pushdown_eligibility(
276
&[],
277
&[(&tmp_key, predicate.clone())],
278
&acc_predicates,
279
expr_arena,
280
self.empty_nodes_scratch_mut(),
281
maintain_errors,
282
lp_arena.get(input),
283
)?
284
.0
285
{
286
PushdownEligibility::Full => vec![],
287
PushdownEligibility::Partial { to_local } => {
288
let mut out = Vec::with_capacity(to_local.len());
289
for key in to_local {
290
out.push(acc_predicates.remove(&key).unwrap());
291
}
292
out
293
},
294
PushdownEligibility::NoPushdown => {
295
let out = acc_predicates.drain().map(|t| t.1).collect();
296
acc_predicates.clear();
297
out
298
},
299
};
300
301
if let Some(predicate) = acc_predicates.remove(&tmp_key) {
302
insert_predicate_dedup(&mut acc_predicates, &predicate, expr_arena);
303
}
304
305
let alp = lp_arena.take(input);
306
let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
307
308
// TODO!
309
// If a predicates result would be influenced by earlier applied
310
// predicates, we simply don't pushdown this one passed this node
311
// However, we can do better and let it pass but store the order of the predicates
312
// so that we can apply them in correct order at the deepest level
313
Ok(
314
self.optional_apply_predicate(
315
new_input,
316
local_predicates,
317
lp_arena,
318
expr_arena,
319
),
320
)
321
},
322
DataFrameScan {
323
df,
324
schema,
325
output_schema,
326
} => {
327
let selection = predicate_at_scan(acc_predicates, None, expr_arena);
328
let mut lp = DataFrameScan {
329
df,
330
schema,
331
output_schema,
332
};
333
334
if let Some(predicate) = selection {
335
let input = lp_arena.add(lp);
336
337
lp = IR::Filter { input, predicate }
338
}
339
340
Ok(lp)
341
},
342
Scan {
343
sources,
344
file_info,
345
hive_parts: scan_hive_parts,
346
ref predicate,
347
predicate_file_skip_applied,
348
scan_type,
349
unified_scan_args,
350
output_schema,
351
} => {
352
let mut blocked_names = Vec::with_capacity(2);
353
354
// TODO: Allow predicates on file names, this should be supported by new-streaming.
355
if let Some(col) = unified_scan_args.include_file_paths.as_deref() {
356
blocked_names.push(col);
357
}
358
359
let local_predicates = if blocked_names.is_empty() {
360
vec![]
361
} else {
362
transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {
363
blocked_names.contains(&name.as_ref())
364
})
365
};
366
let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena);
367
368
let mut do_optimization = match &*scan_type {
369
#[cfg(feature = "csv")]
370
FileScanIR::Csv { .. } => unified_scan_args.pre_slice.is_none(),
371
FileScanIR::Anonymous { function, .. } => function.allows_predicate_pushdown(),
372
#[cfg(feature = "json")]
373
FileScanIR::NDJson { .. } => true,
374
#[allow(unreachable_patterns)]
375
_ => true,
376
};
377
do_optimization &= predicate.is_some();
378
379
let hive_parts = scan_hive_parts;
380
381
let lp = if do_optimization {
382
Scan {
383
sources,
384
file_info,
385
hive_parts,
386
predicate,
387
predicate_file_skip_applied,
388
unified_scan_args,
389
output_schema,
390
scan_type,
391
}
392
} else {
393
let lp = Scan {
394
sources,
395
file_info,
396
hive_parts,
397
predicate: None,
398
predicate_file_skip_applied,
399
unified_scan_args,
400
output_schema,
401
scan_type,
402
};
403
if let Some(predicate) = predicate {
404
let input = lp_arena.add(lp);
405
Filter { input, predicate }
406
} else {
407
lp
408
}
409
};
410
411
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
412
},
413
Distinct { input, options } => {
414
let subset = if let Some(ref subset) = options.subset {
415
subset.as_ref()
416
} else {
417
&[]
418
};
419
let mut names_set = PlHashSet::<PlSmallStr>::with_capacity(subset.len());
420
for name in subset.iter() {
421
names_set.insert(name.clone());
422
}
423
424
let local_predicates = match options.keep_strategy {
425
UniqueKeepStrategy::Any => {
426
let condition = |e: &ExprIR| {
427
// if not elementwise -> to local
428
!is_elementwise_rec(e.node(), expr_arena)
429
};
430
transfer_to_local_by_expr_ir(expr_arena, &mut acc_predicates, condition)
431
},
432
UniqueKeepStrategy::First
433
| UniqueKeepStrategy::Last
434
| UniqueKeepStrategy::None => {
435
let condition = |name: &PlSmallStr| {
436
!subset.is_empty() && !names_set.contains(name.as_str())
437
};
438
transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition)
439
},
440
};
441
442
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
443
let lp = Distinct { input, options };
444
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
445
},
446
Join {
447
input_left,
448
input_right,
449
left_on,
450
right_on,
451
schema,
452
options,
453
} => process_join(
454
self,
455
lp_arena,
456
expr_arena,
457
input_left,
458
input_right,
459
left_on,
460
right_on,
461
schema,
462
options,
463
acc_predicates,
464
self.new_streaming,
465
),
466
MapFunction { ref function, .. } => {
467
if function.allow_predicate_pd() {
468
match function {
469
FunctionIR::Explode { columns, .. } => {
470
let condition = |name: &PlSmallStr| columns.iter().any(|s| s == name);
471
472
// first columns that refer to the exploded columns should be done here
473
let local_predicates = transfer_to_local_by_name(
474
expr_arena,
475
&mut acc_predicates,
476
condition,
477
);
478
479
let lp = self.pushdown_and_continue(
480
lp,
481
acc_predicates,
482
lp_arena,
483
expr_arena,
484
false,
485
)?;
486
Ok(self.optional_apply_predicate(
487
lp,
488
local_predicates,
489
lp_arena,
490
expr_arena,
491
))
492
},
493
#[cfg(feature = "pivot")]
494
FunctionIR::Unpivot { args, .. } => {
495
// predicates that will be done at this level
496
let condition = |name: &PlSmallStr| {
497
name == &args.variable_name || name == &args.value_name
498
};
499
let local_predicates = transfer_to_local_by_name(
500
expr_arena,
501
&mut acc_predicates,
502
condition,
503
);
504
505
let lp = self.pushdown_and_continue(
506
lp,
507
acc_predicates,
508
lp_arena,
509
expr_arena,
510
false,
511
)?;
512
Ok(self.optional_apply_predicate(
513
lp,
514
local_predicates,
515
lp_arena,
516
expr_arena,
517
))
518
},
519
FunctionIR::Unnest {
520
columns,
521
separator: _,
522
} => {
523
let exclude = columns.iter().cloned().collect::<PlHashSet<_>>();
524
525
let local_predicates =
526
transfer_to_local_by_name(expr_arena, &mut acc_predicates, |x| {
527
exclude.contains(x)
528
});
529
530
let lp = self.pushdown_and_continue(
531
lp,
532
acc_predicates,
533
lp_arena,
534
expr_arena,
535
false,
536
)?;
537
Ok(self.optional_apply_predicate(
538
lp,
539
local_predicates,
540
lp_arena,
541
expr_arena,
542
))
543
},
544
_ => self.pushdown_and_continue(
545
lp,
546
acc_predicates,
547
lp_arena,
548
expr_arena,
549
false,
550
),
551
}
552
} else {
553
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
554
}
555
},
556
GroupBy {
557
input,
558
keys,
559
aggs,
560
schema,
561
apply,
562
maintain_order,
563
options,
564
} => process_group_by(
565
self,
566
lp_arena,
567
expr_arena,
568
input,
569
keys,
570
aggs,
571
schema,
572
maintain_order,
573
apply,
574
options,
575
acc_predicates,
576
),
577
lp @ Union { .. } => {
578
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
579
},
580
lp @ Sort { .. } => {
581
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
582
},
583
lp @ Sink { .. } | lp @ SinkMultiple { .. } => {
584
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
585
},
586
// Pushed down passed these nodes
587
lp @ HStack { .. }
588
| lp @ Select { .. }
589
| lp @ SimpleProjection { .. }
590
| lp @ ExtContext { .. } => {
591
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
592
},
593
// NOT Pushed down passed these nodes
594
// predicates influence slice sizes
595
lp @ Slice { .. } => {
596
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
597
},
598
lp @ HConcat { .. } => {
599
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
600
},
601
// Caches will run predicate push-down in the `cache_states` run.
602
Cache { .. } => {
603
if self.caches_pass_allowance == 0 {
604
self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena)
605
} else {
606
self.caches_pass_allowance = self.caches_pass_allowance.saturating_sub(1);
607
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
608
}
609
},
610
#[cfg(feature = "python")]
611
PythonScan { mut options } => {
612
let predicate = predicate_at_scan(acc_predicates, None, expr_arena);
613
if let Some(predicate) = predicate {
614
match ExprPushdownGroup::Pushable.update_with_expr_rec(
615
expr_arena.get(predicate.node()),
616
expr_arena,
617
None,
618
) {
619
ExprPushdownGroup::Barrier => {
620
if cfg!(debug_assertions) {
621
// Expression should not be pushed here by the optimizer
622
panic!()
623
}
624
625
return Ok(self.optional_apply_predicate(
626
PythonScan { options },
627
vec![predicate],
628
lp_arena,
629
expr_arena,
630
));
631
},
632
633
ExprPushdownGroup::Pushable | ExprPushdownGroup::Fallible => {
634
options.predicate = PythonPredicate::Polars(predicate);
635
},
636
}
637
}
638
639
Ok(PythonScan { options })
640
},
641
#[cfg(feature = "merge_sorted")]
642
lp @ MergeSorted { .. } => {
643
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
644
},
645
Invalid => unreachable!(),
646
}
647
}
648
649
pub(crate) fn optimize(
650
&mut self,
651
logical_plan: IR,
652
lp_arena: &mut Arena<IR>,
653
expr_arena: &mut Arena<AExpr>,
654
) -> PolarsResult<IR> {
655
let acc_predicates = PlHashMap::new();
656
self.push_down(logical_plan, acc_predicates, lp_arena, expr_arena)
657
}
658
}
659
660