Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs
8430 views
1
mod group_by;
2
mod join;
3
mod keys;
4
mod utils;
5
6
use polars_core::datatypes::PlHashMap;
7
use polars_core::prelude::*;
8
use polars_utils::idx_vec::UnitVec;
9
use recursive::recursive;
10
use utils::*;
11
12
use super::*;
13
use crate::prelude::optimizer::predicate_pushdown::group_by::process_group_by;
14
use crate::prelude::optimizer::predicate_pushdown::join::process_join;
15
use crate::utils::{check_input_node, has_aexpr};
16
17
/// The struct is wrapped in a mod to prevent direct member access of `nodes_scratch`
18
mod inner {
19
use polars_utils::arena::Node;
20
use polars_utils::idx_vec::UnitVec;
21
use polars_utils::unitvec;
22
23
pub struct PredicatePushDown {
24
// How many cache nodes a predicate may be pushed down to.
25
// Normally this is 0. Only needed for CSPE.
26
pub(super) caches_pass_allowance: u32,
27
nodes_scratch: UnitVec<Node>,
28
pub(super) new_streaming: bool,
29
// Controls pushing filters past fallible projections
30
pub(super) maintain_errors: bool,
31
}
32
33
impl PredicatePushDown {
34
pub fn new(maintain_errors: bool, new_streaming: bool) -> Self {
35
Self {
36
caches_pass_allowance: 0,
37
nodes_scratch: unitvec![],
38
new_streaming,
39
maintain_errors,
40
}
41
}
42
43
/// Returns shared scratch space after clearing.
44
pub(super) fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {
45
self.nodes_scratch.clear();
46
&mut self.nodes_scratch
47
}
48
}
49
}
50
51
pub use inner::PredicatePushDown;
52
53
impl PredicatePushDown {
54
pub(crate) fn block_at_cache(mut self, count: u32) -> Self {
55
self.caches_pass_allowance = count;
56
self
57
}
58
59
fn optional_apply_predicate(
60
&mut self,
61
lp: IR,
62
local_predicates: Vec<ExprIR>,
63
lp_arena: &mut Arena<IR>,
64
expr_arena: &mut Arena<AExpr>,
65
) -> IR {
66
if !local_predicates.is_empty() {
67
let predicate = combine_predicates(local_predicates.into_iter(), expr_arena);
68
let input = lp_arena.add(lp);
69
70
IR::Filter { input, predicate }
71
} else {
72
lp
73
}
74
}
75
76
fn pushdown_and_assign(
77
&mut self,
78
input: Node,
79
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
80
lp_arena: &mut Arena<IR>,
81
expr_arena: &mut Arena<AExpr>,
82
) -> PolarsResult<()> {
83
let alp = lp_arena.take(input);
84
let lp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
85
lp_arena.replace(input, lp);
86
Ok(())
87
}
88
89
/// Filter will be pushed down.
90
fn pushdown_and_continue(
91
&mut self,
92
lp: IR,
93
mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
94
lp_arena: &mut Arena<IR>,
95
expr_arena: &mut Arena<AExpr>,
96
has_projections: bool,
97
) -> PolarsResult<IR> {
98
if has_projections {
99
let input = {
100
let mut inputs = lp.inputs();
101
let input = inputs.next().unwrap();
102
// projections should only have a single input.
103
if inputs.next().is_some() {
104
// except for ExtContext
105
assert!(matches!(lp, IR::ExtContext { .. }));
106
}
107
input
108
};
109
110
let maintain_errors = self.maintain_errors;
111
let (eligibility, alias_rename_map) = pushdown_eligibility(
112
&lp.exprs().cloned().collect::<Vec<_>>(),
113
&[],
114
&acc_predicates,
115
expr_arena,
116
self.empty_nodes_scratch_mut(),
117
maintain_errors,
118
lp_arena.get(input),
119
)?;
120
121
let local_predicates = match eligibility {
122
PushdownEligibility::Full => vec![],
123
PushdownEligibility::Partial { to_local } => {
124
let mut out = Vec::with_capacity(to_local.len());
125
for key in to_local {
126
out.push(acc_predicates.remove(&key).unwrap());
127
}
128
out
129
},
130
PushdownEligibility::NoPushdown => {
131
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
132
},
133
};
134
135
if !alias_rename_map.is_empty() {
136
for (_, expr_ir) in acc_predicates.iter_mut() {
137
map_column_references(expr_ir, expr_arena, &alias_rename_map);
138
}
139
}
140
141
let alp = lp_arena.take(input);
142
let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
143
lp_arena.replace(input, alp);
144
145
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
146
} else {
147
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
148
149
let inputs = lp.get_inputs();
150
151
// determine new inputs by pushing down predicates
152
let new_inputs = inputs
153
.into_iter()
154
.map(|node| {
155
// first we check if we are able to push down the predicate passed this node
156
// it could be that this node just added the column where we base the predicate on
157
let input_schema = lp_arena.get(node).schema(lp_arena);
158
let mut pushdown_predicates =
159
optimizer::init_hashmap(Some(acc_predicates.len()));
160
for (_, predicate) in acc_predicates.iter() {
161
// we can pushdown the predicate
162
if check_input_node(predicate.node(), &input_schema, expr_arena) {
163
insert_predicate_dedup(&mut pushdown_predicates, predicate, expr_arena)
164
}
165
// we cannot pushdown the predicate we do it here
166
else {
167
local_predicates.push(predicate.clone());
168
}
169
}
170
171
let alp = lp_arena.take(node);
172
let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;
173
lp_arena.replace(node, alp);
174
Ok(node)
175
})
176
.collect::<PolarsResult<UnitVec<_>>>()?;
177
178
let lp = lp.with_inputs(new_inputs);
179
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
180
}
181
}
182
183
/// Filter will be done at this node, but we continue optimization
184
fn no_pushdown_restart_opt(
185
&mut self,
186
lp: IR,
187
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
188
lp_arena: &mut Arena<IR>,
189
expr_arena: &mut Arena<AExpr>,
190
) -> PolarsResult<IR> {
191
let inputs = lp.inputs();
192
193
let new_inputs = inputs
194
.map(|node| {
195
let alp = lp_arena.take(node);
196
let alp = self.push_down(
197
alp,
198
init_hashmap(Some(acc_predicates.len())),
199
lp_arena,
200
expr_arena,
201
)?;
202
lp_arena.replace(node, alp);
203
Ok(node)
204
})
205
.collect::<PolarsResult<Vec<_>>>()?;
206
let lp = lp.with_inputs(new_inputs);
207
208
// all predicates are done locally
209
let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();
210
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
211
}
212
213
fn no_pushdown(
214
&mut self,
215
lp: IR,
216
acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
217
lp_arena: &mut Arena<IR>,
218
expr_arena: &mut Arena<AExpr>,
219
) -> PolarsResult<IR> {
220
// all predicates are done locally
221
let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();
222
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
223
}
224
225
/// Predicate pushdown optimizer
226
///
227
/// # Arguments
228
///
229
/// * `IR` - Arena based logical plan tree representing the query.
230
/// * `acc_predicates` - The predicates we accumulate during tree traversal.
231
/// The hashmap maps from leaf-column name to predicates on that column.
232
/// If the key is already taken we combine the predicate with a bitand operation.
233
/// The `Node`s are indexes in the `expr_arena`
234
/// * `lp_arena` - The local memory arena for the logical plan.
235
/// * `expr_arena` - The local memory arena for the expressions.
236
#[recursive]
237
fn push_down(
238
&mut self,
239
lp: IR,
240
mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,
241
lp_arena: &mut Arena<IR>,
242
expr_arena: &mut Arena<AExpr>,
243
) -> PolarsResult<IR> {
244
use IR::*;
245
246
// Note: The logic within the match block should ensure `acc_predicates` is left in a state
247
// where it contains only pushable exprs after it is done (although in some cases it may
248
// contain a single fallible expression).
249
250
match lp {
251
Filter {
252
// Note: We assume AND'ed predicates have already been split to separate IR filter
253
// nodes during DSL conversion so we don't do that here.
254
ref predicate,
255
input,
256
} => {
257
// Use a tmp_key to avoid inadvertently combining predicates that otherwise would have
258
// been partially pushed:
259
//
260
// (1) .filter(pl.count().over("key") == 1)
261
// (2) .filter(pl.col("key") == 1)
262
//
263
// (2) can be pushed past (1) but they both have the same predicate
264
// key name in the hashtable.
265
let tmp_key = temporary_unique_key(&acc_predicates);
266
acc_predicates.insert(tmp_key.clone(), predicate.clone());
267
268
let maintain_errors = self.maintain_errors;
269
270
let local_predicates = match pushdown_eligibility(
271
&[],
272
&[(&tmp_key, predicate.clone())],
273
&acc_predicates,
274
expr_arena,
275
self.empty_nodes_scratch_mut(),
276
maintain_errors,
277
lp_arena.get(input),
278
)?
279
.0
280
{
281
PushdownEligibility::Full => vec![],
282
PushdownEligibility::Partial { to_local } => {
283
let mut out = Vec::with_capacity(to_local.len());
284
for key in to_local {
285
out.push(acc_predicates.remove(&key).unwrap());
286
}
287
out
288
},
289
PushdownEligibility::NoPushdown => {
290
let out = acc_predicates.drain().map(|t| t.1).collect();
291
acc_predicates.clear();
292
out
293
},
294
};
295
296
if let Some(predicate) = acc_predicates.remove(&tmp_key) {
297
insert_predicate_dedup(&mut acc_predicates, &predicate, expr_arena);
298
}
299
300
let alp = lp_arena.take(input);
301
let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
302
303
// TODO!
304
// If a predicates result would be influenced by earlier applied
305
// predicates, we simply don't pushdown this one passed this node
306
// However, we can do better and let it pass but store the order of the predicates
307
// so that we can apply them in correct order at the deepest level
308
Ok(
309
self.optional_apply_predicate(
310
new_input,
311
local_predicates,
312
lp_arena,
313
expr_arena,
314
),
315
)
316
},
317
DataFrameScan {
318
df,
319
schema,
320
output_schema,
321
} => {
322
let selection = predicate_at_scan(acc_predicates, None, expr_arena);
323
let mut lp = DataFrameScan {
324
df,
325
schema,
326
output_schema,
327
};
328
329
if let Some(predicate) = selection {
330
let input = lp_arena.add(lp);
331
332
lp = IR::Filter { input, predicate }
333
}
334
335
Ok(lp)
336
},
337
Scan {
338
sources,
339
file_info,
340
hive_parts: scan_hive_parts,
341
ref predicate,
342
predicate_file_skip_applied,
343
scan_type,
344
unified_scan_args,
345
output_schema,
346
} => {
347
let mut blocked_names = Vec::with_capacity(2);
348
349
// TODO: Allow predicates on file names, this should be supported by new-streaming.
350
if let Some(col) = unified_scan_args.include_file_paths.as_deref() {
351
blocked_names.push(col);
352
}
353
354
let local_predicates = if blocked_names.is_empty() {
355
vec![]
356
} else {
357
transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {
358
blocked_names.contains(&name.as_ref())
359
})
360
};
361
let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena);
362
363
let mut do_optimization = match &*scan_type {
364
#[cfg(feature = "csv")]
365
FileScanIR::Csv { .. } => unified_scan_args.pre_slice.is_none(),
366
FileScanIR::Anonymous { function, .. } => function.allows_predicate_pushdown(),
367
#[cfg(feature = "json")]
368
FileScanIR::NDJson { .. } => true,
369
#[allow(unreachable_patterns)]
370
_ => true,
371
};
372
do_optimization &= predicate.is_some();
373
374
let hive_parts = scan_hive_parts;
375
376
let lp = if do_optimization {
377
Scan {
378
sources,
379
file_info,
380
hive_parts,
381
predicate,
382
predicate_file_skip_applied,
383
unified_scan_args,
384
output_schema,
385
scan_type,
386
}
387
} else {
388
let lp = Scan {
389
sources,
390
file_info,
391
hive_parts,
392
predicate: None,
393
predicate_file_skip_applied,
394
unified_scan_args,
395
output_schema,
396
scan_type,
397
};
398
if let Some(predicate) = predicate {
399
let input = lp_arena.add(lp);
400
Filter { input, predicate }
401
} else {
402
lp
403
}
404
};
405
406
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
407
},
408
Distinct { input, options } => {
409
let subset = if let Some(ref subset) = options.subset {
410
subset.as_ref()
411
} else {
412
&[]
413
};
414
let mut names_set = PlHashSet::<PlSmallStr>::with_capacity(subset.len());
415
for name in subset.iter() {
416
names_set.insert(name.clone());
417
}
418
419
let local_predicates = match options.keep_strategy {
420
UniqueKeepStrategy::Any => {
421
let condition = |e: &ExprIR| {
422
// if not elementwise -> to local
423
!is_elementwise_rec(e.node(), expr_arena)
424
};
425
transfer_to_local_by_expr_ir(expr_arena, &mut acc_predicates, condition)
426
},
427
UniqueKeepStrategy::First
428
| UniqueKeepStrategy::Last
429
| UniqueKeepStrategy::None => {
430
let condition = |name: &PlSmallStr| {
431
!subset.is_empty() && !names_set.contains(name.as_str())
432
};
433
transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition)
434
},
435
};
436
437
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
438
let lp = Distinct { input, options };
439
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
440
},
441
Join {
442
input_left,
443
input_right,
444
left_on,
445
right_on,
446
schema,
447
options,
448
} => process_join(
449
self,
450
lp_arena,
451
expr_arena,
452
input_left,
453
input_right,
454
left_on,
455
right_on,
456
schema,
457
options,
458
acc_predicates,
459
self.new_streaming,
460
),
461
MapFunction { ref function, .. } => {
462
if function.allow_predicate_pd() {
463
match function {
464
FunctionIR::Explode { columns, .. } => {
465
let condition = |name: &PlSmallStr| columns.iter().any(|s| s == name);
466
467
// first columns that refer to the exploded columns should be done here
468
let local_predicates = transfer_to_local_by_name(
469
expr_arena,
470
&mut acc_predicates,
471
condition,
472
);
473
474
let lp = self.pushdown_and_continue(
475
lp,
476
acc_predicates,
477
lp_arena,
478
expr_arena,
479
false,
480
)?;
481
Ok(self.optional_apply_predicate(
482
lp,
483
local_predicates,
484
lp_arena,
485
expr_arena,
486
))
487
},
488
#[cfg(feature = "pivot")]
489
FunctionIR::Unpivot { args, .. } => {
490
// predicates that will be done at this level
491
let condition = |name: &PlSmallStr| {
492
name == &args.variable_name || name == &args.value_name
493
};
494
let local_predicates = transfer_to_local_by_name(
495
expr_arena,
496
&mut acc_predicates,
497
condition,
498
);
499
500
let lp = self.pushdown_and_continue(
501
lp,
502
acc_predicates,
503
lp_arena,
504
expr_arena,
505
false,
506
)?;
507
Ok(self.optional_apply_predicate(
508
lp,
509
local_predicates,
510
lp_arena,
511
expr_arena,
512
))
513
},
514
FunctionIR::Unnest {
515
columns,
516
separator: _,
517
} => {
518
let exclude = columns.iter().cloned().collect::<PlHashSet<_>>();
519
520
let local_predicates =
521
transfer_to_local_by_name(expr_arena, &mut acc_predicates, |x| {
522
exclude.contains(x)
523
});
524
525
let lp = self.pushdown_and_continue(
526
lp,
527
acc_predicates,
528
lp_arena,
529
expr_arena,
530
false,
531
)?;
532
Ok(self.optional_apply_predicate(
533
lp,
534
local_predicates,
535
lp_arena,
536
expr_arena,
537
))
538
},
539
_ => self.pushdown_and_continue(
540
lp,
541
acc_predicates,
542
lp_arena,
543
expr_arena,
544
false,
545
),
546
}
547
} else {
548
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
549
}
550
},
551
GroupBy {
552
input,
553
keys,
554
aggs,
555
schema,
556
apply,
557
maintain_order,
558
options,
559
} => process_group_by(
560
self,
561
lp_arena,
562
expr_arena,
563
input,
564
keys,
565
aggs,
566
schema,
567
maintain_order,
568
apply,
569
options,
570
acc_predicates,
571
),
572
lp @ Union { .. } => {
573
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
574
},
575
lp @ Sort { .. } => {
576
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
577
},
578
lp @ Sink { .. } | lp @ SinkMultiple { .. } => {
579
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
580
},
581
// Pushed down passed these nodes
582
lp @ HStack { .. }
583
| lp @ Select { .. }
584
| lp @ SimpleProjection { .. }
585
| lp @ ExtContext { .. } => {
586
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
587
},
588
// NOT Pushed down passed these nodes
589
// predicates influence slice sizes
590
lp @ Slice { .. } => {
591
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
592
},
593
lp @ HConcat { .. } => {
594
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
595
},
596
// Caches will run predicate push-down in the `cache_states` run.
597
Cache { .. } => {
598
if self.caches_pass_allowance == 0 {
599
self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena)
600
} else {
601
self.caches_pass_allowance = self.caches_pass_allowance.saturating_sub(1);
602
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
603
}
604
},
605
#[cfg(feature = "python")]
606
PythonScan { mut options } => {
607
let predicate = predicate_at_scan(acc_predicates, None, expr_arena);
608
if let Some(predicate) = predicate {
609
match ExprPushdownGroup::Pushable.update_with_expr_rec(
610
expr_arena.get(predicate.node()),
611
expr_arena,
612
None,
613
) {
614
ExprPushdownGroup::Barrier => {
615
if cfg!(debug_assertions) {
616
// Expression should not be pushed here by the optimizer
617
panic!()
618
}
619
620
return Ok(self.optional_apply_predicate(
621
PythonScan { options },
622
vec![predicate],
623
lp_arena,
624
expr_arena,
625
));
626
},
627
628
ExprPushdownGroup::Pushable | ExprPushdownGroup::Fallible => {
629
options.predicate = PythonPredicate::Polars(predicate);
630
},
631
}
632
}
633
634
Ok(PythonScan { options })
635
},
636
#[cfg(feature = "merge_sorted")]
637
lp @ MergeSorted { .. } => {
638
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
639
},
640
Invalid => unreachable!(),
641
}
642
}
643
644
pub(crate) fn optimize(
645
&mut self,
646
logical_plan: IR,
647
lp_arena: &mut Arena<IR>,
648
expr_arena: &mut Arena<AExpr>,
649
) -> PolarsResult<IR> {
650
let acc_predicates = PlHashMap::new();
651
self.push_down(logical_plan, acc_predicates, lp_arena, expr_arena)
652
}
653
}
654
655