Path: blob/main/crates/polars-plan/src/plans/optimizer/mod.rs
6940 views
use polars_core::prelude::*;12use crate::prelude::*;34mod cache_states;5mod delay_rechunk;67mod cluster_with_columns;8mod collapse_and_project;9mod collapse_joins;10mod collect_members;11mod count_star;12#[cfg(feature = "cse")]13mod cse;14mod flatten_union;15#[cfg(feature = "fused")]16mod fused;17mod join_utils;18pub(crate) use join_utils::ExprOrigin;19mod expand_datasets;20#[cfg(feature = "python")]21pub use expand_datasets::ExpandedPythonScan;22mod predicate_pushdown;23mod projection_pushdown;24pub mod set_order;25mod simplify_expr;26mod slice_pushdown_expr;27mod slice_pushdown_lp;28mod stack_opt;2930use collapse_and_project::SimpleProjectionAndCollapse;31#[cfg(feature = "cse")]32pub use cse::NaiveExprMerger;33use delay_rechunk::DelayRechunk;34pub use expand_datasets::ExpandedDataset;35use polars_core::config::verbose;36use polars_io::predicates::PhysicalIoExpr;37pub use predicate_pushdown::PredicatePushDown;38pub use projection_pushdown::ProjectionPushDown;39pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};40use slice_pushdown_lp::SlicePushDown;41pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};4243use self::flatten_union::FlattenUnionRule;44pub use crate::frame::{AllowedOptimizations, OptFlags};45pub use crate::plans::conversion::type_coercion::TypeCoercionRule;46use crate::plans::optimizer::count_star::CountStar;47#[cfg(feature = "cse")]48use crate::plans::optimizer::cse::CommonSubExprOptimizer;49#[cfg(feature = "cse")]50use crate::plans::optimizer::cse::prune_unused_caches;51use crate::plans::optimizer::predicate_pushdown::ExprEval;52#[cfg(feature = "cse")]53use crate::plans::visitor::*;54use crate::prelude::optimizer::collect_members::MemberCollector;5556pub trait Optimize {57fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;58}5960// arbitrary constant to reduce reallocation.61const HASHMAP_SIZE: usize = 16;6263pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {64PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))65}6667pub(crate) fn pushdown_maintain_errors() -> bool {68std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")69}7071pub fn optimize(72logical_plan: DslPlan,73mut opt_flags: OptFlags,74lp_arena: &mut Arena<IR>,75expr_arena: &mut Arena<AExpr>,76scratch: &mut Vec<Node>,77expr_eval: ExprEval<'_>,78) -> PolarsResult<Node> {79#[allow(dead_code)]80let verbose = verbose();8182// Gradually fill the rules passed to the optimizer83let opt = StackOptimizer {};84let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);8586// Unset CSE87// This can be turned on again during ir-conversion.88#[allow(clippy::eq_op)]89#[cfg(feature = "cse")]90if opt_flags.contains(OptFlags::EAGER) {91opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);92}93let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena, &mut opt_flags)?;9495// Don't run optimizations that don't make sense on a single node.96// This keeps eager execution more snappy.97#[cfg(feature = "cse")]98let comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);99100#[cfg(feature = "cse")]101let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);102#[cfg(not(feature = "cse"))]103let comm_subexpr_elim = false;104105// Note: This can be in opt_flags in the future if needed.106let pushdown_maintain_errors = pushdown_maintain_errors();107108// During debug we check if the optimizations have not modified the final schema.109#[cfg(debug_assertions)]110let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned();111112let mut _opt_members = &mut None;113114macro_rules! get_or_init_members {115() => {116_get_or_init_members(_opt_members, lp_top, lp_arena, expr_arena)117};118}119120macro_rules! get_members_opt {121() => {122_opt_members.as_mut()123};124}125126// Run before slice pushdown127if opt_flags.simplify_expr() {128#[cfg(feature = "fused")]129rules.push(Box::new(fused::FusedArithmetic {}));130}131132#[cfg(feature = "cse")]133let _cse_plan_changed = if comm_subplan_elim {134let members = get_or_init_members!();135if (members.has_sink_multiple || members.has_joins_or_unions)136&& members.has_duplicate_scans()137&& !members.has_cache138{139if verbose {140eprintln!("found multiple sources; run comm_subplan_elim")141}142143let (lp, changed, cid2c) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);144145prune_unused_caches(lp_arena, cid2c);146147lp_top = lp;148members.has_cache |= changed;149changed150} else {151false152}153} else {154false155};156#[cfg(not(feature = "cse"))]157let _cse_plan_changed = false;158159// Should be run before predicate pushdown.160if opt_flags.projection_pushdown() {161let mut projection_pushdown_opt = ProjectionPushDown::new();162let alp = lp_arena.take(lp_top);163let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;164lp_arena.replace(lp_top, alp);165166if projection_pushdown_opt.is_count_star {167let mut count_star_opt = CountStar::new();168count_star_opt.optimize_plan(lp_arena, expr_arena, lp_top)?;169}170}171172if opt_flags.predicate_pushdown() {173let mut predicate_pushdown_opt = PredicatePushDown::new(174expr_eval,175pushdown_maintain_errors,176opt_flags.new_streaming(),177);178let alp = lp_arena.take(lp_top);179let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;180lp_arena.replace(lp_top, alp);181}182183// Make sure it is after predicate pushdown184if opt_flags.collapse_joins() && get_or_init_members!().has_filter_with_join_input {185collapse_joins::optimize(lp_top, lp_arena, expr_arena, opt_flags.new_streaming());186}187188// Make sure its before slice pushdown.189if opt_flags.fast_projection() {190rules.push(Box::new(SimpleProjectionAndCollapse::new(191opt_flags.eager(),192)));193}194195if !opt_flags.eager() {196rules.push(Box::new(DelayRechunk::new()));197}198199if opt_flags.slice_pushdown() {200let mut slice_pushdown_opt = SlicePushDown::new(201// We don't maintain errors on slice as the behavior is much more predictable that way.202//203// Even if we enable maintain_errors (thereby preventing the slice from being pushed),204// the new-streaming engine still may not error due to early-stopping.205false, // maintain_errors206opt_flags.new_streaming(),207);208let alp = lp_arena.take(lp_top);209let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;210211lp_arena.replace(lp_top, alp);212213// Expressions use the stack optimizer.214rules.push(Box::new(slice_pushdown_opt));215}216217// This optimization removes branches, so we must do it when type coercion218// is completed.219if opt_flags.simplify_expr() {220rules.push(Box::new(SimplifyBooleanRule {}));221}222223if !opt_flags.eager() {224rules.push(Box::new(FlattenUnionRule {}));225}226227// Note: ExpandDatasets must run after slice and predicate pushdown.228rules.push(Box::new(expand_datasets::ExpandDatasets {}) as Box<dyn OptimizationRule>);229230lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top)?;231232if opt_flags.cluster_with_columns() {233cluster_with_columns::optimize(lp_top, lp_arena, expr_arena)234}235236if _cse_plan_changed237&& get_members_opt!().is_some_and(|members| {238(members.has_joins_or_unions | members.has_sink_multiple) && members.has_cache239})240{241// We only want to run this on cse inserted caches242cache_states::set_cache_states(243lp_top,244lp_arena,245expr_arena,246scratch,247expr_eval,248verbose,249pushdown_maintain_errors,250opt_flags.new_streaming(),251)?;252}253254// This one should run (nearly) last as this modifies the projections255#[cfg(feature = "cse")]256if comm_subexpr_elim && !get_or_init_members!().has_ext_context {257let mut optimizer = CommonSubExprOptimizer::new();258let alp_node = IRNode::new_mutate(lp_top);259260lp_top = try_with_ir_arena(lp_arena, expr_arena, |arena| {261let rewritten = alp_node.rewrite(&mut optimizer, arena)?;262Ok(rewritten.node())263})?;264}265266if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {267let members = get_or_init_members!();268if members.has_group_by269| members.has_sort270| members.has_distinct271| members.has_joins_or_unions272{273match lp_arena.get(lp_top) {274IR::SinkMultiple { inputs } => {275let mut roots = inputs.clone();276for root in &mut roots {277if !matches!(lp_arena.get(*root), IR::Sink { .. }) {278*root = lp_arena.add(IR::Sink {279input: *root,280payload: SinkTypeIR::Memory,281});282}283}284set_order::simplify_and_fetch_orderings(&roots, lp_arena, expr_arena);285},286ir => {287let mut tmp_top = lp_top;288if !matches!(ir, IR::Sink { .. }) {289tmp_top = lp_arena.add(IR::Sink {290input: lp_top,291payload: SinkTypeIR::Memory,292});293}294_ = set_order::simplify_and_fetch_orderings(&[tmp_top], lp_arena, expr_arena)295},296}297}298}299300// During debug we check if the optimizations have not modified the final schema.301#[cfg(debug_assertions)]302{303// only check by names because we may supercast types.304assert_eq!(305prev_schema.iter_names().collect::<Vec<_>>(),306lp_arena307.get(lp_top)308.schema(lp_arena)309.iter_names()310.collect::<Vec<_>>()311);312};313314Ok(lp_top)315}316317fn _get_or_init_members<'a>(318opt_members: &'a mut Option<MemberCollector>,319lp_top: Node,320lp_arena: &mut Arena<IR>,321expr_arena: &mut Arena<AExpr>,322) -> &'a mut MemberCollector {323opt_members.get_or_insert_with(|| {324let mut members = MemberCollector::new();325members.collect(lp_top, lp_arena, expr_arena);326327members328})329}330331332