Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/builder.rs
8509 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use polars_core::config;4use polars_io::cloud::CloudOptions;5use polars_io::prelude::{FileMetadata, ParallelStrategy, ParquetOptions};6use polars_io::utils::byte_source::DynByteSourceBuilder;7use polars_plan::dsl::ScanSource;8use polars_utils::relaxed_cell::RelaxedCell;910use super::{FileReader, ParquetFileReader};11use crate::async_primitives::wait_group::WaitGroup;12use crate::metrics::{IOMetrics, OptIOMetrics};13use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;14use crate::nodes::io_sources::multi_scan::reader_interface::capabilities::ReaderCapabilities;1516#[derive(Clone)]17pub struct ParquetReaderBuilder {18pub first_metadata: Option<Arc<FileMetadata>>,19pub options: Arc<ParquetOptions>,20pub prefetch_limit: RelaxedCell<usize>,21pub prefetch_semaphore: std::sync::OnceLock<Arc<tokio::sync::Semaphore>>,22pub shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,23pub io_metrics: std::sync::OnceLock<Arc<IOMetrics>>,24}2526impl std::fmt::Debug for ParquetReaderBuilder {27fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {28f.debug_struct("ParquetReaderBuilder")29.field("first_metadata", &self.first_metadata)30.field("options", &self.options)31.field("prefetch_semaphore", &self.prefetch_semaphore)32.finish()33}34}3536impl FileReaderBuilder for ParquetReaderBuilder {37fn reader_name(&self) -> &str {38"parquet"39}4041fn reader_capabilities(&self) -> ReaderCapabilities {42use ReaderCapabilities as RC;4344let mut capabilities = RC::ROW_INDEX45| RC::PRE_SLICE46| RC::NEGATIVE_PRE_SLICE47| RC::PARTIAL_FILTER48| RC::MAPPED_COLUMN_PROJECTION;4950if matches!(51self.options.parallel,52ParallelStrategy::Auto | ParallelStrategy::Prefiltered53) {54capabilities |= RC::FULL_FILTER;55}56capabilities57}5859fn set_execution_state(&self, execution_state: &crate::execute::StreamingExecutionState) {60let prefetch_limit = std::env::var("POLARS_ROW_GROUP_PREFETCH_SIZE")61.map(|x| {62x.parse::<NonZeroUsize>()63.unwrap_or_else(|_| {64panic!("invalid value for POLARS_ROW_GROUP_PREFETCH_SIZE: {x}")65})66.get()67})68.unwrap_or(69execution_state70.num_pipelines71.saturating_mul(2)72.clamp(8, 512),73)74.max(1);7576self.prefetch_limit.store(prefetch_limit);7778if config::verbose() {79eprintln!(80"[ParquetReaderBuilder]: prefetch_limit: {}",81self.prefetch_limit.load()82);83}8485self.prefetch_semaphore86.set(Arc::new(tokio::sync::Semaphore::new(prefetch_limit)))87.unwrap()88}8990fn set_io_metrics(&self, io_metrics: Arc<IOMetrics>) {91let _ = self.io_metrics.set(io_metrics);92}9394fn build_file_reader(95&self,96source: ScanSource,97cloud_options: Option<Arc<CloudOptions>>,98scan_source_idx: usize,99) -> Box<dyn FileReader> {100use crate::nodes::io_sources::parquet::RowGroupPrefetchSync;101102let scan_source = source;103let config = self.options.clone();104let verbose = config::verbose();105106let byte_source_builder =107if scan_source.is_cloud_url() || polars_config::config().force_async() {108DynByteSourceBuilder::ObjectStore109} else {110DynByteSourceBuilder::Mmap111};112113assert!(self.prefetch_limit.load() > 0);114115let reader = ParquetFileReader {116scan_source,117cloud_options,118config,119metadata: if scan_source_idx == 0 {120self.first_metadata.clone()121} else {122None123},124byte_source_builder,125row_group_prefetch_sync: RowGroupPrefetchSync {126prefetch_limit: self.prefetch_limit.load(),127prefetch_semaphore: Arc::clone(self.prefetch_semaphore.get().unwrap()),128shared_prefetch_wait_group_slot: Arc::clone(&self.shared_prefetch_wait_group_slot),129prev_all_spawned: None,130current_all_spawned: None,131},132io_metrics: OptIOMetrics(self.io_metrics.get().cloned()),133verbose,134135init_data: None,136};137138Box::new(reader) as Box<dyn FileReader>139}140}141142143