Path: blob/main/crates/polars-plan/src/plans/optimizer/collapse_and_project.rs
6940 views
use std::collections::BTreeSet;12use super::*;34/// Projection in the physical plan is done by selecting an expression per thread.5/// In case of many projections and columns this can be expensive when the expressions are simple6/// column selections. These can be selected on a single thread. The single thread is faster, because7/// the eager selection algorithm hashes the column names, making the projection complexity linear8/// instead of quadratic.9///10/// It is important that this optimization is ran after projection pushdown.11///12/// The schema reported after this optimization is also13pub(super) struct SimpleProjectionAndCollapse {14/// Keep track of nodes that are already processed when they15/// can be expensive. Schema materialization can be for instance.16processed: BTreeSet<Node>,17eager: bool,18}1920impl SimpleProjectionAndCollapse {21pub(super) fn new(eager: bool) -> Self {22Self {23processed: Default::default(),24eager,25}26}27}2829impl OptimizationRule for SimpleProjectionAndCollapse {30fn optimize_plan(31&mut self,32lp_arena: &mut Arena<IR>,33expr_arena: &mut Arena<AExpr>,34node: Node,35) -> PolarsResult<Option<IR>> {36use IR::*;37let lp = lp_arena.get(node);3839match lp {40Select { input, expr, .. } => {41if !matches!(lp_arena.get(*input), ExtContext { .. })42&& !self.processed.contains(&node)43{44// First check if we can apply the optimization before we allocate.45if !expr.iter().all(|e| {46matches!(expr_arena.get(e.node()), AExpr::Column(name) if e.output_name() == name)47}) {48self.processed.insert(node);49return Ok(None);50}5152let exprs = expr53.iter()54.map(|e| e.output_name().clone())55.collect::<Vec<_>>();56let Some(alp) = IRBuilder::new(*input, expr_arena, lp_arena)57.project_simple(exprs.iter().cloned())58.ok()59else {60return Ok(None);61};62let alp = alp.build();6364Ok(Some(alp))65} else {66self.processed.insert(node);67Ok(None)68}69},70SimpleProjection { columns, input } if !self.eager => {71match lp_arena.get(*input) {72// If there are 2 subsequent fast projections, flatten them and only take the last73SimpleProjection {74input: prev_input, ..75} => Ok(Some(SimpleProjection {76input: *prev_input,77columns: columns.clone(),78})),79// Cleanup projections set in projection pushdown just above caches80// they are not needed.81cache_lp @ Cache { .. } if self.processed.contains(&node) => {82let cache_schema = cache_lp.schema(lp_arena);83if cache_schema.len() == columns.len()84&& cache_schema.iter_names().zip(columns.iter_names()).all(85|(left_name, right_name)| left_name.as_str() == right_name.as_str(),86)87{88Ok(Some(cache_lp.clone()))89} else {90Ok(None)91}92},93// If a projection does nothing, remove it.94other => {95let input_schema = other.schema(lp_arena);96// This will fail fast if lengths are not equal97if *input_schema.as_ref() == *columns {98Ok(Some(other.clone()))99} else {100self.processed.insert(node);101Ok(None)102}103},104}105},106// if there are 2 subsequent caches, flatten them and only take the inner107Cache { input, .. } if !self.eager => {108if let Cache {109input: prev_input,110id,111} = lp_arena.get(*input)112{113Ok(Some(Cache {114input: *prev_input,115id: *id,116}))117} else {118Ok(None)119}120},121_ => Ok(None),122}123}124}125126127