Path: blob/main/crates/polars-plan/src/plans/optimizer/expand_datasets.rs
8446 views
use std::fmt::Debug;1use std::sync::Arc;23use polars_core::config;4use polars_core::error::{PolarsResult, polars_bail};5use polars_utils::arena::{Arena, Node};6use polars_utils::pl_str::PlSmallStr;7#[cfg(feature = "python")]8use polars_utils::python_function::PythonObject;9use polars_utils::slice_enum::Slice;10use polars_utils::{format_pl_smallstr, unitvec};1112#[cfg(feature = "python")]13use crate::dsl::python_dsl::PythonScanSource;14use crate::dsl::{DslPlan, FileScanIR, UnifiedScanArgs};15use crate::plans::{AExpr, IR};1617pub(super) fn expand_datasets(18root: Node,19ir_arena: &mut Arena<IR>,20expr_arena: &mut Arena<AExpr>,21apply_scan_predicate_to_scan_ir: fn(22Node,23&mut Arena<IR>,24&mut Arena<AExpr>,25) -> PolarsResult<()>,26) -> PolarsResult<()> {27let mut stack = unitvec![root];2829while let Some(node) = stack.pop() {30ir_arena.get(node).copy_inputs(&mut stack);3132let IR::Scan {33sources,34scan_type,35unified_scan_args,3637file_info,38hive_parts,39predicate,40predicate_file_skip_applied: _,41output_schema: _,42} = ir_arena.get_mut(node)43else {44continue;45};4647let mut projection = unified_scan_args.projection.clone();4849if let Some(row_index) = &unified_scan_args.row_index50&& let Some(projection) = projection.as_mut()51{52*projection = projection53.iter()54.filter(|x| *x != &row_index.name)55.cloned()56.collect();57}5859let limit = match unified_scan_args.pre_slice.clone() {60Some(v @ Slice::Positive { .. }) => Some(v.end_position()),61_ => None,62};6364match scan_type.as_mut() {65#[cfg(feature = "python")]66FileScanIR::PythonDataset {67dataset_object,68cached_ir,69} => {70use crate::plans::pyarrow::predicate_to_pa;7172let cached_ir = cached_ir.clone();73let mut guard = cached_ir.lock().unwrap();7475if config::verbose() {76eprintln!(77"expand_datasets(): python[{}]: limit: {:?}, project: {}",78dataset_object.name(),79limit,80projection.as_ref().map_or(81PlSmallStr::from_static("all"),82|x| format_pl_smallstr!("{}", x.len())83)84)85}8687// Note88// row_index is removed from projection/live_columns set, and is therefore not89// considered when comparing cached expansion equality. This is safe as the90// `row_index_in_live_filter` variable does not depend on the cached values.9192let mut row_index_in_live_filter = false;9394let live_filter_columns: Option<Arc<[PlSmallStr]>> = predicate.as_ref().map(|x| {95use polars_core::prelude::PlHashSet;9697use crate::utils::aexpr_to_leaf_names_iter;9899let mut out: Arc<[PlSmallStr]> =100PlHashSet::from_iter(aexpr_to_leaf_names_iter(x.node(), expr_arena))101.into_iter()102.filter(|&live_col| {103if unified_scan_args104.row_index105.as_ref()106.is_some_and(|ri| live_col == &ri.name)107{108row_index_in_live_filter = true;109false110} else {111true112}113})114.cloned()115.collect();116117Arc::get_mut(&mut out).unwrap().sort_unstable();118119out120});121122let pyarrow_predicate: Option<String> = if !unified_scan_args123.has_row_index_or_slice()124&& let Some(predicate) = &predicate125{126use crate::plans::aexpr::MintermIter;127128// Convert minterms independently, can allow conversion to partially succeed if there are unsupported expressions129let parts: Vec<String> = MintermIter::new(predicate.node(), expr_arena)130.filter_map(|node| predicate_to_pa(node, expr_arena, Default::default()))131.collect();132match parts.len() {1330 => None,1341 => Some(parts.into_iter().next().unwrap()),135_ => Some(format!("({})", parts.join(" & "))),136}137} else {138None139};140141let existing_resolved_version_key = match guard.as_ref() {142Some(resolved) => {143let ExpandedDataset {144version,145limit: cached_limit,146projection: cached_projection,147live_filter_columns: cached_live_filter_columns,148pyarrow_predicate: cached_pyarrow_predicate,149expanded_dsl: _,150python_scan: _,151} = resolved;152153(&limit == cached_limit154&& &projection == cached_projection155&& &live_filter_columns == cached_live_filter_columns156&& &pyarrow_predicate == cached_pyarrow_predicate)157.then_some(version.as_str())158},159160None => None,161};162163if let Some((expanded_dsl, version)) = dataset_object.to_dataset_scan(164existing_resolved_version_key,165limit,166projection.as_deref(),167live_filter_columns.as_deref(),168pyarrow_predicate.as_deref(),169)? {170*guard = Some(ExpandedDataset {171version,172limit,173projection,174live_filter_columns,175pyarrow_predicate,176expanded_dsl,177python_scan: None,178})179}180181let ExpandedDataset {182version: _,183limit: _,184projection: _,185live_filter_columns: _,186pyarrow_predicate: _,187expanded_dsl,188python_scan,189} = guard.as_mut().unwrap();190191match expanded_dsl {192DslPlan::Scan {193sources: resolved_sources,194unified_scan_args: resolved_unified_scan_args,195scan_type: resolved_scan_type,196cached_ir: _,197} => {198use crate::dsl::FileScanDsl;199200// We only want a few configuration flags from here (e.g. column casting config).201// The rest we either expect to be None (e.g. projection / row_index), or ignore.202let UnifiedScanArgs {203schema: _,204cloud_options,205hive_options,206rechunk,207cache,208glob: _,209hidden_file_prefix: _hidden_file_prefix @ None,210projection: _projection @ None,211column_mapping,212default_values,213row_index: _row_index @ None,214pre_slice: _pre_slice @ None,215cast_columns_policy,216missing_columns_policy,217extra_columns_policy,218include_file_paths: _include_file_paths @ None,219deletion_files,220table_statistics,221row_count,222} = resolved_unified_scan_args.as_ref()223else {224panic!(225"invalid scan args from python dataset resolve: {:?}",226&resolved_unified_scan_args227)228};229230unified_scan_args.cloud_options = cloud_options.clone();231unified_scan_args.rechunk = *rechunk;232unified_scan_args.cache = *cache;233unified_scan_args.cast_columns_policy = cast_columns_policy.clone();234unified_scan_args.missing_columns_policy = *missing_columns_policy;235unified_scan_args.extra_columns_policy = *extra_columns_policy;236unified_scan_args.column_mapping = column_mapping.clone();237unified_scan_args.default_values = default_values.clone();238unified_scan_args.deletion_files = deletion_files.clone();239unified_scan_args.table_statistics = table_statistics.clone();240unified_scan_args.row_count = *row_count;241242if row_index_in_live_filter {243use polars_core::prelude::{Column, DataType, IdxCa, IntoColumn};244use polars_core::series::IntoSeries;245246let row_index_name =247&unified_scan_args.row_index.as_ref().unwrap().name;248let table_statistics =249unified_scan_args.table_statistics.as_mut().unwrap();250251let statistics_df = Arc::make_mut(&mut table_statistics.0);252assert!(253!statistics_df254.schema()255.contains(&format_pl_smallstr!("{}_nc", row_index_name))256);257258unsafe { statistics_df.columns_mut() }.extend([259IdxCa::from_vec(260format_pl_smallstr!("{}_nc", row_index_name),261vec![0],262)263.into_series()264.into_column()265.new_from_index(0, sources.len()),266Column::full_null(267format_pl_smallstr!("{}_min", row_index_name),268sources.len(),269&DataType::IDX_DTYPE,270),271Column::full_null(272format_pl_smallstr!("{}_max", row_index_name),273sources.len(),274&DataType::IDX_DTYPE,275),276]);277}278279*sources = resolved_sources.clone();280281**scan_type = match *resolved_scan_type.clone() {282#[cfg(feature = "csv")]283FileScanDsl::Csv { options } => FileScanIR::Csv { options },284285#[cfg(feature = "ipc")]286FileScanDsl::Ipc { options } => FileScanIR::Ipc {287options,288metadata: None,289},290291#[cfg(feature = "parquet")]292FileScanDsl::Parquet { options } => FileScanIR::Parquet {293options,294metadata: None,295},296297#[cfg(feature = "json")]298FileScanDsl::NDJson { options } => FileScanIR::NDJson { options },299300#[cfg(feature = "python")]301FileScanDsl::PythonDataset { dataset_object } => {302FileScanIR::PythonDataset {303dataset_object,304cached_ir: Default::default(),305}306},307308#[cfg(feature = "scan_lines")]309FileScanDsl::Lines { name } => FileScanIR::Lines { name },310311FileScanDsl::Anonymous {312options,313function,314file_info: _,315} => FileScanIR::Anonymous { options, function },316};317318if hive_options.enabled == Some(true)319&& let Some(paths) = sources.as_paths()320{321use arrow::Either;322323use crate::plans::hive::hive_partitions_from_paths;324325let owned;326327*hive_parts = hive_partitions_from_paths(328paths,329hive_options.hive_start_idx,330hive_options.schema.clone(),331match file_info.reader_schema.as_ref().unwrap() {332Either::Left(v) => {333use polars_core::schema::{Schema, SchemaExt as _};334335owned = Some(Schema::from_arrow_schema(v.as_ref()));336owned.as_ref().unwrap()337},338Either::Right(v) => v.as_ref(),339},340hive_options.try_parse_dates,341)?;342}343},344345DslPlan::PythonScan { options } => {346*python_scan = Some(ExpandedPythonScan {347name: dataset_object.name(),348scan_fn: options.scan_fn.clone().unwrap(),349variant: options.python_source.clone(),350})351},352353dsl => {354polars_bail!(355ComputeError:356"unknown DSL when resolving python dataset scan: {}",357dsl.display()?358)359},360};361},362363_ => {},364}365366apply_scan_predicate_to_scan_ir(node, ir_arena, expr_arena)?;367}368369Ok(())370}371372#[derive(Clone)]373#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]374#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]375pub struct ExpandedDataset {376version: PlSmallStr,377limit: Option<usize>,378projection: Option<Arc<[PlSmallStr]>>,379live_filter_columns: Option<Arc<[PlSmallStr]>>,380pyarrow_predicate: Option<String>,381expanded_dsl: DslPlan,382383/// Fallback python scan384#[cfg(feature = "python")]385python_scan: Option<ExpandedPythonScan>,386}387388#[cfg(feature = "python")]389#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]390#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]391#[derive(Clone)]392pub struct ExpandedPythonScan {393pub name: PlSmallStr,394pub scan_fn: PythonObject,395pub variant: PythonScanSource,396}397398impl ExpandedDataset {399#[cfg(feature = "python")]400pub fn python_scan(&self) -> Option<&ExpandedPythonScan> {401self.python_scan.as_ref()402}403}404405impl Debug for ExpandedDataset {406fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {407let ExpandedDataset {408version,409limit,410projection,411live_filter_columns,412pyarrow_predicate,413expanded_dsl,414415#[cfg(feature = "python")]416python_scan,417} = self;418419return display::ExpandedDataset {420version,421limit,422projection,423live_filter_columns,424expanded_dsl: &match expanded_dsl.display() {425Ok(v) => v.to_string(),426Err(e) => e.to_string(),427},428pyarrow_predicate: if pyarrow_predicate.is_some() {429"Some(<redacted>)"430} else {431"None"432},433#[cfg(feature = "python")]434python_scan: python_scan.as_ref().map(435|ExpandedPythonScan {436name,437scan_fn: _,438variant,439}| {440format_pl_smallstr!("python-scan[{} @ {:?}]", name, variant)441},442),443}444.fmt(f);445446mod display {447use std::fmt::Debug;448use std::sync::Arc;449450use polars_utils::pl_str::PlSmallStr;451452#[derive(Debug)]453#[expect(unused)]454pub struct ExpandedDataset<'a> {455pub version: &'a str,456pub limit: &'a Option<usize>,457pub projection: &'a Option<Arc<[PlSmallStr]>>,458pub live_filter_columns: &'a Option<Arc<[PlSmallStr]>>,459pub pyarrow_predicate: &'static str,460pub expanded_dsl: &'a str,461462#[cfg(feature = "python")]463pub python_scan: Option<PlSmallStr>,464}465}466}467}468469470