Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/utils.rs
7889 views
use polars_core::prelude::*;1use polars_utils::idx_vec::UnitVec;2use polars_utils::unitvec;34use super::keys::*;5use crate::plans::visitor::{6AExprArena, AexprNode, RewriteRecursion, RewritingVisitor, TreeWalker,7};8use crate::prelude::*;9fn combine_by_and(left: Node, right: Node, arena: &mut Arena<AExpr>) -> Node {10arena.add(AExpr::BinaryExpr {11left,12op: Operator::And,13right,14})15}1617/// Inserts a predicate into the map, with some basic de-duplication.18///19/// The map is keyed in a way that may cause some predicates to fall into the same bucket. In that20/// case the predicate is AND'ed with the existing node in that bucket.21pub(super) fn insert_predicate_dedup(22acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,23predicate: &ExprIR,24expr_arena: &mut Arena<AExpr>,25) {26let name = predicate_to_key(predicate.node(), expr_arena);2728let mut new_min_terms = unitvec![];2930acc_predicates31.entry(name)32.and_modify(|existing_predicate| {33let mut out_node = existing_predicate.node();3435new_min_terms.clear();36new_min_terms.extend(MintermIter::new(predicate.node(), expr_arena));3738// Limit the number of existing min-terms that we check against so that we have linear-time performance.39// Without this limit the loop below will be quadratic. The side effect is that we may not perfectly40// identify duplicates when there are large amounts of filter expressions.41const CHECK_LIMIT: usize = 32;4243'next_new_min_term: for new_predicate in new_min_terms {44let new_min_term_eq_wrap = AExprArena::new(new_predicate, expr_arena);4546if MintermIter::new(existing_predicate.node(), expr_arena)47.take(CHECK_LIMIT)48.any(|existing_min_term| {49new_min_term_eq_wrap == AExprArena::new(existing_min_term, expr_arena)50})51{52continue 'next_new_min_term;53}5455out_node = combine_by_and(new_predicate, out_node, expr_arena);56}5758existing_predicate.set_node(out_node)59})60.or_insert_with(|| predicate.clone());61}6263pub(super) fn temporary_unique_key(acc_predicates: &PlHashMap<PlSmallStr, ExprIR>) -> PlSmallStr {64// TODO: Don't heap allocate during construction.65let mut out_key = '\u{1D17A}'.to_string();66let mut existing_keys = acc_predicates.keys();6768while acc_predicates.contains_key(&*out_key) {69out_key.push_str(existing_keys.next().unwrap());70}7172PlSmallStr::from_string(out_key)73}7475pub(super) fn combine_predicates<I>(iter: I, arena: &mut Arena<AExpr>) -> ExprIR76where77I: Iterator<Item = ExprIR>,78{79let mut single_pred = None;80for e in iter {81single_pred = match single_pred {82None => Some(e.node()),83Some(left) => Some(arena.add(AExpr::BinaryExpr {84left,85op: Operator::And,86right: e.node(),87})),88};89}90single_pred91.map(|node| ExprIR::from_node(node, arena))92.expect("an empty iterator was passed")93}9495pub(super) fn predicate_at_scan(96acc_predicates: PlHashMap<PlSmallStr, ExprIR>,97predicate: Option<ExprIR>,98expr_arena: &mut Arena<AExpr>,99) -> Option<ExprIR> {100if !acc_predicates.is_empty() {101let mut new_predicate = combine_predicates(acc_predicates.into_values(), expr_arena);102if let Some(pred) = predicate {103new_predicate.set_node(combine_by_and(104new_predicate.node(),105pred.node(),106expr_arena,107));108}109Some(new_predicate)110} else {111None112}113}114115/// Evaluates a condition on the column name inputs of every predicate, where if116/// the condition evaluates to true on any column name the predicate is117/// transferred to local.118pub(super) fn transfer_to_local_by_expr_ir<F>(119expr_arena: &Arena<AExpr>,120acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,121mut condition: F,122) -> Vec<ExprIR>123where124F: FnMut(&ExprIR) -> bool,125{126let mut remove_keys = Vec::with_capacity(acc_predicates.len());127128for predicate in acc_predicates.values() {129if condition(predicate) {130if let Some(name) = aexpr_to_leaf_names_iter(predicate.node(), expr_arena).next() {131remove_keys.push(name);132}133}134}135let mut local_predicates = Vec::with_capacity(remove_keys.len());136for key in remove_keys {137if let Some(pred) = acc_predicates.remove(key) {138local_predicates.push(pred)139}140}141local_predicates142}143144/// Evaluates a condition on the column name inputs of every predicate, where if145/// the condition evaluates to true on any column name the predicate is146/// transferred to local.147pub(super) fn transfer_to_local_by_name<F>(148expr_arena: &Arena<AExpr>,149acc_predicates: &mut PlHashMap<PlSmallStr, ExprIR>,150mut condition: F,151) -> Vec<ExprIR>152where153F: FnMut(&PlSmallStr) -> bool,154{155let mut remove_keys = Vec::with_capacity(acc_predicates.len());156157for (key, predicate) in &*acc_predicates {158let root_names = aexpr_to_leaf_names_iter(predicate.node(), expr_arena);159for name in root_names {160if condition(name) {161remove_keys.push(key.clone());162break;163}164}165}166let mut local_predicates = Vec::with_capacity(remove_keys.len());167for key in remove_keys {168if let Some(pred) = acc_predicates.remove(&*key) {169local_predicates.push(pred)170}171}172local_predicates173}174175/// * `col(A).alias(B).alias(C) => (C, A)`176/// * `col(A) => (A, A)`177/// * `col(A).sum().alias(B) => None`178fn get_maybe_aliased_projection_to_input_name_map(179e: &ExprIR,180expr_arena: &Arena<AExpr>,181) -> Option<(PlSmallStr, PlSmallStr)> {182let ae = expr_arena.get(e.node());183match e.get_alias() {184Some(alias) => match ae {185AExpr::Column(c_name) => Some((alias.clone(), c_name.clone())),186_ => None,187},188_ => match ae {189AExpr::Column(c_name) => Some((c_name.clone(), c_name.clone())),190_ => None,191},192}193}194195#[derive(Debug)]196pub enum PushdownEligibility {197Full,198// Partial can happen when there are window exprs.199Partial { to_local: Vec<PlSmallStr> },200NoPushdown,201}202203#[allow(clippy::type_complexity)]204pub fn pushdown_eligibility(205projection_nodes: &[ExprIR],206// Predicates that need to be checked (key, expr_ir)207new_predicates: &[(&PlSmallStr, ExprIR)],208// Note: These predicates have already passed checks.209acc_predicates: &PlHashMap<PlSmallStr, ExprIR>,210expr_arena: &mut Arena<AExpr>,211scratch: &mut UnitVec<Node>,212maintain_errors: bool,213input_ir: &IR,214) -> PolarsResult<(PushdownEligibility, PlHashMap<PlSmallStr, PlSmallStr>)> {215scratch.clear();216let ae_nodes_stack = scratch;217218let mut alias_to_col_map =219optimizer::init_hashmap::<PlSmallStr, PlSmallStr>(Some(projection_nodes.len()));220let mut col_to_alias_map = alias_to_col_map.clone();221222let mut modified_projection_columns =223PlHashSet::<PlSmallStr>::with_capacity(projection_nodes.len());224let mut has_window = false;225let mut common_window_inputs = PlHashSet::<PlSmallStr>::new();226227// Important: Names inserted into any data structure by this function are228// all non-aliased.229// This function returns false if pushdown cannot be performed.230let process_projection_or_predicate = |ae_nodes_stack: &mut UnitVec<Node>,231has_window: &mut bool,232common_window_inputs: &mut PlHashSet<PlSmallStr>|233-> ExprPushdownGroup {234debug_assert_eq!(ae_nodes_stack.len(), 1);235236let mut partition_by_names = PlHashSet::<PlSmallStr>::new();237let mut expr_pushdown_eligibility = ExprPushdownGroup::Pushable;238239while let Some(node) = ae_nodes_stack.pop() {240let ae = expr_arena.get(node);241242match ae {243#[cfg(feature = "dynamic_group_by")]244AExpr::Rolling { .. } => return ExprPushdownGroup::Barrier,245AExpr::Over {246function: _,247partition_by,248order_by: _,249mapping: _,250} => {251partition_by_names.clear();252partition_by_names.reserve(partition_by.len());253254for node in partition_by.iter() {255// Only accept col()256if let AExpr::Column(name) = expr_arena.get(*node) {257partition_by_names.insert(name.clone());258} else {259// Nested windows can also qualify for push down.260// e.g.:261// * expr1 = min().over(A)262// * expr2 = sum().over(A, expr1)263// Both exprs window over A, so predicates referring264// to A can still be pushed.265ae_nodes_stack.push(*node);266}267}268269if !*has_window {270for name in partition_by_names.drain() {271common_window_inputs.insert(name);272}273274*has_window = true;275} else {276common_window_inputs.retain(|k| partition_by_names.contains(k))277}278279// Cannot push into disjoint windows:280// e.g.:281// * sum().over(A)282// * sum().over(B)283if common_window_inputs.is_empty() {284return ExprPushdownGroup::Barrier;285}286},287_ => {288if let ExprPushdownGroup::Barrier =289expr_pushdown_eligibility.update_with_expr(ae_nodes_stack, ae, expr_arena)290{291return ExprPushdownGroup::Barrier;292}293},294}295}296297expr_pushdown_eligibility298};299300for e in projection_nodes.iter() {301if let Some((alias, column_name)) =302get_maybe_aliased_projection_to_input_name_map(e, expr_arena)303{304if alias != column_name {305alias_to_col_map.insert(alias.clone(), column_name.clone());306col_to_alias_map.insert(column_name, alias);307}308continue;309}310311if !does_not_modify_rec(e.node(), expr_arena) {312modified_projection_columns.insert(e.output_name().clone());313}314315debug_assert!(ae_nodes_stack.is_empty());316ae_nodes_stack.push(e.node());317318if process_projection_or_predicate(319ae_nodes_stack,320&mut has_window,321&mut common_window_inputs,322)323.blocks_pushdown(maintain_errors)324{325return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));326}327}328329if has_window && !col_to_alias_map.is_empty() {330// Rename to aliased names.331let mut new = PlHashSet::<PlSmallStr>::with_capacity(2 * common_window_inputs.len());332333for key in common_window_inputs.into_iter() {334if let Some(aliased) = col_to_alias_map.get(&key) {335new.insert(aliased.clone());336}337// Ensure predicate does not refer to a different column that338// got aliased to the same name as the window column. E.g.:339// .with_columns(col(A).alias(C), sum=sum().over(C))340// .filter(col(C) == ..)341if !alias_to_col_map.contains_key(&key) {342new.insert(key);343}344}345346if new.is_empty() {347return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));348}349350common_window_inputs = new;351}352353for (_, e) in new_predicates.iter() {354debug_assert!(ae_nodes_stack.is_empty());355ae_nodes_stack.push(e.node());356357let pd_group = process_projection_or_predicate(358ae_nodes_stack,359&mut has_window,360&mut common_window_inputs,361);362363if pd_group.blocks_pushdown(maintain_errors) {364return Ok((PushdownEligibility::NoPushdown, alias_to_col_map));365}366}367368// Should have returned early.369debug_assert!(!common_window_inputs.is_empty() || !has_window);370371// Note: has_window is constant.372let can_use_column = |col: &str| {373if has_window {374common_window_inputs.contains(col)375} else {376!modified_projection_columns.contains(col)377}378};379380// For an allocation-free dyn iterator381let mut check_predicates_all: Option<_> = None;382let mut check_predicates_only_new: Option<_> = None;383384// We only need to check the new predicates if no columns were renamed and there are no window385// aggregations.386if !has_window387&& modified_projection_columns.is_empty()388&& !(389// If there is only a single predicate, it may be fallible390acc_predicates.len() == 1 && ir_removes_rows(input_ir)391)392{393check_predicates_only_new = Some(new_predicates.iter().map(|(key, expr)| (*key, expr)))394} else {395check_predicates_all = Some(acc_predicates.iter())396}397398let to_check_iter: &mut dyn Iterator<Item = (&PlSmallStr, &ExprIR)> = check_predicates_all399.as_mut()400.map(|x| x as _)401.unwrap_or_else(|| check_predicates_only_new.as_mut().map(|x| x as _).unwrap());402403let mut allow_single_fallible = !ir_removes_rows(input_ir);404ae_nodes_stack.clear();405406let to_local = to_check_iter407.filter_map(|(key, e)| {408debug_assert!(ae_nodes_stack.is_empty());409410ae_nodes_stack.push(e.node());411412let mut uses_blocked_name = false;413let mut pd_group = ExprPushdownGroup::Pushable;414415while let Some(node) = ae_nodes_stack.pop() {416let ae = expr_arena.get(node);417418if let AExpr::Column(name) = ae {419uses_blocked_name |= !can_use_column(name);420} else {421pd_group.update_with_expr(ae_nodes_stack, ae, expr_arena);422};423424if uses_blocked_name {425break;426};427}428429ae_nodes_stack.clear();430431if uses_blocked_name || matches!(pd_group, ExprPushdownGroup::Barrier) {432allow_single_fallible = false;433}434435if uses_blocked_name436|| matches!(437// Note: We do not use `blocks_pushdown()`, this fallible indicates that the438// predicate we are checking to push is fallible.439pd_group,440ExprPushdownGroup::Fallible | ExprPushdownGroup::Barrier441)442{443Some(key.clone())444} else {445None446}447})448.collect::<Vec<_>>();449450Ok(match to_local.len() {4510 => (PushdownEligibility::Full, alias_to_col_map),452len if len == acc_predicates.len() => {453if len == 1 && allow_single_fallible {454(PushdownEligibility::Full, alias_to_col_map)455} else {456(PushdownEligibility::NoPushdown, alias_to_col_map)457}458},459_ => (PushdownEligibility::Partial { to_local }, alias_to_col_map),460})461}462463/// Note: This may give false positives as it is a conservative function.464pub(crate) fn ir_removes_rows(ir: &IR) -> bool {465use IR::*;466467// NOTE468// At time of writing predicate pushdown runs before slice pushdown, so469// some of the below checks for slice may never be hit.470471match ir {472DataFrameScan { .. }473| SimpleProjection { .. }474| Select { .. }475| Cache { .. }476| HStack { .. }477| HConcat { .. } => false,478479GroupBy { options, .. } => options.slice.is_some(),480481Sort { slice, .. } => slice.is_some(),482483#[cfg(feature = "merge_sorted")]484MergeSorted { .. } => false,485486#[cfg(feature = "python")]487PythonScan { options } => options.n_rows.is_some(),488489// Scan currently may evaluate the predicate on the statistics of the490// entire files list.491Scan {492unified_scan_args, ..493} => unified_scan_args.pre_slice.is_some(),494495_ => true,496}497}498499/// Maps column references within an expression. Used to handle column renaming when pushing500/// predicates.501///502/// This will add a new expression tree in the arena (i.e. it won't mutate the existing node in-place).503pub(super) fn map_column_references(504expr: &mut ExprIR,505expr_arena: &mut Arena<AExpr>,506rename_map: &PlHashMap<PlSmallStr, PlSmallStr>,507) {508if rename_map.is_empty() {509return;510}511512let node = AexprNode::new(expr.node())513.rewrite(514&mut MapColumnReferences {515rename_map,516column_nodes: PlHashMap::with_capacity(rename_map.len()),517},518expr_arena,519)520.unwrap()521.node();522523*expr = ExprIR::from_node(node, expr_arena);524525struct MapColumnReferences<'a> {526rename_map: &'a PlHashMap<PlSmallStr, PlSmallStr>,527column_nodes: PlHashMap<&'a str, Node>,528}529530impl RewritingVisitor for MapColumnReferences<'_> {531type Node = AexprNode;532type Arena = Arena<AExpr>;533534fn pre_visit(535&mut self,536node: &Self::Node,537arena: &mut Self::Arena,538) -> polars_core::prelude::PolarsResult<crate::prelude::visitor::RewriteRecursion> {539let AExpr::Column(colname) = arena.get(node.node()) else {540return Ok(RewriteRecursion::NoMutateAndContinue);541};542543if !self.rename_map.contains_key(colname) {544return Ok(RewriteRecursion::NoMutateAndContinue);545}546547Ok(RewriteRecursion::MutateAndContinue)548}549550fn mutate(551&mut self,552node: Self::Node,553arena: &mut Self::Arena,554) -> polars_core::prelude::PolarsResult<Self::Node> {555let AExpr::Column(colname) = arena.get(node.node()) else {556unreachable!();557};558559let new_colname = self.rename_map.get(colname).unwrap();560561if !self.column_nodes.contains_key(new_colname.as_str()) {562self.column_nodes.insert(563new_colname.as_str(),564arena.add(AExpr::Column(new_colname.clone())),565);566}567568// Safety: Checked in pre_visit()569Ok(AexprNode::new(570*self.column_nodes.get(new_colname.as_str()).unwrap(),571))572}573}574}575576577