Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs
8509 views
use std::io::{BufReader, Cursor};1use std::sync::{LazyLock, RwLock};23use either::Either;4use polars_buffer::Buffer;5use polars_io::csv::read::streaming::read_until_start_and_infer_schema;6use polars_io::prelude::*;7use polars_io::utils::byte_source::{ByteSource, DynByteSourceBuilder};8use polars_io::utils::compression::{ByteSourceReader, CompressedReader, SupportedCompression};9use polars_io::utils::stream_buf_reader::ReaderSource;10use polars_io::{RowIndex, pl_async};1112use super::*;1314pub(super) async fn dsl_to_ir(15sources: ScanSources,16mut unified_scan_args_box: Box<UnifiedScanArgs>,17scan_type: Box<FileScanDsl>,18cached_ir: Arc<Mutex<Option<IR>>>,19cache_file_info: SourcesToFileInfo,20verbose: bool,21) -> PolarsResult<()> {22// Note that the first metadata can still end up being `None` later if the files were23// filtered from predicate pushdown.24// Check and drop the lock in its own scope25let is_not_cached = {26let cached_ir_guard = cached_ir.lock().unwrap();27cached_ir_guard.is_none()28};2930if is_not_cached {31let unified_scan_args = unified_scan_args_box.as_mut();3233if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {34match unified_scan_args.hive_options.enabled {35// Enable hive_partitioning if it is unspecified but a non-empty hive_schema given36None if !hive_schema.is_empty() => {37unified_scan_args.hive_options.enabled = Some(true)38},39// hive_partitioning was explicitly disabled40Some(false) => polars_bail!(41ComputeError:42"a hive schema was given but hive_partitioning was disabled"43),44Some(true) | None => {},45}46}4748let sources_before_expansion = &sources;4950let sources = match &*scan_type {51#[cfg(feature = "parquet")]52FileScanDsl::Parquet { .. } => {53sources54.expand_paths_with_hive_update(unified_scan_args)55.await?56},57#[cfg(feature = "ipc")]58FileScanDsl::Ipc { .. } => {59sources60.expand_paths_with_hive_update(unified_scan_args)61.await?62},63#[cfg(feature = "csv")]64FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args).await?,65#[cfg(feature = "json")]66FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args).await?,67#[cfg(feature = "python")]68FileScanDsl::PythonDataset { .. } => {69// There are a lot of places that short-circuit if the paths is empty,70// so we just give a dummy path here.71ScanSources::Paths(Buffer::from_iter([PlRefPath::new("PL_PY_DSET")]))72},73#[cfg(feature = "scan_lines")]74FileScanDsl::Lines { .. } => sources.expand_paths(unified_scan_args).await?,75FileScanDsl::Anonymous { .. } => sources.clone(),76};7778// For cloud we must deduplicate files. Serialization/deserialization leads to Arc's losing there79// sharing.80let (mut file_info, scan_type_ir) = {81cache_file_info82.get_or_insert(83&scan_type,84&sources,85sources_before_expansion,86unified_scan_args,87verbose,88)89.await?90};9192if unified_scan_args.hive_options.enabled.is_none() {93// We expect this to be `Some(_)` after this point. If it hasn't been auto-enabled94// we explicitly set it to disabled.95unified_scan_args.hive_options.enabled = Some(false);96}9798let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()99&& let Some(file_schema) = file_info.reader_schema.as_ref()100{101let paths = sources102.as_paths()103.ok_or_else(|| polars_err!(nyi = "Hive-partitioning of in-memory buffers"))?;104105#[allow(unused_assignments)]106let mut owned = None;107108hive_partitions_from_paths(109paths,110unified_scan_args.hive_options.hive_start_idx,111unified_scan_args.hive_options.schema.clone(),112match file_schema {113Either::Left(v) => {114owned = Some(Schema::from_arrow_schema(v.as_ref()));115owned.as_ref().unwrap()116},117Either::Right(v) => v.as_ref(),118},119unified_scan_args.hive_options.try_parse_dates,120)?121} else {122None123};124125if let Some(ref hive_parts) = hive_parts {126let hive_schema = hive_parts.schema();127file_info.update_schema_with_hive_schema(hive_schema.clone());128} else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {129// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file130// list during path expansion. In this case we still want to return an empty DataFrame with this131// schema.132file_info.update_schema_with_hive_schema(hive_schema);133}134135if let Some(ref file_path_col) = unified_scan_args.include_file_paths {136let schema: &mut Schema = Arc::make_mut(&mut file_info.schema);137138if schema.contains(file_path_col) {139polars_bail!(140Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,141file_path_col142);143}144145schema.insert_at_index(schema.len(), file_path_col.clone(), DataType::String)?;146}147148unified_scan_args.projection = if let Some(file_schema) = file_info.reader_schema.as_ref() {149maybe_init_projection_excluding_hive(150file_schema,151hive_parts.as_ref().map(|h| h.schema()),152)153} else {154None155};156157if let Some(row_index) = &unified_scan_args.row_index {158let schema = Arc::make_mut(&mut file_info.schema);159*schema = schema160.new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)161.unwrap();162}163164let ir = if sources.is_empty() && !matches!(&(*scan_type), FileScanDsl::Anonymous { .. }) {165IR::DataFrameScan {166df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),167schema: file_info.schema,168output_schema: None,169}170} else {171let unified_scan_args = unified_scan_args_box;172173IR::Scan {174sources,175file_info,176hive_parts,177predicate: None,178predicate_file_skip_applied: None,179scan_type: Box::new(scan_type_ir),180output_schema: None,181unified_scan_args,182}183};184185let mut cached_ir = cached_ir.lock().unwrap();186cached_ir.replace(ir);187}188189Ok(())190}191192pub(super) fn insert_row_index_to_schema(193schema: &mut Schema,194name: PlSmallStr,195) -> PolarsResult<()> {196if schema.contains(&name) {197polars_bail!(198Duplicate:199"cannot add row_index with name '{}': \200column already exists in file.",201name,202)203}204205schema.insert_at_index(0, name, IDX_DTYPE)?;206207Ok(())208}209210#[cfg(any(feature = "parquet", feature = "ipc"))]211fn prepare_output_schema(212mut schema: Schema,213row_index: Option<&RowIndex>,214) -> PolarsResult<SchemaRef> {215if let Some(rc) = row_index {216insert_row_index_to_schema(&mut schema, rc.name.clone())?;217}218Ok(Arc::new(schema))219}220221#[cfg(any(feature = "json", feature = "csv"))]222fn prepare_schemas(223mut schema: Schema,224row_index: Option<&RowIndex>,225) -> PolarsResult<(SchemaRef, SchemaRef)> {226Ok(if let Some(rc) = row_index {227let reader_schema = schema.clone();228insert_row_index_to_schema(&mut schema, rc.name.clone())?;229(Arc::new(reader_schema), Arc::new(schema))230} else {231let schema = Arc::new(schema);232(schema.clone(), schema)233})234}235236#[cfg(feature = "parquet")]237pub(super) async fn parquet_file_info(238first_scan_source: ScanSourceRef<'_>,239row_index: Option<&RowIndex>,240#[allow(unused)] cloud_options: Option<&polars_io::cloud::CloudOptions>,241n_sources: usize,242) -> PolarsResult<(FileInfo, Option<FileMetadataRef>)> {243use polars_core::error::feature_gated;244245let (reader_schema, num_rows, metadata) = {246if first_scan_source.is_cloud_url() {247let first_path = first_scan_source.as_path().unwrap();248feature_gated!("cloud", {249let mut reader =250ParquetObjectStore::from_uri(first_path.clone(), cloud_options, None).await?;251252(253reader.schema().await?,254reader.num_rows().await?,255reader.get_metadata().await?.clone(),256)257})258} else {259let memslice = first_scan_source.to_memslice()?;260let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));261(262reader.schema()?,263reader.num_rows()?,264reader.get_metadata()?.clone(),265)266}267};268269let schema =270prepare_output_schema(Schema::from_arrow_schema(reader_schema.as_ref()), row_index)?;271272let known_size = if n_sources == 1 { Some(num_rows) } else { None };273274let file_info = FileInfo::new(275schema,276Some(Either::Left(reader_schema)),277(known_size, num_rows * n_sources),278);279280Ok((file_info, Some(metadata)))281}282283pub fn max_metadata_scan_cached() -> usize {284static MAX_SCANS_METADATA_CACHED: LazyLock<usize> = LazyLock::new(|| {285let value = std::env::var("POLARS_MAX_CACHED_METADATA_SCANS").map_or(8, |v| {286v.parse::<usize>()287.expect("invalid `POLARS_MAX_CACHED_METADATA_SCANS` value")288});289if value == 0 {290return usize::MAX;291}292value293});294*MAX_SCANS_METADATA_CACHED295}296297// TODO! return metadata arced298#[cfg(feature = "ipc")]299pub(super) async fn ipc_file_info(300first_scan_source: ScanSourceRef<'_>,301row_index: Option<&RowIndex>,302cloud_options: Option<&polars_io::cloud::CloudOptions>,303) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> {304use polars_core::error::feature_gated;305306let metadata = match first_scan_source {307ScanSourceRef::Path(path) => {308if path.has_scheme() {309feature_gated!("cloud", {310polars_io::ipc::IpcReaderAsync::from_uri(path.clone(), cloud_options)311.await?312.metadata()313.await?314})315} else {316arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(317polars_utils::open_file(path.as_std_path())?,318))?319}320},321ScanSourceRef::File(file) => {322arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(file))?323},324ScanSourceRef::Buffer(buff) => {325arrow::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(buff))?326},327};328329let file_info = FileInfo::new(330prepare_output_schema(331Schema::from_arrow_schema(metadata.schema.as_ref()),332row_index,333)?,334Some(Either::Left(Arc::clone(&metadata.schema))),335(None, usize::MAX),336);337338Ok((file_info, metadata))339}340341#[cfg(feature = "csv")]342pub async fn csv_file_info(343sources: &ScanSources,344_first_scan_source: ScanSourceRef<'_>,345row_index: Option<&RowIndex>,346csv_options: &mut CsvReadOptions,347cloud_options: Option<&polars_io::cloud::CloudOptions>,348) -> PolarsResult<FileInfo> {349use polars_core::POOL;350use polars_core::error::feature_gated;351use rayon::iter::{IntoParallelIterator, ParallelIterator};352353// Holding _first_scan_source should guarantee sources is not empty.354debug_assert!(!sources.is_empty());355356// TODO:357// * See if we can do better than scanning all files if there is a row limit358359// prints the error message if paths is empty.360let run_async =361sources.is_cloud_url() || (sources.is_paths() && polars_config::config().force_async());362363let cache_entries = {364if run_async {365let sources = sources.clone();366assert!(sources.as_paths().is_some());367368feature_gated!("cloud", {369Some(370polars_io::file_cache::init_entries_from_uri_list(371(0..sources.len())372.map(move |i| sources.as_paths().unwrap().get(i).unwrap().clone()),373cloud_options,374)375.await?,376)377})378} else {379None380}381};382383let infer_schema_length = csv_options.infer_schema_length;384let infer_schema_func = |i| {385const ASSUMED_COMPRESSION_RATIO: usize = 4;386let source = sources.at(i);387388let (mem_slice_raw, file_size, decompressed_slice_size_hint) = if run_async389&& let Some(infer_schema_length) = infer_schema_length390{391// Only download what we need for schema inference.392// To do so, we use an iterative two-way progressive trial-and-error download strategy393// until we either have enough rows, or reached EOF. In every iteration, we either394// increase fetch_size (download progressively more), or try_read_size (try and395// decompress more of what we have, in the case of compressed).396const INITIAL_FETCH: usize = 64 * 1024;397398// Collect metadata.399let byte_source = pl_async::get_runtime().block_on(async move {400source401.to_dyn_byte_source(&DynByteSourceBuilder::ObjectStore, cloud_options, None)402.await403})?;404let byte_source = Arc::new(byte_source);405406let file_size = {407let byte_source = byte_source.clone();408pl_async::get_runtime().block_on(async move { byte_source.get_size().await })?409};410411let compression = if file_size >= 4 {412let byte_source = byte_source.clone();413let magic_range = 0..4;414let magic_bytes = pl_async::get_runtime()415.block_on(async move { byte_source.get_range(magic_range).await })?;416SupportedCompression::check(&magic_bytes)417} else {418None419};420421let mut offset = 0;422let mut fetch_size = INITIAL_FETCH;423let mut try_read_size = INITIAL_FETCH * ASSUMED_COMPRESSION_RATIO;424let mut truncated_bytes: Vec<u8> = Vec::with_capacity(INITIAL_FETCH);425let mut reached_eof = false;426427// Collect enough rows to satisfy infer_schema_length.428let (mem_slice_raw, decompressed_slice_size_hint) = loop {429let range = offset..std::cmp::min(file_size, offset + fetch_size);430431if range.is_empty() {432reached_eof = true433} else {434let byte_source = byte_source.clone();435let fetch_bytes = pl_async::get_runtime()436.block_on(async move { byte_source.get_range(range).await })?;437offset += fetch_bytes.len();438truncated_bytes.extend_from_slice(fetch_bytes.as_ref());439}440441let decompressed_size_hint =442Some(offset * compression.map_or(1, |_| ASSUMED_COMPRESSION_RATIO));443let mut reader = ByteSourceReader::<ReaderSource>::from_memory(444Buffer::from_owner(truncated_bytes.clone()),445)?;446447let read_size = if compression.is_none() {448offset449} else if reached_eof {450usize::MAX451} else {452try_read_size453};454455// Note: if `count_rows_from_reader_par` and therefore also `read_next_slice` were to456// handle truncated compressed bytes gracefully, we could avoid the following EoF check457// and remove `try_read_size` from the loop.458let (slice, bytes_read) =459match reader.read_next_slice(&Buffer::new(), read_size, decompressed_size_hint)460{461Ok(v) => v,462// We assume that unexpected EOF indicates that we lack sufficient data.463Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {464fetch_size *= 2;465continue;466},467Err(e) => Err(e)?,468};469470let row_count = polars_io::csv::read::count_rows_from_slice_par(471slice.clone(),472csv_options.parse_options.quote_char,473csv_options.parse_options.comment_prefix.as_ref(),474csv_options.parse_options.eol_char,475csv_options.has_header,476csv_options.skip_lines,477csv_options.skip_rows,478csv_options.skip_rows_after_header,479csv_options.raise_if_empty,480)?;481482if row_count < infer_schema_length && !reached_eof {483if compression.is_some() && bytes_read == read_size {484// Decompressor had more to give — read_size too small485try_read_size *= 2;486} else {487// Decompressor exhausted input — need more compressed bytes488// Or, no compression489fetch_size *= 2;490}491continue;492}493494break (Buffer::from_owner(truncated_bytes), Some(bytes_read));495};496(mem_slice_raw, file_size, decompressed_slice_size_hint)497} else {498let mem_slice_raw =499source.to_buffer_possibly_async(run_async, cache_entries.as_ref(), i)?;500let file_size = mem_slice_raw.len();501let compression = SupportedCompression::check(&mem_slice_raw);502let decompressed_slice_size_hint = Some(match compression {503None => file_size,504Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,505});506(mem_slice_raw, file_size, decompressed_slice_size_hint)507};508509let mut reader = ByteSourceReader::from_memory(mem_slice_raw)?;510let compression = reader.compression();511512let mut first_row_len = 0;513let (schema, _) = read_until_start_and_infer_schema(514csv_options,515None,516decompressed_slice_size_hint,517Some(Box::new(|line| {518first_row_len = line.len() + 1;519})),520&mut reader,521)?;522523let decompressed_file_size_hint = match compression {524None => file_size,525Some(_) => file_size * ASSUMED_COMPRESSION_RATIO,526};527528// TODO. We can do (much) better by collect statistics as part of row count and/or schema529// inference, including observed average row_length and compression ratio.530let estimated_rows =531(decompressed_file_size_hint as f64 / first_row_len as f64).round() as usize;532533Ok((schema, estimated_rows))534};535536let merge_func =537|a: PolarsResult<(Schema, usize)>, b: PolarsResult<(Schema, usize)>| match (a, b) {538(Err(e), _) | (_, Err(e)) => Err(e),539(Ok((mut schema_a, row_estimate_a)), Ok((schema_b, row_estimate_b))) => {540match (schema_a.is_empty(), schema_b.is_empty()) {541(true, _) => Ok((schema_b, row_estimate_b)),542(_, true) => Ok((schema_a, row_estimate_a)),543_ => {544schema_a.to_supertype(&schema_b)?;545Ok((schema_a, row_estimate_a.saturating_add(row_estimate_b)))546},547}548},549};550551assert!(552csv_options.schema.is_none(),553"DSL to IR schema inference should not run if user provides a schema."554);555// Run inference in parallel with a specific merge order.556// TODO: flatten to single level once Schema::to_supertype is commutative.557let si_results = POOL.join(558|| infer_schema_func(0),559|| {560(1..sources.len())561.into_par_iter()562.map(infer_schema_func)563.reduce(|| Ok(Default::default()), merge_func)564},565);566567let (inferred_schema, estimated_n_rows) = merge_func(si_results.0, si_results.1)?;568let inferred_schema_ref = Arc::new(inferred_schema);569570let (schema, reader_schema) = if let Some(rc) = row_index {571let mut output_schema = (*inferred_schema_ref).clone();572insert_row_index_to_schema(&mut output_schema, rc.name.clone())?;573574(Arc::new(output_schema), inferred_schema_ref)575} else {576(inferred_schema_ref.clone(), inferred_schema_ref)577};578579Ok(FileInfo::new(580schema,581Some(Either::Right(reader_schema)),582(None, estimated_n_rows),583))584}585586#[cfg(feature = "json")]587pub async fn ndjson_file_info(588sources: &ScanSources,589first_scan_source: ScanSourceRef<'_>,590row_index: Option<&RowIndex>,591ndjson_options: &NDJsonReadOptions,592cloud_options: Option<&polars_io::cloud::CloudOptions>,593) -> PolarsResult<FileInfo> {594use polars_core::error::feature_gated;595596let run_async =597sources.is_cloud_url() || (sources.is_paths() && polars_config::config().force_async());598599let cache_entries = {600if run_async {601let sources = sources.clone();602assert!(sources.as_paths().is_some());603604feature_gated!("cloud", {605Some(606polars_io::file_cache::init_entries_from_uri_list(607(0..sources.len())608.map(move |i| sources.as_paths().unwrap().get(i).unwrap().clone()),609cloud_options,610)611.await?,612)613})614} else {615None616}617};618619let infer_schema_length = ndjson_options.infer_schema_length;620621let mut schema = if let Some(schema) = ndjson_options.schema.clone() {622schema623} else if run_async && let Some(infer_schema_length) = infer_schema_length {624// Only download what we need for schema inference.625// To do so, we use an iterative two-way progressive trial-and-error download strategy626// until we either have enough rows, or reached EOF. In every iteration, we either627// increase fetch_size (download progressively more), or try_read_size (try and628// decompress more of what we have, in the case of compressed).629use polars_io::utils::compression::{ByteSourceReader, SupportedCompression};630use polars_io::utils::stream_buf_reader::ReaderSource;631632const INITIAL_FETCH: usize = 64 * 1024;633const ASSUMED_COMPRESSION_RATIO: usize = 4;634635let first_scan_source = first_scan_source.into_owned()?.clone();636let cloud_options = cloud_options.cloned();637// TODO. Support IOMetrics collection during planning phase.638let byte_source = pl_async::get_runtime()639.spawn(async move {640first_scan_source641.as_scan_source_ref()642.to_dyn_byte_source(643&DynByteSourceBuilder::ObjectStore,644cloud_options.as_ref(),645None,646)647.await648})649.await650.unwrap()?;651let byte_source = Arc::new(byte_source);652653let file_size = {654let byte_source = byte_source.clone();655pl_async::get_runtime()656.spawn(async move { byte_source.get_size().await })657.await658.unwrap()?659};660661let mut offset = 0;662let mut fetch_size = INITIAL_FETCH;663let mut try_read_size = INITIAL_FETCH * ASSUMED_COMPRESSION_RATIO;664let mut truncated_bytes: Vec<u8> = Vec::with_capacity(INITIAL_FETCH);665let mut reached_eof = false;666667// Collect enough rows to satisfy infer_schema_length668let memslice = loop {669let range = offset..std::cmp::min(file_size, offset + fetch_size);670671if range.is_empty() {672reached_eof = true673} else {674let byte_source = byte_source.clone();675let fetch_bytes = pl_async::get_runtime()676.spawn(async move { byte_source.get_range(range).await })677.await678.unwrap()?;679offset += fetch_bytes.len();680truncated_bytes.extend_from_slice(fetch_bytes.as_ref());681}682683let compression = SupportedCompression::check(&truncated_bytes);684let mut reader = ByteSourceReader::<ReaderSource>::from_memory(Buffer::from_owner(685truncated_bytes.clone(),686))?;687let read_size = if compression.is_none() {688offset689} else if reached_eof {690usize::MAX691} else {692try_read_size693};694695let uncompressed_size_hint = Some(696offset697* if compression.is_none() {6981699} else {700ASSUMED_COMPRESSION_RATIO701},702);703704let (slice, bytes_read) =705match reader.read_next_slice(&Buffer::new(), read_size, uncompressed_size_hint) {706Ok(v) => v,707// We assume that unexpected EOF indicates that we lack sufficient data.708Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {709fetch_size *= 2;710continue;711},712Err(e) => Err(e)?,713};714715if polars_io::ndjson::count_rows(&slice) < infer_schema_length.into() && !reached_eof {716if compression.is_some() && bytes_read == read_size {717// Decompressor had more to give — read_size too small718try_read_size *= 2;719} else {720// Decompressor exhausted input — need more compressed bytes721// Or, no compression722fetch_size *= 2;723}724continue;725}726727break slice;728};729730let mut buf_reader = BufReader::new(Cursor::new(memslice));731Arc::new(polars_io::ndjson::infer_schema(732&mut buf_reader,733ndjson_options.infer_schema_length,734)?)735} else {736// Download the entire object.737// Warning - this is potentially memory-expensive in the case of a cloud source, and goes738// against the design goal of a streaming reader. This can be optimized.739let mem_slice =740first_scan_source.to_buffer_possibly_async(run_async, cache_entries.as_ref(), 0)?;741let mut reader = BufReader::new(CompressedReader::try_new(mem_slice)?);742743Arc::new(polars_io::ndjson::infer_schema(744&mut reader,745ndjson_options.infer_schema_length,746)?)747};748749if let Some(overwriting_schema) = &ndjson_options.schema_overwrite {750overwrite_schema(Arc::make_mut(&mut schema), overwriting_schema)?;751}752753let mut reader_schema = schema.clone();754755if row_index.is_some() {756(schema, reader_schema) = prepare_schemas(Arc::unwrap_or_clone(schema), row_index)?757}758759Ok(FileInfo::new(760schema,761Some(Either::Right(reader_schema)),762(None, usize::MAX),763))764}765766// Add flags that influence metadata/schema here767#[derive(Eq, Hash, PartialEq)]768enum CachedSourceKey {769ParquetIpc {770first_path: PlRefPath,771schema_overwrite: Option<SchemaRef>,772},773CsvJson {774paths: Buffer<PlRefPath>,775schema: Option<SchemaRef>,776schema_overwrite: Option<SchemaRef>,777},778}779780#[derive(Default, Clone)]781pub(super) struct SourcesToFileInfo {782inner: Arc<RwLock<PlHashMap<CachedSourceKey, (FileInfo, FileScanIR)>>>,783}784785impl SourcesToFileInfo {786async fn infer_or_parse(787&self,788scan_type: FileScanDsl,789sources: &ScanSources,790sources_before_expansion: &ScanSources,791unified_scan_args: &mut UnifiedScanArgs,792) -> PolarsResult<(FileInfo, FileScanIR)> {793let require_first_source = |failed_operation_name: &'static str, hint: &'static str| {794sources.first_or_empty_expand_err(795failed_operation_name,796sources_before_expansion,797unified_scan_args.glob,798hint,799)800};801802let n_sources = sources.len();803let cloud_options = unified_scan_args.cloud_options.as_ref();804805Ok(match scan_type {806#[cfg(feature = "parquet")]807FileScanDsl::Parquet { options } => {808if let Some(schema) = &options.schema {809// We were passed a schema, we don't have to call `parquet_file_info`,810// but this does mean we don't have `row_estimation` and `first_metadata`.811812(813FileInfo {814schema: schema.clone(),815reader_schema: Some(either::Either::Left(Arc::new(816schema.to_arrow(CompatLevel::newest()),817))),818row_estimation: (None, usize::MAX),819},820FileScanIR::Parquet {821options,822metadata: None,823},824)825} else {826{827let first_scan_source = require_first_source(828"failed to retrieve first file schema (parquet)",829"\830passing a schema can allow \831this scan to succeed with an empty DataFrame.",832)?;833834if verbose() {835eprintln!(836"sourcing parquet scan file schema from: '{}'",837first_scan_source.to_include_path_name()838)839}840841let (mut file_info, mut metadata) = scans::parquet_file_info(842first_scan_source,843unified_scan_args.row_index.as_ref(),844cloud_options,845n_sources,846)847.await?;848849if let Some((total, deleted)) = unified_scan_args.row_count {850let size = (total - deleted) as usize;851file_info.row_estimation = (Some(size), size);852}853854if self.inner.read().unwrap().len() > max_metadata_scan_cached() {855_ = metadata.take();856}857858PolarsResult::Ok((file_info, FileScanIR::Parquet { options, metadata }))859}860.map_err(|e| e.context(failed_here!(parquet scan)))?861}862},863#[cfg(feature = "ipc")]864FileScanDsl::Ipc { options } => {865let first_scan_source =866require_first_source("failed to retrieve first file schema (ipc)", "")?;867868if verbose() {869eprintln!(870"sourcing ipc scan file schema from: '{}'",871first_scan_source.to_include_path_name()872)873}874875let (file_info, md) = scans::ipc_file_info(876first_scan_source,877unified_scan_args.row_index.as_ref(),878cloud_options,879)880.await?;881882PolarsResult::Ok((883file_info,884FileScanIR::Ipc {885options,886metadata: Some(Arc::new(md)),887},888))889}890.map_err(|e| e.context(failed_here!(ipc scan)))?,891#[cfg(feature = "csv")]892FileScanDsl::Csv { mut options } => {893{894// TODO: This is a hack. We conditionally set `allow_missing_columns` to895// mimic existing behavior, but this should be taken from a user provided896// parameter instead.897if options.schema.is_some() && options.has_header {898unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;899}900901let file_info = if let Some(schema) = options.schema.clone() {902FileInfo {903schema: schema.clone(),904reader_schema: Some(either::Either::Right(schema)),905row_estimation: (None, usize::MAX),906}907} else {908let first_scan_source =909require_first_source("failed to retrieve file schemas (csv)", "")?;910911if verbose() {912eprintln!(913"sourcing csv scan file schema from: '{}'",914first_scan_source.to_include_path_name()915)916}917918scans::csv_file_info(919sources,920first_scan_source,921unified_scan_args.row_index.as_ref(),922Arc::make_mut(&mut options),923cloud_options,924)925.await?926};927928PolarsResult::Ok((file_info, FileScanIR::Csv { options }))929}930.map_err(|e| e.context(failed_here!(csv scan)))?931},932#[cfg(feature = "json")]933FileScanDsl::NDJson { options } => {934let file_info = if let Some(schema) = options.schema.clone() {935FileInfo {936schema: schema.clone(),937reader_schema: Some(either::Either::Right(schema)),938row_estimation: (None, usize::MAX),939}940} else {941let first_scan_source =942require_first_source("failed to retrieve first file schema (ndjson)", "")?;943944if verbose() {945eprintln!(946"sourcing ndjson scan file schema from: '{}'",947first_scan_source.to_include_path_name()948)949}950951scans::ndjson_file_info(952sources,953first_scan_source,954unified_scan_args.row_index.as_ref(),955&options,956cloud_options,957)958.await?959};960961PolarsResult::Ok((file_info, FileScanIR::NDJson { options }))962}963.map_err(|e| e.context(failed_here!(ndjson scan)))?,964#[cfg(feature = "python")]965FileScanDsl::PythonDataset { dataset_object } => (|| {966if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {967polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")968}969970let mut schema = dataset_object.schema()?;971let reader_schema = schema.clone();972973if let Some(row_index) = &unified_scan_args.row_index {974insert_row_index_to_schema(Arc::make_mut(&mut schema), row_index.name.clone())?;975}976977PolarsResult::Ok((978FileInfo {979schema,980reader_schema: Some(either::Either::Right(reader_schema)),981row_estimation: (None, usize::MAX),982},983FileScanIR::PythonDataset {984dataset_object,985cached_ir: Default::default(),986},987))988})()989.map_err(|e| e.context(failed_here!(python dataset scan)))?,990#[cfg(feature = "scan_lines")]991FileScanDsl::Lines { name } => {992// We were passed a schema, we don't have to call `parquet_file_info`,993// but this does mean we don't have `row_estimation` and `first_metadata`.994let schema = Arc::new(Schema::from_iter([(name.clone(), DataType::String)]));995996(997FileInfo {998schema: schema.clone(),999reader_schema: Some(either::Either::Right(schema.clone())),1000row_estimation: (None, usize::MAX),1001},1002FileScanIR::Lines { name },1003)1004},1005FileScanDsl::Anonymous {1006file_info,1007options,1008function,1009} => (file_info, FileScanIR::Anonymous { options, function }),1010})1011}10121013pub(super) async fn get_or_insert(1014&self,1015scan_type: &FileScanDsl,1016sources: &ScanSources,1017sources_before_expansion: &ScanSources,1018unified_scan_args: &mut UnifiedScanArgs,1019verbose: bool,1020) -> PolarsResult<(FileInfo, FileScanIR)> {1021// Only cache non-empty paths. Others are directly parsed.1022let paths = match sources {1023ScanSources::Paths(paths) if !paths.is_empty() => paths.clone(),10241025_ => {1026return self1027.infer_or_parse(1028scan_type.clone(),1029sources,1030sources_before_expansion,1031unified_scan_args,1032)1033.await;1034},1035};10361037let (k, v): (CachedSourceKey, Option<(FileInfo, FileScanIR)>) = match scan_type {1038#[cfg(feature = "parquet")]1039FileScanDsl::Parquet { options } => {1040let key = CachedSourceKey::ParquetIpc {1041first_path: paths[0].clone(),1042schema_overwrite: options.schema.clone(),1043};10441045let guard = self.inner.read().unwrap();1046let v = guard.get(&key);1047(key, v.cloned())1048},1049#[cfg(feature = "ipc")]1050FileScanDsl::Ipc { options: _ } => {1051let key = CachedSourceKey::ParquetIpc {1052first_path: paths[0].clone(),1053schema_overwrite: None,1054};10551056let guard = self.inner.read().unwrap();1057let v = guard.get(&key);1058(key, v.cloned())1059},1060#[cfg(feature = "csv")]1061FileScanDsl::Csv { options } => {1062let key = CachedSourceKey::CsvJson {1063paths: paths.clone(),1064schema: options.schema.clone(),1065schema_overwrite: options.schema_overwrite.clone(),1066};1067let guard = self.inner.read().unwrap();1068let v = guard.get(&key);1069(key, v.cloned())1070},1071#[cfg(feature = "json")]1072FileScanDsl::NDJson { options } => {1073let key = CachedSourceKey::CsvJson {1074paths: paths.clone(),1075schema: options.schema.clone(),1076schema_overwrite: options.schema_overwrite.clone(),1077};1078let guard = self.inner.read().unwrap();1079let v = guard.get(&key);1080(key, v.cloned())1081},1082_ => {1083return self1084.infer_or_parse(1085scan_type.clone(),1086sources,1087sources_before_expansion,1088unified_scan_args,1089)1090.await;1091},1092};10931094if let Some(out) = v {1095if verbose {1096eprintln!("FILE_INFO CACHE HIT")1097}1098Ok(out)1099} else {1100let v = self1101.infer_or_parse(1102scan_type.clone(),1103sources,1104sources_before_expansion,1105unified_scan_args,1106)1107.await?;1108self.inner.write().unwrap().insert(k, v.clone());1109Ok(v)1110}1111}1112}111311141115