Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/plans/optimizer/set_order/mod.rs
7889 views
1
//! Pass to obtain and optimize using exhaustive row-order information.
2
//!
3
//! This pass attaches an ordering flag to all edges between IR nodes. When this flag is `true`,
4
//! this edge needs to be ordered.
5
//!
6
//! The pass performs two passes over the IR graph. First, it assigns and pushes ordering down from
7
//! the sinks to the leaves. Second, it pulls those orderings back up from the leaves to the sinks.
8
//! The two passes weaken order guarantees and simplify IR nodes where possible.
9
//!
10
//! When the two passes are done, we are left with a map from all nodes to the ordering status of
11
//! their inputs.
12
13
mod expr_pullup;
14
mod expr_pushdown;
15
mod ir_pullup;
16
mod ir_pushdown;
17
18
use polars_core::prelude::PlHashMap;
19
use polars_utils::arena::{Arena, Node};
20
use polars_utils::idx_vec::UnitVec;
21
use polars_utils::unique_id::UniqueId;
22
23
use super::IR;
24
use crate::plans::AExpr;
25
use crate::plans::ir::inputs::Inputs;
26
27
/// Optimize the orderings used in the IR plan and get the relative orderings of all edges.
28
///
29
/// All roots should be `Sink` nodes and no `SinkMultiple` or `Invalid` are allowed to be part of
30
/// the graph.
31
pub fn simplify_and_fetch_orderings(
32
roots: &[Node],
33
ir_arena: &mut Arena<IR>,
34
expr_arena: &mut Arena<AExpr>,
35
) -> PlHashMap<Node, UnitVec<bool>> {
36
let mut leaves = Vec::new();
37
let mut outputs = PlHashMap::default();
38
let mut cache_proxy = PlHashMap::<UniqueId, Vec<Node>>::default();
39
40
// Get the per-node outputs and leaves
41
{
42
let mut stack = Vec::new();
43
44
for root in roots {
45
assert!(matches!(ir_arena.get(*root), IR::Sink { .. }));
46
outputs.insert(*root, Vec::new());
47
stack.extend(
48
ir_arena
49
.get(*root)
50
.inputs()
51
.enumerate()
52
.map(|(root_input_idx, node)| ((*root, root_input_idx), node)),
53
);
54
}
55
56
while let Some(((parent, parent_input_idx), node)) = stack.pop() {
57
let ir = ir_arena.get(node);
58
let node = match ir {
59
IR::Cache { id, .. } => {
60
let nodes = cache_proxy.entry(*id).or_default();
61
nodes.push(node);
62
nodes[0]
63
},
64
_ => node,
65
};
66
67
let outputs = outputs.entry(node).or_default();
68
let has_been_visisited_before = !outputs.is_empty();
69
outputs.push((parent, parent_input_idx));
70
71
if has_been_visisited_before {
72
continue;
73
}
74
75
let inputs = ir.inputs();
76
if matches!(inputs, Inputs::Empty) {
77
leaves.push(node);
78
}
79
stack.extend(
80
inputs
81
.enumerate()
82
.map(|(node_input_idx, input)| ((node, node_input_idx), input)),
83
);
84
}
85
}
86
87
// Pushdown and optimize orders from the roots to the leaves.
88
let mut orders =
89
ir_pushdown::pushdown_orders(roots, ir_arena, expr_arena, &mut outputs, &cache_proxy);
90
// Pullup orders from the leaves to the roots.
91
ir_pullup::pullup_orders(
92
&leaves,
93
ir_arena,
94
expr_arena,
95
&mut outputs,
96
&mut orders,
97
&cache_proxy,
98
);
99
100
// @Hack. Since not all caches might share the same node and the input of caches might have
101
// been updated, we need to ensure that all caches again have the same input.
102
//
103
// This can be removed when all caches with the same id share the same IR node.
104
for nodes in cache_proxy.into_values() {
105
let updated_node = nodes[0];
106
let order = orders[&updated_node].clone();
107
let IR::Cache {
108
input: updated_input,
109
id: _,
110
} = ir_arena.get(updated_node)
111
else {
112
unreachable!();
113
};
114
let updated_input = *updated_input;
115
for n in &nodes[1..] {
116
let IR::Cache { input, id: _ } = ir_arena.get_mut(*n) else {
117
unreachable!();
118
};
119
120
orders.insert(*n, order.clone());
121
*input = updated_input;
122
}
123
}
124
125
orders
126
}
127
128