Path: blob/main/crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
8424 views
use polars_core::prelude::*;1use polars_utils::idx_vec::UnitVec;2use polars_utils::slice_enum::Slice;3use recursive::recursive;45use crate::prelude::*;67mod inner {8use polars_utils::arena::Node;9use polars_utils::idx_vec::UnitVec;10use polars_utils::unitvec;1112pub struct SlicePushDown {13scratch: UnitVec<Node>,14pub(super) maintain_errors: bool,15}1617impl SlicePushDown {18pub fn new(maintain_errors: bool) -> Self {19Self {20scratch: unitvec![],21maintain_errors,22}23}2425/// Returns shared scratch space after clearing.26pub fn empty_nodes_scratch_mut(&mut self) -> &mut UnitVec<Node> {27self.scratch.clear();28&mut self.scratch29}30}31}3233pub(super) use inner::SlicePushDown;3435#[derive(Copy, Clone, Debug)]36struct State {37offset: i64,38len: IdxSize,39}4041impl State {42fn to_slice_enum(self) -> Slice {43let offset = self.offset;44let len: usize = usize::try_from(self.len).unwrap();4546(offset, len).into()47}48}4950/// Can push down slice when:51/// * all projections are elementwise52/// * at least 1 projection is based on a column (for height broadcast)53/// * projections not based on any column project as scalars54///55/// Returns (can_pushdown, can_pushdown_and_any_expr_has_column)56fn can_pushdown_slice_past_projections(57exprs: &[ExprIR],58arena: &Arena<AExpr>,59scratch: &mut UnitVec<Node>,60maintain_errors: bool,61) -> (bool, bool) {62scratch.clear();6364let mut can_pushdown_and_any_expr_has_column = false;6566for expr_ir in exprs.iter() {67scratch.push(expr_ir.node());6869// # "has_column"70// `select(c = Literal([1, 2, 3])).slice(0, 0)` must block slice pushdown,71// because `c` projects to a height independent from the input height. We check72// this by observing that `c` does not have any columns in its input nodes.73//74// TODO: Simply checking that a column node is present does not handle e.g.:75// `select(c = Literal([1, 2, 3]).is_in(col(a)))`, for functions like `is_in`,76// `str.contains`, `str.contains_any` etc. - observe a column node is present77// but the output height is not dependent on it.78let mut has_column = false;79let mut literals_all_scalar = true;8081let mut pd_group = ExprPushdownGroup::Pushable;8283while let Some(node) = scratch.pop() {84let ae = arena.get(node);8586// We re-use the logic from predicate pushdown, as slices can be seen as a form of filtering.87// But we also do some bookkeeping here specific to slice pushdown.8889match ae {90AExpr::Column(_) => has_column = true,91AExpr::Literal(v) => literals_all_scalar &= v.is_scalar(),92_ => {},93}9495if pd_group96.update_with_expr(scratch, ae, arena)97.blocks_pushdown(maintain_errors)98{99return (false, false);100}101}102103// If there is no column then all literals must be scalar104if !(has_column || literals_all_scalar) {105return (false, false);106}107108can_pushdown_and_any_expr_has_column |= has_column109}110111(true, can_pushdown_and_any_expr_has_column)112}113114impl SlicePushDown {115// slice will be done at this node if we found any116// we also stop optimization117fn no_pushdown_finish_opt(118&self,119lp: IR,120state: Option<State>,121lp_arena: &mut Arena<IR>,122) -> PolarsResult<IR> {123match state {124Some(state) => {125let input = lp_arena.add(lp);126127let lp = IR::Slice {128input,129offset: state.offset,130len: state.len,131};132Ok(lp)133},134None => Ok(lp),135}136}137138/// slice will be done at this node, but we continue optimization139fn no_pushdown_restart_opt(140&mut self,141lp: IR,142state: Option<State>,143lp_arena: &mut Arena<IR>,144expr_arena: &mut Arena<AExpr>,145) -> PolarsResult<IR> {146let inputs = lp.get_inputs();147148let new_inputs = inputs149.into_iter()150.map(|node| {151let alp = lp_arena.take(node);152// No state, so we do not push down the slice here.153let state = None;154let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;155lp_arena.replace(node, alp);156Ok(node)157})158.collect::<PolarsResult<UnitVec<_>>>()?;159let lp = lp.with_inputs(new_inputs);160161self.no_pushdown_finish_opt(lp, state, lp_arena)162}163164/// slice will be pushed down.165fn pushdown_and_continue(166&mut self,167lp: IR,168state: Option<State>,169lp_arena: &mut Arena<IR>,170expr_arena: &mut Arena<AExpr>,171) -> PolarsResult<IR> {172let inputs = lp.get_inputs();173174let new_inputs = inputs175.into_iter()176.map(|node| {177let alp = lp_arena.take(node);178let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;179lp_arena.replace(node, alp);180Ok(node)181})182.collect::<PolarsResult<UnitVec<_>>>()?;183Ok(lp.with_inputs(new_inputs))184}185186#[recursive]187fn pushdown(188&mut self,189lp: IR,190state: Option<State>,191lp_arena: &mut Arena<IR>,192expr_arena: &mut Arena<AExpr>,193) -> PolarsResult<IR> {194use IR::*;195196match (lp, state) {197#[cfg(feature = "python")]198(PythonScan {199mut options,200},201// TODO! we currently skip slice pushdown if there is a predicate.202// we can modify the readers to only limit after predicates have been applied203Some(state)) if state.offset == 0 && matches!(options.predicate, PythonPredicate::None) => {204options.n_rows = Some(state.len as usize);205let lp = PythonScan {206options,207};208Ok(lp)209}210211(Scan {212sources,213file_info,214hive_parts,215output_schema,216mut unified_scan_args,217predicate,218predicate_file_skip_applied,219scan_type,220}, Some(state)) if predicate.is_none() && match &*scan_type {221#[cfg(feature = "parquet")]222FileScanIR::Parquet { .. } => true,223224#[cfg(feature = "ipc")]225FileScanIR::Ipc { .. } => true,226227#[cfg(feature = "csv")]228FileScanIR::Csv { .. } => true,229230#[cfg(feature = "json")]231FileScanIR::NDJson { .. } => true,232233#[cfg(feature = "python")]234FileScanIR::PythonDataset { .. } => true,235236#[cfg(feature = "scan_lines")]237FileScanIR::Lines { .. } => true,238239// TODO: This can be `true` after Anonymous scan dispatches to new-streaming.240FileScanIR::Anonymous { .. } => state.offset == 0,241} => {242unified_scan_args.pre_slice = Some(state.to_slice_enum());243244let lp = Scan {245sources,246file_info,247hive_parts,248output_schema,249scan_type,250unified_scan_args,251predicate,252predicate_file_skip_applied,253};254255Ok(lp)256},257258(DataFrameScan {df, schema, output_schema, }, Some(state)) => {259let df = df.slice(state.offset, state.len as usize);260let lp = DataFrameScan {261df: Arc::new(df),262schema,263output_schema,264};265Ok(lp)266}267(Union {mut inputs, mut options }, opt_state) => {268let subplan_slice: Option<State> = opt_state269.filter(|x| x.offset >= 0)270.and_then(|x| x.len.checked_add(x.offset.try_into().unwrap()))271.map(|len| State {272offset: 0,273len,274});275276for input in &mut inputs {277let input_lp = lp_arena.take(*input);278let input_lp = self.pushdown(input_lp, subplan_slice, lp_arena, expr_arena)?;279lp_arena.replace(*input, input_lp);280}281options.slice = opt_state.map(|x| (x.offset, x.len.try_into().unwrap()));282let lp = Union {inputs, options};283Ok(lp)284},285(Join {286input_left,287input_right,288schema,289left_on,290right_on,291mut options292}, Some(state)) if !matches!(options.options, Some(JoinTypeOptionsIR::CrossAndFilter { .. })) => {293// first restart optimization in both inputs and get the updated LP294let lp_left = lp_arena.take(input_left);295let lp_left = self.pushdown(lp_left, None, lp_arena, expr_arena)?;296let input_left = lp_arena.add(lp_left);297298let lp_right = lp_arena.take(input_right);299let lp_right = self.pushdown(lp_right, None, lp_arena, expr_arena)?;300let input_right = lp_arena.add(lp_right);301302// then assign the slice state to the join operation303304let mut_options = Arc::make_mut(&mut options);305mut_options.args.slice = Some((state.offset, state.len as usize));306307Ok(Join {308input_left,309input_right,310schema,311left_on,312right_on,313options314})315}316(GroupBy { input, keys, aggs, schema, apply, maintain_order, mut options }, Some(state)) => {317// first restart optimization in inputs and get the updated LP318let input_lp = lp_arena.take(input);319let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;320let input= lp_arena.add(input_lp);321322let mut_options= Arc::make_mut(&mut options);323mut_options.slice = Some((state.offset, state.len as usize));324325Ok(GroupBy {326input,327keys,328aggs,329schema,330apply,331maintain_order,332options333})334}335(Distinct {input, mut options}, Some(state)) => {336// first restart optimization in inputs and get the updated LP337let input_lp = lp_arena.take(input);338let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;339let input= lp_arena.add(input_lp);340options.slice = Some((state.offset, state.len as usize));341Ok(Distinct {342input,343options,344})345}346(Sort {input, by_column, slice, sort_options}, Some(state)) => {347// The slice argument on Sort should be inserted by slice pushdown,348// so it shouldn't exist yet (or be idempotently the same).349let new_slice = Some((state.offset, state.len as usize));350assert!(slice.is_none() || slice == new_slice);351352// first restart optimization in inputs and get the updated LP353let input_lp = lp_arena.take(input);354let input_lp = self.pushdown(input_lp, None, lp_arena, expr_arena)?;355let input = lp_arena.add(input_lp);356357Ok(Sort {358input,359by_column,360slice: new_slice,361sort_options362})363}364(Slice {365input,366offset,367mut len368}, Some(outer_slice)) => {369let alp = lp_arena.take(input);370371// Both are positive, can combine into a single slice.372if outer_slice.offset >= 0 && offset >= 0 {373let state = State {374offset: offset.checked_add(outer_slice.offset).unwrap(),375len: if len as i128 > outer_slice.offset as i128 {376(len - outer_slice.offset as IdxSize).min(outer_slice.len)377} else {3780379},380};381return self.pushdown(alp, Some(state), lp_arena, expr_arena);382}383384// If offset is negative the length can never be greater than it.385if offset < 0 {386#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.387if len as u64 > offset.unsigned_abs() as u64 {388len = offset.unsigned_abs() as IdxSize;389}390}391392// Both are negative, can also combine (but not so simply).393if outer_slice.offset < 0 && offset < 0 {394// We use 128-bit arithmetic to avoid overflows, clamping at the end.395let inner_start_rel_end = offset as i128;396let inner_stop_rel_end = inner_start_rel_end + len as i128;397let naive_outer_start_rel_end = inner_stop_rel_end + outer_slice.offset as i128;398let naive_outer_stop_rel_end = naive_outer_start_rel_end + outer_slice.len as i128;399let clamped_outer_start_rel_end = naive_outer_start_rel_end.max(inner_start_rel_end);400let clamped_outer_stop_rel_end = naive_outer_stop_rel_end.max(clamped_outer_start_rel_end);401402let state = State {403offset: clamped_outer_start_rel_end.clamp(i64::MIN as i128, i64::MAX as i128) as i64,404len: (clamped_outer_stop_rel_end - clamped_outer_start_rel_end).min(IdxSize::MAX as i128) as IdxSize,405};406return self.pushdown(alp, Some(state), lp_arena, expr_arena);407}408409let inner_slice = Some(State { offset, len });410let lp = self.pushdown(alp, inner_slice, lp_arena, expr_arena)?;411let input = lp_arena.add(lp);412Ok(Slice {413input,414offset: outer_slice.offset,415len: outer_slice.len416})417}418(Slice {419input,420offset,421mut len422}, None) => {423let alp = lp_arena.take(input);424425// If offset is negative the length can never be greater than it.426if offset < 0 {427#[allow(clippy::unnecessary_cast)] // Necessary when IdxSize = u64.428if len as u64 > offset.unsigned_abs() as u64 {429len = offset.unsigned_abs() as IdxSize;430}431}432433let state = Some(State {434offset,435len436});437self.pushdown(alp, state, lp_arena, expr_arena)438}439// [Do not pushdown] boundary440// here we do not pushdown.441// we reset the state and then start the optimization again442m @ (Filter { .. }, _)443// other blocking nodes444| m @ (DataFrameScan {..}, _)445| m @ (Sort {..}, _)446| m @ (MapFunction {function: FunctionIR::Explode {..}, ..}, _)447| m @ (Cache {..}, _)448| m @ (Distinct {..}, _)449| m @ (GroupBy{..},_)450// blocking in streaming451| m @ (Join{..},_)452=> {453let (lp, state) = m;454self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)455},456#[cfg(feature = "pivot")]457m @ (MapFunction {function: FunctionIR::Unpivot {..}, ..}, _) => {458let (lp, state) = m;459self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)460},461// [Pushdown]462(MapFunction {input, function}, _) if function.allow_predicate_pd() => {463let lp = MapFunction {input, function};464self.pushdown_and_continue(lp, state, lp_arena, expr_arena)465},466// [NO Pushdown]467m @ (MapFunction {..}, _) => {468let (lp, state) = m;469self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)470}471// [Pushdown]472// these nodes will be pushed down.473// State is None, we can continue474m @ (Select {..}, None)475| m @ (HStack {..}, None)476| m @ (SimpleProjection {..}, _)477=> {478let (lp, state) = m;479self.pushdown_and_continue(lp, state, lp_arena, expr_arena)480}481// there is state, inspect the projection to determine how to deal with it482(Select {input, expr, schema, options}, Some(_)) => {483let maintain_errors = self.maintain_errors;484if can_pushdown_slice_past_projections(&expr, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors).1 {485let lp = Select {input, expr, schema, options};486self.pushdown_and_continue(lp, state, lp_arena, expr_arena)487}488// don't push down slice, but restart optimization489else {490let lp = Select {input, expr, schema, options};491self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)492}493}494(HStack {input, exprs, schema, options}, _) => {495let maintain_errors = self.maintain_errors;496let (can_pushdown, can_pushdown_and_any_expr_has_column) = can_pushdown_slice_past_projections(&exprs, expr_arena, self.empty_nodes_scratch_mut(), maintain_errors);497498if can_pushdown_and_any_expr_has_column || (499// If the schema length is greater then an input column is being projected, so500// the exprs in with_columns do not need to have an input column name.501schema.len() > exprs.len() && can_pushdown502)503{504let lp = HStack {input, exprs, schema, options};505self.pushdown_and_continue(lp, state, lp_arena, expr_arena)506}507// don't push down slice, but restart optimization508else {509let lp = HStack {input, exprs, schema, options};510self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)511}512}513(HConcat {inputs, schema, options}, _) => {514// Slice can always be pushed down for horizontal concatenation515let lp = HConcat {inputs, schema, options};516self.pushdown_and_continue(lp, state, lp_arena, expr_arena)517}518(lp @ Sink { .. }, _) | (lp @ SinkMultiple { .. }, _) => {519// Slice can always be pushed down for sinks520self.pushdown_and_continue(lp, state, lp_arena, expr_arena)521}522(catch_all, state) => {523self.no_pushdown_finish_opt(catch_all, state, lp_arena)524}525}526}527528pub fn optimize(529&mut self,530logical_plan: IR,531lp_arena: &mut Arena<IR>,532expr_arena: &mut Arena<AExpr>,533) -> PolarsResult<IR> {534self.pushdown(logical_plan, None, lp_arena, expr_arena)535}536}537538539