Path: blob/main/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs
8430 views
mod group_by;1mod join;2mod keys;3mod utils;45use polars_core::datatypes::PlHashMap;6use polars_core::prelude::*;7use polars_utils::idx_vec::UnitVec;8use recursive::recursive;9use utils::*;1011use super::*;12use crate::prelude::optimizer::predicate_pushdown::group_by::process_group_by;13use crate::prelude::optimizer::predicate_pushdown::join::process_join;14use crate::utils::{check_input_node, has_aexpr};1516/// The struct is wrapped in a mod to prevent direct member access of `nodes_scratch`17mod inner {18use polars_utils::arena::Node;19use polars_utils::idx_vec::UnitVec;20use polars_utils::unitvec;2122pub struct PredicatePushDown {23// How many cache nodes a predicate may be pushed down to.24// Normally this is 0. Only needed for CSPE.25pub(super) caches_pass_allowance: u32,26nodes_scratch: UnitVec<Node>,27pub(super) new_streaming: bool,28// Controls pushing filters past fallible projections29pub(super) maintain_errors: bool,30}3132impl PredicatePushDown {33pub fn new(maintain_errors: bool, new_streaming: bool) -> Self {34Self {35caches_pass_allowance: 0,36nodes_scratch: unitvec![],37new_streaming,38maintain_errors,39}40}4142/// Returns shared scratch space after clearing.43pub(super) fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {44self.nodes_scratch.clear();45&mut self.nodes_scratch46}47}48}4950pub use inner::PredicatePushDown;5152impl PredicatePushDown {53pub(crate) fn block_at_cache(mut self, count: u32) -> Self {54self.caches_pass_allowance = count;55self56}5758fn optional_apply_predicate(59&mut self,60lp: IR,61local_predicates: Vec<ExprIR>,62lp_arena: &mut Arena<IR>,63expr_arena: &mut Arena<AExpr>,64) -> IR {65if !local_predicates.is_empty() {66let predicate = combine_predicates(local_predicates.into_iter(), expr_arena);67let input = lp_arena.add(lp);6869IR::Filter { input, predicate }70} else {71lp72}73}7475fn pushdown_and_assign(76&mut self,77input: Node,78acc_predicates: PlHashMap<PlSmallStr, ExprIR>,79lp_arena: &mut Arena<IR>,80expr_arena: &mut Arena<AExpr>,81) -> PolarsResult<()> {82let alp = lp_arena.take(input);83let lp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;84lp_arena.replace(input, lp);85Ok(())86}8788/// Filter will be pushed down.89fn pushdown_and_continue(90&mut self,91lp: IR,92mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,93lp_arena: &mut Arena<IR>,94expr_arena: &mut Arena<AExpr>,95has_projections: bool,96) -> PolarsResult<IR> {97if has_projections {98let input = {99let mut inputs = lp.inputs();100let input = inputs.next().unwrap();101// projections should only have a single input.102if inputs.next().is_some() {103// except for ExtContext104assert!(matches!(lp, IR::ExtContext { .. }));105}106input107};108109let maintain_errors = self.maintain_errors;110let (eligibility, alias_rename_map) = pushdown_eligibility(111&lp.exprs().cloned().collect::<Vec<_>>(),112&[],113&acc_predicates,114expr_arena,115self.empty_nodes_scratch_mut(),116maintain_errors,117lp_arena.get(input),118)?;119120let local_predicates = match eligibility {121PushdownEligibility::Full => vec![],122PushdownEligibility::Partial { to_local } => {123let mut out = Vec::with_capacity(to_local.len());124for key in to_local {125out.push(acc_predicates.remove(&key).unwrap());126}127out128},129PushdownEligibility::NoPushdown => {130return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);131},132};133134if !alias_rename_map.is_empty() {135for (_, expr_ir) in acc_predicates.iter_mut() {136map_column_references(expr_ir, expr_arena, &alias_rename_map);137}138}139140let alp = lp_arena.take(input);141let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;142lp_arena.replace(input, alp);143144Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))145} else {146let mut local_predicates = Vec::with_capacity(acc_predicates.len());147148let inputs = lp.get_inputs();149150// determine new inputs by pushing down predicates151let new_inputs = inputs152.into_iter()153.map(|node| {154// first we check if we are able to push down the predicate passed this node155// it could be that this node just added the column where we base the predicate on156let input_schema = lp_arena.get(node).schema(lp_arena);157let mut pushdown_predicates =158optimizer::init_hashmap(Some(acc_predicates.len()));159for (_, predicate) in acc_predicates.iter() {160// we can pushdown the predicate161if check_input_node(predicate.node(), &input_schema, expr_arena) {162insert_predicate_dedup(&mut pushdown_predicates, predicate, expr_arena)163}164// we cannot pushdown the predicate we do it here165else {166local_predicates.push(predicate.clone());167}168}169170let alp = lp_arena.take(node);171let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;172lp_arena.replace(node, alp);173Ok(node)174})175.collect::<PolarsResult<UnitVec<_>>>()?;176177let lp = lp.with_inputs(new_inputs);178Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))179}180}181182/// Filter will be done at this node, but we continue optimization183fn no_pushdown_restart_opt(184&mut self,185lp: IR,186acc_predicates: PlHashMap<PlSmallStr, ExprIR>,187lp_arena: &mut Arena<IR>,188expr_arena: &mut Arena<AExpr>,189) -> PolarsResult<IR> {190let inputs = lp.inputs();191192let new_inputs = inputs193.map(|node| {194let alp = lp_arena.take(node);195let alp = self.push_down(196alp,197init_hashmap(Some(acc_predicates.len())),198lp_arena,199expr_arena,200)?;201lp_arena.replace(node, alp);202Ok(node)203})204.collect::<PolarsResult<Vec<_>>>()?;205let lp = lp.with_inputs(new_inputs);206207// all predicates are done locally208let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();209Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))210}211212fn no_pushdown(213&mut self,214lp: IR,215acc_predicates: PlHashMap<PlSmallStr, ExprIR>,216lp_arena: &mut Arena<IR>,217expr_arena: &mut Arena<AExpr>,218) -> PolarsResult<IR> {219// all predicates are done locally220let local_predicates = acc_predicates.into_values().collect::<Vec<_>>();221Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))222}223224/// Predicate pushdown optimizer225///226/// # Arguments227///228/// * `IR` - Arena based logical plan tree representing the query.229/// * `acc_predicates` - The predicates we accumulate during tree traversal.230/// The hashmap maps from leaf-column name to predicates on that column.231/// If the key is already taken we combine the predicate with a bitand operation.232/// The `Node`s are indexes in the `expr_arena`233/// * `lp_arena` - The local memory arena for the logical plan.234/// * `expr_arena` - The local memory arena for the expressions.235#[recursive]236fn push_down(237&mut self,238lp: IR,239mut acc_predicates: PlHashMap<PlSmallStr, ExprIR>,240lp_arena: &mut Arena<IR>,241expr_arena: &mut Arena<AExpr>,242) -> PolarsResult<IR> {243use IR::*;244245// Note: The logic within the match block should ensure `acc_predicates` is left in a state246// where it contains only pushable exprs after it is done (although in some cases it may247// contain a single fallible expression).248249match lp {250Filter {251// Note: We assume AND'ed predicates have already been split to separate IR filter252// nodes during DSL conversion so we don't do that here.253ref predicate,254input,255} => {256// Use a tmp_key to avoid inadvertently combining predicates that otherwise would have257// been partially pushed:258//259// (1) .filter(pl.count().over("key") == 1)260// (2) .filter(pl.col("key") == 1)261//262// (2) can be pushed past (1) but they both have the same predicate263// key name in the hashtable.264let tmp_key = temporary_unique_key(&acc_predicates);265acc_predicates.insert(tmp_key.clone(), predicate.clone());266267let maintain_errors = self.maintain_errors;268269let local_predicates = match pushdown_eligibility(270&[],271&[(&tmp_key, predicate.clone())],272&acc_predicates,273expr_arena,274self.empty_nodes_scratch_mut(),275maintain_errors,276lp_arena.get(input),277)?278.0279{280PushdownEligibility::Full => vec![],281PushdownEligibility::Partial { to_local } => {282let mut out = Vec::with_capacity(to_local.len());283for key in to_local {284out.push(acc_predicates.remove(&key).unwrap());285}286out287},288PushdownEligibility::NoPushdown => {289let out = acc_predicates.drain().map(|t| t.1).collect();290acc_predicates.clear();291out292},293};294295if let Some(predicate) = acc_predicates.remove(&tmp_key) {296insert_predicate_dedup(&mut acc_predicates, &predicate, expr_arena);297}298299let alp = lp_arena.take(input);300let new_input = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;301302// TODO!303// If a predicates result would be influenced by earlier applied304// predicates, we simply don't pushdown this one passed this node305// However, we can do better and let it pass but store the order of the predicates306// so that we can apply them in correct order at the deepest level307Ok(308self.optional_apply_predicate(309new_input,310local_predicates,311lp_arena,312expr_arena,313),314)315},316DataFrameScan {317df,318schema,319output_schema,320} => {321let selection = predicate_at_scan(acc_predicates, None, expr_arena);322let mut lp = DataFrameScan {323df,324schema,325output_schema,326};327328if let Some(predicate) = selection {329let input = lp_arena.add(lp);330331lp = IR::Filter { input, predicate }332}333334Ok(lp)335},336Scan {337sources,338file_info,339hive_parts: scan_hive_parts,340ref predicate,341predicate_file_skip_applied,342scan_type,343unified_scan_args,344output_schema,345} => {346let mut blocked_names = Vec::with_capacity(2);347348// TODO: Allow predicates on file names, this should be supported by new-streaming.349if let Some(col) = unified_scan_args.include_file_paths.as_deref() {350blocked_names.push(col);351}352353let local_predicates = if blocked_names.is_empty() {354vec![]355} else {356transfer_to_local_by_name(expr_arena, &mut acc_predicates, |name| {357blocked_names.contains(&name.as_ref())358})359};360let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena);361362let mut do_optimization = match &*scan_type {363#[cfg(feature = "csv")]364FileScanIR::Csv { .. } => unified_scan_args.pre_slice.is_none(),365FileScanIR::Anonymous { function, .. } => function.allows_predicate_pushdown(),366#[cfg(feature = "json")]367FileScanIR::NDJson { .. } => true,368#[allow(unreachable_patterns)]369_ => true,370};371do_optimization &= predicate.is_some();372373let hive_parts = scan_hive_parts;374375let lp = if do_optimization {376Scan {377sources,378file_info,379hive_parts,380predicate,381predicate_file_skip_applied,382unified_scan_args,383output_schema,384scan_type,385}386} else {387let lp = Scan {388sources,389file_info,390hive_parts,391predicate: None,392predicate_file_skip_applied,393unified_scan_args,394output_schema,395scan_type,396};397if let Some(predicate) = predicate {398let input = lp_arena.add(lp);399Filter { input, predicate }400} else {401lp402}403};404405Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))406},407Distinct { input, options } => {408let subset = if let Some(ref subset) = options.subset {409subset.as_ref()410} else {411&[]412};413let mut names_set = PlHashSet::<PlSmallStr>::with_capacity(subset.len());414for name in subset.iter() {415names_set.insert(name.clone());416}417418let local_predicates = match options.keep_strategy {419UniqueKeepStrategy::Any => {420let condition = |e: &ExprIR| {421// if not elementwise -> to local422!is_elementwise_rec(e.node(), expr_arena)423};424transfer_to_local_by_expr_ir(expr_arena, &mut acc_predicates, condition)425},426UniqueKeepStrategy::First427| UniqueKeepStrategy::Last428| UniqueKeepStrategy::None => {429let condition = |name: &PlSmallStr| {430!subset.is_empty() && !names_set.contains(name.as_str())431};432transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition)433},434};435436self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;437let lp = Distinct { input, options };438Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))439},440Join {441input_left,442input_right,443left_on,444right_on,445schema,446options,447} => process_join(448self,449lp_arena,450expr_arena,451input_left,452input_right,453left_on,454right_on,455schema,456options,457acc_predicates,458self.new_streaming,459),460MapFunction { ref function, .. } => {461if function.allow_predicate_pd() {462match function {463FunctionIR::Explode { columns, .. } => {464let condition = |name: &PlSmallStr| columns.iter().any(|s| s == name);465466// first columns that refer to the exploded columns should be done here467let local_predicates = transfer_to_local_by_name(468expr_arena,469&mut acc_predicates,470condition,471);472473let lp = self.pushdown_and_continue(474lp,475acc_predicates,476lp_arena,477expr_arena,478false,479)?;480Ok(self.optional_apply_predicate(481lp,482local_predicates,483lp_arena,484expr_arena,485))486},487#[cfg(feature = "pivot")]488FunctionIR::Unpivot { args, .. } => {489// predicates that will be done at this level490let condition = |name: &PlSmallStr| {491name == &args.variable_name || name == &args.value_name492};493let local_predicates = transfer_to_local_by_name(494expr_arena,495&mut acc_predicates,496condition,497);498499let lp = self.pushdown_and_continue(500lp,501acc_predicates,502lp_arena,503expr_arena,504false,505)?;506Ok(self.optional_apply_predicate(507lp,508local_predicates,509lp_arena,510expr_arena,511))512},513FunctionIR::Unnest {514columns,515separator: _,516} => {517let exclude = columns.iter().cloned().collect::<PlHashSet<_>>();518519let local_predicates =520transfer_to_local_by_name(expr_arena, &mut acc_predicates, |x| {521exclude.contains(x)522});523524let lp = self.pushdown_and_continue(525lp,526acc_predicates,527lp_arena,528expr_arena,529false,530)?;531Ok(self.optional_apply_predicate(532lp,533local_predicates,534lp_arena,535expr_arena,536))537},538_ => self.pushdown_and_continue(539lp,540acc_predicates,541lp_arena,542expr_arena,543false,544),545}546} else {547self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)548}549},550GroupBy {551input,552keys,553aggs,554schema,555apply,556maintain_order,557options,558} => process_group_by(559self,560lp_arena,561expr_arena,562input,563keys,564aggs,565schema,566maintain_order,567apply,568options,569acc_predicates,570),571lp @ Union { .. } => {572self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)573},574lp @ Sort { .. } => {575self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)576},577lp @ Sink { .. } | lp @ SinkMultiple { .. } => {578self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)579},580// Pushed down passed these nodes581lp @ HStack { .. }582| lp @ Select { .. }583| lp @ SimpleProjection { .. }584| lp @ ExtContext { .. } => {585self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)586},587// NOT Pushed down passed these nodes588// predicates influence slice sizes589lp @ Slice { .. } => {590self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)591},592lp @ HConcat { .. } => {593self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)594},595// Caches will run predicate push-down in the `cache_states` run.596Cache { .. } => {597if self.caches_pass_allowance == 0 {598self.no_pushdown(lp, acc_predicates, lp_arena, expr_arena)599} else {600self.caches_pass_allowance = self.caches_pass_allowance.saturating_sub(1);601self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)602}603},604#[cfg(feature = "python")]605PythonScan { mut options } => {606let predicate = predicate_at_scan(acc_predicates, None, expr_arena);607if let Some(predicate) = predicate {608match ExprPushdownGroup::Pushable.update_with_expr_rec(609expr_arena.get(predicate.node()),610expr_arena,611None,612) {613ExprPushdownGroup::Barrier => {614if cfg!(debug_assertions) {615// Expression should not be pushed here by the optimizer616panic!()617}618619return Ok(self.optional_apply_predicate(620PythonScan { options },621vec![predicate],622lp_arena,623expr_arena,624));625},626627ExprPushdownGroup::Pushable | ExprPushdownGroup::Fallible => {628options.predicate = PythonPredicate::Polars(predicate);629},630}631}632633Ok(PythonScan { options })634},635#[cfg(feature = "merge_sorted")]636lp @ MergeSorted { .. } => {637self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)638},639Invalid => unreachable!(),640}641}642643pub(crate) fn optimize(644&mut self,645logical_plan: IR,646lp_arena: &mut Arena<IR>,647expr_arena: &mut Arena<AExpr>,648) -> PolarsResult<IR> {649let acc_predicates = PlHashMap::new();650self.push_down(logical_plan, acc_predicates, lp_arena, expr_arena)651}652}653654655