Path: blob/main/crates/polars-plan/src/dsl/file_scan/mod.rs
8446 views
use std::hash::Hash;1use std::sync::Mutex;23use deletion::DeletionFilesList;4use polars_core::schema::iceberg::IcebergSchemaRef;5use polars_core::utils::get_numeric_upcast_supertype_lossless;6use polars_io::cloud::CloudOptions;7#[cfg(feature = "csv")]8use polars_io::csv::read::CsvReadOptions;9#[cfg(feature = "ipc")]10use polars_io::ipc::IpcScanOptions;11#[cfg(feature = "parquet")]12use polars_io::parquet::metadata::FileMetadataRef;13#[cfg(feature = "parquet")]14use polars_io::parquet::read::ParquetOptions;15use polars_io::{HiveOptions, RowIndex};16use polars_utils::slice_enum::Slice;17#[cfg(feature = "serde")]18use serde::{Deserialize, Serialize};19use strum_macros::IntoStaticStr;2021use super::*;22use crate::dsl::default_values::DefaultFieldValues;23pub mod default_values;24pub mod deletion;2526#[cfg(feature = "python")]27pub mod python_dataset;28#[cfg(feature = "python")]29pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};3031bitflags::bitflags! {32#[derive(Debug, Clone, Copy, PartialEq, Eq)]33pub struct ScanFlags : u32 {34const SPECIALIZED_PREDICATE_FILTER = 0x01;35}36}3738const _: () = {39assert!(std::mem::size_of::<FileScanDsl>() <= 100);40};4142#[derive(Clone, Debug, IntoStaticStr)]43#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]44#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]45/// Note: This is cheaply cloneable.46pub enum FileScanDsl {47#[cfg(feature = "csv")]48Csv { options: Arc<CsvReadOptions> },4950#[cfg(feature = "json")]51NDJson { options: NDJsonReadOptions },5253#[cfg(feature = "parquet")]54Parquet { options: ParquetOptions },5556#[cfg(feature = "ipc")]57Ipc { options: IpcScanOptions },5859#[cfg(feature = "python")]60PythonDataset {61dataset_object: Arc<python_dataset::PythonDatasetProvider>,62},6364#[cfg(feature = "scan_lines")]65Lines { name: PlSmallStr },6667#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]68Anonymous {69options: Arc<AnonymousScanOptions>,70function: Arc<dyn AnonymousScan>,71file_info: FileInfo,72},73}7475const _: () = {76assert!(std::mem::size_of::<FileScanIR>() <= 80);77};7879#[derive(Clone, Debug, IntoStaticStr)]80#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]81#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]82/// Note: This is cheaply cloneable.83pub enum FileScanIR {84#[cfg(feature = "csv")]85Csv { options: Arc<CsvReadOptions> },8687#[cfg(feature = "json")]88NDJson { options: NDJsonReadOptions },8990#[cfg(feature = "parquet")]91Parquet {92options: ParquetOptions,93#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]94metadata: Option<FileMetadataRef>,95},9697#[cfg(feature = "ipc")]98Ipc {99options: IpcScanOptions,100#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]101metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,102},103104#[cfg(feature = "python")]105PythonDataset {106dataset_object: Arc<python_dataset::PythonDatasetProvider>,107cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,108},109110#[cfg(feature = "scan_lines")]111Lines { name: PlSmallStr },112113#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]114Anonymous {115options: Arc<AnonymousScanOptions>,116function: Arc<dyn AnonymousScan>,117},118}119120impl FileScanIR {121pub fn flags(&self) -> ScanFlags {122match self {123#[cfg(feature = "csv")]124Self::Csv { .. } => ScanFlags::empty(),125#[cfg(feature = "ipc")]126Self::Ipc { .. } => ScanFlags::empty(),127#[cfg(feature = "parquet")]128Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,129#[cfg(feature = "json")]130Self::NDJson { .. } => ScanFlags::empty(),131#[allow(unreachable_patterns)]132_ => ScanFlags::empty(),133}134}135136pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {137match self {138#[cfg(feature = "csv")]139Self::Csv { .. } => true,140#[cfg(feature = "ipc")]141Self::Ipc { .. } => _has_row_index,142#[cfg(feature = "parquet")]143Self::Parquet { .. } => false,144#[allow(unreachable_patterns)]145_ => false,146}147}148149pub fn streamable(&self) -> bool {150match self {151#[cfg(feature = "csv")]152Self::Csv { .. } => true,153#[cfg(feature = "ipc")]154Self::Ipc { .. } => false,155#[cfg(feature = "parquet")]156Self::Parquet { .. } => true,157#[cfg(feature = "json")]158Self::NDJson { .. } => false,159#[allow(unreachable_patterns)]160_ => false,161}162}163}164165#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]166#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]167#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]168pub enum MissingColumnsPolicy {169#[default]170Raise,171/// Inserts full-NULL columns for the missing ones.172Insert,173}174175/// Used by scans.176#[derive(Debug, Clone, PartialEq, Eq, Hash)]177#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]178#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]179pub struct CastColumnsPolicy {180/// Allow casting when target dtype is lossless supertype181pub integer_upcast: bool,182183/// Allow casting integers to floats.184#[cfg_attr(feature = "serde", serde(default))]185pub integer_to_float_cast: bool,186187/// Allow upcasting from small floats to bigger floats188pub float_upcast: bool,189/// Allow downcasting from big floats to smaller floats190pub float_downcast: bool,191192/// Allow datetime[ns] to be casted to any lower precision. Important for193/// being able to read datasets written by spark.194pub datetime_nanoseconds_downcast: bool,195/// Allow datetime[us] to datetime[ms]196pub datetime_microseconds_downcast: bool,197198/// Allow casting to change time units.199pub datetime_convert_timezone: bool,200201/// DataType::Null to any202pub null_upcast: bool,203204/// DataType::Categorical to string205pub categorical_to_string: bool,206207pub missing_struct_fields: MissingColumnsPolicy,208pub extra_struct_fields: ExtraColumnsPolicy,209}210211impl CastColumnsPolicy {212/// Configuration variant that defaults to raising on mismatch.213pub const ERROR_ON_MISMATCH: Self = Self {214integer_upcast: false,215integer_to_float_cast: false,216float_upcast: false,217float_downcast: false,218datetime_nanoseconds_downcast: false,219datetime_microseconds_downcast: false,220datetime_convert_timezone: false,221null_upcast: true,222categorical_to_string: false,223missing_struct_fields: MissingColumnsPolicy::Raise,224extra_struct_fields: ExtraColumnsPolicy::Raise,225};226}227228impl Default for CastColumnsPolicy {229fn default() -> Self {230Self::ERROR_ON_MISMATCH231}232}233234#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]235#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]236#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]237pub enum ExtraColumnsPolicy {238/// Error if there are extra columns outside the target schema.239#[default]240Raise,241Ignore,242}243244#[derive(Debug, Clone, Eq, Hash, PartialEq, strum_macros::IntoStaticStr)]245#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]246#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]247pub enum ColumnMapping {248Iceberg(IcebergSchemaRef),249}250251#[derive(Debug, Clone)]252#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]253#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]254pub struct TableStatistics(pub Arc<DataFrame>);255256impl PartialEq for TableStatistics {257fn eq(&self, other: &Self) -> bool {258Arc::ptr_eq(&self.0, &other.0)259}260}261262impl Eq for TableStatistics {}263264impl Hash for TableStatistics {265fn hash<H: std::hash::Hasher>(&self, state: &mut H) {266state.write_usize(Arc::as_ptr(&self.0) as *const () as usize);267}268}269270impl std::ops::Deref for TableStatistics {271type Target = Arc<DataFrame>;272273fn deref(&self) -> &Self::Target {274&self.0275}276}277278/// Scan arguments shared across different scan types.279#[derive(Debug, Clone, PartialEq, Eq, Hash)]280#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]281#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]282pub struct UnifiedScanArgs {283/// User-provided schema of the file. Will be inferred during IR conversion284/// if None.285pub schema: Option<SchemaRef>,286pub cloud_options: Option<CloudOptions>,287pub hive_options: HiveOptions,288289pub rechunk: bool,290pub cache: bool,291pub glob: bool,292/// Files with these prefixes will not be read.293pub hidden_file_prefix: Option<Arc<[PlSmallStr]>>,294295pub projection: Option<Arc<[PlSmallStr]>>,296pub column_mapping: Option<ColumnMapping>,297/// Default values for missing columns.298pub default_values: Option<DefaultFieldValues>,299pub row_index: Option<RowIndex>,300/// Slice applied before predicates301pub pre_slice: Option<Slice>,302303pub cast_columns_policy: CastColumnsPolicy,304pub missing_columns_policy: MissingColumnsPolicy,305pub extra_columns_policy: ExtraColumnsPolicy,306pub include_file_paths: Option<PlSmallStr>,307308pub deletion_files: Option<DeletionFilesList>,309pub table_statistics: Option<TableStatistics>,310/// Stores (physical, deleted) row counts of the table if known upfront (e.g. for Iceberg).311/// This allows for row-count queries to succeed without scanning all files.312///313/// Note, intentionally store u64 instead of IdxSize to avoid erroring if it's unused.314pub row_count: Option<(u64, u64)>,315}316317#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash)]318#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]319#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]320pub struct PredicateFileSkip {321/// If `true` the predicate can be skipped at runtime.322pub no_residual_predicate: bool,323/// Number of files before skipping324pub original_len: usize,325}326327impl UnifiedScanArgs {328pub fn has_row_index_or_slice(&self) -> bool {329self.row_index.is_some() || self.pre_slice.is_some()330}331}332333// Manual default, we have `glob: true` by default.334impl Default for UnifiedScanArgs {335fn default() -> Self {336Self {337schema: None,338cloud_options: None,339hive_options: HiveOptions::new_enabled(),340rechunk: false,341cache: false,342glob: true,343hidden_file_prefix: None,344projection: None,345column_mapping: None,346default_values: None,347row_index: None,348pre_slice: None,349cast_columns_policy: CastColumnsPolicy::default(),350missing_columns_policy: MissingColumnsPolicy::default(),351extra_columns_policy: ExtraColumnsPolicy::default(),352include_file_paths: None,353deletion_files: None,354table_statistics: None,355row_count: None,356}357}358}359360/// Manual impls of Eq/Hash, as some fields are `Arc<T>` where T does not have Eq/Hash. For these361/// fields we compare the pointer addresses instead.362mod _file_scan_eq_hash {363use std::hash::{Hash, Hasher};364use std::sync::Arc;365366#[cfg(feature = "scan_lines")]367use polars_utils::pl_str::PlSmallStr;368369use super::FileScanIR;370371impl Eq for FileScanIR {}372373impl Hash for FileScanIR {374fn hash<H: Hasher>(&self, state: &mut H) {375FileScanEqHashWrap::from(self).hash(state)376}377}378379impl PartialEq for FileScanIR {380fn eq(&self, other: &Self) -> bool {381FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)382}383}384385/// # Hash / Eq safety386/// * All usizes originate from `Arc<>`s, and the lifetime of this enum is bound to that of the387/// input ref.388#[derive(PartialEq, Hash)]389pub enum FileScanEqHashWrap<'a> {390#[cfg(feature = "csv")]391Csv {392options: &'a polars_io::csv::read::CsvReadOptions,393},394395#[cfg(feature = "json")]396NDJson {397options: &'a crate::prelude::NDJsonReadOptions,398},399400#[cfg(feature = "parquet")]401Parquet {402options: &'a polars_io::prelude::ParquetOptions,403metadata: Option<usize>,404},405406#[cfg(feature = "ipc")]407Ipc {408options: &'a polars_io::prelude::IpcScanOptions,409metadata: Option<usize>,410},411412#[cfg(feature = "python")]413PythonDataset {414dataset_object: usize,415cached_ir: usize,416},417418#[cfg(feature = "scan_lines")]419Lines { name: &'a PlSmallStr },420421Anonymous {422options: &'a crate::dsl::AnonymousScanOptions,423function: usize,424},425426/// Variant to ensure the lifetime is used regardless of feature gate combination.427#[expect(unused)]428Phantom(&'a ()),429}430431impl<'a> From<&'a FileScanIR> for FileScanEqHashWrap<'a> {432fn from(value: &'a FileScanIR) -> Self {433match value {434#[cfg(feature = "csv")]435FileScanIR::Csv { options } => FileScanEqHashWrap::Csv { options },436437#[cfg(feature = "json")]438FileScanIR::NDJson { options } => FileScanEqHashWrap::NDJson { options },439440#[cfg(feature = "parquet")]441FileScanIR::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {442options,443metadata: metadata.as_ref().map(arc_as_ptr),444},445446#[cfg(feature = "ipc")]447FileScanIR::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {448options,449metadata: metadata.as_ref().map(arc_as_ptr),450},451452#[cfg(feature = "python")]453FileScanIR::PythonDataset {454dataset_object,455cached_ir,456} => FileScanEqHashWrap::PythonDataset {457dataset_object: arc_as_ptr(dataset_object),458cached_ir: arc_as_ptr(cached_ir),459},460461#[cfg(feature = "scan_lines")]462FileScanIR::Lines { name } => FileScanEqHashWrap::Lines { name },463464FileScanIR::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {465options,466function: arc_as_ptr(function),467},468}469}470}471472fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {473Arc::as_ptr(arc) as *const () as usize474}475}476477impl CastColumnsPolicy {478/// Checks if casting can be done to a dtype with a configured policy.479///480/// # Returns481/// * Ok(true): Cast needed to target dtype482/// * Ok(false): No casting needed483/// * Err(_): Forbidden by configuration, or incompatible types.484pub fn should_cast_column(485&self,486column_name: &str,487target_dtype: &DataType,488incoming_dtype: &DataType,489) -> PolarsResult<bool> {490let mismatch_err = |hint: &str| {491let hint_spacing = if hint.is_empty() { "" } else { ", " };492493polars_bail!(494SchemaMismatch:495"data type mismatch for column {}: incoming: {:?} != target: {:?}{}{}",496column_name,497incoming_dtype,498target_dtype,499hint_spacing,500hint,501)502};503504if incoming_dtype.is_null() && !target_dtype.is_null() {505return if self.null_upcast {506Ok(true)507} else {508mismatch_err("unimplemented: 'null-upcast' in scan cast options")509};510}511512// We intercept the nested types first to prevent an expensive recursive eq - recursion513// is instead done manually through this function.514515#[cfg(feature = "dtype-struct")]516if let DataType::Struct(target_fields) = target_dtype {517let DataType::Struct(incoming_fields) = incoming_dtype else {518return mismatch_err("");519};520521let incoming_fields_schema = PlHashMap::from_iter(522incoming_fields523.iter()524.enumerate()525.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),526);527528let mut should_cast = incoming_fields.len() != target_fields.len();529530for (target_idx, target_field) in target_fields.iter().enumerate() {531let Some((incoming_idx, incoming_field_dtype)) =532incoming_fields_schema.get(target_field.name().as_str())533else {534match self.missing_struct_fields {535MissingColumnsPolicy::Raise => {536return mismatch_err(&format!(537"encountered missing struct field: {}, \538hint: pass cast_options=pl.ScanCastOptions(missing_struct_fields='insert')",539target_field.name(),540));541},542MissingColumnsPolicy::Insert => {543should_cast = true;544// Must keep checking the rest of the fields.545continue;546},547};548};549550// # Note551// We also need to cast if the struct fields are out of order. Currently there is552// no API parameter to control this - we always do this by default.553should_cast |= *incoming_idx != target_idx;554555should_cast |= self.should_cast_column(556column_name,557&target_field.dtype,558incoming_field_dtype,559)?;560}561562// Casting is also needed if there are extra fields, check them here.563564// Take and re-use hashmap565let mut target_fields_schema = incoming_fields_schema;566target_fields_schema.clear();567568target_fields_schema.extend(569target_fields570.iter()571.enumerate()572.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),573);574575for fld in incoming_fields {576if !target_fields_schema.contains_key(fld.name.as_str()) {577match self.extra_struct_fields {578ExtraColumnsPolicy::Ignore => {579should_cast = true;580break;581},582ExtraColumnsPolicy::Raise => {583return mismatch_err(&format!(584"encountered extra struct field: {}, \585hint: specify this field in the schema, or pass \586cast_options=pl.ScanCastOptions(extra_struct_fields='ignore')",587&fld.name,588));589},590}591}592}593594return Ok(should_cast);595}596597if let DataType::List(target_inner) = target_dtype {598let DataType::List(incoming_inner) = incoming_dtype else {599return mismatch_err("");600};601602return self.should_cast_column(column_name, target_inner, incoming_inner);603}604605#[cfg(feature = "dtype-array")]606if let DataType::Array(target_inner, target_width) = target_dtype {607let DataType::Array(incoming_inner, incoming_width) = incoming_dtype else {608return mismatch_err("");609};610611if incoming_width != target_width {612return mismatch_err("");613}614615return self.should_cast_column(column_name, target_inner, incoming_inner);616}617618// Eq here should be cheap as we have intercepted all nested types above.619620debug_assert!(!target_dtype.is_nested());621622// If we were to drop cast on an `Unknown` incoming_dtype, it could eventually623// lead to dtype errors. The reason is that the logic used by type coercion differs624// from the casting logic used by `materialize_unknown`.625if incoming_dtype.contains_unknown() {626return Ok(true);627}628629// Note: Only call this with non-nested types for performance630let materialize_unknown = |dtype: &DataType| -> std::borrow::Cow<DataType> {631dtype632.clone()633.materialize_unknown(true)634.map(std::borrow::Cow::Owned)635.unwrap_or(std::borrow::Cow::Borrowed(incoming_dtype))636};637638let incoming_dtype = std::borrow::Cow::Borrowed(incoming_dtype);639let target_dtype = materialize_unknown(target_dtype);640641if target_dtype == incoming_dtype {642return Ok(false);643}644645let incoming_dtype = incoming_dtype.as_ref();646let target_dtype = target_dtype.as_ref();647648//649// After this point the dtypes are mismatching.650//651652if target_dtype.is_integer() && incoming_dtype.is_integer() {653if !self.integer_upcast {654return mismatch_err(655"hint: pass cast_options=pl.ScanCastOptions(integer_cast='upcast')",656);657}658659return match get_numeric_upcast_supertype_lossless(incoming_dtype, target_dtype) {660Some(ref v) if v == target_dtype => Ok(true),661_ => mismatch_err("incoming dtype cannot safely cast to target dtype"),662};663}664665if target_dtype.is_float() && incoming_dtype.is_float() {666return match (target_dtype, incoming_dtype) {667(DataType::Float64, DataType::Float32)668| (DataType::Float64, DataType::Float16)669| (DataType::Float32, DataType::Float16) => {670if self.float_upcast {671Ok(true)672} else {673mismatch_err(674"hint: pass cast_options=pl.ScanCastOptions(float_cast='upcast')",675)676}677},678679(DataType::Float16, DataType::Float32)680| (DataType::Float16, DataType::Float64)681| (DataType::Float32, DataType::Float64) => {682if self.float_downcast {683Ok(true)684} else {685mismatch_err(686"hint: pass cast_options=pl.ScanCastOptions(float_cast='downcast')",687)688}689},690691_ => unreachable!(),692};693}694695if target_dtype.is_float() && incoming_dtype.is_integer() {696return if !self.integer_to_float_cast {697mismatch_err(698"hint: pass cast_options=pl.ScanCastOptions(integer_cast='allow-float')",699)700} else {701Ok(true)702};703}704705if let (706DataType::Datetime(target_unit, target_zone),707DataType::Datetime(incoming_unit, incoming_zone),708) = (target_dtype, incoming_dtype)709{710// Check timezone711if !self.datetime_convert_timezone712&& !TimeZone::eq_none_as_utc(incoming_zone.as_ref(), target_zone.as_ref())713{714return mismatch_err(715"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='convert-timezone')",716);717}718719// Check unit720if target_unit != incoming_unit {721return match (incoming_unit, target_unit) {722(TimeUnit::Nanoseconds, _) => {723if self.datetime_nanoseconds_downcast {724Ok(true)725} else {726mismatch_err(727"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='nanosecond-downcast')",728)729}730},731732(TimeUnit::Microseconds, TimeUnit::Milliseconds) => {733if self.datetime_microseconds_downcast {734Ok(true)735} else {736// TODO737mismatch_err(738"unimplemented: 'microsecond-downcast' in scan cast options",739)740}741},742743_ => mismatch_err(""),744};745}746747// Dtype differs and we are allowed to coerce748return Ok(true);749}750751mismatch_err("")752}753}754755756