Path: blob/main/crates/polars-plan/src/plans/optimizer/set_order.rs
6940 views
//! Pass to obtain and optimize using exhaustive row-order information.1//!2//! This pass attaches order information to all the IR node input and output ports.3//!4//! The pass performs two passes over the IR graph. First, it assigns and pushes ordering down from5//! the sinks to the leaves. Second, it pulls those orderings back up from the leaves to the sinks.6//! The two passes weaken order guarantees and simplify IR nodes where possible.7//!8//! When the two passes are done, we are left with a map from all the nodes to `PortOrder` which9//! contains the input and output port ordering information.1011use std::sync::Arc;1213use polars_core::frame::UniqueKeepStrategy;14use polars_core::prelude::PlHashMap;15use polars_ops::frame::{JoinType, MaintainOrderJoin};16use polars_utils::arena::{Arena, Node};17use polars_utils::idx_vec::UnitVec;18use polars_utils::unique_id::UniqueId;1920use super::IR;21use crate::dsl::{SinkTypeIR, UnionOptions};22use crate::plans::ir::inputs::Inputs;23use crate::plans::{AExpr, is_order_sensitive_amortized, is_scalar_ae};2425#[derive(Debug, Clone, Copy)]26pub enum InputOrder {27/// The input may receive data in an undefined order.28Unordered,29/// The input may propagate ordering into one or more of its outputs.30Preserving,31/// The input observes ordering and may propagate ordering into one or more of its outputs.32Observing,33/// The input observes and terminates ordering.34Consuming,35}3637/// The ordering of the input and output ports of an IR node.38///39/// This gives information about how row ordering may be received, observed and passed an IR node.40///41/// Some general rules:42/// - An input or output being `Unordered` signifies that on that port can receive rows in any43/// order i.e. a shuffle could be inserted and the result would still be correct.44/// - If any input is `Observing` or `Consuming`, it is important that the rows on this input are45/// received in the order specified by the input.46/// - If any output is ordered it is able to propgate on the ordering from any input that is47/// `Preserving` or `Observing`. Conversely, if no output is ordered or no input is `Preserving`48/// or `Observing`, no input order may be propagated to any of the outputs.49#[derive(Debug, Clone)]50pub struct PortOrder {51pub inputs: UnitVec<InputOrder>,52pub output_ordered: UnitVec<bool>,53}5455impl PortOrder {56pub fn new(57inputs: impl IntoIterator<Item = InputOrder>,58output_ordered: impl IntoIterator<Item = bool>,59) -> Self {60Self {61inputs: inputs.into_iter().collect(),62output_ordered: output_ordered.into_iter().collect(),63}64}6566fn set_unordered_output(&mut self) {67self.output_ordered.iter_mut().for_each(|o| *o = false);68}69}7071/// Remove ordering from both sides if either side has an undefined order.72fn simplify_edge(tx: bool, rx: InputOrder) -> (bool, InputOrder) {73use InputOrder as I;74match (tx, rx) {75(false, _) | (_, I::Unordered) => (false, I::Unordered),76(o, i) => (o, i),77}78}7980fn pushdown_orders(81roots: &[Node],82ir_arena: &mut Arena<IR>,83expr_arena: &Arena<AExpr>,84outputs: &mut PlHashMap<Node, UnitVec<Node>>,85cache_proxy: &PlHashMap<UniqueId, Vec<Node>>,86) -> PlHashMap<Node, PortOrder> {87let mut orders: PlHashMap<Node, PortOrder> = PlHashMap::default();88let mut node_hits: PlHashMap<Node, Vec<(usize, Node)>> = PlHashMap::default();89let mut aexpr_stack = Vec::new();90let mut stack = Vec::new();91let mut output_port_orderings = Vec::new();9293stack.extend(roots.iter().map(|n| (*n, None)));9495while let Some((node, outgoing)) = stack.pop() {96// @Hack. The IR creates caches for every path at the moment. That is super hacky. So is97// this, but we need to work around it.98let node = match ir_arena.get(node) {99IR::Cache { id, .. } => cache_proxy.get(id).unwrap()[0],100_ => node,101};102103debug_assert!(!orders.contains_key(&node));104105let node_outputs = &outputs[&node];106let hits = node_hits.entry(node).or_default();107if let Some(outgoing) = outgoing {108hits.push(outgoing);109if hits.len() < node_outputs.len() {110continue;111}112}113114output_port_orderings.clear();115output_port_orderings.extend(116hits.iter().map(|(to_input_idx, to_node)| {117orders.get_mut(to_node).unwrap().inputs[*to_input_idx]118}),119);120121let all_outputs_unordered = output_port_orderings122.iter()123.all(|i| matches!(i, I::Unordered));124125// Pushdown simplification rules.126let ir = ir_arena.get_mut(node);127use {InputOrder as I, MaintainOrderJoin as MOJ, PortOrder as P};128let mut node_ordering = match ir {129IR::Cache { .. } if all_outputs_unordered => P::new([I::Unordered], [false]),130IR::Cache { .. } => P::new(131[I::Preserving],132output_port_orderings133.iter()134.map(|i| !matches!(i, I::Unordered))135.collect::<Box<[_]>>(),136),137IR::Sort { input, slice, .. } if slice.is_none() && all_outputs_unordered => {138// _ -> Unordered139//140// Remove sort.141let input = *input;142let (to_input_idx, to_node) = outgoing.unwrap();143*ir_arena144.get_mut(to_node)145.inputs_mut()146.nth(to_input_idx)147.unwrap() = input;148149// @Performance: Linear search150*outputs151.get_mut(&input)152.unwrap()153.iter_mut()154.find(|o| **o == node)155.unwrap() = to_node;156157stack.push((input, outgoing));158continue;159},160IR::Sort {161by_column,162sort_options,163..164} => {165let input = if sort_options.maintain_order {166I::Consuming167} else {168let mut has_order_sensitive = false;169for e in by_column {170let aexpr = expr_arena.get(e.node());171has_order_sensitive |=172is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);173}174175if has_order_sensitive {176I::Consuming177} else {178I::Unordered179}180};181182P::new([input], [true])183},184IR::GroupBy {185keys,186aggs,187maintain_order,188apply,189options,190..191} => {192*maintain_order &= !all_outputs_unordered;193194let (input, output) = if apply.is_some()195|| options.is_dynamic()196|| options.is_rolling()197|| *maintain_order198{199(I::Consuming, true)200} else {201// _ -> Unordered202// to203// maintain_order = false204// and205// Unordered -> Unordered (if no order sensitive expressions)206207*maintain_order = false;208let mut has_order_sensitive = false;209for e in keys.iter().chain(aggs.iter()) {210let aexpr = expr_arena.get(e.node());211has_order_sensitive |=212is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);213}214215// The auto-implode is also other sensitive.216has_order_sensitive |=217aggs.iter().any(|agg| !is_scalar_ae(agg.node(), expr_arena));218219(220if has_order_sensitive {221I::Consuming222} else {223I::Unordered224},225false,226)227};228229P::new([input], [output])230},231#[cfg(feature = "merge_sorted")]232IR::MergeSorted {233input_left,234input_right,235..236} => {237if all_outputs_unordered {238// MergeSorted239// (_, _) -> Unordered240// to241// UnorderedUnion([left, right])242*ir = IR::Union {243inputs: vec![*input_left, *input_right],244options: UnionOptions {245maintain_order: false,246..Default::default()247},248};249P::new([I::Unordered, I::Unordered], [false])250} else {251P::new([I::Observing, I::Observing], [true])252}253},254#[cfg(feature = "asof_join")]255IR::Join { options, .. } if matches!(options.args.how, JoinType::AsOf(_)) => {256P::new([I::Observing, I::Observing], [!all_outputs_unordered])257},258IR::Join {259input_left: _,260input_right: _,261schema: _,262left_on,263right_on,264options,265} if all_outputs_unordered => {266// If the join maintains order, but the output has undefined order. Remove the267// ordering.268if !matches!(options.args.maintain_order, MOJ::None) {269let mut new_options = options.as_ref().clone();270new_options.args.maintain_order = MOJ::None;271*options = Arc::new(new_options);272}273274let mut inputs = [I::Consuming, I::Consuming];275276// If either side does not need to maintain order, don't maintain the old on that277// side.278for (i, on) in [left_on, right_on].iter().enumerate() {279let mut has_order_sensitive = false;280for e in on.iter() {281let aexpr = expr_arena.get(e.node());282has_order_sensitive |=283is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);284}285286if !has_order_sensitive {287inputs[i] = I::Unordered;288}289}290291P::new(inputs, [false])292},293IR::Join {294input_left: _,295input_right: _,296schema: _,297left_on,298right_on,299options,300} => {301let mut left_has_order_sensitive = false;302let mut right_has_order_sensitive = false;303304for e in left_on {305let aexpr = expr_arena.get(e.node());306left_has_order_sensitive |=307is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);308}309for e in right_on {310let aexpr = expr_arena.get(e.node());311right_has_order_sensitive |=312is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);313}314315use MaintainOrderJoin as M;316let left_input = match (317options.args.maintain_order,318left_has_order_sensitive,319options.args.slice.is_some(),320) {321(M::Left | M::LeftRight, true, _)322| (M::Left | M::LeftRight | M::RightLeft, _, true) => I::Observing,323(M::Left | M::LeftRight, false, _) => I::Preserving,324(M::RightLeft, _, _) | (_, true, _) => I::Consuming,325_ => I::Unordered,326};327let right_input = match (328options.args.maintain_order,329right_has_order_sensitive,330options.args.slice.is_some(),331) {332(M::Right | M::RightLeft, true, _)333| (M::Right | M::LeftRight | M::RightLeft, _, true) => I::Observing,334(M::Right | M::RightLeft, false, _) => I::Preserving,335(M::LeftRight, _, _) | (_, true, _) => I::Consuming,336_ => I::Unordered,337};338let output = !matches!(options.args.maintain_order, M::None);339340P::new([left_input, right_input], [output])341},342IR::Distinct { input: _, options } => {343options.maintain_order &= !all_outputs_unordered;344345let input = if options.maintain_order346|| matches!(347options.keep_strategy,348UniqueKeepStrategy::First | UniqueKeepStrategy::Last349) {350I::Observing351} else {352I::Unordered353};354P::new([input], [options.maintain_order])355},356IR::MapFunction { input: _, function } => {357let input = if function.is_streamable() {358if all_outputs_unordered {359I::Unordered360} else {361I::Preserving362}363} else {364I::Consuming365};366367P::new([input], [!all_outputs_unordered])368},369IR::SimpleProjection { .. } => {370let input = if all_outputs_unordered {371I::Unordered372} else {373I::Preserving374};375P::new([input], [!all_outputs_unordered])376},377IR::Slice { .. } => P::new([I::Observing], [!all_outputs_unordered]),378IR::HStack { exprs, .. } => {379let mut has_order_sensitive = false;380for e in exprs {381let aexpr = expr_arena.get(e.node());382has_order_sensitive |=383is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);384}385386let input = if has_order_sensitive {387I::Observing388} else if all_outputs_unordered {389I::Unordered390} else {391I::Preserving392};393394P::new([input], [!all_outputs_unordered])395},396IR::Select { expr: exprs, .. } => {397let mut has_order_sensitive = false;398let mut all_scalar = true;399400for e in exprs {401let aexpr = expr_arena.get(e.node());402has_order_sensitive |=403is_order_sensitive_amortized(aexpr, expr_arena, &mut aexpr_stack);404all_scalar &= is_scalar_ae(e.node(), expr_arena);405}406407let input = if has_order_sensitive {408I::Observing409} else if all_outputs_unordered {410I::Unordered411} else {412I::Preserving413};414let output = !all_outputs_unordered && !all_scalar;415416P::new([input], [output])417},418419IR::Filter {420input: _,421predicate,422} => {423let is_order_sensitive = is_order_sensitive_amortized(424expr_arena.get(predicate.node()),425expr_arena,426&mut aexpr_stack,427);428429let input = if is_order_sensitive {430I::Observing431} else if all_outputs_unordered {432I::Unordered433} else {434I::Preserving435};436437P::new([input], [!all_outputs_unordered])438},439440IR::Union { inputs, options } => {441if options.slice.is_none() && all_outputs_unordered {442options.maintain_order = false;443}444445let input = match (options.slice.is_some(), options.maintain_order) {446(true, true) => I::Observing,447(true, false) => I::Consuming,448(false, true) => I::Preserving,449(false, false) => I::Unordered,450};451let output = options.maintain_order && !all_outputs_unordered;452453P::new(std::iter::repeat_n(input, inputs.len()), [output])454},455456IR::HConcat { inputs, .. } => P::new(457std::iter::repeat_n(I::Observing, inputs.len()),458[!all_outputs_unordered],459),460461#[cfg(feature = "python")]462IR::PythonScan { .. } => P::new([], [!all_outputs_unordered]),463464IR::Sink { payload, .. } => {465let input = if payload.maintain_order() {466I::Consuming467} else {468I::Unordered469};470P::new([input], [])471},472IR::Scan { .. } | IR::DataFrameScan { .. } => P::new([], [!all_outputs_unordered]),473474IR::ExtContext { contexts, .. } => {475// This node is nonsense. Just do the most conservative thing you can.476P::new(477std::iter::repeat_n(I::Consuming, contexts.len() + 1),478[!all_outputs_unordered],479)480},481482IR::SinkMultiple { .. } | IR::Invalid => unreachable!(),483};484485// We make the code above simpler by pretending every node except caches always only has486// one output. We correct for that here.487if output_port_orderings.len() > 1 && node_ordering.output_ordered.len() == 1 {488node_ordering.output_ordered = UnitVec::from(vec![489node_ordering.output_ordered[0];490output_port_orderings.len()491])492}493494let prev_value = orders.insert(node, node_ordering);495assert!(prev_value.is_none());496497stack.extend(498ir.inputs()499.enumerate()500.map(|(to_input_idx, input)| (input, Some((to_input_idx, node)))),501);502}503504orders505}506507fn pullup_orders(508leaves: &[Node],509ir_arena: &mut Arena<IR>,510outputs: &mut PlHashMap<Node, UnitVec<Node>>,511orders: &mut PlHashMap<Node, PortOrder>,512cache_proxy: &PlHashMap<UniqueId, Vec<Node>>,513) {514let mut hits: PlHashMap<Node, Vec<(usize, Node)>> = PlHashMap::default();515let mut stack = Vec::new();516517let mut txs = Vec::new();518519for leaf in leaves {520stack.extend(521outputs[leaf]522.iter()523.enumerate()524.map(|(i, v)| (*v, (i, *leaf))),525);526}527528while let Some((node, outgoing)) = stack.pop() {529// @Hack. The IR creates caches for every path at the moment. That is super hacky. So is530// this, but we need to work around it.531let node = match ir_arena.get(node) {532IR::Cache { id, .. } => cache_proxy.get(id).unwrap()[0],533_ => node,534};535536let hits = hits.entry(node).or_default();537hits.push(outgoing);538if hits.len() < orders[&node].inputs.len() {539continue;540}541542let node_outputs = &outputs[&node];543let ir = ir_arena.get_mut(node);544545txs.clear();546txs.extend(547hits.iter()548.map(|(to_output_idx, to_node)| orders[to_node].output_ordered[*to_output_idx]),549);550551let node_ordering = orders.get_mut(&node).unwrap();552assert_eq!(txs.len(), node_ordering.inputs.len());553for (tx, rx) in txs.iter().zip(node_ordering.inputs.iter_mut()) {554// @NOTE: We don't assign tx back here since it would be redundant.555(_, *rx) = simplify_edge(*tx, *rx);556}557558// Pullup simplification rules.559use {InputOrder as I, MaintainOrderJoin as MOJ};560match ir {561IR::Sort { sort_options, .. } => {562// Unordered -> _ ==> maintain_order=false563sort_options.maintain_order &= !matches!(node_ordering.inputs[0], I::Unordered);564},565IR::GroupBy { maintain_order, .. } => {566if matches!(node_ordering.inputs[0], I::Unordered) {567// Unordered -> _568// to569// maintain_order = false570// and571// Unordered -> Unordered572573*maintain_order = false;574node_ordering.set_unordered_output();575}576},577IR::Sink { input: _, payload } => {578if matches!(node_ordering.inputs[0], I::Unordered) {579// Set maintain order to false if input is unordered580match payload {581SinkTypeIR::Memory => {},582SinkTypeIR::File(s) => s.sink_options.maintain_order = false,583SinkTypeIR::Partition(s) => s.sink_options.maintain_order = false,584}585}586},587IR::Join { options, .. } => {588let left_unordered = matches!(node_ordering.inputs[0], I::Unordered);589let right_unordered = matches!(node_ordering.inputs[1], I::Unordered);590591let maintain_order = options.args.maintain_order;592593if (left_unordered594&& matches!(maintain_order, MOJ::Left | MOJ::RightLeft | MOJ::LeftRight))595|| (right_unordered596&& matches!(maintain_order, MOJ::Right | MOJ::RightLeft | MOJ::LeftRight))597{598// If we are maintaining order of a side, but that input has no guaranteed order,599// remove the maintain ordering from that side.600601let mut new_options = options.as_ref().clone();602new_options.args.maintain_order = match maintain_order {603_ if left_unordered && right_unordered => MOJ::None,604MOJ::Left | MOJ::LeftRight if left_unordered => MOJ::None,605MOJ::RightLeft if left_unordered => MOJ::Right,606MOJ::Right | MOJ::RightLeft if right_unordered => MOJ::None,607MOJ::LeftRight if right_unordered => MOJ::Left,608_ => unreachable!(),609};610611if matches!(new_options.args.maintain_order, MOJ::None) {612node_ordering.set_unordered_output();613}614*options = Arc::new(new_options);615}616},617IR::Distinct { input: _, options } => {618if matches!(node_ordering.inputs[0], I::Unordered) {619options.maintain_order = false;620if options.keep_strategy != UniqueKeepStrategy::None {621options.keep_strategy = UniqueKeepStrategy::Any;622}623node_ordering.set_unordered_output();624}625},626627#[cfg(feature = "python")]628IR::PythonScan { .. } => {},629IR::Scan { .. } | IR::DataFrameScan { .. } => {},630#[cfg(feature = "merge_sorted")]631IR::MergeSorted { .. } => {632// An input being unordered is technically valid as it is possible for all values633// to be the same in which case the rows are sorted.634},635IR::Union { options, .. } => {636// Even if the inputs are unordered. The output still has an order given by the637// order of the inputs.638639if node_ordering640.inputs641.iter()642.all(|i| matches!(i, I::Unordered))643{644if !options.maintain_order {645node_ordering.set_unordered_output();646}647}648},649IR::MapFunction { input: _, function } => {650if function.is_streamable() && matches!(node_ordering.inputs[0], I::Unordered) {651node_ordering.set_unordered_output();652}653},654655IR::Cache { .. }656| IR::SimpleProjection { .. }657| IR::Slice { .. }658| IR::HStack { .. }659| IR::Filter { .. }660| IR::Select { .. }661| IR::HConcat { .. }662| IR::ExtContext { .. } => {663if node_ordering664.inputs665.iter()666.all(|i| matches!(i, I::Unordered))667{668node_ordering.set_unordered_output();669}670},671672IR::SinkMultiple { .. } | IR::Invalid => unreachable!(),673}674675stack.extend(676node_outputs677.iter()678.enumerate()679.map(|(i, v)| (*v, (i, node))),680);681}682}683684/// Optimize the orderings used in the IR plan and get the relative orderings of all edges.685///686/// All roots should be `Sink` nodes and no `SinkMultiple` or `Invalid` are allowed to be part of687/// the graph.688pub fn simplify_and_fetch_orderings(689roots: &[Node],690ir_arena: &mut Arena<IR>,691expr_arena: &Arena<AExpr>,692) -> PlHashMap<Node, PortOrder> {693let mut leaves = Vec::new();694let mut outputs = PlHashMap::default();695let mut cache_proxy = PlHashMap::<UniqueId, Vec<Node>>::default();696697// Get the per-node outputs and leaves698{699let mut stack = Vec::new();700701for root in roots {702assert!(matches!(ir_arena.get(*root), IR::Sink { .. }));703outputs.insert(*root, UnitVec::new());704stack.extend(ir_arena.get(*root).inputs().map(|node| (*root, node)));705}706707while let Some((parent, node)) = stack.pop() {708let ir = ir_arena.get(node);709let node = match ir {710IR::Cache { id, .. } => {711let nodes = cache_proxy.entry(*id).or_default();712nodes.push(node);713nodes[0]714},715_ => node,716};717718let outputs = outputs.entry(node).or_default();719let has_been_visisited_before = !outputs.is_empty();720outputs.push(parent);721722if has_been_visisited_before {723continue;724}725726let inputs = ir.inputs();727if matches!(inputs, Inputs::Empty) {728leaves.push(node);729}730stack.extend(inputs.map(|input| (node, input)));731}732}733734// Pushdown and optimize orders from the roots to the leaves.735let mut orders = pushdown_orders(roots, ir_arena, expr_arena, &mut outputs, &cache_proxy);736// Pullup orders from the leaves to the roots.737pullup_orders(&leaves, ir_arena, &mut outputs, &mut orders, &cache_proxy);738739// @Hack. Since not all caches might share the same node and the input of caches might have740// been updated, we need to ensure that all caches again have the same input.741//742// This can be removed when all caches with the same id share the same IR node.743for nodes in cache_proxy.into_values() {744let updated_node = nodes[0];745let order = orders[&updated_node].clone();746let IR::Cache {747input: updated_input,748id: _,749} = ir_arena.get(updated_node)750else {751unreachable!();752};753let updated_input = *updated_input;754for n in &nodes[1..] {755let IR::Cache { input, id: _ } = ir_arena.get_mut(*n) else {756unreachable!();757};758759orders.insert(*n, order.clone());760*input = updated_input;761}762}763764orders765}766767768