Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
7889 views
mod functions;1mod generic;2mod group_by;3mod hconcat;4mod hstack;5mod joins;6mod projection;78use polars_core::datatypes::PlHashSet;9use polars_core::prelude::*;10use polars_io::RowIndex;11use polars_utils::idx_vec::UnitVec;12use recursive::recursive;1314use crate::prelude::optimizer::projection_pushdown::generic::process_generic;15use crate::prelude::optimizer::projection_pushdown::group_by::process_group_by;16use crate::prelude::optimizer::projection_pushdown::hconcat::process_hconcat;17use crate::prelude::optimizer::projection_pushdown::hstack::process_hstack;18use crate::prelude::optimizer::projection_pushdown::joins::process_join;19use crate::prelude::optimizer::projection_pushdown::projection::process_projection;20use crate::prelude::*;21use crate::utils::aexpr_to_leaf_names;2223#[derive(Default, Copy, Clone)]24struct ProjectionCopyState {25projections_seen: usize,26is_count_star: bool,27}2829#[derive(Clone, Default)]30struct ProjectionContext {31acc_projections: Vec<ColumnNode>,32projected_names: PlHashSet<PlSmallStr>,33inner: ProjectionCopyState,34}3536impl ProjectionContext {37fn new(38acc_projections: Vec<ColumnNode>,39projected_names: PlHashSet<PlSmallStr>,40inner: ProjectionCopyState,41) -> Self {42Self {43acc_projections,44projected_names,45inner,46}47}4849/// If this is `true`, other nodes should add the columns50/// they need to the push down state51fn has_pushed_down(&self) -> bool {52// count star also acts like a pushdown as we will select a single column at the source53// when there were no other projections.54!self.acc_projections.is_empty() || self.inner.is_count_star55}5657fn process_count_star_at_scan(&mut self, schema: &Schema, expr_arena: &mut Arena<AExpr>) {58if self.acc_projections.is_empty() {59let (name, _dt) = match schema.len() {600 => return,611 => schema.get_at_index(0).unwrap(),62_ => {63// skip first as that can be the row index.64// We look for a relative cheap type, such as a numeric or bool65schema66.iter()67.skip(1)68.find(|(_name, dt)| {69let phys = dt;70phys.is_null()71|| phys.is_primitive_numeric()72|| phys.is_bool()73|| phys.is_temporal()74})75.unwrap_or_else(|| schema.get_at_index(schema.len() - 1).unwrap())76},77};7879let node = expr_arena.add(AExpr::Column(name.clone()));80self.acc_projections.push(ColumnNode(node));81self.projected_names.insert(name.clone());82}83}84}8586/// utility function to get names of the columns needed in projection at scan level87fn get_scan_columns(88acc_projections: &[ColumnNode],89expr_arena: &Arena<AExpr>,90row_index: Option<&RowIndex>,91file_path_col: Option<&str>,92// When set, the column order will match the order from the provided schema93normalize_order_schema: Option<&Schema>,94) -> Option<Arc<[PlSmallStr]>> {95if acc_projections.is_empty() {96return None;97}9899let mut column_names = acc_projections100.iter()101.filter_map(|node| {102let name = column_node_to_name(*node, expr_arena);103104if let Some(ri) = row_index {105if ri.name == name {106return None;107}108}109110if let Some(file_path_col) = file_path_col {111if file_path_col == name.as_str() {112return None;113}114}115116Some(name.clone())117})118.collect::<Vec<_>>();119120if let Some(schema) = normalize_order_schema {121column_names.sort_unstable_by_key(|name| schema.try_get_full(name).unwrap().0);122}123124Some(column_names.into_iter().collect::<Arc<[_]>>())125}126127/// split in a projection vec that can be pushed down and a projection vec that should be used128/// in this node129///130/// # Returns131/// accumulated_projections, local_projections, accumulated_names132///133/// - `expands_schema`. An unnest adds more columns to a schema, so we cannot use fast path134fn split_acc_projections(135acc_projections: Vec<ColumnNode>,136down_schema: &Schema,137expr_arena: &Arena<AExpr>,138expands_schema: bool,139) -> (Vec<ColumnNode>, Vec<ColumnNode>, PlHashSet<PlSmallStr>) {140// If node above has as many columns as the projection there is nothing to pushdown.141if !expands_schema && down_schema.len() == acc_projections.len() {142let local_projections = acc_projections;143(vec![], local_projections, PlHashSet::new())144} else {145let (acc_projections, local_projections): (Vec<_>, Vec<_>) = acc_projections146.into_iter()147.partition(|expr| check_input_column_node(*expr, down_schema, expr_arena));148let mut names = PlHashSet::default();149for proj in &acc_projections {150let name = column_node_to_name(*proj, expr_arena).clone();151names.insert(name);152}153(acc_projections, local_projections, names)154}155}156157/// utility function such that we can recurse all binary expressions in the expression tree158fn add_expr_to_accumulated(159expr: Node,160acc_projections: &mut Vec<ColumnNode>,161projected_names: &mut PlHashSet<PlSmallStr>,162expr_arena: &Arena<AExpr>,163) {164for root_node in aexpr_to_column_nodes_iter(expr, expr_arena) {165let name = column_node_to_name(root_node, expr_arena).clone();166if projected_names.insert(name) {167acc_projections.push(root_node)168}169}170}171172fn add_str_to_accumulated(173name: PlSmallStr,174ctx: &mut ProjectionContext,175expr_arena: &mut Arena<AExpr>,176) {177// if not pushed down: all columns are already projected.178if ctx.has_pushed_down() && !ctx.projected_names.contains(&name) {179let node = expr_arena.add(AExpr::Column(name));180add_expr_to_accumulated(181node,182&mut ctx.acc_projections,183&mut ctx.projected_names,184expr_arena,185);186}187}188189fn update_scan_schema(190acc_projections: &[ColumnNode],191expr_arena: &Arena<AExpr>,192schema: &Schema,193sort_projections: bool,194) -> PolarsResult<Schema> {195let mut new_schema = Schema::with_capacity(acc_projections.len());196let mut new_cols = Vec::with_capacity(acc_projections.len());197for node in acc_projections.iter() {198let name = column_node_to_name(*node, expr_arena);199let item = schema.try_get_full(name)?;200new_cols.push(item);201}202// make sure that the projections are sorted by the schema.203if sort_projections {204new_cols.sort_unstable_by_key(|item| item.0);205}206for item in new_cols {207new_schema.with_column(item.1.clone(), item.2.clone());208}209Ok(new_schema)210}211212pub struct ProjectionPushDown {213pub is_count_star: bool,214}215216impl ProjectionPushDown {217pub(super) fn new() -> Self {218Self {219is_count_star: false,220}221}222223/// Projection will be done at this node, but we continue optimization224fn no_pushdown_restart_opt(225&mut self,226lp: IR,227ctx: ProjectionContext,228lp_arena: &mut Arena<IR>,229expr_arena: &mut Arena<AExpr>,230) -> PolarsResult<IR> {231let inputs = lp.get_inputs();232233let new_inputs = inputs234.into_iter()235.map(|node| {236let alp = lp_arena.take(node);237let ctx = ProjectionContext::new(Default::default(), Default::default(), ctx.inner);238let alp = self.push_down(alp, ctx, lp_arena, expr_arena)?;239lp_arena.replace(node, alp);240Ok(node)241})242.collect::<PolarsResult<UnitVec<_>>>()?;243let lp = lp.with_inputs(new_inputs);244245let builder = IRBuilder::from_lp(lp, expr_arena, lp_arena);246Ok(self.finish_node_simple_projection(&ctx.acc_projections, builder))247}248249fn finish_node_simple_projection(250&mut self,251local_projections: &[ColumnNode],252builder: IRBuilder,253) -> IR {254if !local_projections.is_empty() {255builder256.project_simple_nodes(local_projections.iter().map(|node| node.0))257.unwrap()258.build()259} else {260builder.build()261}262}263264fn finish_node(&mut self, local_projections: Vec<ExprIR>, builder: IRBuilder) -> IR {265if !local_projections.is_empty() {266builder267.project(local_projections, Default::default())268.build()269} else {270builder.build()271}272}273274#[allow(clippy::too_many_arguments)]275#[allow(unused)]276fn join_push_down(277&mut self,278schema_left: &Schema,279schema_right: &Schema,280proj: ColumnNode,281pushdown_left: &mut Vec<ColumnNode>,282pushdown_right: &mut Vec<ColumnNode>,283names_left: &mut PlHashSet<PlSmallStr>,284names_right: &mut PlHashSet<PlSmallStr>,285expr_arena: &Arena<AExpr>,286) -> (bool, bool) {287let mut pushed_at_least_one = false;288let mut already_projected = false;289290let name = column_node_to_name(proj, expr_arena);291let is_in_left = names_left.contains(name);292let is_in_right = names_right.contains(name);293already_projected |= is_in_left;294already_projected |= is_in_right;295296if check_input_column_node(proj, schema_left, expr_arena) && !is_in_left {297names_left.insert(name.clone());298pushdown_left.push(proj);299pushed_at_least_one = true;300}301if check_input_column_node(proj, schema_right, expr_arena) && !is_in_right {302names_right.insert(name.clone());303pushdown_right.push(proj);304pushed_at_least_one = true;305}306307(pushed_at_least_one, already_projected)308}309310/// This pushes down current node and assigns the result to this node.311fn pushdown_and_assign(312&mut self,313input: Node,314ctx: ProjectionContext,315lp_arena: &mut Arena<IR>,316expr_arena: &mut Arena<AExpr>,317) -> PolarsResult<()> {318let alp = lp_arena.take(input);319let lp = self.push_down(alp, ctx, lp_arena, expr_arena)?;320lp_arena.replace(input, lp);321Ok(())322}323324/// This pushes down the projection that are validated325/// that they can be done successful at the schema above326/// The result is assigned to this node.327///328/// The local projections are return and still have to be applied329fn pushdown_and_assign_check_schema(330&mut self,331input: Node,332mut ctx: ProjectionContext,333lp_arena: &mut Arena<IR>,334expr_arena: &mut Arena<AExpr>,335// an unnest changes/expands the schema336expands_schema: bool,337) -> PolarsResult<Vec<ColumnNode>> {338let alp = lp_arena.take(input);339let down_schema = alp.schema(lp_arena);340341let (acc_projections, local_projections, names) = split_acc_projections(342ctx.acc_projections,343&down_schema,344expr_arena,345expands_schema,346);347348ctx.acc_projections = acc_projections;349ctx.projected_names = names;350351let lp = self.push_down(alp, ctx, lp_arena, expr_arena)?;352lp_arena.replace(input, lp);353Ok(local_projections)354}355356/// Projection pushdown optimizer357///358/// # Arguments359///360/// * `IR` - Arena based logical plan tree representing the query.361/// * `acc_projections` - The projections we accumulate during tree traversal.362/// * `names` - We keep track of the names to ensure we don't do duplicate projections.363/// * `projections_seen` - Count the number of projection operations during tree traversal.364/// * `lp_arena` - The local memory arena for the logical plan.365/// * `expr_arena` - The local memory arena for the expressions.366#[recursive]367fn push_down(368&mut self,369logical_plan: IR,370mut ctx: ProjectionContext,371lp_arena: &mut Arena<IR>,372expr_arena: &mut Arena<AExpr>,373) -> PolarsResult<IR> {374use IR::*;375376match logical_plan {377Select { expr, input, .. } => {378process_projection(self, input, expr, ctx, lp_arena, expr_arena, false)379},380SimpleProjection { columns, input, .. } => {381let exprs = names_to_expr_irs(columns.iter_names_cloned(), expr_arena);382process_projection(self, input, exprs, ctx, lp_arena, expr_arena, true)383},384DataFrameScan {385df,386schema,387mut output_schema,388..389} => {390// TODO: Just project 0-width morsels.391if self.is_count_star {392ctx.process_count_star_at_scan(&schema, expr_arena);393}394if ctx.has_pushed_down() {395output_schema = Some(Arc::new(update_scan_schema(396&ctx.acc_projections,397expr_arena,398&schema,399false,400)?));401}402let lp = DataFrameScan {403df,404schema,405output_schema,406};407Ok(lp)408},409#[cfg(feature = "python")]410PythonScan { mut options } => {411if self.is_count_star {412ctx.process_count_star_at_scan(&options.schema, expr_arena);413}414415let normalize_order_schema = Some(&*options.schema);416417options.with_columns = get_scan_columns(418&ctx.acc_projections,419expr_arena,420None,421None,422normalize_order_schema,423);424425options.output_schema = if options.with_columns.is_none() {426None427} else {428Some(Arc::new(update_scan_schema(429&ctx.acc_projections,430expr_arena,431&options.schema,432true,433)?))434};435Ok(PythonScan { options })436},437Scan {438sources,439mut file_info,440hive_parts,441scan_type,442predicate,443predicate_file_skip_applied,444mut unified_scan_args,445mut output_schema,446} => {447let do_optimization = match &*scan_type {448FileScanIR::Anonymous { function, .. } => function.allows_projection_pushdown(),449#[cfg(feature = "json")]450FileScanIR::NDJson { .. } => true,451#[cfg(feature = "ipc")]452FileScanIR::Ipc { .. } => true,453#[cfg(feature = "csv")]454FileScanIR::Csv { .. } => true,455#[cfg(feature = "parquet")]456FileScanIR::Parquet { .. } => true,457#[cfg(feature = "scan_lines")]458FileScanIR::Lines { .. } => true,459// MultiScan will handle it if the PythonDataset cannot do projections.460#[cfg(feature = "python")]461FileScanIR::PythonDataset { .. } => true,462};463464#[expect(clippy::never_loop)]465loop {466if !do_optimization {467break;468}469470if self.is_count_star {471if let FileScanIR::Anonymous { .. } = &*scan_type {472// Anonymous scan is not controlled by us, we don't know if it can support473// 0-column projections, so we always project one.474use either::Either;475476let projection: Arc<[PlSmallStr]> = match &file_info.reader_schema {477Some(Either::Left(s)) => s.iter_names().next(),478Some(Either::Right(s)) => s.iter_names().next(),479None => None,480}481.into_iter()482.cloned()483.collect();484485unified_scan_args.projection = Some(projection.clone());486487if projection.is_empty() {488output_schema = Some(Default::default());489break;490}491492ctx.acc_projections.push(ColumnNode(493expr_arena.add(AExpr::Column(projection[0].clone())),494));495496unified_scan_args.projection = Some(projection)497} else {498// All nodes in new-streaming support projecting empty morsels with the correct height499// from the file.500unified_scan_args.projection = Some(Arc::from([]));501output_schema = Some(Default::default());502break;503};504}505506unified_scan_args.projection = get_scan_columns(507&ctx.acc_projections,508expr_arena,509unified_scan_args.row_index.as_ref(),510unified_scan_args.include_file_paths.as_deref(),511None,512);513514output_schema = if unified_scan_args.projection.is_some() {515let mut schema = update_scan_schema(516&ctx.acc_projections,517expr_arena,518&file_info.schema,519scan_type.sort_projection(unified_scan_args.row_index.is_some()),520)?;521522if let Some(ref file_path_col) = unified_scan_args.include_file_paths {523if let Some(i) = schema.index_of(file_path_col) {524let (name, dtype) = schema.shift_remove_index(i).unwrap();525schema.insert_at_index(schema.len(), name, dtype)?;526}527}528529Some(Arc::new(schema))530} else {531None532};533534break;535}536537// File builder has a row index, but projected columns538// do not include it, so cull.539if let Some(RowIndex { ref name, .. }) = unified_scan_args.row_index {540if output_schema541.as_ref()542.is_some_and(|schema| !schema.contains(name))543{544// Need to remove it from the input schema so545// that projection indices are correct.546let mut file_schema = Arc::unwrap_or_clone(file_info.schema);547file_schema.shift_remove(name);548file_info.schema = Arc::new(file_schema);549unified_scan_args.row_index = None;550}551};552553if let Some(col_name) = &unified_scan_args.include_file_paths {554if output_schema555.as_ref()556.is_some_and(|schema| !schema.contains(col_name))557{558// Need to remove it from the input schema so559// that projection indices are correct.560let mut file_schema = Arc::unwrap_or_clone(file_info.schema);561file_schema.shift_remove(col_name);562file_info.schema = Arc::new(file_schema);563unified_scan_args.include_file_paths = None;564}565};566567let lp = Scan {568sources,569file_info,570hive_parts,571output_schema,572scan_type,573predicate,574predicate_file_skip_applied,575unified_scan_args,576};577578Ok(lp)579},580Sort {581input,582by_column,583slice,584sort_options,585} => {586if ctx.has_pushed_down() {587// Make sure that the column(s) used for the sort is projected588by_column.iter().for_each(|node| {589add_expr_to_accumulated(590node.node(),591&mut ctx.acc_projections,592&mut ctx.projected_names,593expr_arena,594);595});596}597598self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;599Ok(Sort {600input,601by_column,602slice,603sort_options,604})605},606Distinct { input, options } => {607// make sure that the set of unique columns is projected608if ctx.has_pushed_down() {609if let Some(subset) = options.subset.as_ref() {610subset.iter().for_each(|name| {611add_str_to_accumulated(name.clone(), &mut ctx, expr_arena)612})613} else {614// distinct needs all columns615let input_schema = lp_arena.get(input).schema(lp_arena);616for name in input_schema.iter_names() {617add_str_to_accumulated(name.clone(), &mut ctx, expr_arena)618}619}620}621622self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;623Ok(Distinct { input, options })624},625Filter { predicate, input } => {626if ctx.has_pushed_down() {627// make sure that the filter column is projected628add_expr_to_accumulated(629predicate.node(),630&mut ctx.acc_projections,631&mut ctx.projected_names,632expr_arena,633);634};635self.pushdown_and_assign(input, ctx, lp_arena, expr_arena)?;636Ok(Filter { predicate, input })637},638GroupBy {639input,640keys,641aggs,642apply,643schema,644maintain_order,645options,646} => process_group_by(647self,648input,649keys,650aggs,651apply,652schema,653maintain_order,654options,655ctx,656lp_arena,657expr_arena,658),659join_ir @ Join { .. } => process_join(join_ir, ctx, self, lp_arena, expr_arena),660HStack {661input,662exprs,663options,664..665} => process_hstack(self, input, exprs, options, ctx, lp_arena, expr_arena),666ExtContext {667input, contexts, ..668} => {669// local projections are ignored. These are just root nodes670// complex expression will still be done later671let _local_projections =672self.pushdown_and_assign_check_schema(input, ctx, lp_arena, expr_arena, false)?;673674let mut new_schema = lp_arena675.get(input)676.schema(lp_arena)677.as_ref()678.as_ref()679.clone();680681for node in &contexts {682let other_schema = lp_arena.get(*node).schema(lp_arena);683for fld in other_schema.iter_fields() {684if new_schema.get(fld.name()).is_none() {685new_schema.with_column(fld.name, fld.dtype);686}687}688}689690Ok(ExtContext {691input,692contexts,693schema: Arc::new(new_schema),694})695},696MapFunction { input, function } => {697functions::process_functions(self, input, function, ctx, lp_arena, expr_arena)698},699HConcat {700inputs,701schema,702options,703} => process_hconcat(self, inputs, schema, options, ctx, lp_arena, expr_arena),704lp @ Union { .. } => process_generic(self, lp, ctx, lp_arena, expr_arena),705// These nodes only have inputs and exprs, so we can use same logic.706lp @ Slice { .. } | lp @ Sink { .. } | lp @ SinkMultiple { .. } => {707process_generic(self, lp, ctx, lp_arena, expr_arena)708},709Cache { .. } => {710// projections above this cache will be accumulated and pushed down711// later712// the redundant projection will be cleaned in the fast projection optimization713// phase.714if ctx.acc_projections.is_empty() {715Ok(logical_plan)716} else {717Ok(IRBuilder::from_lp(logical_plan, expr_arena, lp_arena)718.project_simple_nodes(ctx.acc_projections)719.unwrap()720.build())721}722},723#[cfg(feature = "merge_sorted")]724MergeSorted {725input_left,726input_right,727key,728} => {729if ctx.has_pushed_down() {730// make sure that the filter column is projected731add_str_to_accumulated(key.clone(), &mut ctx, expr_arena);732};733734self.pushdown_and_assign(input_left, ctx.clone(), lp_arena, expr_arena)?;735self.pushdown_and_assign(input_right, ctx, lp_arena, expr_arena)?;736737Ok(MergeSorted {738input_left,739input_right,740key,741})742},743Invalid => unreachable!(),744}745}746747pub fn optimize(748&mut self,749logical_plan: IR,750lp_arena: &mut Arena<IR>,751expr_arena: &mut Arena<AExpr>,752) -> PolarsResult<IR> {753let ctx = ProjectionContext::default();754self.push_down(logical_plan, ctx, lp_arena, expr_arena)755}756}757758759