Path: blob/main/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs
6940 views
use std::sync::Arc;12use arrow::bitmap::MutableBitmap;3use polars_core::schema::Schema;4use polars_utils::aliases::{InitHashMaps, PlHashMap};5use polars_utils::arena::{Arena, Node};6use polars_utils::vec::inplace_zip_filtermap;78use super::aexpr::AExpr;9use super::ir::IR;10use super::{PlSmallStr, aexpr_to_leaf_names_iter};1112type ColumnMap = PlHashMap<PlSmallStr, usize>;1314fn column_map_finalize_bitset(bitset: &mut MutableBitmap, column_map: &ColumnMap) {15assert!(bitset.len() <= column_map.len());1617let size = bitset.len();18bitset.extend_constant(column_map.len() - size, false);19}2021fn column_map_set(bitset: &mut MutableBitmap, column_map: &mut ColumnMap, column: PlSmallStr) {22let size = column_map.len();23column_map24.entry(column)25.and_modify(|idx| bitset.set(*idx, true))26.or_insert_with(|| {27bitset.push(true);28size29});30}3132pub fn optimize(root: Node, lp_arena: &mut Arena<IR>, expr_arena: &Arena<AExpr>) {33let mut ir_stack = Vec::with_capacity(16);34ir_stack.push(root);3536// We define these here to reuse the allocations across the loops37let mut column_map = ColumnMap::with_capacity(8);38let mut input_genset = MutableBitmap::with_capacity(16);39let mut current_expr_livesets: Vec<MutableBitmap> = Vec::with_capacity(16);40let mut current_liveset = MutableBitmap::with_capacity(16);41let mut pushable = MutableBitmap::with_capacity(16);42let mut potential_pushable = Vec::with_capacity(4);4344while let Some(current) = ir_stack.pop() {45let current_ir = lp_arena.get(current);46current_ir.copy_inputs(&mut ir_stack);47let IR::HStack { input, .. } = current_ir else {48continue;49};50let input = *input;5152let [current_ir, input_ir] = lp_arena.get_many_mut([current, input]);5354let IR::HStack {55input: current_input,56exprs: current_exprs,57schema: current_schema,58options: current_options,59} = current_ir60else {61unreachable!();62};63let IR::HStack {64input: input_input,65exprs: input_exprs,66schema: input_schema,67options: input_options,68} = input_ir69else {70continue;71};7273let column_map = &mut column_map;7475// Reuse the allocations of the previous loop76column_map.clear();77input_genset.clear();78current_expr_livesets.clear();79current_liveset.clear();80pushable.clear();81potential_pushable.clear();8283pushable.reserve(current_exprs.len());84potential_pushable.reserve(current_exprs.len());8586// @NOTE87// We can pushdown any column that utilizes no live columns that are generated in the88// input.8990for input_expr in input_exprs.iter() {91column_map_set(92&mut input_genset,93column_map,94input_expr.output_name().clone(),95);96}9798for expr in current_exprs.iter() {99let mut liveset = MutableBitmap::from_len_zeroed(column_map.len());100101for live in aexpr_to_leaf_names_iter(expr.node(), expr_arena) {102column_map_set(&mut liveset, column_map, live.clone());103}104105current_expr_livesets.push(liveset);106}107108// Force that column_map is not further mutated from this point on109let column_map = column_map as &_;110111column_map_finalize_bitset(&mut input_genset, column_map);112113current_liveset.extend_constant(column_map.len(), false);114for expr_liveset in &mut current_expr_livesets {115use std::ops::BitOrAssign;116column_map_finalize_bitset(expr_liveset, column_map);117(&mut current_liveset).bitor_assign(expr_liveset as &_);118}119120// Check for every expression in the current WITH_COLUMNS node whether it can be pushed121// down or pruned.122inplace_zip_filtermap(123current_exprs,124&mut current_expr_livesets,125|mut expr, liveset| {126let does_input_assign_column_that_expr_used =127input_genset.intersects_with(&liveset);128129if does_input_assign_column_that_expr_used {130pushable.push(false);131return Some((expr, liveset));132}133134let column_name = expr.output_name();135let is_pushable = if let Some(idx) = column_map.get(column_name) {136let does_input_alias_also_expr = input_genset.get(*idx);137let is_alias_live_in_current = current_liveset.get(*idx);138139if does_input_alias_also_expr && !is_alias_live_in_current {140// @NOTE: Pruning of re-assigned columns141//142// We checked if this expression output is also assigned by the input and143// that this assignment is not used in the current WITH_COLUMNS.144// Consequently, we are free to prune the input's assignment to the output.145//146// We immediately prune here to simplify the later code.147//148// @NOTE: Expressions in a `WITH_COLUMNS` cannot alias to the same column.149// Otherwise, this would be faulty and would panic.150let input_expr = input_exprs151.iter_mut()152.find(|input_expr| column_name == input_expr.output_name())153.expect("No assigning expression for generated column");154155// @NOTE156// Since we are reassigning a column and we are pushing to the input, we do157// not need to change the schema of the current or input nodes.158std::mem::swap(&mut expr, input_expr);159return None;160}161162// We cannot have multiple assignments to the same column in one WITH_COLUMNS163// and we need to make sure that we are not changing the column value that164// neighbouring expressions are seeing.165166// @NOTE: In this case it might be possible to push this down if all the167// expressions that use the output are also being pushed down.168if !does_input_alias_also_expr && is_alias_live_in_current {169potential_pushable.push(pushable.len());170pushable.push(false);171return Some((expr, liveset));172}173174!does_input_alias_also_expr && !is_alias_live_in_current175} else {176true177};178179pushable.push(is_pushable);180Some((expr, liveset))181},182);183184debug_assert_eq!(pushable.len(), current_exprs.len());185186// Here we do a last check for expressions to push down.187// This will pushdown the expressions that "has an output column that is mentioned by188// neighbour columns, but all those neighbours were being pushed down".189for candidate in potential_pushable.iter().copied() {190let column_name = current_exprs[candidate].output_name();191let column_idx = column_map.get(column_name).unwrap();192193current_liveset.clear();194current_liveset.extend_constant(column_map.len(), false);195for (i, expr_liveset) in current_expr_livesets.iter().enumerate() {196if pushable.get(i) || i == candidate {197continue;198}199use std::ops::BitOrAssign;200(&mut current_liveset).bitor_assign(expr_liveset as &_);201}202203if !current_liveset.get(*column_idx) {204pushable.set(candidate, true);205}206}207208let pushable_set_bits = pushable.set_bits();209210// If all columns are pushable, we can merge the input into the current. This should be211// a relatively common case.212if pushable_set_bits == pushable.len() {213// @NOTE: To keep the schema correct, we reverse the order here. As a214// `WITH_COLUMNS` higher up produces later columns. This also allows us not to215// have to deal with schemas.216input_exprs.extend(std::mem::take(current_exprs));217std::mem::swap(current_exprs, input_exprs);218219// Here, we perform the trick where we switch the inputs. This makes it possible to220// change the essentially remove the `current` node without knowing the parent of221// `current`. Essentially, we move the input node to the current node.222*current_input = *input_input;223*current_options = current_options.merge_options(input_options);224225// Let us just make this node invalid so we can detect when someone tries to226// mention it later.227lp_arena.take(input);228229// Since we merged the current and input nodes and the input node might have230// optimizations with their input, we loop again on this node.231ir_stack.pop();232ir_stack.push(current);233continue;234}235236// There is nothing to push down. Move on.237if pushable_set_bits == 0 {238continue;239}240241let input_schema_inner = Arc::make_mut(input_schema);242243// @NOTE: We don't have to insert a SimpleProjection or redo the `current_schema` if244// `pushable` contains only 0..N for some N. We use these two variables to keep track245// of this.246let mut has_seen_unpushable = false;247let mut needs_simple_projection = false;248249input_schema_inner.reserve(pushable_set_bits);250input_exprs.reserve(pushable_set_bits);251*current_exprs = std::mem::take(current_exprs)252.into_iter()253.zip(pushable.iter())254.filter_map(|(expr, do_pushdown)| {255if do_pushdown {256needs_simple_projection = has_seen_unpushable;257258let column = expr.output_name().as_ref();259// @NOTE: we cannot just use the index here, as there might be renames that sit260// earlier in the schema261let datatype = current_schema.get(column).unwrap();262input_schema_inner.with_column(column.into(), datatype.clone());263input_exprs.push(expr);264265None266} else {267has_seen_unpushable = true;268Some(expr)269}270})271.collect();272273let options = current_options.merge_options(input_options);274*current_options = options;275*input_options = options;276277// @NOTE: Here we add a simple projection to make sure that the output still278// has the right schema.279if needs_simple_projection {280// @NOTE: This may seem stupid, but this way we prioritize the input columns and then281// the existing columns which is exactly what we want.282let mut new_current_schema = Schema::with_capacity(current_schema.len());283new_current_schema.merge_from_ref(input_schema.as_ref());284new_current_schema.merge_from_ref(current_schema.as_ref());285286debug_assert_eq!(new_current_schema.len(), current_schema.len());287288let proj_schema = std::mem::replace(current_schema, Arc::new(new_current_schema));289290let moved_current = lp_arena.add(IR::Invalid);291let projection = IR::SimpleProjection {292input: moved_current,293columns: proj_schema,294};295let current = lp_arena.replace(current, projection);296lp_arena.replace(moved_current, current);297}298}299}300301302