Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/delay_rechunk.rs
6940 views
1
use std::collections::BTreeSet;
2
3
use super::*;
4
5
#[derive(Default)]
6
pub(super) struct DelayRechunk {
7
processed: BTreeSet<usize>,
8
}
9
10
impl DelayRechunk {
11
pub(super) fn new() -> Self {
12
Default::default()
13
}
14
}
15
16
impl OptimizationRule for DelayRechunk {
17
fn optimize_plan(
18
&mut self,
19
lp_arena: &mut Arena<IR>,
20
_expr_arena: &mut Arena<AExpr>,
21
node: Node,
22
) -> PolarsResult<Option<IR>> {
23
match lp_arena.get(node) {
24
// An aggregation can be partitioned, its wasteful to rechunk before that partition.
25
#[allow(unused_mut)]
26
IR::GroupBy { input, keys, .. } => {
27
// Multiple keys on multiple chunks is much slower, so rechunk.
28
if !self.processed.insert(node.0) || keys.len() > 1 {
29
return Ok(None);
30
};
31
32
use IR::*;
33
let mut input_node = None;
34
for (node, lp) in lp_arena.iter(*input) {
35
match lp {
36
Scan { .. } => {
37
input_node = Some(node);
38
break;
39
},
40
Union { .. } => {
41
input_node = Some(node);
42
break;
43
},
44
// don't delay rechunk if there is a join first
45
Join { .. } => break,
46
_ => {},
47
}
48
}
49
50
if let Some(node) = input_node {
51
match lp_arena.get_mut(node) {
52
Scan {
53
unified_scan_args, ..
54
} => {
55
unified_scan_args.rechunk = false;
56
},
57
Union { options, .. } => {
58
options.rechunk = false;
59
},
60
_ => unreachable!(),
61
}
62
};
63
64
Ok(None)
65
},
66
_ => Ok(None),
67
}
68
}
69
}
70
71