Path: blob/main/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs
7889 views
use std::sync::Arc;12use polars_core::prelude::InitHashMaps;3use polars_error::PolarsResult;4use polars_ops::frame::{JoinCoalesce, JoinType};5use polars_utils::arena::Arena;6use polars_utils::format_pl_smallstr;7use polars_utils::pl_str::PlSmallStr;89use crate::plans::{10AExpr, ColumnNode, ExprIR, ExprOrigin, IR, IRBuilder, OutputName, PlHashSet, det_join_schema,11};12use crate::prelude::optimizer::projection_pushdown::ProjectionContext;13use crate::prelude::{ProjectionOptions, ProjectionPushDown};14use crate::utils::{aexpr_to_leaf_names_iter, column_node_to_name};1516/// # Panics17/// Panics if `join_ir` is not `IR::Join`.18pub(super) fn process_join(19mut join_ir: IR,20proj_cx: ProjectionContext,21proj_pd: &mut ProjectionPushDown,22ir_arena: &mut Arena<IR>,23expr_arena: &mut Arena<AExpr>,24) -> PolarsResult<IR> {25let IR::Join {26input_left,27input_right,28schema: join_output_schema,29left_on,30right_on,31options,32} = &mut join_ir33else {34panic!()35};3637let is_projected =38|name: &str| proj_cx.projected_names.contains(name) || !proj_cx.has_pushed_down();3940let input_schema_left = ir_arena.get(*input_left).schema(ir_arena).into_owned();41let input_schema_right = ir_arena.get(*input_right).schema(ir_arena).into_owned();4243let mut project_left = PlHashSet::with_capacity(input_schema_left.len());44let mut project_right = PlHashSet::with_capacity(input_schema_right.len());4546let mut coalesced_to_right: PlHashSet<PlSmallStr> = Default::default();47if options.args.should_coalesce()48&& let JoinType::Right = &options.args.how49{50coalesced_to_right = left_on51.iter()52.map(|expr| {53let node = match expr_arena.get(expr.node()) {54AExpr::Cast {55expr,56dtype: _,57options: _,58} => *expr,5960_ => expr.node(),61};6263let AExpr::Column(name) = expr_arena.get(node) else {64// All keys should be columns when coalesce=True65unreachable!()66};6768name.clone()69})70.collect()71}7273// Add accumulated projections74for output_name in join_output_schema.iter_names() {75if !is_projected(output_name) {76continue;77}7879match ExprOrigin::get_column_origin(80output_name,81&input_schema_left,82&input_schema_right,83options.args.suffix(),84Some(&|name| coalesced_to_right.contains(name)),85)? {86ExprOrigin::None => {},87ExprOrigin::Left => {88project_left.insert(output_name.clone());89},90ExprOrigin::Right => {91let name = if !input_schema_right.contains(output_name.as_str()) {92PlSmallStr::from_str(93output_name94.strip_suffix(options.args.suffix().as_str())95.unwrap(),96)97} else {98output_name.clone()99};100101debug_assert!(input_schema_right.contains(name.as_str()));102103project_right.insert(name);104},105ExprOrigin::Both => unreachable!(),106}107}108109// Add projections required by the join itself110for expr_ir in left_on.as_slice() {111for name in aexpr_to_leaf_names_iter(expr_ir.node(), expr_arena).cloned() {112project_left.insert(name);113}114}115116for expr_ir in right_on.as_slice() {117for name in aexpr_to_leaf_names_iter(expr_ir.node(), expr_arena).cloned() {118project_right.insert(name);119}120}121122#[cfg(feature = "asof_join")]123if let JoinType::AsOf(asof_options) = &options.args.how {124if let Some(left_by) = asof_options.left_by.as_deref() {125for name in left_by {126project_left.insert(name.clone());127}128}129130if let Some(right_by) = asof_options.right_by.as_deref() {131for name in right_by {132project_right.insert(name.clone());133}134}135}136137// Turn on coalesce if non-coalesced keys are not included in projection. Reduces materialization.138if !options.args.should_coalesce()139&& matches!(options.args.how, JoinType::Inner | JoinType::Left)140&& left_on141.iter()142.all(|e| matches!(expr_arena.get(e.node()), AExpr::Column(_)))143&& right_on.iter().all(|e| {144let AExpr::Column(name) = expr_arena.get(e.node()) else {145return false;146};147148let projected = if input_schema_left.contains(name.as_str()) {149let name = format_pl_smallstr!("{}{}", name, options.args.suffix());150is_projected(&name)151} else {152is_projected(name)153};154155!projected156})157{158Arc::make_mut(options).args.coalesce = JoinCoalesce::CoalesceColumns;159}160161// Pushdown left/right projections.162{163let input = *input_left;164let acc_projections = input_schema_left165.iter_names()166.filter(|x| project_left.contains(*x))167.map(|name| ColumnNode(expr_arena.add(AExpr::Column(name.clone()))))168.collect();169let projected_names = project_left;170171proj_pd.pushdown_and_assign(172input,173ProjectionContext::new(acc_projections, projected_names, proj_cx.inner),174ir_arena,175expr_arena,176)?;177}178179{180let input = *input_right;181let acc_projections = input_schema_right182.iter_names()183.filter(|x| project_right.contains(*x))184.map(|name| ColumnNode(expr_arena.add(AExpr::Column(name.clone()))))185.collect();186let projected_names = project_right;187188proj_pd.pushdown_and_assign(189input,190ProjectionContext::new(acc_projections, projected_names, proj_cx.inner),191ir_arena,192expr_arena,193)?;194}195196// Resolve new schemas after pushdown to left/right.197let input_schema_left = ir_arena.get(*input_left).schema(ir_arena).into_owned();198let input_schema_right = ir_arena.get(*input_right).schema(ir_arena).into_owned();199let new_join_output_schema = det_join_schema(200&input_schema_left,201&input_schema_right,202left_on,203right_on,204options,205expr_arena,206)207.unwrap();208209let post_project: Option<Vec<ExprIR>> = if proj_cx.has_pushed_down() {210let mut needs_post_project = proj_cx.acc_projections.len() != new_join_output_schema.len();211212// Build post-projection to re-order the columns and add suffixes if necessary.213let post_project: Vec<ExprIR> = proj_cx214.acc_projections215.iter()216.enumerate()217.map(|(i, col_node)| {218let original_projected_name = column_node_to_name(*col_node, expr_arena);219220if new_join_output_schema.index_of(original_projected_name.as_str()) != Some(i) {221needs_post_project = true;222}223224if !new_join_output_schema.contains(original_projected_name.as_str()) {225// This name is no longer suffixed in the new output schema, we restore it with an226// alias here.227let new_output_name = PlSmallStr::from_str(228original_projected_name229.strip_suffix(options.args.suffix().as_str())230.unwrap(),231);232233debug_assert!(new_join_output_schema.contains(new_output_name.as_str()));234let original_projected_name = original_projected_name.clone();235236ExprIR::new(237expr_arena.add(AExpr::Column(new_output_name)),238OutputName::Alias(original_projected_name),239)240} else {241ExprIR::from_node(col_node.0, expr_arena)242}243})244.collect();245246needs_post_project.then_some(post_project)247} else {248None249};250251*join_output_schema = new_join_output_schema;252253let out: IR = if let Some(post_project) = post_project {254IRBuilder::from_lp(join_ir, expr_arena, ir_arena)255.project(256post_project,257ProjectionOptions {258run_parallel: false,259duplicate_check: false,260should_broadcast: false,261},262)263.build()264} else {265join_ir266};267268Ok(out)269}270271272