Path: blob/main/crates/polars-plan/src/dsl/file_scan/mod.rs
6940 views
use std::hash::Hash;1use std::sync::Mutex;23use deletion::DeletionFilesList;4use polars_core::schema::iceberg::IcebergSchemaRef;5use polars_core::utils::get_numeric_upcast_supertype_lossless;6use polars_io::cloud::CloudOptions;7#[cfg(feature = "csv")]8use polars_io::csv::read::CsvReadOptions;9#[cfg(feature = "ipc")]10use polars_io::ipc::IpcScanOptions;11#[cfg(feature = "parquet")]12use polars_io::parquet::metadata::FileMetadataRef;13#[cfg(feature = "parquet")]14use polars_io::parquet::read::ParquetOptions;15use polars_io::{HiveOptions, RowIndex};16use polars_utils::slice_enum::Slice;17#[cfg(feature = "serde")]18use serde::{Deserialize, Serialize};19use strum_macros::IntoStaticStr;2021use super::*;22use crate::dsl::default_values::DefaultFieldValues;23pub mod default_values;24pub mod deletion;2526#[cfg(feature = "python")]27pub mod python_dataset;28#[cfg(feature = "python")]29pub use python_dataset::{DATASET_PROVIDER_VTABLE, PythonDatasetProviderVTable};3031bitflags::bitflags! {32#[derive(Debug, Clone, Copy, PartialEq, Eq)]33pub struct ScanFlags : u32 {34const SPECIALIZED_PREDICATE_FILTER = 0x01;35}36}3738#[derive(Clone, Debug, IntoStaticStr)]39#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]40#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]41// TODO: Arc<> some of the options and the cloud options.42pub enum FileScanDsl {43#[cfg(feature = "csv")]44Csv { options: CsvReadOptions },4546#[cfg(feature = "json")]47NDJson { options: NDJsonReadOptions },4849#[cfg(feature = "parquet")]50Parquet { options: ParquetOptions },5152#[cfg(feature = "ipc")]53Ipc { options: IpcScanOptions },5455#[cfg(feature = "python")]56PythonDataset {57dataset_object: Arc<python_dataset::PythonDatasetProvider>,58},5960#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]61Anonymous {62options: Arc<AnonymousScanOptions>,63function: Arc<dyn AnonymousScan>,64file_info: FileInfo,65},66}6768#[derive(Clone, Debug, IntoStaticStr)]69#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]70#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]71// TODO: Arc<> some of the options and the cloud options.72pub enum FileScanIR {73#[cfg(feature = "csv")]74Csv { options: CsvReadOptions },7576#[cfg(feature = "json")]77NDJson { options: NDJsonReadOptions },7879#[cfg(feature = "parquet")]80Parquet {81options: ParquetOptions,82#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]83metadata: Option<FileMetadataRef>,84},8586#[cfg(feature = "ipc")]87Ipc {88options: IpcScanOptions,89#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]90metadata: Option<Arc<arrow::io::ipc::read::FileMetadata>>,91},9293#[cfg(feature = "python")]94PythonDataset {95dataset_object: Arc<python_dataset::PythonDatasetProvider>,96cached_ir: Arc<Mutex<Option<ExpandedDataset>>>,97},9899#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]100Anonymous {101options: Arc<AnonymousScanOptions>,102function: Arc<dyn AnonymousScan>,103},104}105106impl FileScanIR {107pub fn flags(&self) -> ScanFlags {108match self {109#[cfg(feature = "csv")]110Self::Csv { .. } => ScanFlags::empty(),111#[cfg(feature = "ipc")]112Self::Ipc { .. } => ScanFlags::empty(),113#[cfg(feature = "parquet")]114Self::Parquet { .. } => ScanFlags::SPECIALIZED_PREDICATE_FILTER,115#[cfg(feature = "json")]116Self::NDJson { .. } => ScanFlags::empty(),117#[allow(unreachable_patterns)]118_ => ScanFlags::empty(),119}120}121122pub(crate) fn sort_projection(&self, _has_row_index: bool) -> bool {123match self {124#[cfg(feature = "csv")]125Self::Csv { .. } => true,126#[cfg(feature = "ipc")]127Self::Ipc { .. } => _has_row_index,128#[cfg(feature = "parquet")]129Self::Parquet { .. } => false,130#[allow(unreachable_patterns)]131_ => false,132}133}134135pub fn streamable(&self) -> bool {136match self {137#[cfg(feature = "csv")]138Self::Csv { .. } => true,139#[cfg(feature = "ipc")]140Self::Ipc { .. } => false,141#[cfg(feature = "parquet")]142Self::Parquet { .. } => true,143#[cfg(feature = "json")]144Self::NDJson { .. } => false,145#[allow(unreachable_patterns)]146_ => false,147}148}149}150151#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]152#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]153#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]154pub enum MissingColumnsPolicy {155#[default]156Raise,157/// Inserts full-NULL columns for the missing ones.158Insert,159}160161/// Used by scans.162#[derive(Debug, Clone, PartialEq, Eq, Hash)]163#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]164#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]165pub struct CastColumnsPolicy {166/// Allow casting when target dtype is lossless supertype167pub integer_upcast: bool,168169/// Allow Float32 -> Float64170pub float_upcast: bool,171/// Allow Float64 -> Float32172pub float_downcast: bool,173174/// Allow datetime[ns] to be casted to any lower precision. Important for175/// being able to read datasets written by spark.176pub datetime_nanoseconds_downcast: bool,177/// Allow datetime[us] to datetime[ms]178pub datetime_microseconds_downcast: bool,179180/// Allow casting to change time units.181pub datetime_convert_timezone: bool,182183/// DataType::Null to any184pub null_upcast: bool,185186pub missing_struct_fields: MissingColumnsPolicy,187pub extra_struct_fields: ExtraColumnsPolicy,188}189190impl CastColumnsPolicy {191/// Configuration variant that defaults to raising on mismatch.192pub const ERROR_ON_MISMATCH: Self = Self {193integer_upcast: false,194float_upcast: false,195float_downcast: false,196datetime_nanoseconds_downcast: false,197datetime_microseconds_downcast: false,198datetime_convert_timezone: false,199null_upcast: true,200missing_struct_fields: MissingColumnsPolicy::Raise,201extra_struct_fields: ExtraColumnsPolicy::Raise,202};203}204205impl Default for CastColumnsPolicy {206fn default() -> Self {207Self::ERROR_ON_MISMATCH208}209}210211#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Hash)]212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]213#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]214pub enum ExtraColumnsPolicy {215/// Error if there are extra columns outside the target schema.216#[default]217Raise,218Ignore,219}220221#[derive(Debug, Clone, Eq, Hash, PartialEq)]222#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]223#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]224pub enum ColumnMapping {225Iceberg(IcebergSchemaRef),226}227228/// Scan arguments shared across different scan types.229#[derive(Debug, Clone, PartialEq, Eq, Hash)]230#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]231#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]232pub struct UnifiedScanArgs {233/// User-provided schema of the file. Will be inferred during IR conversion234/// if None.235pub schema: Option<SchemaRef>,236pub cloud_options: Option<CloudOptions>,237pub hive_options: HiveOptions,238239pub rechunk: bool,240pub cache: bool,241pub glob: bool,242243pub projection: Option<Arc<[PlSmallStr]>>,244pub column_mapping: Option<ColumnMapping>,245/// Default values for missing columns.246pub default_values: Option<DefaultFieldValues>,247pub row_index: Option<RowIndex>,248/// Slice applied before predicates249pub pre_slice: Option<Slice>,250251pub cast_columns_policy: CastColumnsPolicy,252pub missing_columns_policy: MissingColumnsPolicy,253pub extra_columns_policy: ExtraColumnsPolicy,254pub include_file_paths: Option<PlSmallStr>,255256pub deletion_files: Option<DeletionFilesList>,257}258259impl Default for UnifiedScanArgs {260fn default() -> Self {261Self {262schema: None,263cloud_options: None,264hive_options: HiveOptions::new_enabled(),265rechunk: false,266cache: false,267glob: true,268projection: None,269column_mapping: None,270default_values: None,271row_index: None,272pre_slice: None,273cast_columns_policy: CastColumnsPolicy::default(),274missing_columns_policy: MissingColumnsPolicy::default(),275extra_columns_policy: ExtraColumnsPolicy::default(),276include_file_paths: None,277deletion_files: None,278}279}280}281282/// Manual impls of Eq/Hash, as some fields are `Arc<T>` where T does not have Eq/Hash. For these283/// fields we compare the pointer addresses instead.284mod _file_scan_eq_hash {285use std::hash::{Hash, Hasher};286use std::sync::Arc;287288use super::FileScanIR;289290impl Eq for FileScanIR {}291292impl Hash for FileScanIR {293fn hash<H: Hasher>(&self, state: &mut H) {294FileScanEqHashWrap::from(self).hash(state)295}296}297298impl PartialEq for FileScanIR {299fn eq(&self, other: &Self) -> bool {300FileScanEqHashWrap::from(self) == FileScanEqHashWrap::from(other)301}302}303304/// # Hash / Eq safety305/// * All usizes originate from `Arc<>`s, and the lifetime of this enum is bound to that of the306/// input ref.307#[derive(PartialEq, Hash)]308pub enum FileScanEqHashWrap<'a> {309#[cfg(feature = "csv")]310Csv {311options: &'a polars_io::csv::read::CsvReadOptions,312},313314#[cfg(feature = "json")]315NDJson {316options: &'a crate::prelude::NDJsonReadOptions,317},318319#[cfg(feature = "parquet")]320Parquet {321options: &'a polars_io::prelude::ParquetOptions,322metadata: Option<usize>,323},324325#[cfg(feature = "ipc")]326Ipc {327options: &'a polars_io::prelude::IpcScanOptions,328metadata: Option<usize>,329},330331#[cfg(feature = "python")]332PythonDataset {333dataset_object: usize,334cached_ir: usize,335},336337Anonymous {338options: &'a crate::dsl::AnonymousScanOptions,339function: usize,340},341342/// Variant to ensure the lifetime is used regardless of feature gate combination.343#[expect(unused)]344Phantom(&'a ()),345}346347impl<'a> From<&'a FileScanIR> for FileScanEqHashWrap<'a> {348fn from(value: &'a FileScanIR) -> Self {349match value {350#[cfg(feature = "csv")]351FileScanIR::Csv { options } => FileScanEqHashWrap::Csv { options },352353#[cfg(feature = "json")]354FileScanIR::NDJson { options } => FileScanEqHashWrap::NDJson { options },355356#[cfg(feature = "parquet")]357FileScanIR::Parquet { options, metadata } => FileScanEqHashWrap::Parquet {358options,359metadata: metadata.as_ref().map(arc_as_ptr),360},361362#[cfg(feature = "ipc")]363FileScanIR::Ipc { options, metadata } => FileScanEqHashWrap::Ipc {364options,365metadata: metadata.as_ref().map(arc_as_ptr),366},367368#[cfg(feature = "python")]369FileScanIR::PythonDataset {370dataset_object,371cached_ir,372} => FileScanEqHashWrap::PythonDataset {373dataset_object: arc_as_ptr(dataset_object),374cached_ir: arc_as_ptr(cached_ir),375},376377FileScanIR::Anonymous { options, function } => FileScanEqHashWrap::Anonymous {378options,379function: arc_as_ptr(function),380},381}382}383}384385fn arc_as_ptr<T: ?Sized>(arc: &Arc<T>) -> usize {386Arc::as_ptr(arc) as *const () as usize387}388}389390impl CastColumnsPolicy {391/// Checks if casting can be done to a dtype with a configured policy.392///393/// # Returns394/// * Ok(true): Cast needed to target dtype395/// * Ok(false): No casting needed396/// * Err(_): Forbidden by configuration, or incompatible types.397pub fn should_cast_column(398&self,399column_name: &str,400target_dtype: &DataType,401incoming_dtype: &DataType,402) -> PolarsResult<bool> {403let mismatch_err = |hint: &str| {404let hint_spacing = if hint.is_empty() { "" } else { ", " };405406polars_bail!(407SchemaMismatch:408"data type mismatch for column {}: incoming: {:?} != target: {:?}{}{}",409column_name,410incoming_dtype,411target_dtype,412hint_spacing,413hint,414)415};416417if incoming_dtype.is_null() && !target_dtype.is_null() {418return if self.null_upcast {419Ok(true)420} else {421mismatch_err("unimplemented: 'null-upcast' in scan cast options")422};423}424425// We intercept the nested types first to prevent an expensive recursive eq - recursion426// is instead done manually through this function.427428#[cfg(feature = "dtype-struct")]429if let DataType::Struct(target_fields) = target_dtype {430let DataType::Struct(incoming_fields) = incoming_dtype else {431return mismatch_err("");432};433434let incoming_fields_schema = PlHashMap::from_iter(435incoming_fields436.iter()437.enumerate()438.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),439);440441let mut should_cast = incoming_fields.len() != target_fields.len();442443for (target_idx, target_field) in target_fields.iter().enumerate() {444let Some((incoming_idx, incoming_field_dtype)) =445incoming_fields_schema.get(target_field.name().as_str())446else {447match self.missing_struct_fields {448MissingColumnsPolicy::Raise => {449return mismatch_err(&format!(450"encountered missing struct field: {}, \451hint: pass cast_options=pl.ScanCastOptions(missing_struct_fields='insert')",452target_field.name(),453));454},455MissingColumnsPolicy::Insert => {456should_cast = true;457// Must keep checking the rest of the fields.458continue;459},460};461};462463// # Note464// We also need to cast if the struct fields are out of order. Currently there is465// no API parameter to control this - we always do this by default.466should_cast |= *incoming_idx != target_idx;467468should_cast |= self.should_cast_column(469column_name,470&target_field.dtype,471incoming_field_dtype,472)?;473}474475// Casting is also needed if there are extra fields, check them here.476477// Take and re-use hashmap478let mut target_fields_schema = incoming_fields_schema;479target_fields_schema.clear();480481target_fields_schema.extend(482target_fields483.iter()484.enumerate()485.map(|(i, fld)| (fld.name.as_str(), (i, &fld.dtype))),486);487488for fld in incoming_fields {489if !target_fields_schema.contains_key(fld.name.as_str()) {490match self.extra_struct_fields {491ExtraColumnsPolicy::Ignore => {492should_cast = true;493break;494},495ExtraColumnsPolicy::Raise => {496return mismatch_err(&format!(497"encountered extra struct field: {}, \498hint: specify this field in the schema, or pass \499cast_options=pl.ScanCastOptions(extra_struct_fields='ignore')",500&fld.name,501));502},503}504}505}506507return Ok(should_cast);508}509510if let DataType::List(target_inner) = target_dtype {511let DataType::List(incoming_inner) = incoming_dtype else {512return mismatch_err("");513};514515return self.should_cast_column(column_name, target_inner, incoming_inner);516}517518#[cfg(feature = "dtype-array")]519if let DataType::Array(target_inner, target_width) = target_dtype {520let DataType::Array(incoming_inner, incoming_width) = incoming_dtype else {521return mismatch_err("");522};523524if incoming_width != target_width {525return mismatch_err("");526}527528return self.should_cast_column(column_name, target_inner, incoming_inner);529}530531// Eq here should be cheap as we have intercepted all nested types above.532533debug_assert!(!target_dtype.is_nested());534535// If we were to drop cast on an `Unknown` incoming_dtype, it could eventually536// lead to dtype errors. The reason is that the logic used by type coercion differs537// from the casting logic used by `materialize_unknown`.538if incoming_dtype.contains_unknown() {539return Ok(true);540}541542// Note: Only call this with non-nested types for performance543let materialize_unknown = |dtype: &DataType| -> std::borrow::Cow<DataType> {544dtype545.clone()546.materialize_unknown(true)547.map(std::borrow::Cow::Owned)548.unwrap_or(std::borrow::Cow::Borrowed(incoming_dtype))549};550551let incoming_dtype = std::borrow::Cow::Borrowed(incoming_dtype);552let target_dtype = materialize_unknown(target_dtype);553554if target_dtype == incoming_dtype {555return Ok(false);556}557558let incoming_dtype = incoming_dtype.as_ref();559let target_dtype = target_dtype.as_ref();560561//562// After this point the dtypes are mismatching.563//564565if target_dtype.is_integer() && incoming_dtype.is_integer() {566if !self.integer_upcast {567return mismatch_err(568"hint: pass cast_options=pl.ScanCastOptions(integer_cast='upcast')",569);570}571572return match get_numeric_upcast_supertype_lossless(incoming_dtype, target_dtype) {573Some(ref v) if v == target_dtype => Ok(true),574_ => mismatch_err("incoming dtype cannot safely cast to target dtype"),575};576}577578if target_dtype.is_float() && incoming_dtype.is_float() {579return match (target_dtype, incoming_dtype) {580(DataType::Float64, DataType::Float32) => {581if self.float_upcast {582Ok(true)583} else {584mismatch_err(585"hint: pass cast_options=pl.ScanCastOptions(float_cast='upcast')",586)587}588},589590(DataType::Float32, DataType::Float64) => {591if self.float_downcast {592Ok(true)593} else {594mismatch_err(595"hint: pass cast_options=pl.ScanCastOptions(float_cast='downcast')",596)597}598},599600_ => unreachable!(),601};602}603604if let (605DataType::Datetime(target_unit, target_zone),606DataType::Datetime(incoming_unit, incoming_zone),607) = (target_dtype, incoming_dtype)608{609// Check timezone610if !self.datetime_convert_timezone611&& !TimeZone::eq_none_as_utc(incoming_zone.as_ref(), target_zone.as_ref())612{613return mismatch_err(614"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='convert-timezone')",615);616}617618// Check unit619if target_unit != incoming_unit {620return match (incoming_unit, target_unit) {621(TimeUnit::Nanoseconds, _) => {622if self.datetime_nanoseconds_downcast {623Ok(true)624} else {625mismatch_err(626"hint: pass cast_options=pl.ScanCastOptions(datetime_cast='nanosecond-downcast')",627)628}629},630631(TimeUnit::Microseconds, TimeUnit::Milliseconds) => {632if self.datetime_microseconds_downcast {633Ok(true)634} else {635// TODO636mismatch_err(637"unimplemented: 'microsecond-downcast' in scan cast options",638)639}640},641642_ => mismatch_err(""),643};644}645646// Dtype differs and we are allowed to coerce647return Ok(true);648}649650mismatch_err("")651}652}653654655