Path: blob/main/crates/polars-plan/src/plans/optimizer/expand_datasets.rs
6940 views
use std::fmt::Debug;1use std::sync::Arc;23use polars_core::config;4use polars_core::error::{PolarsResult, polars_bail};5use polars_utils::arena::{Arena, Node};6use polars_utils::format_pl_smallstr;7use polars_utils::pl_str::PlSmallStr;8#[cfg(feature = "python")]9use polars_utils::python_function::PythonObject;10use polars_utils::slice_enum::Slice;1112use super::OptimizationRule;13#[cfg(feature = "python")]14use crate::dsl::python_dsl::PythonScanSource;15use crate::dsl::{DslPlan, FileScanIR, UnifiedScanArgs};16use crate::plans::IR;1718/// Note: Currently only used for iceberg. This is so that we can call iceberg to fetch the files19/// list with a potential row limit from slice pushdown.20///21/// In the future this can also apply to hive path expansion with predicates.22pub(super) struct ExpandDatasets;2324impl OptimizationRule for ExpandDatasets {25fn optimize_plan(26&mut self,27lp_arena: &mut Arena<IR>,28_expr_arena: &mut Arena<crate::prelude::AExpr>,29node: Node,30) -> PolarsResult<Option<IR>> {31// # Note32// This function mutates the IR node in-place rather than returning the new IR - the33// StackOptimizer will re-call this function otherwise.34if let IR::Scan {35sources,36scan_type,37unified_scan_args,3839file_info: _,40hive_parts: _,41predicate: _,42output_schema: _,43} = lp_arena.get_mut(node)44{45let projection = unified_scan_args.projection.clone();46let limit = match unified_scan_args.pre_slice.clone() {47Some(v @ Slice::Positive { .. }) => Some(v.end_position()),48_ => None,49};5051match scan_type.as_mut() {52#[cfg(feature = "python")]53FileScanIR::PythonDataset {54dataset_object,55cached_ir,56} => {57let cached_ir = cached_ir.clone();58let mut guard = cached_ir.lock().unwrap();5960if config::verbose() {61eprintln!(62"expand_datasets(): python[{}]: limit: {:?}, project: {}",63dataset_object.name(),64limit,65projection.as_ref().map_or(66PlSmallStr::from_static("all"),67|x| format_pl_smallstr!("{}", x.len())68)69)70}7172let existing_resolved_version_key = match guard.as_ref() {73Some(resolved) => {74let ExpandedDataset {75version,76limit: cached_limit,77projection: cached_projection,78expanded_dsl: _,79python_scan: _,80} = resolved;8182(cached_limit == &limit && cached_projection == &projection)83.then_some(version.as_str())84},8586None => None,87};8889if let Some((expanded_dsl, version)) = dataset_object.to_dataset_scan(90existing_resolved_version_key,91limit,92projection.as_deref(),93)? {94*guard = Some(ExpandedDataset {95version,96limit,97projection,98expanded_dsl,99python_scan: None,100})101}102103let ExpandedDataset {104version: _,105limit: _,106projection: _,107expanded_dsl,108python_scan,109} = guard.as_mut().unwrap();110111match expanded_dsl {112DslPlan::Scan {113sources: resolved_sources,114unified_scan_args: resolved_unified_scan_args,115scan_type: resolved_scan_type,116cached_ir: _,117} => {118use crate::dsl::FileScanDsl;119120// We only want a few configuration flags from here (e.g. column casting config).121// The rest we either expect to be None (e.g. projection / row_index), or ignore.122let UnifiedScanArgs {123schema: _,124cloud_options,125hive_options: _,126rechunk,127cache,128glob: _,129projection: _projection @ None,130column_mapping,131default_values,132row_index: _row_index @ None,133pre_slice: _pre_slice @ None,134cast_columns_policy,135missing_columns_policy,136extra_columns_policy,137include_file_paths: _include_file_paths @ None,138deletion_files,139} = resolved_unified_scan_args.as_ref()140else {141panic!(142"invalid scan args from python dataset resolve: {:?}",143&resolved_unified_scan_args144)145};146147unified_scan_args.cloud_options = cloud_options.clone();148unified_scan_args.rechunk = *rechunk;149unified_scan_args.cache = *cache;150unified_scan_args.cast_columns_policy = cast_columns_policy.clone();151unified_scan_args.missing_columns_policy = *missing_columns_policy;152unified_scan_args.extra_columns_policy = *extra_columns_policy;153unified_scan_args.column_mapping = column_mapping.clone();154unified_scan_args.default_values = default_values.clone();155unified_scan_args.deletion_files = deletion_files.clone();156157*sources = resolved_sources.clone();158159*scan_type = Box::new(match *resolved_scan_type.clone() {160#[cfg(feature = "csv")]161FileScanDsl::Csv { options } => FileScanIR::Csv { options },162163#[cfg(feature = "ipc")]164FileScanDsl::Ipc { options } => FileScanIR::Ipc {165options,166metadata: None,167},168169#[cfg(feature = "parquet")]170FileScanDsl::Parquet { options } => FileScanIR::Parquet {171options,172metadata: None,173},174175#[cfg(feature = "json")]176FileScanDsl::NDJson { options } => FileScanIR::NDJson { options },177178#[cfg(feature = "python")]179FileScanDsl::PythonDataset { dataset_object } => {180FileScanIR::PythonDataset {181dataset_object,182cached_ir: Default::default(),183}184},185186FileScanDsl::Anonymous {187options,188function,189file_info: _,190} => FileScanIR::Anonymous { options, function },191});192},193194DslPlan::PythonScan { options } => {195*python_scan = Some(ExpandedPythonScan {196name: dataset_object.name(),197scan_fn: options.scan_fn.clone().unwrap(),198variant: options.python_source.clone(),199})200},201202dsl => {203polars_bail!(204ComputeError:205"unknown DSL when resolving python dataset scan: {}",206dsl.display()?207)208},209};210},211212_ => {},213}214}215216Ok(None)217}218}219220#[derive(Clone)]221#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]222#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]223pub struct ExpandedDataset {224version: PlSmallStr,225limit: Option<usize>,226projection: Option<Arc<[PlSmallStr]>>,227expanded_dsl: DslPlan,228229/// Fallback python scan230#[cfg(feature = "python")]231python_scan: Option<ExpandedPythonScan>,232}233234#[cfg(feature = "python")]235#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]236#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]237#[derive(Clone)]238pub struct ExpandedPythonScan {239pub name: PlSmallStr,240pub scan_fn: PythonObject,241pub variant: PythonScanSource,242}243244impl ExpandedDataset {245#[cfg(feature = "python")]246pub fn python_scan(&self) -> Option<&ExpandedPythonScan> {247self.python_scan.as_ref()248}249}250251impl Debug for ExpandedDataset {252fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {253let ExpandedDataset {254version,255limit,256projection,257expanded_dsl,258259#[cfg(feature = "python")]260python_scan,261} = self;262263return display::ExpandedDataset {264version,265limit,266projection,267expanded_dsl: &match expanded_dsl.display() {268Ok(v) => v.to_string(),269Err(e) => e.to_string(),270},271#[cfg(feature = "python")]272python_scan: python_scan.as_ref().map(273|ExpandedPythonScan {274name,275scan_fn: _,276variant,277}| {278format_pl_smallstr!("python-scan[{} @ {:?}]", name, variant)279},280),281}282.fmt(f);283284mod display {285use std::fmt::Debug;286use std::sync::Arc;287288use polars_utils::pl_str::PlSmallStr;289290#[derive(Debug)]291#[expect(unused)]292pub struct ExpandedDataset<'a> {293pub version: &'a str,294pub limit: &'a Option<usize>,295pub projection: &'a Option<Arc<[PlSmallStr]>>,296pub expanded_dsl: &'a str,297298#[cfg(feature = "python")]299pub python_scan: Option<PlSmallStr>,300}301}302}303}304305306