Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs
7889 views
mod group_by;1mod join;2mod keys;3mod utils;45use polars_core::datatypes::PlHashMap;6use polars_core::prelude::*;7use polars_utils::idx_vec::UnitVec;8use recursive::recursive;9use utils::*;1011use super::*;12use crate::prelude::optimizer::predicate_pushdown::group_by::process_group_by;13use crate::prelude::optimizer::predicate_pushdown::join::process_join;14use crate::utils::{check_input_node, has_aexpr};1516/// The struct is wrapped in a mod to prevent direct member access of `nodes_scratch`17mod inner {18use polars_core::config::verbose;19use polars_utils::arena::Node;20use polars_utils::idx_vec::UnitVec;21use polars_utils::unitvec;2223pub struct PredicatePushDown {24// TODO: Remove unused25#[expect(unused)]26pub(super) verbose: bool,27// How many cache nodes a predicate may be pushed down to.28// Normally this is 0. Only needed for CSPE.29pub(super) caches_pass_allowance: u32,30nodes_scratch: UnitVec<Node>,31pub(super) new_streaming: bool,32// Controls pushing filters past fallible projections33pub(super) maintain_errors: bool,34}3536impl PredicatePushDown {37pub fn new(maintain_errors: bool, new_streaming: bool) -> Self {38Self {39verbose: verbose(),40caches_pass_allowance: 0,41nodes_scratch: unitvec![],42new_streaming,43maintain_errors,44}45}4647/// Returns shared scratch space after clearing.48pub(super) fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {49self.nodes_scratch.clear();50&mut self.nodes_scratch51}52}53}5455pub use inner::PredicatePushDown;5657impl PredicatePushDown {58pub(crate) fn block_at_cache(mut self, count: u32) -> Self {59self.caches_pass_allowance = count;60self61}6263fn optional_apply_predicate(64&mut self,65lp: IR,66local_predicates: Vec<ExprIR>,67lp_arena: &mut Arena<IR>,68expr_arena: &mut Arena<AExpr>,69) -> IR {70if !local_predicates.is_empty() {71let predicate = combine_predicates(local_predicates.into_iter(), expr_arena);72let input = lp_arena.add(lp);7374IR::Filter { input, predicate }75} else {76lp77}78}7980fn pushdown_and_assign(81&mut self,82input: Node,83acc_predicates: PlHashMap<PlSmallStr, ExprIR>,84lp_arena: &mut Arena<IR>,85expr_arena: &mut Arena<AExpr>,86) -> PolarsResult<()> {87let alp = lp_arena.take(input);88let lp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;89lp_arena.replace(input, lp);90Ok(())91}9293/// Filter will be pushed down.94fn pushdown_and_continue(95&mut self,96lp: IR,97mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,98lp_arena: &mut Arena<IR>,99expr_arena: &mut Arena<AExpr>,100has_projections: bool,101) -> PolarsResult<IR> {102if has_projections {103let input = {104let mut inputs = lp.inputs();105let input = inputs.next().unwrap();106// projections should only have a single input.107if inputs.next().is_some() {108// except for ExtContext109assert!(matches!(lp, IR::ExtContext { .. }));110}111input112};113114let maintain_errors = self.maintain_errors;115let (eligibility, alias_rename_map) = pushdown_eligibility(116&lp.exprs().cloned().collect::<Vec<_>>(),117&[],118&acc_predicates,119expr_arena,120self.empty_nodes_scratch_mut(),121maintain_errors,122lp_arena.get(input),123)?;124125let local_predicates = match eligibility {126PushdownEligibility::Full => vec![],127PushdownEligibility::Partial { to_local } => {128let mut out = Vec::with_capacity(to_local.len());129for key in to_local {130out.push(acc_predicates.remove(&key).unwrap());131}132out133},134PushdownEligibility::NoPushdown => {135return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);136},137};138139if !alias_rename_map.is_empty() {140for (_, expr_ir) in acc_predicates.iter_mut() {141map_column_references(expr_ir, expr_arena, &alias_rename_map);142}143}144145let alp = lp_arena.take(input);146let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;147lp_arena.replace(input, alp);148149Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))150} else {151let mut local_predicates = Vec::with_capacity(acc_predicates.len());152153let inputs = lp.get_inputs();154155// determine new inputs by pushing down predicates156let new_inputs = inputs157.into_iter()158.map(|node| {159// first we check if we are able to push down the predicate passed this node160// it could be that this node just added the column where we base the predicate on161let input_schema = lp_arena.get(node).schema(lp_arena);162let mut pushdown_predicates =163optimizer::init_hashmap(Some(acc_predicates.len()));164for (_, predicate) in acc_predicates.iter() {165// we can pushdown the predicate166if check_input_node(predicate.node(), &input_schema, expr_arena) {167insert_predicate_dedup(&mut pushdown_predicates, predicate, expr_arena)168}169// we cannot pushdown the predicate we do it here170else {171local_predicates.push(predicate.clone());172}173}174175let alp = lp_arena.take(node);176let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;177lp_arena.replace(node, alp);178Ok(node)179})180.collect::<PolarsResult<UnitVec<_>>>()?;181182let lp = lp.with_inputs(new_inputs);183Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))184}185}186187/// Filter will be done at this node, but we continue optimization188fn no_pushdown_restart_opt(189&mut self,190lp: IR,191acc_predicates: PlHashMap<PlSmallStr, ExprIR>,192lp_arena: &mut Arena<IR>,193expr_arena: &mut Arena<AExpr>,194) -> PolarsResult<IR> {195let inputs = lp.inputs();196197let new_inputs = inputs198.map(|node| {199let alp = lp_arena.take(node);200let alp = self.push_down(201alp,202init_hashmap(Some(acc_predicates.len())),203lp_arena,204expr_arena,205)?;206lp_arena.replace(node, alp);207Ok(node)208})209.collect::<PolarsResult<Vec<_>>>()?;210let lp = lp.with_inputs(new_inputs);211212// all predicates are done locally213let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();214Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))215}216217fn no_pushdown(218&mut self,219lp: IR,220acc_predicates: PlHashMap<PlSmallStr, ExprIR>,221lp_arena: &mut Arena<IR>,222expr_arena: &mut Arena<AExpr>,223) -> PolarsResult<IR> {224// all predicates are done locally225let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();226Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))227}228229/// Predicate pushdown optimizer230///231/// # Arguments232///233/// * `IR` - Arena based logical plan tree representing the query.234/// * `acc_predicates` - The predicates we accumulate during tree traversal.235/// The hashmap maps from leaf-column name to predicates on that column.236/// If the key is already taken we combine the predicate with a bitand operation.237/// The `Node`s are indexes in the `expr_arena`238/// * `lp_arena` - The local memory arena for the logical plan.239/// * `expr_arena` - The local memory arena for the expressions.240#[recursive]241fn push_down(242&mut self,243lp: IR,244mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,245lp_arena: &mut Arena<IR>,246expr_arena: &mut Arena<AExpr>,247) -> PolarsResult<IR> {248use IR::*;249250// Note: The logic within the match block should ensure `acc_predicates` is left in a state251// where it contains only pushable exprs after it is done (although in some cases it may252// contain a single fallible expression).253254match lp {255Filter {256// Note: We assume AND'ed predicates have already been split to separate IR filter257// nodes during DSL conversion so we don't do that here.258ref predicate,259input,260} => {261// Use a tmp_key to avoid inadvertently combining predicates that otherwise would have262// been partially pushed:263//264// (1) .filter(pl.count().over("key") == 1)265// (2) .filter(pl.col("key") == 1)266//267// (2) can be pushed past (1) but they both have the same predicate268// key name in the hashtable.269let tmp_key = temporary_unique_key(&acc_predicates);270acc_predicates.insert(tmp_key.clone(), predicate.clone());271272let maintain_errors = self.maintain_errors;273274let local_predicates = match pushdown_eligibility(275&[],276&[(&tmp_key, predicate.clone())],277&acc_predicates,278expr_arena,279self.empty_nodes_scratch_mut(),280maintain_errors,281lp_arena.get(input),282)?283.0284{285PushdownEligibility::Full => vec![],286PushdownEligibility::Partial { to_local } => {287let mut out = Vec::with_capacity(to_local.len());288for key in to_local {289out.push(acc_predicates.remove(&key).unwrap());290}291out292},293PushdownEligibility::NoPushdown => {294let out = acc_predicates.drain().map(|t| t.1).collect();295acc_predicates.clear();296out297},298};299300if let Some(predicate) = acc_predicates.remove(&tmp_key) {301insert_predicate_dedup(&mut acc_predicates, &predicate, expr_arena);302}303304let alp = lp_arena.take(input);305let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;306307// TODO!308// If a predicates result would be influenced by earlier applied309// predicates, we simply don't pushdown this one passed this node310// However, we can do better and let it pass but store the order of the predicates311// so that we can apply them in correct order at the deepest level312Ok(313self.optional_apply_predicate(314new_input,315local_predicates,316lp_arena,317expr_arena,318),319)320},321DataFrameScan {322df,323schema,324output_schema,325} => {326let selection = predicate_at_scan(acc_predicates, None, expr_arena);327let mut lp = DataFrameScan {328df,329schema,330output_schema,331};332333if let Some(predicate) = selection {334let input = lp_arena.add(lp);335336lp = IR::Filter { input, predicate }337}338339Ok(lp)340},341Scan {342sources,343file_info,344hive_parts: scan_hive_parts,345ref predicate,346predicate_file_skip_applied,347scan_type,348unified_scan_args,349output_schema,350} => {351let mut blocked_names = Vec::with_capacity(2);352353// TODO: Allow predicates on file names, this should be supported by new-streaming.354if let Some(col) = unified_scan_args.include_file_paths.as_deref() {355blocked_names.push(col);356}357358let local_predicates = if blocked_names.is_empty() {359vec![]360} else {361transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {362blocked_names.contains(&name.as_ref())363})364};365let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena);366367let mut do_optimization = match &*scan_type {368#[cfg(feature = "csv")]369FileScanIR::Csv { .. } => unified_scan_args.pre_slice.is_none(),370FileScanIR::Anonymous { function, .. } => function.allows_predicate_pushdown(),371#[cfg(feature = "json")]372FileScanIR::NDJson { .. } => true,373#[allow(unreachable_patterns)]374_ => true,375};376do_optimization &= predicate.is_some();377378let hive_parts = scan_hive_parts;379380let lp = if do_optimization {381Scan {382sources,383file_info,384hive_parts,385predicate,386predicate_file_skip_applied,387unified_scan_args,388output_schema,389scan_type,390}391} else {392let lp = Scan {393sources,394file_info,395hive_parts,396predicate: None,397predicate_file_skip_applied,398unified_scan_args,399output_schema,400scan_type,401};402if let Some(predicate) = predicate {403let input = lp_arena.add(lp);404Filter { input, predicate }405} else {406lp407}408};409410Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))411},412Distinct { input, options } => {413let subset = if let Some(ref subset) = options.subset {414subset.as_ref()415} else {416&[]417};418let mut names_set = PlHashSet::<PlSmallStr>::with_capacity(subset.len());419for name in subset.iter() {420names_set.insert(name.clone());421}422423let local_predicates = match options.keep_strategy {424UniqueKeepStrategy::Any => {425let condition = |e: &ExprIR| {426// if not elementwise -> to local427!is_elementwise_rec(e.node(), expr_arena)428};429transfer_to_local_by_expr_ir(expr_arena, &mut acc_predicates, condition)430},431UniqueKeepStrategy::First432| UniqueKeepStrategy::Last433| UniqueKeepStrategy::None => {434let condition = |name: &PlSmallStr| {435!subset.is_empty() && !names_set.contains(name.as_str())436};437transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition)438},439};440441self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;442let lp = Distinct { input, options };443Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))444},445Join {446input_left,447input_right,448left_on,449right_on,450schema,451options,452} => process_join(453self,454lp_arena,455expr_arena,456input_left,457input_right,458left_on,459right_on,460schema,461options,462acc_predicates,463self.new_streaming,464),465MapFunction { ref function, .. } => {466if function.allow_predicate_pd() {467match function {468FunctionIR::Explode { columns, .. } => {469let condition = |name: &PlSmallStr| columns.iter().any(|s| s == name);470471// first columns that refer to the exploded columns should be done here472let local_predicates = transfer_to_local_by_name(473expr_arena,474&mut acc_predicates,475condition,476);477478let lp = self.pushdown_and_continue(479lp,480acc_predicates,481lp_arena,482expr_arena,483false,484)?;485Ok(self.optional_apply_predicate(486lp,487local_predicates,488lp_arena,489expr_arena,490))491},492#[cfg(feature = "pivot")]493FunctionIR::Unpivot { args, .. } => {494// predicates that will be done at this level495let condition = |name: &PlSmallStr| {496name == &args.variable_name || name == &args.value_name497};498let local_predicates = transfer_to_local_by_name(499expr_arena,500&mut acc_predicates,501condition,502);503504let lp = self.pushdown_and_continue(505lp,506acc_predicates,507lp_arena,508expr_arena,509false,510)?;511Ok(self.optional_apply_predicate(512lp,513local_predicates,514lp_arena,515expr_arena,516))517},518FunctionIR::Unnest {519columns,520separator: _,521} => {522let exclude = columns.iter().cloned().collect::<PlHashSet<_>>();523524let local_predicates =525transfer_to_local_by_name(expr_arena, &mut acc_predicates, |x| {526exclude.contains(x)527});528529let lp = self.pushdown_and_continue(530lp,531acc_predicates,532lp_arena,533expr_arena,534false,535)?;536Ok(self.optional_apply_predicate(537lp,538local_predicates,539lp_arena,540expr_arena,541))542},543_ => self.pushdown_and_continue(544lp,545acc_predicates,546lp_arena,547expr_arena,548false,549),550}551} else {552self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)553}554},555GroupBy {556input,557keys,558aggs,559schema,560apply,561maintain_order,562options,563} => process_group_by(564self,565lp_arena,566expr_arena,567input,568keys,569aggs,570schema,571maintain_order,572apply,573options,574acc_predicates,575),576lp @ Union { .. } => {577self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)578},579lp @ Sort { .. } => {580self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)581},582lp @ Sink { .. } | lp @ SinkMultiple { .. } => {583self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)584},585// Pushed down passed these nodes586lp @ HStack { .. }587| lp @ Select { .. }588| lp @ SimpleProjection { .. }589| lp @ ExtContext { .. } => {590self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)591},592// NOT Pushed down passed these nodes593// predicates influence slice sizes594lp @ Slice { .. } => {595self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)596},597lp @ HConcat { .. } => {598self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)599},600// Caches will run predicate push-down in the `cache_states` run.601Cache { .. } => {602if self.caches_pass_allowance == 0 {603self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena)604} else {605self.caches_pass_allowance = self.caches_pass_allowance.saturating_sub(1);606self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)607}608},609#[cfg(feature = "python")]610PythonScan { mut options } => {611let predicate = predicate_at_scan(acc_predicates, None, expr_arena);612if let Some(predicate) = predicate {613match ExprPushdownGroup::Pushable.update_with_expr_rec(614expr_arena.get(predicate.node()),615expr_arena,616None,617) {618ExprPushdownGroup::Barrier => {619if cfg!(debug_assertions) {620// Expression should not be pushed here by the optimizer621panic!()622}623624return Ok(self.optional_apply_predicate(625PythonScan { options },626vec![predicate],627lp_arena,628expr_arena,629));630},631632ExprPushdownGroup::Pushable | ExprPushdownGroup::Fallible => {633options.predicate = PythonPredicate::Polars(predicate);634},635}636}637638Ok(PythonScan { options })639},640#[cfg(feature = "merge_sorted")]641lp @ MergeSorted { .. } => {642self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)643},644Invalid => unreachable!(),645}646}647648pub(crate) fn optimize(649&mut self,650logical_plan: IR,651lp_arena: &mut Arena<IR>,652expr_arena: &mut Arena<AExpr>,653) -> PolarsResult<IR> {654let acc_predicates = PlHashMap::new();655self.push_down(logical_plan, acc_predicates, lp_arena, expr_arena)656}657}658659660