Path: blob/main/crates/polars-plan/src/plans/optimizer/mod.rs
8424 views
use polars_core::prelude::*;1use polars_error::feature_gated;23use crate::prelude::*;45mod delay_rechunk;67mod cluster_with_columns;8mod collapse_and_project;9mod collect_members;10mod count_star;11#[cfg(feature = "cse")]12mod cse;13mod flatten_union;14#[cfg(feature = "fused")]15mod fused;16mod join_utils;17pub(crate) use join_utils::ExprOrigin;18mod expand_datasets;19#[cfg(feature = "python")]20pub use expand_datasets::ExpandedPythonScan;21mod predicate_pushdown;22mod projection_pushdown;23pub mod set_order;24mod simplify_expr;25mod slice_pushdown_expr;26mod slice_pushdown_lp;27mod sortedness;28mod stack_opt;2930use collapse_and_project::SimpleProjectionAndCollapse;31#[cfg(feature = "cse")]32pub use cse::NaiveExprMerger;33use delay_rechunk::DelayRechunk;34pub use expand_datasets::ExpandedDataset;35use polars_core::config::verbose;36pub use predicate_pushdown::PredicatePushDown;37pub use projection_pushdown::ProjectionPushDown;38pub use simplify_expr::{SimplifyBooleanRule, SimplifyExprRule};39use slice_pushdown_lp::SlicePushDown;40pub use sortedness::{AExprSorted, IRSorted, are_keys_sorted_any, is_sorted};41pub use stack_opt::{OptimizationRule, OptimizeExprContext, StackOptimizer};4243use self::flatten_union::FlattenUnionRule;44pub use crate::frame::{AllowedOptimizations, OptFlags};45pub use crate::plans::conversion::type_coercion::TypeCoercionRule;46use crate::plans::optimizer::count_star::CountStar;47#[cfg(feature = "cse")]48use crate::plans::optimizer::cse::CommonSubExprOptimizer;49#[cfg(feature = "cse")]50use crate::plans::visitor::*;51use crate::prelude::optimizer::collect_members::MemberCollector;5253pub trait Optimize {54fn optimize(&self, logical_plan: DslPlan) -> PolarsResult<DslPlan>;55}5657// arbitrary constant to reduce reallocation.58const HASHMAP_SIZE: usize = 16;5960pub(crate) fn init_hashmap<K, V>(max_len: Option<usize>) -> PlHashMap<K, V> {61PlHashMap::with_capacity(std::cmp::min(max_len.unwrap_or(HASHMAP_SIZE), HASHMAP_SIZE))62}6364pub(crate) fn pushdown_maintain_errors() -> bool {65std::env::var("POLARS_PUSHDOWN_OPT_MAINTAIN_ERRORS").as_deref() == Ok("1")66}6768pub(super) fn run_projection_predicate_pushdown(69root: Node,70ir_arena: &mut Arena<IR>,71expr_arena: &mut Arena<AExpr>,72pushdown_maintain_errors: bool,73opt_flags: &OptFlags,74) -> PolarsResult<()> {75// Should be run before predicate pushdown.76if opt_flags.projection_pushdown() {77let mut projection_pushdown_opt = ProjectionPushDown::new();78let ir = ir_arena.take(root);79let ir = projection_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;80ir_arena.replace(root, ir);8182if projection_pushdown_opt.is_count_star {83let mut count_star_opt = CountStar::new();84count_star_opt.optimize_plan(ir_arena, expr_arena, root)?;85}86}8788if opt_flags.predicate_pushdown() {89let mut predicate_pushdown_opt =90PredicatePushDown::new(pushdown_maintain_errors, opt_flags.new_streaming());91let ir = ir_arena.take(root);92let ir = predicate_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;93ir_arena.replace(root, ir);94}9596Ok(())97}9899pub fn optimize(100logical_plan: DslPlan,101mut opt_flags: OptFlags,102ir_arena: &mut Arena<IR>,103expr_arena: &mut Arena<AExpr>,104scratch: &mut Vec<Node>,105apply_scan_predicate_to_scan_ir: fn(106Node,107&mut Arena<IR>,108&mut Arena<AExpr>,109) -> PolarsResult<()>,110) -> PolarsResult<Node> {111#[allow(dead_code)]112let verbose = verbose();113114// Gradually fill the rules passed to the optimizer115let opt = StackOptimizer {};116let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);117118// Unset CSE119// This can be turned on again during ir-conversion.120#[allow(clippy::eq_op)]121#[cfg(feature = "cse")]122if opt_flags.contains(OptFlags::EAGER) {123opt_flags &= !(OptFlags::COMM_SUBEXPR_ELIM | OptFlags::COMM_SUBEXPR_ELIM);124}125let mut root = to_alp(logical_plan, expr_arena, ir_arena, &mut opt_flags)?;126127#[allow(unused_assignments)]128let mut comm_subplan_elim = false;129// Don't run optimizations that don't make sense on a single node.130// This keeps eager execution more snappy.131#[cfg(feature = "cse")]132{133comm_subplan_elim = opt_flags.contains(OptFlags::COMM_SUBPLAN_ELIM);134}135136#[cfg(feature = "cse")]137let comm_subexpr_elim = opt_flags.contains(OptFlags::COMM_SUBEXPR_ELIM);138#[cfg(not(feature = "cse"))]139let comm_subexpr_elim = false;140141// Note: This can be in opt_flags in the future if needed.142let pushdown_maintain_errors = pushdown_maintain_errors();143144// During debug we check if the optimizations have not modified the final schema.145#[cfg(debug_assertions)]146let prev_schema = ir_arena.get(root).schema(ir_arena).into_owned();147148let mut _opt_members: &mut Option<MemberCollector> = &mut None;149150macro_rules! get_or_init_members {151() => {152_get_or_init_members(_opt_members, root, ir_arena, expr_arena)153};154}155156// Run before slice pushdown157if opt_flags.simplify_expr() {158#[cfg(feature = "fused")]159rules.push(Box::new(fused::FusedArithmetic {}));160}161162let run_pushdowns = if comm_subplan_elim {163#[allow(unused_assignments)]164let mut run_pd = true;165166feature_gated!("cse", {167let members = get_or_init_members!();168run_pd = if (members.has_sink_multiple || members.has_joins_or_unions)169&& members.has_duplicate_scans()170&& !members.has_cache171{172use self::cse::CommonSubPlanOptimizer;173174if verbose {175eprintln!("found multiple sources; run comm_subplan_elim")176}177178root = CommonSubPlanOptimizer::new().optimize(179root,180ir_arena,181expr_arena,182pushdown_maintain_errors,183&opt_flags,184verbose,185scratch,186)?;187false188} else {189true190}191});192193run_pd194} else {195true196};197198if run_pushdowns {199run_projection_predicate_pushdown(200root,201ir_arena,202expr_arena,203pushdown_maintain_errors,204&opt_flags,205)?;206}207208if opt_flags.slice_pushdown() {209let mut slice_pushdown_opt = SlicePushDown::new(210// We don't maintain errors on slice as the behavior is much more predictable that way.211//212// Even if we enable maintain_errors (thereby preventing the slice from being pushed),213// the new-streaming engine still may not error due to early-stopping.214false, // maintain_errors215);216let ir = ir_arena.take(root);217let ir = slice_pushdown_opt.optimize(ir, ir_arena, expr_arena)?;218219ir_arena.replace(root, ir);220221// Expressions use the stack optimizer.222rules.push(Box::new(slice_pushdown_opt));223}224225if opt_flags.fast_projection() {226rules.push(Box::new(SimpleProjectionAndCollapse::new(227opt_flags.eager(),228)));229}230231if !opt_flags.eager() {232rules.push(Box::new(DelayRechunk::new()));233}234235// This optimization removes branches, so we must do it when type coercion236// is completed.237if opt_flags.simplify_expr() {238rules.push(Box::new(SimplifyBooleanRule {}));239}240241if !opt_flags.eager() {242rules.push(Box::new(FlattenUnionRule {}));243}244245root = opt.optimize_loop(&mut rules, expr_arena, ir_arena, root)?;246247if opt_flags.cluster_with_columns() && get_or_init_members!().with_columns_count > 1 {248cluster_with_columns::optimize(root, ir_arena, expr_arena)249}250251// This one should run (nearly) last as this modifies the projections252#[cfg(feature = "cse")]253if comm_subexpr_elim && !get_or_init_members!().has_ext_context {254let mut optimizer =255CommonSubExprOptimizer::new(opt_flags.contains(OptFlags::NEW_STREAMING));256let ir_node = IRNode::new_mutate(root);257258root = try_with_ir_arena(ir_arena, expr_arena, |arena| {259let rewritten = ir_node.rewrite(&mut optimizer, arena)?;260Ok(rewritten.node())261})?;262}263264if opt_flags.contains(OptFlags::CHECK_ORDER_OBSERVE) {265let members = get_or_init_members!();266if members.has_group_by267| members.has_sort268| members.has_distinct269| members.has_joins_or_unions270{271match ir_arena.get(root) {272IR::SinkMultiple { inputs } => {273let mut roots = inputs.clone();274for root in &mut roots {275if !matches!(ir_arena.get(*root), IR::Sink { .. }) {276*root = ir_arena.add(IR::Sink {277input: *root,278payload: SinkTypeIR::Memory,279});280}281}282set_order::simplify_and_fetch_orderings(&roots, ir_arena, expr_arena);283},284ir => {285let mut tmp_top = root;286if !matches!(ir, IR::Sink { .. }) {287tmp_top = ir_arena.add(IR::Sink {288input: root,289payload: SinkTypeIR::Memory,290});291}292_ = set_order::simplify_and_fetch_orderings(&[tmp_top], ir_arena, expr_arena)293},294}295}296}297298expand_datasets::expand_datasets(root, ir_arena, expr_arena, apply_scan_predicate_to_scan_ir)?;299300// During debug we check if the optimizations have not modified the final schema.301#[cfg(debug_assertions)]302{303// only check by names because we may supercast types.304assert_eq!(305prev_schema.iter_names().collect::<Vec<_>>(),306ir_arena307.get(root)308.schema(ir_arena)309.iter_names()310.collect::<Vec<_>>()311);312};313314Ok(root)315}316317fn _get_or_init_members<'a>(318opt_members: &'a mut Option<MemberCollector>,319root: Node,320ir_arena: &mut Arena<IR>,321expr_arena: &mut Arena<AExpr>,322) -> &'a mut MemberCollector {323opt_members.get_or_insert_with(|| {324let mut members = MemberCollector::new();325members.collect(root, ir_arena, expr_arena);326327members328})329}330331332