Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
7889 views
1
mod functions;
2
mod generic;
3
mod group_by;
4
mod hconcat;
5
mod hstack;
6
mod joins;
7
mod projection;
8
9
use polars_core::datatypes::PlHashSet;
10
use polars_core::prelude::*;
11
use polars_io::RowIndex;
12
use polars_utils::idx_vec::UnitVec;
13
use recursive::recursive;
14
15
use crate::prelude::optimizer::projection_pushdown::generic::process_generic;
16
use crate::prelude::optimizer::projection_pushdown::group_by::process_group_by;
17
use crate::prelude::optimizer::projection_pushdown::hconcat::process_hconcat;
18
use crate::prelude::optimizer::projection_pushdown::hstack::process_hstack;
19
use crate::prelude::optimizer::projection_pushdown::joins::process_join;
20
use crate::prelude::optimizer::projection_pushdown::projection::process_projection;
21
use crate::prelude::*;
22
use crate::utils::aexpr_to_leaf_names;
23
24
#[derive(Default, Copy, Clone)]
25
struct ProjectionCopyState {
26
projections_seen: usize,
27
is_count_star: bool,
28
}
29
30
#[derive(Clone, Default)]
31
struct ProjectionContext {
32
acc_projections: Vec<ColumnNode>,
33
projected_names: PlHashSet<PlSmallStr>,
34
inner: ProjectionCopyState,
35
}
36
37
impl ProjectionContext {
38
fn new(
39
acc_projections: Vec<ColumnNode>,
40
projected_names: PlHashSet<PlSmallStr>,
41
inner: ProjectionCopyState,
42
) -> Self {
43
Self {
44
acc_projections,
45
projected_names,
46
inner,
47
}
48
}
49
50
/// If this is `true`, other nodes should add the columns
51
/// they need to the push down state
52
fn has_pushed_down(&self) -> bool {
53
// count star also acts like a pushdown as we will select a single column at the source
54
// when there were no other projections.
55
!self.acc_projections.is_empty() || self.inner.is_count_star
56
}
57
58
fn process_count_star_at_scan(&mut self, schema: &Schema, expr_arena: &mut Arena<AExpr>) {
59
if self.acc_projections.is_empty() {
60
let (name, _dt) = match schema.len() {
61
0 => return,
62
1 => schema.get_at_index(0).unwrap(),
63
_ => {
64
// skip first as that can be the row index.
65
// We look for a relative cheap type, such as a numeric or bool
66
schema
67
.iter()
68
.skip(1)
69
.find(|(_name, dt)| {
70
let phys = dt;
71
phys.is_null()
72
|| phys.is_primitive_numeric()
73
|| phys.is_bool()
74
|| phys.is_temporal()
75
})
76
.unwrap_or_else(|| schema.get_at_index(schema.len() - 1).unwrap())
77
},
78
};
79
80
let node = expr_arena.add(AExpr::Column(name.clone()));
81
self.acc_projections.push(ColumnNode(node));
82
self.projected_names.insert(name.clone());
83
}
84
}
85
}
86
87
/// utility function to get names of the columns needed in projection at scan level
88
fn get_scan_columns(
89
acc_projections: &[ColumnNode],
90
expr_arena: &Arena<AExpr>,
91
row_index: Option<&RowIndex>,
92
file_path_col: Option<&str>,
93
// When set, the column order will match the order from the provided schema
94
normalize_order_schema: Option<&Schema>,
95
) -> Option<Arc<[PlSmallStr]>> {
96
if acc_projections.is_empty() {
97
return None;
98
}
99
100
let mut column_names = acc_projections
101
.iter()
102
.filter_map(|node| {
103
let name = column_node_to_name(*node, expr_arena);
104
105
if let Some(ri) = row_index {
106
if ri.name == name {
107
return None;
108
}
109
}
110
111
if let Some(file_path_col) = file_path_col {
112
if file_path_col == name.as_str() {
113
return None;
114
}
115
}
116
117
Some(name.clone())
118
})
119
.collect::<Vec<_>>();
120
121
if let Some(schema) = normalize_order_schema {
122
column_names.sort_unstable_by_key(|name| schema.try_get_full(name).unwrap().0);
123
}
124
125
Some(column_names.into_iter().collect::<Arc<[_]>>())
126
}
127
128
/// split in a projection vec that can be pushed down and a projection vec that should be used
129
/// in this node
130
///
131
/// # Returns
132
/// accumulated_projections, local_projections, accumulated_names
133
///
134
/// - `expands_schema`. An unnest adds more columns to a schema, so we cannot use fast path
135
fn split_acc_projections(
136
acc_projections: Vec<ColumnNode>,
137
down_schema: &Schema,
138
expr_arena: &Arena<AExpr>,
139
expands_schema: bool,
140
) -> (Vec<ColumnNode>, Vec<ColumnNode>, PlHashSet<PlSmallStr>) {
141
// If node above has as many columns as the projection there is nothing to pushdown.
142
if !expands_schema && down_schema.len() == acc_projections.len() {
143
let local_projections = acc_projections;
144
(vec![], local_projections, PlHashSet::new())
145
} else {
146
let (acc_projections, local_projections): (Vec<_>, Vec<_>) = acc_projections
147
.into_iter()
148
.partition(|expr| check_input_column_node(*expr, down_schema, expr_arena));
149
let mut names = PlHashSet::default();
150
for proj in &acc_projections {
151
let name = column_node_to_name(*proj, expr_arena).clone();
152
names.insert(name);
153
}
154
(acc_projections, local_projections, names)
155
}
156
}
157
158
/// utility function such that we can recurse all binary expressions in the expression tree
159
fn add_expr_to_accumulated(
160
expr: Node,
161
acc_projections: &mut Vec<ColumnNode>,
162
projected_names: &mut PlHashSet<PlSmallStr>,
163
expr_arena: &Arena<AExpr>,
164
) {
165
for root_node in aexpr_to_column_nodes_iter(expr, expr_arena) {
166
let name = column_node_to_name(root_node, expr_arena).clone();
167
if projected_names.insert(name) {
168
acc_projections.push(root_node)
169
}
170
}
171
}
172
173
fn add_str_to_accumulated(
174
name: PlSmallStr,
175
ctx: &mut ProjectionContext,
176
expr_arena: &mut Arena<AExpr>,
177
) {
178
// if not pushed down: all columns are already projected.
179
if ctx.has_pushed_down() && !ctx.projected_names.contains(&name) {
180
let node = expr_arena.add(AExpr::Column(name));
181
add_expr_to_accumulated(
182
node,
183
&mut ctx.acc_projections,
184
&mut ctx.projected_names,
185
expr_arena,
186
);
187
}
188
}
189
190
fn update_scan_schema(
191
acc_projections: &[ColumnNode],
192
expr_arena: &Arena<AExpr>,
193
schema: &Schema,
194
sort_projections: bool,
195
) -> PolarsResult<Schema> {
196
let mut new_schema = Schema::with_capacity(acc_projections.len());
197
let mut new_cols = Vec::with_capacity(acc_projections.len());
198
for node in acc_projections.iter() {
199
let name = column_node_to_name(*node, expr_arena);
200
let item = schema.try_get_full(name)?;
201
new_cols.push(item);
202
}
203
// make sure that the projections are sorted by the schema.
204
if sort_projections {
205
new_cols.sort_unstable_by_key(|item| item.0);
206
}
207
for item in new_cols {
208
new_schema.with_column(item.1.clone(), item.2.clone());
209
}
210
Ok(new_schema)
211
}
212
213
pub struct ProjectionPushDown {
214
pub is_count_star: bool,
215
}
216
217
impl ProjectionPushDown {
218
pub(super) fn new() -> Self {
219
Self {
220
is_count_star: false,
221
}
222
}
223
224
/// Projection will be done at this node, but we continue optimization
225
fn no_pushdown_restart_opt(
226
&mut self,
227
lp: IR,
228
ctx: ProjectionContext,
229
lp_arena: &mut Arena<IR>,
230
expr_arena: &mut Arena<AExpr>,
231
) -> PolarsResult<IR> {
232
let inputs = lp.get_inputs();
233
234
let new_inputs = inputs
235
.into_iter()
236
.map(|node| {
237
let alp = lp_arena.take(node);
238
let ctx = ProjectionContext::new(Default::default(), Default::default(), ctx.inner);
239
let alp = self.push_down(alp, ctx, lp_arena, expr_arena)?;
240
lp_arena.replace(node, alp);
241
Ok(node)
242
})
243
.collect::<PolarsResult<UnitVec<_>>>()?;
244
let lp = lp.with_inputs(new_inputs);
245
246
let builder = IRBuilder::from_lp(lp, expr_arena, lp_arena);
247
Ok(self.finish_node_simple_projection(&ctx.acc_projections, builder))
248
}
249
250
fn finish_node_simple_projection(
251
&mut self,
252
local_projections: &[ColumnNode],
253
builder: IRBuilder,
254
) -> IR {
255
if !local_projections.is_empty() {
256
builder
257
.project_simple_nodes(local_projections.iter().map(|node| node.0))
258
.unwrap()
259
.build()
260
} else {
261
builder.build()
262
}
263
}
264
265
fn finish_node(&mut self, local_projections: Vec<ExprIR>, builder: IRBuilder) -> IR {
266
if !local_projections.is_empty() {
267
builder
268
.project(local_projections, Default::default())
269
.build()
270
} else {
271
builder.build()
272
}
273
}
274
275
#[allow(clippy::too_many_arguments)]
276
#[allow(unused)]
277
fn join_push_down(
278
&mut self,
279
schema_left: &Schema,
280
schema_right: &Schema,
281
proj: ColumnNode,
282
pushdown_left: &mut Vec<ColumnNode>,
283
pushdown_right: &mut Vec<ColumnNode>,
284
names_left: &mut PlHashSet<PlSmallStr>,
285
names_right: &mut PlHashSet<PlSmallStr>,
286
expr_arena: &Arena<AExpr>,
287
) -> (bool, bool) {
288
let mut pushed_at_least_one = false;
289
let mut already_projected = false;
290
291
let name = column_node_to_name(proj, expr_arena);
292
let is_in_left = names_left.contains(name);
293
let is_in_right = names_right.contains(name);
294
already_projected |= is_in_left;
295
already_projected |= is_in_right;
296
297
if check_input_column_node(proj, schema_left, expr_arena) && !is_in_left {
298
names_left.insert(name.clone());
299
pushdown_left.push(proj);
300
pushed_at_least_one = true;
301
}
302
if check_input_column_node(proj, schema_right, expr_arena) && !is_in_right {
303
names_right.insert(name.clone());
304
pushdown_right.push(proj);
305
pushed_at_least_one = true;
306
}
307
308
(pushed_at_least_one, already_projected)
309
}
310
311
/// This pushes down current node and assigns the result to this node.
312
fn pushdown_and_assign(
313
&mut self,
314
input: Node,
315
ctx: ProjectionContext,
316
lp_arena: &mut Arena<IR>,
317
expr_arena: &mut Arena<AExpr>,
318
) -> PolarsResult<()> {
319
let alp = lp_arena.take(input);
320
let lp = self.push_down(alp, ctx, lp_arena, expr_arena)?;
321
lp_arena.replace(input, lp);
322
Ok(())
323
}
324
325
/// This pushes down the projection that are validated
326
/// that they can be done successful at the schema above
327
/// The result is assigned to this node.
328
///
329
/// The local projections are return and still have to be applied
330
fn pushdown_and_assign_check_schema(
331
&mut self,
332
input: Node,
333
mut ctx: ProjectionContext,
334
lp_arena: &mut Arena<IR>,
335
expr_arena: &mut Arena<AExpr>,
336
// an unnest changes/expands the schema
337
expands_schema: bool,
338
) -> PolarsResult<Vec<ColumnNode>> {
339
let alp = lp_arena.take(input);
340
let down_schema = alp.schema(lp_arena);
341
342
let (acc_projections, local_projections, names) = split_acc_projections(
343
ctx.acc_projections,
344
&down_schema,
345
expr_arena,
346
expands_schema,
347
);
348
349
ctx.acc_projections = acc_projections;
350
ctx.projected_names = names;
351
352
let lp = self.push_down(alp, ctx, lp_arena, expr_arena)?;
353
lp_arena.replace(input, lp);
354
Ok(local_projections)
355
}
356
357
/// Projection pushdown optimizer
358
///
359
/// # Arguments
360
///
361
/// * `IR` - Arena based logical plan tree representing the query.
362
/// * `acc_projections` - The projections we accumulate during tree traversal.
363
/// * `names` - We keep track of the names to ensure we don't do duplicate projections.
364
/// * `projections_seen` - Count the number of projection operations during tree traversal.
365
/// * `lp_arena` - The local memory arena for the logical plan.
366
/// * `expr_arena` - The local memory arena for the expressions.
367
#[recursive]
368
fn push_down(
369
&mut self,
370
logical_plan: IR,
371
mut ctx: ProjectionContext,
372
lp_arena: &mut Arena<IR>,
373
expr_arena: &mut Arena<AExpr>,
374
) -> PolarsResult<IR> {
375
use IR::*;
376
377
match logical_plan {
378
Select { expr, input, .. } => {
379
process_projection(self, input, expr, ctx, lp_arena, expr_arena, false)
380
},
381
SimpleProjection { columns, input, .. } => {
382
let exprs = names_to_expr_irs(columns.iter_names_cloned(), expr_arena);
383
process_projection(self, input, exprs, ctx, lp_arena, expr_arena, true)
384
},
385
DataFrameScan {
386
df,
387
schema,
388
mut output_schema,
389
..
390
} => {
391
// TODO: Just project 0-width morsels.
392
if self.is_count_star {
393
ctx.process_count_star_at_scan(&schema, expr_arena);
394
}
395
if ctx.has_pushed_down() {
396
output_schema = Some(Arc::new(update_scan_schema(
397
&ctx.acc_projections,
398
expr_arena,
399
&schema,
400
false,
401
)?));
402
}
403
let lp = DataFrameScan {
404
df,
405
schema,
406
output_schema,
407
};
408
Ok(lp)
409
},
410
#[cfg(feature = "python")]
411
PythonScan { mut options } => {
412
if self.is_count_star {
413
ctx.process_count_star_at_scan(&options.schema, expr_arena);
414
}
415
416
let normalize_order_schema = Some(&*options.schema);
417
418
options.with_columns = get_scan_columns(
419
&ctx.acc_projections,
420
expr_arena,
421
None,
422
None,
423
normalize_order_schema,
424
);
425
426
options.output_schema = if options.with_columns.is_none() {
427
None
428
} else {
429
Some(Arc::new(update_scan_schema(
430
&ctx.acc_projections,
431
expr_arena,
432
&options.schema,
433
true,
434
)?))
435
};
436
Ok(PythonScan { options })
437
},
438
Scan {
439
sources,
440
mut file_info,
441
hive_parts,
442
scan_type,
443
predicate,
444
predicate_file_skip_applied,
445
mut unified_scan_args,
446
mut output_schema,
447
} => {
448
let do_optimization = match &*scan_type {
449
FileScanIR::Anonymous { function, .. } => function.allows_projection_pushdown(),
450
#[cfg(feature = "json")]
451
FileScanIR::NDJson { .. } => true,
452
#[cfg(feature = "ipc")]
453
FileScanIR::Ipc { .. } => true,
454
#[cfg(feature = "csv")]
455
FileScanIR::Csv { .. } => true,
456
#[cfg(feature = "parquet")]
457
FileScanIR::Parquet { .. } => true,
458
#[cfg(feature = "scan_lines")]
459
FileScanIR::Lines { .. } => true,
460
// MultiScan will handle it if the PythonDataset cannot do projections.
461
#[cfg(feature = "python")]
462
FileScanIR::PythonDataset { .. } => true,
463
};
464
465
#[expect(clippy::never_loop)]
466
loop {
467
if !do_optimization {
468
break;
469
}
470
471
if self.is_count_star {
472
if let FileScanIR::Anonymous { .. } = &*scan_type {
473
// Anonymous scan is not controlled by us, we don't know if it can support
474
// 0-column projections, so we always project one.
475
use either::Either;
476
477
let projection: Arc<[PlSmallStr]> = match &file_info.reader_schema {
478
Some(Either::Left(s)) => s.iter_names().next(),
479
Some(Either::Right(s)) => s.iter_names().next(),
480
None => None,
481
}
482
.into_iter()
483
.cloned()
484
.collect();
485
486
unified_scan_args.projection = Some(projection.clone());
487
488
if projection.is_empty() {
489
output_schema = Some(Default::default());
490
break;
491
}
492
493
ctx.acc_projections.push(ColumnNode(
494
expr_arena.add(AExpr::Column(projection[0].clone())),
495
));
496
497
unified_scan_args.projection = Some(projection)
498
} else {
499
// All nodes in new-streaming support projecting empty morsels with the correct height
500
// from the file.
501
unified_scan_args.projection = Some(Arc::from([]));
502
output_schema = Some(Default::default());
503
break;
504
};
505
}
506
507
unified_scan_args.projection = get_scan_columns(
508
&ctx.acc_projections,
509
expr_arena,
510
unified_scan_args.row_index.as_ref(),
511
unified_scan_args.include_file_paths.as_deref(),
512
None,
513
);
514
515
output_schema = if unified_scan_args.projection.is_some() {
516
let mut schema = update_scan_schema(
517
&ctx.acc_projections,
518
expr_arena,
519
&file_info.schema,
520
scan_type.sort_projection(unified_scan_args.row_index.is_some()),
521
)?;
522
523
if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
524
if let Some(i) = schema.index_of(file_path_col) {
525
let (name, dtype) = schema.shift_remove_index(i).unwrap();
526
schema.insert_at_index(schema.len(), name, dtype)?;
527
}
528
}
529
530
Some(Arc::new(schema))
531
} else {
532
None
533
};
534
535
break;
536
}
537
538
// File builder has a row index, but projected columns
539
// do not include it, so cull.
540
if let Some(RowIndex { ref name, .. }) = unified_scan_args.row_index {
541
if output_schema
542
.as_ref()
543
.is_some_and(|schema| !schema.contains(name))
544
{
545
// Need to remove it from the input schema so
546
// that projection indices are correct.
547
let mut file_schema = Arc::unwrap_or_clone(file_info.schema);
548
file_schema.shift_remove(name);
549
file_info.schema = Arc::new(file_schema);
550
unified_scan_args.row_index = None;
551
}
552
};
553
554
if let Some(col_name) = &unified_scan_args.include_file_paths {
555
if output_schema
556
.as_ref()
557
.is_some_and(|schema| !schema.contains(col_name))
558
{
559
// Need to remove it from the input schema so
560
// that projection indices are correct.
561
let mut file_schema = Arc::unwrap_or_clone(file_info.schema);
562
file_schema.shift_remove(col_name);
563
file_info.schema = Arc::new(file_schema);
564
unified_scan_args.include_file_paths = None;
565
}
566
};
567
568
let lp = Scan {
569
sources,
570
file_info,
571
hive_parts,
572
output_schema,
573
scan_type,
574
predicate,
575
predicate_file_skip_applied,
576
unified_scan_args,
577
};
578
579
Ok(lp)
580
},
581
Sort {
582
input,
583
by_column,
584
slice,
585
sort_options,
586
} => {
587
if ctx.has_pushed_down() {
588
// Make sure that the column(s) used for the sort is projected
589
by_column.iter().for_each(|node| {
590
add_expr_to_accumulated(
591
node.node(),
592
&mut ctx.acc_projections,
593
&mut ctx.projected_names,
594
expr_arena,
595
);
596
});
597
}
598
599
self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
600
Ok(Sort {
601
input,
602
by_column,
603
slice,
604
sort_options,
605
})
606
},
607
Distinct { input, options } => {
608
// make sure that the set of unique columns is projected
609
if ctx.has_pushed_down() {
610
if let Some(subset) = options.subset.as_ref() {
611
subset.iter().for_each(|name| {
612
add_str_to_accumulated(name.clone(), &mut ctx, expr_arena)
613
})
614
} else {
615
// distinct needs all columns
616
let input_schema = lp_arena.get(input).schema(lp_arena);
617
for name in input_schema.iter_names() {
618
add_str_to_accumulated(name.clone(), &mut ctx, expr_arena)
619
}
620
}
621
}
622
623
self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
624
Ok(Distinct { input, options })
625
},
626
Filter { predicate, input } => {
627
if ctx.has_pushed_down() {
628
// make sure that the filter column is projected
629
add_expr_to_accumulated(
630
predicate.node(),
631
&mut ctx.acc_projections,
632
&mut ctx.projected_names,
633
expr_arena,
634
);
635
};
636
self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;
637
Ok(Filter { predicate, input })
638
},
639
GroupBy {
640
input,
641
keys,
642
aggs,
643
apply,
644
schema,
645
maintain_order,
646
options,
647
} => process_group_by(
648
self,
649
input,
650
keys,
651
aggs,
652
apply,
653
schema,
654
maintain_order,
655
options,
656
ctx,
657
lp_arena,
658
expr_arena,
659
),
660
join_ir @ Join { .. } => process_join(join_ir, ctx, self, lp_arena, expr_arena),
661
HStack {
662
input,
663
exprs,
664
options,
665
..
666
} => process_hstack(self, input, exprs, options, ctx, lp_arena, expr_arena),
667
ExtContext {
668
input, contexts, ..
669
} => {
670
// local projections are ignored. These are just root nodes
671
// complex expression will still be done later
672
let _local_projections =
673
self.pushdown_and_assign_check_schema(input, ctx, lp_arena, expr_arena, false)?;
674
675
let mut new_schema = lp_arena
676
.get(input)
677
.schema(lp_arena)
678
.as_ref()
679
.as_ref()
680
.clone();
681
682
for node in &contexts {
683
let other_schema = lp_arena.get(*node).schema(lp_arena);
684
for fld in other_schema.iter_fields() {
685
if new_schema.get(fld.name()).is_none() {
686
new_schema.with_column(fld.name, fld.dtype);
687
}
688
}
689
}
690
691
Ok(ExtContext {
692
input,
693
contexts,
694
schema: Arc::new(new_schema),
695
})
696
},
697
MapFunction { input, function } => {
698
functions::process_functions(self, input, function, ctx, lp_arena, expr_arena)
699
},
700
HConcat {
701
inputs,
702
schema,
703
options,
704
} => process_hconcat(self, inputs, schema, options, ctx, lp_arena, expr_arena),
705
lp @ Union { .. } => process_generic(self, lp, ctx, lp_arena, expr_arena),
706
// These nodes only have inputs and exprs, so we can use same logic.
707
lp @ Slice { .. } | lp @ Sink { .. } | lp @ SinkMultiple { .. } => {
708
process_generic(self, lp, ctx, lp_arena, expr_arena)
709
},
710
Cache { .. } => {
711
// projections above this cache will be accumulated and pushed down
712
// later
713
// the redundant projection will be cleaned in the fast projection optimization
714
// phase.
715
if ctx.acc_projections.is_empty() {
716
Ok(logical_plan)
717
} else {
718
Ok(IRBuilder::from_lp(logical_plan, expr_arena, lp_arena)
719
.project_simple_nodes(ctx.acc_projections)
720
.unwrap()
721
.build())
722
}
723
},
724
#[cfg(feature = "merge_sorted")]
725
MergeSorted {
726
input_left,
727
input_right,
728
key,
729
} => {
730
if ctx.has_pushed_down() {
731
// make sure that the filter column is projected
732
add_str_to_accumulated(key.clone(), &mut ctx, expr_arena);
733
};
734
735
self.pushdown_and_assign(input_left, ctx.clone(), lp_arena, expr_arena)?;
736
self.pushdown_and_assign(input_right, ctx, lp_arena, expr_arena)?;
737
738
Ok(MergeSorted {
739
input_left,
740
input_right,
741
key,
742
})
743
},
744
Invalid => unreachable!(),
745
}
746
}
747
748
pub fn optimize(
749
&mut self,
750
logical_plan: IR,
751
lp_arena: &mut Arena<IR>,
752
expr_arena: &mut Arena<AExpr>,
753
) -> PolarsResult<IR> {
754
let ctx = ProjectionContext::default();
755
self.push_down(logical_plan, ctx, lp_arena, expr_arena)
756
}
757
}
758
759