Path: blob/main/crates/polars-plan/src/plans/conversion/stack_opt.rs
6940 views
use std::borrow::Borrow;12use self::type_check::TypeCheckRule;3use super::*;45/// Applies expression simplification and type coercion during conversion to IR.6pub struct ConversionOptimizer {7scratch: Vec<(Node, usize)>,8schemas: Vec<Schema>,910simplify: Option<SimplifyExprRule>,11coerce: Option<TypeCoercionRule>,12check: Option<TypeCheckRule>,13// IR's can be cached in the DSL.14// But if they are used multiple times in DSL (e.g. concat/join)15// then it can occur that we take a slot multiple times.16// So we keep track of the arena versions used and allow only17// one unique IR cache to be reused.18pub(super) used_arenas: PlHashSet<u32>,19}2021struct ExtendVec<'a> {22out: &'a mut Vec<(Node, usize)>,23schema_idx: usize,24}25impl Extend<Node> for ExtendVec<'_> {26fn extend<T: IntoIterator<Item = Node>>(&mut self, iter: T) {27self.out28.extend(iter.into_iter().map(|n| (n, self.schema_idx)))29}30}3132impl ConversionOptimizer {33pub fn new(simplify: bool, type_coercion: bool, type_check: bool) -> Self {34let simplify = if simplify {35Some(SimplifyExprRule {})36} else {37None38};3940let coerce = if type_coercion {41Some(TypeCoercionRule {})42} else {43None44};4546let check = if type_check {47Some(TypeCheckRule)48} else {49None50};5152ConversionOptimizer {53scratch: Vec::with_capacity(8),54schemas: Vec::new(),55simplify,56coerce,57check,58used_arenas: Default::default(),59}60}6162pub fn push_scratch(&mut self, expr: Node, expr_arena: &Arena<AExpr>) {63self.scratch.push((expr, 0));64// traverse all subexpressions and add to the stack65let expr = unsafe { expr_arena.get_unchecked(expr) };66expr.inputs_rev(&mut ExtendVec {67out: &mut self.scratch,68schema_idx: 0,69});70}7172pub fn fill_scratch<I, N>(&mut self, exprs: I, expr_arena: &Arena<AExpr>)73where74I: IntoIterator<Item = N>,75N: Borrow<Node>,76{77for e in exprs {78let node = *e.borrow();79self.push_scratch(node, expr_arena);80}81}8283/// Optimizes the expressions in the scratch space. This should be called after filling the84/// scratch space with the expressions that you want to optimize.85pub fn optimize_exprs(86&mut self,87expr_arena: &mut Arena<AExpr>,88ir_arena: &mut Arena<IR>,89current_ir_node: Node,90// Use the schema of `current_ir_node` instead of its input when resolving expr fields.91use_current_node_schema: bool,92) -> PolarsResult<()> {93// Different from the stack-opt in the optimizer phase, this does a single pass until fixed point per expression.9495if let Some(rule) = &mut self.check {96while let Some(x) = rule.optimize_plan(ir_arena, expr_arena, current_ir_node)? {97ir_arena.replace(current_ir_node, x);98}99}100101// process the expressions on the stack and apply optimizations.102let schema = if use_current_node_schema {103ir_arena.get(current_ir_node).schema(ir_arena)104} else {105get_input_schema(ir_arena, current_ir_node)106};107let plan = ir_arena.get(current_ir_node);108let mut ctx = OptimizeExprContext {109in_filter: matches!(plan, IR::Filter { .. }),110has_inputs: !get_input(ir_arena, current_ir_node).is_empty(),111..Default::default()112};113#[cfg(feature = "python")]114{115use crate::dsl::python_dsl::PythonScanSource;116ctx.in_pyarrow_scan = matches!(plan, IR::PythonScan { options } if options.python_source == PythonScanSource::Pyarrow);117ctx.in_io_plugin = matches!(plan, IR::PythonScan { options } if options.python_source == PythonScanSource::IOPlugin);118};119120self.schemas.clear();121while let Some((current_expr_node, schema_idx)) = self.scratch.pop() {122let expr = unsafe { expr_arena.get_unchecked(current_expr_node) };123124if expr.is_leaf() {125continue;126}127128// Evaluation expressions still need to do rules on the evaluation expression but the129// schema is not the same and it is not concluded in the inputs. Therefore, we handl130if let AExpr::Eval {131expr,132evaluation,133variant,134} = expr135{136let schema = if schema_idx == 0 {137&schema138} else {139&self.schemas[schema_idx - 1]140};141let expr = expr_arena.get(*expr).get_dtype(schema, expr_arena)?;142143let element_dtype = variant.element_dtype(&expr)?;144let schema = Schema::from_iter([(PlSmallStr::EMPTY, element_dtype.clone())]);145self.schemas.push(schema);146self.scratch.push((*evaluation, self.schemas.len()));147}148149let schema = if schema_idx == 0 {150&schema151} else {152&self.schemas[schema_idx - 1]153};154155if let Some(rule) = &mut self.simplify {156while let Some(x) =157rule.optimize_expr(expr_arena, current_expr_node, schema, ctx)?158{159expr_arena.replace(current_expr_node, x);160}161}162if let Some(rule) = &mut self.coerce {163while let Some(x) =164rule.optimize_expr(expr_arena, current_expr_node, schema, ctx)?165{166expr_arena.replace(current_expr_node, x);167}168}169170let expr = unsafe { expr_arena.get_unchecked(current_expr_node) };171// traverse subexpressions and add to the stack172expr.inputs_rev(&mut ExtendVec {173out: &mut self.scratch,174schema_idx,175});176}177178Ok(())179}180}181182183