Path: blob/main/crates/polars-plan/src/plans/optimizer/set_order/mod.rs
7889 views
//! Pass to obtain and optimize using exhaustive row-order information.1//!2//! This pass attaches an ordering flag to all edges between IR nodes. When this flag is `true`,3//! this edge needs to be ordered.4//!5//! The pass performs two passes over the IR graph. First, it assigns and pushes ordering down from6//! the sinks to the leaves. Second, it pulls those orderings back up from the leaves to the sinks.7//! The two passes weaken order guarantees and simplify IR nodes where possible.8//!9//! When the two passes are done, we are left with a map from all nodes to the ordering status of10//! their inputs.1112mod expr_pullup;13mod expr_pushdown;14mod ir_pullup;15mod ir_pushdown;1617use polars_core::prelude::PlHashMap;18use polars_utils::arena::{Arena, Node};19use polars_utils::idx_vec::UnitVec;20use polars_utils::unique_id::UniqueId;2122use super::IR;23use crate::plans::AExpr;24use crate::plans::ir::inputs::Inputs;2526/// Optimize the orderings used in the IR plan and get the relative orderings of all edges.27///28/// All roots should be `Sink` nodes and no `SinkMultiple` or `Invalid` are allowed to be part of29/// the graph.30pub fn simplify_and_fetch_orderings(31roots: &[Node],32ir_arena: &mut Arena<IR>,33expr_arena: &mut Arena<AExpr>,34) -> PlHashMap<Node, UnitVec<bool>> {35let mut leaves = Vec::new();36let mut outputs = PlHashMap::default();37let mut cache_proxy = PlHashMap::<UniqueId, Vec<Node>>::default();3839// Get the per-node outputs and leaves40{41let mut stack = Vec::new();4243for root in roots {44assert!(matches!(ir_arena.get(*root), IR::Sink { .. }));45outputs.insert(*root, Vec::new());46stack.extend(47ir_arena48.get(*root)49.inputs()50.enumerate()51.map(|(root_input_idx, node)| ((*root, root_input_idx), node)),52);53}5455while let Some(((parent, parent_input_idx), node)) = stack.pop() {56let ir = ir_arena.get(node);57let node = match ir {58IR::Cache { id, .. } => {59let nodes = cache_proxy.entry(*id).or_default();60nodes.push(node);61nodes[0]62},63_ => node,64};6566let outputs = outputs.entry(node).or_default();67let has_been_visisited_before = !outputs.is_empty();68outputs.push((parent, parent_input_idx));6970if has_been_visisited_before {71continue;72}7374let inputs = ir.inputs();75if matches!(inputs, Inputs::Empty) {76leaves.push(node);77}78stack.extend(79inputs80.enumerate()81.map(|(node_input_idx, input)| ((node, node_input_idx), input)),82);83}84}8586// Pushdown and optimize orders from the roots to the leaves.87let mut orders =88ir_pushdown::pushdown_orders(roots, ir_arena, expr_arena, &mut outputs, &cache_proxy);89// Pullup orders from the leaves to the roots.90ir_pullup::pullup_orders(91&leaves,92ir_arena,93expr_arena,94&mut outputs,95&mut orders,96&cache_proxy,97);9899// @Hack. Since not all caches might share the same node and the input of caches might have100// been updated, we need to ensure that all caches again have the same input.101//102// This can be removed when all caches with the same id share the same IR node.103for nodes in cache_proxy.into_values() {104let updated_node = nodes[0];105let order = orders[&updated_node].clone();106let IR::Cache {107input: updated_input,108id: _,109} = ir_arena.get(updated_node)110else {111unreachable!();112};113let updated_input = *updated_input;114for n in &nodes[1..] {115let IR::Cache { input, id: _ } = ir_arena.get_mut(*n) else {116unreachable!();117};118119orders.insert(*n, order.clone());120*input = updated_input;121}122}123124orders125}126127128