Path: blob/main/crates/polars-mem-engine/src/scan_predicate/functions.rs
7884 views
use std::cell::LazyCell;1use std::sync::Arc;23use polars_core::config;4use polars_core::error::PolarsResult;5use polars_core::prelude::{IDX_DTYPE, IdxCa, InitHashMaps, PlHashMap, PlIndexMap, PlIndexSet};6use polars_core::schema::Schema;7use polars_error::polars_warn;8use polars_expr::{ExpressionConversionState, create_physical_expr};9use polars_io::predicates::ScanIOPredicate;10use polars_plan::dsl::default_values::{11DefaultFieldValues, IcebergIdentityTransformedPartitionFields,12};13use polars_plan::dsl::deletion::DeletionFilesList;14use polars_plan::dsl::{15FileScanIR, Operator, PredicateFileSkip, ScanSources, TableStatistics, UnifiedScanArgs,16};17use polars_plan::plans::expr_ir::{ExprIR, OutputName};18use polars_plan::plans::hive::HivePartitionsDf;19use polars_plan::plans::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};20use polars_plan::plans::{AExpr, ExprIRDisplay, FileInfo, IR, MintermIter};21use polars_plan::utils::aexpr_to_leaf_names_iter;22use polars_utils::arena::{Arena, Node};23use polars_utils::pl_str::PlSmallStr;24use polars_utils::{IdxSize, format_pl_smallstr};2526use crate::scan_predicate::skip_files_mask::SkipFilesMask;27use crate::scan_predicate::{PhysicalColumnPredicates, ScanPredicate};2829pub fn create_scan_predicate(30predicate: &ExprIR,31expr_arena: &mut Arena<AExpr>,32schema: &Arc<Schema>,33hive_schema: Option<&Schema>,34state: &mut ExpressionConversionState,35create_skip_batch_predicate: bool,36create_column_predicates: bool,37) -> PolarsResult<ScanPredicate> {38let mut predicate = predicate.clone();3940let mut hive_predicate = None;41let mut hive_predicate_is_full_predicate = false;4243#[expect(clippy::never_loop)]44loop {45let Some(hive_schema) = hive_schema else {46break;47};4849let mut hive_predicate_parts = vec![];50let mut non_hive_predicate_parts = vec![];5152for predicate_part in MintermIter::new(predicate.node(), expr_arena) {53if aexpr_to_leaf_names_iter(predicate_part, expr_arena)54.all(|name| hive_schema.contains(name))55{56hive_predicate_parts.push(predicate_part)57} else {58non_hive_predicate_parts.push(predicate_part)59}60}6162if hive_predicate_parts.is_empty() {63break;64}6566if non_hive_predicate_parts.is_empty() {67hive_predicate_is_full_predicate = true;68break;69}7071{72let mut iter = hive_predicate_parts.into_iter();73let mut node = iter.next().unwrap();7475for next_node in iter {76node = expr_arena.add(AExpr::BinaryExpr {77left: node,78op: Operator::And,79right: next_node,80});81}8283hive_predicate = Some(create_physical_expr(84&ExprIR::from_node(node, expr_arena),85expr_arena,86schema,87state,88)?)89}9091{92let mut iter = non_hive_predicate_parts.into_iter();93let mut node = iter.next().unwrap();9495for next_node in iter {96node = expr_arena.add(AExpr::BinaryExpr {97left: node,98op: Operator::And,99right: next_node,100});101}102103predicate = ExprIR::from_node(node, expr_arena);104}105106break;107}108109let phys_predicate = create_physical_expr(&predicate, expr_arena, schema, state)?;110111if hive_predicate_is_full_predicate {112hive_predicate = Some(phys_predicate.clone());113}114115let live_columns = Arc::new(PlIndexSet::from_iter(116aexpr_to_leaf_names_iter(predicate.node(), expr_arena).cloned(),117));118119let mut skip_batch_predicate = None;120121if create_skip_batch_predicate {122if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {123let expr = ExprIR::new(node, predicate.output_name_inner().clone());124125if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {126eprintln!("predicate: {}", predicate.display(expr_arena));127eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));128}129130let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());131132skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);133for (col, dtype) in schema.iter() {134if !live_columns.contains(col) {135continue;136}137138skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());139skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());140skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);141}142143skip_batch_predicate = Some(create_physical_expr(144&expr,145expr_arena,146&Arc::new(skip_batch_schema),147state,148)?);149}150}151152let column_predicates = if create_column_predicates {153let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);154if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {155eprintln!("column_predicates: {{");156eprintln!(" [");157for (pred, spec) in column_predicates.predicates.values() {158eprintln!(159" {} ({spec:?}),",160ExprIRDisplay::display_node(*pred, expr_arena)161);162}163eprintln!(" ],");164eprintln!(165" is_sumwise_complete: {}",166column_predicates.is_sumwise_complete167);168eprintln!("}}");169}170PhysicalColumnPredicates {171predicates: column_predicates172.predicates173.into_iter()174.map(|(n, (p, s))| {175PolarsResult::Ok((176n,177(178create_physical_expr(179&ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),180expr_arena,181schema,182state,183)?,184s,185),186))187})188.collect::<PolarsResult<PlHashMap<_, _>>>()?,189is_sumwise_complete: column_predicates.is_sumwise_complete,190}191} else {192PhysicalColumnPredicates {193predicates: PlHashMap::default(),194is_sumwise_complete: false,195}196};197198PolarsResult::Ok(ScanPredicate {199predicate: phys_predicate,200live_columns,201skip_batch_predicate,202column_predicates,203hive_predicate,204hive_predicate_is_full_predicate,205})206}207208/// # Returns209/// (skip_files_mask, predicate)210pub fn initialize_scan_predicate<'a>(211predicate: Option<&'a ScanIOPredicate>,212hive_parts: Option<&HivePartitionsDf>,213table_statsitics: Option<&TableStatistics>,214verbose: bool,215) -> PolarsResult<(Option<SkipFilesMask>, Option<&'a ScanIOPredicate>)> {216#[expect(clippy::never_loop)]217loop {218let Some(predicate) = predicate else {219break;220};221222let expected_mask_len: usize;223224let (skip_files_mask, send_predicate_to_readers) = if let Some(hive_parts) = hive_parts225&& let Some(hive_predicate) = &predicate.hive_predicate226{227if verbose {228eprintln!(229"initialize_scan_predicate: Source filter mask initialization via hive partitions"230);231}232233expected_mask_len = hive_parts.df().height();234235let inclusion_mask = hive_predicate236.evaluate_io(hive_parts.df())?237.bool()?238.rechunk()239.into_owned()240.downcast_into_iter()241.next()242.unwrap()243.values()244.clone();245246(247SkipFilesMask::Inclusion(inclusion_mask),248!predicate.hive_predicate_is_full_predicate,249)250} else if let Some(table_statsitics) = table_statsitics251&& let Some(skip_batch_predicate) = &predicate.skip_batch_predicate252{253if verbose {254eprintln!(255"initialize_scan_predicate: Source filter mask initialization via table statistics"256);257}258259expected_mask_len = table_statsitics.0.height();260261let exclusion_mask = skip_batch_predicate.evaluate_with_stat_df(&table_statsitics.0)?;262263(SkipFilesMask::Exclusion(exclusion_mask), true)264} else {265break;266};267268if skip_files_mask.len() != expected_mask_len {269polars_warn!(270"WARNING: \271initialize_scan_predicate: \272filter mask length mismatch (length: {}, expected: {}). Files \273will not be skipped. This is a bug; please open an issue with \274a reproducible example if possible.",275skip_files_mask.len(),276expected_mask_len277);278return Ok((None, Some(predicate)));279}280281if verbose {282eprintln!(283"initialize_scan_predicate: Predicate pushdown allows skipping {} / {} files",284skip_files_mask.num_skipped_files(),285skip_files_mask.len()286);287}288289return Ok((290Some(skip_files_mask),291send_predicate_to_readers.then_some(predicate),292));293}294295Ok((None, predicate))296}297298/// Filters the list of files in an `IR::Scan` based on the contained predicate. This is possible299/// if the predicate has components that refer to only the hive parts and there is no e.g.300/// row index / slice.301///302/// This also applies the projection onto the hive parts.303///304/// # Panics305/// Panics if `scan_ir_node` is not `IR::Scan`.306pub fn apply_scan_predicate_to_scan_ir(307scan_ir_node: Node,308ir_arena: &mut Arena<IR>,309expr_arena: &mut Arena<AExpr>,310) -> PolarsResult<()> {311let scan_ir_schema = IR::schema(ir_arena.get(scan_ir_node), ir_arena).into_owned();312let scan_ir = ir_arena.get_mut(scan_ir_node);313314let IR::Scan {315sources,316hive_parts,317predicate,318predicate_file_skip_applied,319unified_scan_args,320file_info,321..322} = scan_ir323else {324unreachable!()325};326327if let Some(hive_parts) = hive_parts.as_mut() {328*hive_parts = hive_parts.filter_columns(&scan_ir_schema);329}330331if unified_scan_args.has_row_index_or_slice() || predicate_file_skip_applied.is_some() {332return Ok(());333}334335let Some(predicate) = predicate else {336return Ok(());337};338339match sources {340// Files cannot be `gather()`ed.341ScanSources::Files(_) => return Ok(()),342ScanSources::Paths(_) | ScanSources::Buffers(_) => {},343}344345let verbose = config::verbose();346347let scan_predicate = create_scan_predicate(348predicate,349expr_arena,350&scan_ir_schema,351hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),352&mut ExpressionConversionState::new(true),353true, // create_skip_batch_predicate354false, // create_column_predicates355)?356.to_io(None, file_info.schema.clone());357358let (skip_files_mask, predicate_to_readers) = initialize_scan_predicate(359Some(&scan_predicate),360hive_parts.as_ref(),361unified_scan_args.table_statistics.as_ref(),362verbose,363)?;364365if let Some(skip_files_mask) = skip_files_mask {366assert_eq!(skip_files_mask.len(), sources.len());367368let predicate_file_skip = PredicateFileSkip {369no_residual_predicate: predicate_to_readers.is_none(),370original_len: sources.len(),371};372373if verbose {374eprintln!("apply_scan_predicate_to_scan_ir: {predicate_file_skip:?}");375}376377*predicate_file_skip_applied = Some(predicate_file_skip);378379if skip_files_mask.num_skipped_files() > 0 {380filter_scan_ir(scan_ir, skip_files_mask.non_skipped_files_idx_iter())381}382}383384Ok(())385}386387/// Filters the paths for a scan IR. This also involves performing selections on388/// e.g. hive partitions, deletion files.389///390/// Note: `selected_path_indices` should be cheaply cloneable.391///392/// # Panics393/// Panics if `scan_ir` is not `IR::Scan`.394pub fn filter_scan_ir<I>(scan_ir: &mut IR, selected_path_indices: I)395where396I: Iterator<Item = usize> + Clone,397{398let IR::Scan {399sources,400file_info:401FileInfo {402schema: _,403reader_schema,404row_estimation,405},406hive_parts,407predicate: _,408predicate_file_skip_applied: _,409output_schema: _,410scan_type,411unified_scan_args,412} = scan_ir413else {414panic!("{:?}", scan_ir);415};416417let size_hint = selected_path_indices.size_hint();418419if size_hint.0 == sources.len()420&& size_hint.1 == Some(sources.len())421&& selected_path_indices422.clone()423.enumerate()424.all(|(i, x)| i == x)425{426return;427}428429let UnifiedScanArgs {430schema: _,431cloud_options: _,432hive_options: _,433rechunk: _,434cache: _,435glob: _,436hidden_file_prefix: _,437projection: _,438column_mapping: _,439default_values,440// Ensure these are None.441row_index: None,442pre_slice: None,443cast_columns_policy: _,444missing_columns_policy: _,445extra_columns_policy: _,446include_file_paths: _,447table_statistics,448deletion_files,449row_count,450} = unified_scan_args.as_mut()451else {452panic!("{unified_scan_args:?}")453};454455*row_count = None;456457if selected_path_indices.clone().next() != Some(0) {458*reader_schema = None;459460// Ensure the metadata is unset, otherwise it may incorrectly be used at461// scan. This is especially important for Parquet as it requires the462// correct `is_nullable` in the arrow field.463match scan_type.as_mut() {464#[cfg(feature = "parquet")]465FileScanIR::Parquet {466options: _,467metadata,468} => *metadata = None,469470#[cfg(feature = "ipc")]471FileScanIR::Ipc {472options: _,473metadata,474} => *metadata = None,475476#[cfg(feature = "csv")]477FileScanIR::Csv { options: _ } => {},478479#[cfg(feature = "json")]480FileScanIR::NDJson { options: _ } => {},481482#[cfg(feature = "python")]483FileScanIR::PythonDataset {484dataset_object: _,485cached_ir,486} => *cached_ir.lock().unwrap() = None,487488#[cfg(feature = "scan_lines")]489FileScanIR::Lines { name: _ } => {},490491FileScanIR::Anonymous {492options: _,493function: _,494} => {},495}496}497498let selected_path_indices_idxsize = LazyCell::new(|| {499selected_path_indices500.clone()501.map(|i| IdxSize::try_from(i).unwrap())502.collect::<Vec<_>>()503});504505*deletion_files = deletion_files.as_ref().and_then(|x| match x {506DeletionFilesList::IcebergPositionDelete(deletions) => {507let mut out = None;508509for (out_idx, source_idx) in selected_path_indices.clone().enumerate() {510if let Some(v) = deletions.get(&source_idx) {511out.get_or_insert_with(|| {512PlIndexMap::with_capacity(selected_path_indices.size_hint().0 - out_idx)513})514.insert(out_idx, v.clone());515}516}517518out.map(|x| DeletionFilesList::IcebergPositionDelete(Arc::new(x)))519},520});521522*table_statistics = table_statistics.as_ref().map(|x| {523let df_height = IdxSize::try_from(x.0.height()).unwrap();524525assert!(selected_path_indices_idxsize.iter().all(|x| *x < df_height));526527TableStatistics(Arc::new(unsafe {528x.0.take_slice_unchecked(&selected_path_indices_idxsize)529}))530});531532let original_sources_len = sources.len();533*sources = sources.gather(selected_path_indices.clone()).unwrap();534*row_estimation = (535None,536row_estimation537.1538.div_ceil(original_sources_len)539.saturating_mul(sources.len()),540);541542*hive_parts = hive_parts.as_ref().map(|hp| {543let df = hp.df();544let df_height = IdxSize::try_from(df.height()).unwrap();545546assert!(selected_path_indices_idxsize.iter().all(|x| *x < df_height));547548// Safety: Asserted all < df.height() above.549unsafe { df.take_slice_unchecked(&selected_path_indices_idxsize) }.into()550});551552*default_values = default_values.as_ref().map(|x| match x {553DefaultFieldValues::Iceberg(v) => {554let mut out = PlIndexMap::with_capacity(v.len());555let mut gather_indices = PlHashMap::with_capacity(v.len());556557for (k, v) in v.iter() {558out.insert(559*k,560v.as_ref().map_err(Clone::clone).map(|partition_values| {561if !gather_indices.contains_key(&partition_values.len()) {562gather_indices.insert(563partition_values.len(),564selected_path_indices565.clone()566.map(|i| {567(i < partition_values.len())568.then(|| IdxSize::try_from(i).unwrap())569})570.collect::<IdxCa>(),571);572}573574unsafe {575partition_values.take_unchecked(576gather_indices.get(&partition_values.len()).unwrap(),577)578}579}),580);581}582583DefaultFieldValues::Iceberg(Arc::new(IcebergIdentityTransformedPartitionFields(out)))584},585});586}587588589