Path: blob/main/crates/polars-mem-engine/src/planner/lp.rs
6940 views
use polars_core::POOL;1use polars_core::prelude::*;2use polars_expr::state::ExecutionState;3use polars_plan::plans::expr_ir::ExprIR;4use polars_utils::format_pl_smallstr;5use polars_utils::unique_id::UniqueId;6use recursive::recursive;78use self::expr_ir::OutputName;9use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};10#[cfg(feature = "python")]11use self::python_dsl::PythonScanSource;12use super::*;13use crate::ScanPredicate;14use crate::executors::{15self, CachePrefiller, Executor, PartitionedSinkExecutor, SinkExecutor, sink_name,16};17use crate::predicate::PhysicalColumnPredicates;1819pub type StreamingExecutorBuilder =20fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;2122fn partitionable_gb(23keys: &[ExprIR],24aggs: &[ExprIR],25input_schema: &Schema,26expr_arena: &Arena<AExpr>,27apply: &Option<PlanCallback<DataFrame, DataFrame>>,28) -> bool {29// checks:30// 1. complex expressions in the group_by itself are also not partitionable31// in this case anything more than col("foo")32// 2. a custom function cannot be partitioned33// 3. we don't bother with more than 2 keys, as the cardinality likely explodes34// by the combinations35if !keys.is_empty() && keys.len() < 3 && apply.is_none() {36// complex expressions in the group_by itself are also not partitionable37// in this case anything more than col("foo")38for key in keys {39if (expr_arena).iter(key.node()).count() > 140|| has_aexpr(key.node(), expr_arena, |ae| match ae {41AExpr::Literal(lv) => !lv.is_scalar(),42_ => false,43})44{45return false;46}47}4849can_pre_agg_exprs(aggs, expr_arena, input_schema)50} else {51false52}53}5455#[derive(Clone)]56struct ConversionState {57has_cache_child: bool,58has_cache_parent: bool,59}6061impl ConversionState {62fn new() -> PolarsResult<Self> {63Ok(ConversionState {64has_cache_child: false,65has_cache_parent: false,66})67}6869fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {70let mut new_state = self.clone();71new_state.has_cache_child = false;72let out = func(&mut new_state);73self.has_cache_child = new_state.has_cache_child;74out75}76}7778pub fn create_physical_plan(79root: Node,80lp_arena: &mut Arena<IR>,81expr_arena: &mut Arena<AExpr>,82build_streaming_executor: Option<StreamingExecutorBuilder>,83) -> PolarsResult<Box<dyn Executor>> {84let mut state = ConversionState::new()?;85let mut cache_nodes = Default::default();86let plan = create_physical_plan_impl(87root,88lp_arena,89expr_arena,90&mut state,91&mut cache_nodes,92build_streaming_executor,93)?;9495if cache_nodes.is_empty() {96Ok(plan)97} else {98Ok(Box::new(CachePrefiller {99caches: cache_nodes,100phys_plan: plan,101}))102}103}104105pub struct MultiplePhysicalPlans {106pub cache_prefiller: Option<Box<dyn Executor>>,107pub physical_plans: Vec<Box<dyn Executor>>,108}109pub fn create_multiple_physical_plans(110roots: &[Node],111lp_arena: &mut Arena<IR>,112expr_arena: &mut Arena<AExpr>,113build_streaming_executor: Option<StreamingExecutorBuilder>,114) -> PolarsResult<MultiplePhysicalPlans> {115let mut state = ConversionState::new()?;116let mut cache_nodes = Default::default();117let plans = state.with_new_branch(|new_state| {118roots119.iter()120.map(|&node| {121create_physical_plan_impl(122node,123lp_arena,124expr_arena,125new_state,126&mut cache_nodes,127build_streaming_executor,128)129})130.collect::<PolarsResult<Vec<_>>>()131})?;132133let cache_prefiller = (!cache_nodes.is_empty()).then(|| {134struct Empty;135impl Executor for Empty {136fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {137Ok(DataFrame::empty())138}139}140Box::new(CachePrefiller {141caches: cache_nodes,142phys_plan: Box::new(Empty),143}) as _144});145146Ok(MultiplePhysicalPlans {147cache_prefiller,148physical_plans: plans,149})150}151152#[cfg(feature = "python")]153#[allow(clippy::type_complexity)]154pub fn python_scan_predicate(155options: &mut PythonOptions,156expr_arena: &Arena<AExpr>,157state: &mut ExpressionConversionState,158) -> PolarsResult<(159Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,160Option<Vec<u8>>,161)> {162let mut predicate_serialized = None;163let predicate = if let PythonPredicate::Polars(e) = &options.predicate {164// Convert to a pyarrow eval string.165if matches!(options.python_source, PythonScanSource::Pyarrow) {166use polars_core::config::verbose_print_sensitive;167168let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(169e.node(),170expr_arena,171Default::default(),172);173174verbose_print_sensitive(|| {175format!(176"python_scan_predicate: \177predicate node: {}, \178converted pyarrow predicate: {}",179ExprIRDisplay::display_node(e.node(), expr_arena),180&predicate_pa.as_deref().unwrap_or("<conversion failed>")181)182});183184if let Some(eval_str) = predicate_pa {185options.predicate = PythonPredicate::PyArrow(eval_str);186// We don't have to use a physical expression as pyarrow deals with the filter.187None188} else {189Some(create_physical_expr(190e,191Context::Default,192expr_arena,193&options.schema,194state,195)?)196}197}198// Convert to physical expression for the case the reader cannot consume the predicate.199else {200let dsl_expr = e.to_expr(expr_arena);201predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;202203Some(create_physical_expr(204e,205Context::Default,206expr_arena,207&options.schema,208state,209)?)210}211} else {212None213};214215Ok((predicate, predicate_serialized))216}217218#[recursive]219fn create_physical_plan_impl(220root: Node,221lp_arena: &mut Arena<IR>,222expr_arena: &mut Arena<AExpr>,223state: &mut ConversionState,224// Cache nodes in order of discovery225cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,226build_streaming_executor: Option<StreamingExecutorBuilder>,227) -> PolarsResult<Box<dyn Executor>> {228use IR::*;229230macro_rules! recurse {231($node:expr, $state: expr) => {232create_physical_plan_impl(233$node,234lp_arena,235expr_arena,236$state,237cache_nodes,238build_streaming_executor,239)240};241}242243let logical_plan = if state.has_cache_parent244|| matches!(245lp_arena.get(root),246IR::Scan { .. } // Needed for the streaming impl247| IR::Cache { .. } // Needed for plans branching from the same cache node248| IR::Sink { // Needed for the streaming impl249payload: SinkTypeIR::Partition(_),250..251}252) {253lp_arena.get(root).clone()254} else {255lp_arena.take(root)256};257258match logical_plan {259#[cfg(feature = "python")]260PythonScan { mut options } => {261let mut expr_conv_state = ExpressionConversionState::new(true);262let (predicate, predicate_serialized) =263python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;264Ok(Box::new(executors::PythonScanExec {265options,266predicate,267predicate_serialized,268}))269},270Sink { input, payload } => {271let input = recurse!(input, state)?;272match payload {273SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {274input,275name: "mem".to_string(),276f: Box::new(move |df, _state| Ok(Some(df))),277})),278SinkTypeIR::File(FileSinkType {279file_type,280target,281sink_options,282cloud_options,283}) => {284let name = sink_name(&file_type).to_owned();285Ok(Box::new(SinkExecutor {286input,287name,288f: Box::new(move |mut df, _state| {289let mut file = target290.open_into_writeable(&sink_options, cloud_options.as_ref())?;291let writer = &mut *file;292293use std::io::BufWriter;294match &file_type {295#[cfg(feature = "parquet")]296FileType::Parquet(options) => {297use polars_io::parquet::write::ParquetWriter;298ParquetWriter::new(BufWriter::new(writer))299.with_compression(options.compression)300.with_statistics(options.statistics)301.with_row_group_size(options.row_group_size)302.with_data_page_size(options.data_page_size)303.with_key_value_metadata(options.key_value_metadata.clone())304.finish(&mut df)?;305},306#[cfg(feature = "ipc")]307FileType::Ipc(options) => {308use polars_io::SerWriter;309use polars_io::ipc::IpcWriter;310IpcWriter::new(BufWriter::new(writer))311.with_compression(options.compression)312.with_compat_level(options.compat_level)313.finish(&mut df)?;314},315#[cfg(feature = "csv")]316FileType::Csv(options) => {317use polars_io::SerWriter;318use polars_io::csv::write::CsvWriter;319CsvWriter::new(BufWriter::new(writer))320.include_bom(options.include_bom)321.include_header(options.include_header)322.with_separator(options.serialize_options.separator)323.with_line_terminator(324options.serialize_options.line_terminator.clone(),325)326.with_quote_char(options.serialize_options.quote_char)327.with_batch_size(options.batch_size)328.with_datetime_format(329options.serialize_options.datetime_format.clone(),330)331.with_date_format(332options.serialize_options.date_format.clone(),333)334.with_time_format(335options.serialize_options.time_format.clone(),336)337.with_float_scientific(338options.serialize_options.float_scientific,339)340.with_float_precision(341options.serialize_options.float_precision,342)343.with_decimal_comma(options.serialize_options.decimal_comma)344.with_null_value(options.serialize_options.null.clone())345.with_quote_style(options.serialize_options.quote_style)346.finish(&mut df)?;347},348#[cfg(feature = "json")]349FileType::Json(_options) => {350use polars_io::SerWriter;351use polars_io::json::{JsonFormat, JsonWriter};352353JsonWriter::new(BufWriter::new(writer))354.with_json_format(JsonFormat::JsonLines)355.finish(&mut df)?;356},357#[allow(unreachable_patterns)]358_ => panic!("enable filetype feature"),359}360361file.sync_on_close(sink_options.sync_on_close)?;362file.close()?;363364Ok(None)365}),366}))367},368SinkTypeIR::Partition(_) => {369let builder = build_streaming_executor370.expect("invalid build. Missing feature new-streaming");371372let executor = Box::new(PartitionedSinkExecutor::new(373input, builder, root, lp_arena, expr_arena,374));375376// Use cache so that this runs during the cache pre-filling stage and not on the377// thread pool, it could deadlock since the streaming engine uses the thread378// pool internally.379let mut prefill = executors::CachePrefill::new_sink(executor);380let exec = prefill.make_exec();381let existing = cache_nodes.insert(prefill.id(), prefill);382383assert!(existing.is_none());384385Ok(Box::new(exec))386},387}388},389SinkMultiple { .. } => {390unreachable!("should be handled with create_multiple_physical_plans")391},392Union { inputs, options } => {393let inputs = state.with_new_branch(|new_state| {394inputs395.into_iter()396.map(|node| recurse!(node, new_state))397.collect::<PolarsResult<Vec<_>>>()398});399let inputs = inputs?;400Ok(Box::new(executors::UnionExec { inputs, options }))401},402HConcat {403inputs, options, ..404} => {405let inputs = state.with_new_branch(|new_state| {406inputs407.into_iter()408.map(|node| recurse!(node, new_state))409.collect::<PolarsResult<Vec<_>>>()410});411412let inputs = inputs?;413414Ok(Box::new(executors::HConcatExec { inputs, options }))415},416Slice { input, offset, len } => {417let input = recurse!(input, state)?;418Ok(Box::new(executors::SliceExec { input, offset, len }))419},420Filter { input, predicate } => {421let streamable = is_elementwise_rec(predicate.node(), expr_arena);422let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();423let input = recurse!(input, state)?;424let mut state = ExpressionConversionState::new(true);425let predicate = create_physical_expr(426&predicate,427Context::Default,428expr_arena,429&input_schema,430&mut state,431)?;432Ok(Box::new(executors::FilterExec::new(433predicate,434input,435state.has_windows,436streamable,437)))438},439#[allow(unused_variables)]440Scan {441sources,442file_info,443hive_parts,444output_schema,445scan_type,446predicate,447unified_scan_args,448} => {449let mut expr_conversion_state = ExpressionConversionState::new(true);450451let mut create_skip_batch_predicate = false;452#[cfg(feature = "parquet")]453{454create_skip_batch_predicate |= matches!(455&*scan_type,456FileScanIR::Parquet {457options: polars_io::prelude::ParquetOptions {458use_statistics: true,459..460},461..462}463);464}465466let predicate = predicate467.map(|predicate| {468create_scan_predicate(469&predicate,470expr_arena,471output_schema.as_ref().unwrap_or(&file_info.schema),472None, // hive_schema473&mut expr_conversion_state,474create_skip_batch_predicate,475false,476)477})478.transpose()?;479480match *scan_type {481FileScanIR::Anonymous { function, .. } => {482Ok(Box::new(executors::AnonymousScanExec {483function,484predicate,485unified_scan_args,486file_info,487output_schema,488predicate_has_windows: expr_conversion_state.has_windows,489}))490},491#[allow(unreachable_patterns)]492_ => {493// We wrap in a CacheExec so that the new-streaming scan gets called from the494// CachePrefiller. This ensures it is called from outside of rayon to avoid495// deadlocks.496//497// Note that we don't actually want it to be kept in memory after being used,498// so we set the count to have it be dropped after a single use (or however499// many times it is referenced after CSE (subplan)).500state.has_cache_parent = true;501state.has_cache_child = true;502503let build_func = build_streaming_executor504.expect("invalid build. Missing feature new-streaming");505506let executor = build_func(root, lp_arena, expr_arena)?;507508let mut prefill = executors::CachePrefill::new_scan(executor);509let exec = prefill.make_exec();510511let existing = cache_nodes.insert(prefill.id(), prefill);512513assert!(existing.is_none());514515Ok(Box::new(exec))516},517#[allow(unreachable_patterns)]518_ => unreachable!(),519}520},521522Select {523expr,524input,525schema: _schema,526options,527..528} => {529let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();530let input = recurse!(input, state)?;531let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());532let phys_expr = create_physical_expressions_from_irs(533&expr,534Context::Default,535expr_arena,536&input_schema,537&mut state,538)?;539540let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))541// If all columns are literal we would get a 1 row per thread.542&& !phys_expr.iter().all(|p| {543p.is_literal()544});545546Ok(Box::new(executors::ProjectionExec {547input,548expr: phys_expr,549has_windows: state.has_windows,550input_schema,551#[cfg(test)]552schema: _schema,553options,554allow_vertical_parallelism,555}))556},557DataFrameScan {558df, output_schema, ..559} => Ok(Box::new(executors::DataFrameExec {560df,561projection: output_schema.map(|s| s.iter_names_cloned().collect()),562})),563Sort {564input,565by_column,566slice,567sort_options,568} => {569debug_assert!(!by_column.is_empty());570let input_schema = lp_arena.get(input).schema(lp_arena);571let by_column = create_physical_expressions_from_irs(572&by_column,573Context::Default,574expr_arena,575input_schema.as_ref(),576&mut ExpressionConversionState::new(true),577)?;578let input = recurse!(input, state)?;579Ok(Box::new(executors::SortExec {580input,581by_column,582slice,583sort_options,584}))585},586Cache { input, id } => {587state.has_cache_parent = true;588state.has_cache_child = true;589590if let Some(cache) = cache_nodes.get_mut(&id) {591Ok(Box::new(cache.make_exec()))592} else {593let input = recurse!(input, state)?;594595let mut prefill = executors::CachePrefill::new_cache(input, id);596let exec = prefill.make_exec();597598cache_nodes.insert(id, prefill);599600Ok(Box::new(exec))601}602},603Distinct { input, options } => {604let input = recurse!(input, state)?;605Ok(Box::new(executors::UniqueExec { input, options }))606},607GroupBy {608input,609keys,610aggs,611apply,612schema,613maintain_order,614options,615} => {616let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();617let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());618let phys_keys = create_physical_expressions_from_irs(619&keys,620Context::Default,621expr_arena,622&input_schema,623&mut ExpressionConversionState::new(true),624)?;625let phys_aggs = create_physical_expressions_from_irs(626&aggs,627Context::Aggregation,628expr_arena,629&input_schema,630&mut ExpressionConversionState::new(true),631)?;632633let _slice = options.slice;634#[cfg(feature = "dynamic_group_by")]635if let Some(options) = options.dynamic {636let input = recurse!(input, state)?;637return Ok(Box::new(executors::GroupByDynamicExec {638input,639keys: phys_keys,640aggs: phys_aggs,641options,642input_schema,643slice: _slice,644apply,645}));646}647648#[cfg(feature = "dynamic_group_by")]649if let Some(options) = options.rolling {650let input = recurse!(input, state)?;651return Ok(Box::new(executors::GroupByRollingExec {652input,653keys: phys_keys,654aggs: phys_aggs,655options,656input_schema,657slice: _slice,658apply,659}));660}661662// We first check if we can partition the group_by on the latest moment.663let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);664if partitionable {665let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {666if let Union { options, .. } = lp {667options.from_partitioned_ds668} else {669false670}671});672let input = recurse!(input, state)?;673let keys = keys674.iter()675.map(|e| e.to_expr(expr_arena))676.collect::<Vec<_>>();677let aggs = aggs678.iter()679.map(|e| e.to_expr(expr_arena))680.collect::<Vec<_>>();681Ok(Box::new(executors::PartitionGroupByExec::new(682input,683phys_keys,684phys_aggs,685maintain_order,686options.slice,687input_schema,688schema,689from_partitioned_ds,690keys,691aggs,692)))693} else {694let input = recurse!(input, state)?;695Ok(Box::new(executors::GroupByExec::new(696input,697phys_keys,698phys_aggs,699apply,700maintain_order,701input_schema,702options.slice,703)))704}705},706Join {707input_left,708input_right,709left_on,710right_on,711options,712schema,713..714} => {715let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();716let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();717718let (input_left, input_right) = state.with_new_branch(|new_state| {719(720recurse!(input_left, new_state),721recurse!(input_right, new_state),722)723});724let input_left = input_left?;725let input_right = input_right?;726727// Todo! remove the force option. It can deadlock.728let parallel = if options.force_parallel {729true730} else {731options.allow_parallel732};733734let left_on = create_physical_expressions_from_irs(735&left_on,736Context::Default,737expr_arena,738&schema_left,739&mut ExpressionConversionState::new(true),740)?;741let right_on = create_physical_expressions_from_irs(742&right_on,743Context::Default,744expr_arena,745&schema_right,746&mut ExpressionConversionState::new(true),747)?;748let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());749750// Convert the join options, to the physical join options. This requires the physical751// planner, so we do this last minute.752let join_type_options = options753.options754.map(|o| {755o.compile(|e| {756let phys_expr = create_physical_expr(757e,758Context::Default,759expr_arena,760&schema,761&mut ExpressionConversionState::new(false),762)?;763764let execution_state = ExecutionState::default();765766Ok(Arc::new(move |df: DataFrame| {767let mask = phys_expr.evaluate(&df, &execution_state)?;768let mask = mask.as_materialized_series();769let mask = mask.bool()?;770df._filter_seq(mask)771}))772})773})774.transpose()?;775776Ok(Box::new(executors::JoinExec::new(777input_left,778input_right,779left_on,780right_on,781parallel,782options.args,783join_type_options,784)))785},786HStack {787input,788exprs,789schema: output_schema,790options,791} => {792let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();793let input = recurse!(input, state)?;794795let allow_vertical_parallelism = options.should_broadcast796&& exprs797.iter()798.all(|e| is_elementwise_rec(e.node(), expr_arena));799800let mut state =801ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());802803let phys_exprs = create_physical_expressions_from_irs(804&exprs,805Context::Default,806expr_arena,807&input_schema,808&mut state,809)?;810Ok(Box::new(executors::StackExec {811input,812has_windows: state.has_windows,813exprs: phys_exprs,814input_schema,815output_schema,816options,817allow_vertical_parallelism,818}))819},820MapFunction {821input, function, ..822} => {823let input = recurse!(input, state)?;824Ok(Box::new(executors::UdfExec { input, function }))825},826ExtContext {827input, contexts, ..828} => {829let input = recurse!(input, state)?;830let contexts = contexts831.into_iter()832.map(|node| recurse!(node, state))833.collect::<PolarsResult<_>>()?;834Ok(Box::new(executors::ExternalContext { input, contexts }))835},836SimpleProjection { input, columns } => {837let input = recurse!(input, state)?;838let exec = executors::ProjectionSimple { input, columns };839Ok(Box::new(exec))840},841#[cfg(feature = "merge_sorted")]842MergeSorted {843input_left,844input_right,845key,846} => {847let (input_left, input_right) = state.with_new_branch(|new_state| {848(849recurse!(input_left, new_state),850recurse!(input_right, new_state),851)852});853let input_left = input_left?;854let input_right = input_right?;855856let exec = executors::MergeSorted {857input_left,858input_right,859key,860};861Ok(Box::new(exec))862},863Invalid => unreachable!(),864}865}866867pub fn create_scan_predicate(868predicate: &ExprIR,869expr_arena: &mut Arena<AExpr>,870schema: &Arc<Schema>,871hive_schema: Option<&Schema>,872state: &mut ExpressionConversionState,873create_skip_batch_predicate: bool,874create_column_predicates: bool,875) -> PolarsResult<ScanPredicate> {876let mut predicate = predicate.clone();877878let mut hive_predicate = None;879let mut hive_predicate_is_full_predicate = false;880881#[expect(clippy::never_loop)]882loop {883let Some(hive_schema) = hive_schema else {884break;885};886887let mut hive_predicate_parts = vec![];888let mut non_hive_predicate_parts = vec![];889890for predicate_part in MintermIter::new(predicate.node(), expr_arena) {891if aexpr_to_leaf_names_iter(predicate_part, expr_arena)892.all(|name| hive_schema.contains(&name))893{894hive_predicate_parts.push(predicate_part)895} else {896non_hive_predicate_parts.push(predicate_part)897}898}899900if hive_predicate_parts.is_empty() {901break;902}903904if non_hive_predicate_parts.is_empty() {905hive_predicate_is_full_predicate = true;906break;907}908909{910let mut iter = hive_predicate_parts.into_iter();911let mut node = iter.next().unwrap();912913for next_node in iter {914node = expr_arena.add(AExpr::BinaryExpr {915left: node,916op: Operator::And,917right: next_node,918});919}920921hive_predicate = Some(create_physical_expr(922&ExprIR::from_node(node, expr_arena),923Context::Default,924expr_arena,925schema,926state,927)?)928}929930{931let mut iter = non_hive_predicate_parts.into_iter();932let mut node = iter.next().unwrap();933934for next_node in iter {935node = expr_arena.add(AExpr::BinaryExpr {936left: node,937op: Operator::And,938right: next_node,939});940}941942predicate = ExprIR::from_node(node, expr_arena);943}944945break;946}947948let phys_predicate =949create_physical_expr(&predicate, Context::Default, expr_arena, schema, state)?;950951if hive_predicate_is_full_predicate {952hive_predicate = Some(phys_predicate.clone());953}954955let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(956predicate.node(),957expr_arena,958)));959960let mut skip_batch_predicate = None;961962if create_skip_batch_predicate {963if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {964let expr = ExprIR::new(node, predicate.output_name_inner().clone());965966if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {967eprintln!("predicate: {}", predicate.display(expr_arena));968eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));969}970971let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());972973skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);974for (col, dtype) in schema.iter() {975if !live_columns.contains(col) {976continue;977}978979skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());980skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());981skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);982}983984skip_batch_predicate = Some(create_physical_expr(985&expr,986Context::Default,987expr_arena,988&Arc::new(skip_batch_schema),989state,990)?);991}992}993994let column_predicates = if create_column_predicates {995let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);996if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {997eprintln!("column_predicates: {{");998eprintln!(" [");999for (pred, spec) in column_predicates.predicates.values() {1000eprintln!(1001" {} ({spec:?}),",1002ExprIRDisplay::display_node(*pred, expr_arena)1003);1004}1005eprintln!(" ],");1006eprintln!(1007" is_sumwise_complete: {}",1008column_predicates.is_sumwise_complete1009);1010eprintln!("}}");1011}1012PhysicalColumnPredicates {1013predicates: column_predicates1014.predicates1015.into_iter()1016.map(|(n, (p, s))| {1017PolarsResult::Ok((1018n,1019(1020create_physical_expr(1021&ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),1022Context::Default,1023expr_arena,1024schema,1025state,1026)?,1027s,1028),1029))1030})1031.collect::<PolarsResult<PlHashMap<_, _>>>()?,1032is_sumwise_complete: column_predicates.is_sumwise_complete,1033}1034} else {1035PhysicalColumnPredicates {1036predicates: PlHashMap::default(),1037is_sumwise_complete: false,1038}1039};10401041PolarsResult::Ok(ScanPredicate {1042predicate: phys_predicate,1043live_columns,1044skip_batch_predicate,1045column_predicates,1046hive_predicate,1047hive_predicate_is_full_predicate,1048})1049}10501051#[cfg(test)]1052mod tests {1053use super::*;10541055#[test]1056fn test_create_multiple_physical_plans_reused_cache() {1057// Check that reusing the same cache node doesn't panic.1058// CSE creates duplicate cache nodes with the same ID, but cloud reuses them.10591060let mut ir = Arena::new();10611062let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);1063let scan = ir.add(IR::DataFrameScan {1064df: Arc::new(DataFrame::empty_with_schema(&schema)),1065schema: Arc::new(schema),1066output_schema: None,1067});10681069let cache = ir.add(IR::Cache {1070input: scan,1071id: UniqueId::new(),1072});10731074let left_sink = ir.add(IR::Sink {1075input: cache,1076payload: SinkTypeIR::Memory,1077});1078let right_sink = ir.add(IR::Sink {1079input: cache,1080payload: SinkTypeIR::Memory,1081});10821083let _multiplan = create_multiple_physical_plans(1084&[left_sink, right_sink],1085&mut ir,1086&mut Arena::new(),1087None,1088)1089.unwrap();1090}1091}109210931094