Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
8512 views
use std::sync::Arc;12use arrow::datatypes::ArrowSchemaRef;3use async_trait::async_trait;4use polars_core::prelude::ArrowSchema;5use polars_core::schema::{Schema, SchemaExt, SchemaRef};6use polars_error::{PolarsResult, polars_err};7use polars_io::cloud::CloudOptions;8use polars_io::predicates::ScanIOPredicate;9use polars_io::prelude::{FileMetadata, ParquetOptions};10use polars_io::utils::byte_source::{BufferByteSource, DynByteSource, DynByteSourceBuilder};11use polars_io::{RowIndex, pl_async};12use polars_parquet::read::schema::infer_schema_with_options;13use polars_plan::dsl::ScanSource;14use polars_utils::IdxSize;15use polars_utils::mem::prefetch::get_memory_prefetch_func;16use polars_utils::slice_enum::Slice;1718use super::multi_scan::reader_interface::output::{FileReaderOutputRecv, FileReaderOutputSend};19use super::multi_scan::reader_interface::{20BeginReadArgs, FileReader, FileReaderCallbacks, calc_row_position_after_slice,21};22use crate::async_executor::{self};23use crate::async_primitives::wait_group::{WaitGroup, WaitToken};24use crate::metrics::OptIOMetrics;25use crate::morsel::SourceToken;26use crate::nodes::compute_node_prelude::*;27use crate::nodes::io_sources::parquet::projection::{28ArrowFieldProjection, resolve_arrow_field_projections,29};30use crate::nodes::{TaskPriority, io_sources};31use crate::utils::tokio_handle_ext;3233pub mod builder;34pub mod init;35mod metadata_utils;36mod projection;37mod row_group_data_fetch;38mod row_group_decode;39mod statistics;4041pub struct ParquetFileReader {42scan_source: ScanSource,43cloud_options: Option<Arc<CloudOptions>>,44config: Arc<ParquetOptions>,45/// Set by the builder if we have metadata left over from DSL conversion.46metadata: Option<Arc<FileMetadata>>,47byte_source_builder: DynByteSourceBuilder,48row_group_prefetch_sync: RowGroupPrefetchSync,49io_metrics: OptIOMetrics,50verbose: bool,5152/// Set during initialize()53init_data: Option<InitializedState>,54}5556struct RowGroupPrefetchSync {57prefetch_limit: usize,58prefetch_semaphore: Arc<tokio::sync::Semaphore>,59shared_prefetch_wait_group_slot: Arc<std::sync::Mutex<Option<WaitGroup>>>,6061/// Waits for the previous reader to finish spawning prefetches.62prev_all_spawned: Option<WaitGroup>,63/// Dropped once the current reader has finished spawning prefetches.64current_all_spawned: Option<WaitToken>,65}6667#[derive(Clone)]68struct InitializedState {69file_metadata: Arc<FileMetadata>,70file_schema: Arc<ArrowSchema>,71file_schema_pl: Option<SchemaRef>,72byte_source: Arc<DynByteSource>,73}7475#[async_trait]76impl FileReader for ParquetFileReader {77async fn initialize(&mut self) -> PolarsResult<()> {78let verbose = self.verbose;7980if self.init_data.is_some() {81return Ok(());82}8384let scan_source = self.scan_source.clone();85let byte_source_builder = self.byte_source_builder.clone();86let cloud_options = self.cloud_options.clone();87let io_metrics = self.io_metrics.clone();8889let byte_source = pl_async::get_runtime()90.spawn(async move {91scan_source92.as_scan_source_ref()93.to_dyn_byte_source(94&byte_source_builder,95cloud_options.as_deref(),96io_metrics.0,97)98.await99})100.await101.unwrap()?;102103let mut byte_source = Arc::new(byte_source);104105let file_metadata = if let Some(v) = self.metadata.clone() {106v107} else {108let (metadata_bytes, opt_full_bytes) = {109let byte_source = byte_source.clone();110111pl_async::get_runtime()112.spawn(async move {113metadata_utils::read_parquet_metadata_bytes(&byte_source, verbose).await114})115.await116.unwrap()?117};118119if let Some(full_bytes) = opt_full_bytes {120byte_source = Arc::new(DynByteSource::Buffer(BufferByteSource(full_bytes)));121}122123Arc::new(polars_parquet::parquet::read::deserialize_metadata(124metadata_bytes.as_ref(),125metadata_bytes.len() * 2 + 1024,126)?)127};128129let file_schema = Arc::new(infer_schema_with_options(&file_metadata, &None)?);130131self.init_data = Some(InitializedState {132file_metadata,133file_schema,134file_schema_pl: None,135byte_source,136});137138Ok(())139}140141fn prepare_read(&mut self) -> PolarsResult<()> {142let wait_group_this_reader = WaitGroup::default();143let prefetch_all_spawned_token = wait_group_this_reader.token();144145let prev_wait_group: Option<WaitGroup> = self146.row_group_prefetch_sync147.shared_prefetch_wait_group_slot148.try_lock()149.unwrap()150.replace(wait_group_this_reader);151152self.row_group_prefetch_sync.prev_all_spawned = prev_wait_group;153self.row_group_prefetch_sync.current_all_spawned = Some(prefetch_all_spawned_token);154155Ok(())156}157158fn begin_read(159&mut self,160args: BeginReadArgs,161) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {162let verbose = self.verbose;163164let InitializedState {165file_metadata,166file_schema: file_arrow_schema,167file_schema_pl: _,168byte_source,169} = self.init_data.clone().unwrap();170171let n_rows_in_file = self._n_rows_in_file()?;172173let single_morsel_height: Option<usize> = if let BeginReadArgs {174projection,175row_index: None,176pre_slice,177predicate: None,178cast_columns_policy: _,179num_pipelines: _,180disable_morsel_split: true,181callbacks:182FileReaderCallbacks {183file_schema_tx: _,184n_rows_in_file_tx: _,185row_position_on_end_tx: _,186},187} = &args188&& projection.is_empty()189{190let mut h: usize = n_rows_in_file as _;191192if let Some(pre_slice) = pre_slice.clone() {193h = usize::min(h, pre_slice.restrict_to_bounds(h).len());194}195196Some(h)197} else {198None199};200201let BeginReadArgs {202projection,203row_index,204pre_slice: pre_slice_arg,205predicate,206cast_columns_policy,207num_pipelines,208disable_morsel_split,209callbacks:210FileReaderCallbacks {211file_schema_tx,212n_rows_in_file_tx,213row_position_on_end_tx,214},215} = args;216217let file_schema = self._file_schema().clone();218219let normalized_pre_slice = pre_slice_arg220.clone()221.map(|x| x.restrict_to_bounds(usize::try_from(n_rows_in_file).unwrap()));222223// Send all callbacks to unblock the next reader. We can do this immediately as we know224// the total row count upfront.225226if let Some(n_rows_in_file_tx) = n_rows_in_file_tx {227_ = n_rows_in_file_tx.send(n_rows_in_file);228}229230// We are allowed to send this value immediately, even though we haven't "ended" yet231// (see its definition under FileReaderCallbacks).232if let Some(row_position_on_end_tx) = row_position_on_end_tx {233_ = row_position_on_end_tx234.send(self._row_position_after_slice(normalized_pre_slice.clone())?);235}236237if let Some(file_schema_tx) = file_schema_tx {238_ = file_schema_tx.send(file_schema.clone());239}240241if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {242let (_, rx) = FileReaderOutputSend::new_serial();243244if verbose {245eprintln!(246"[ParquetFileReader]: early return: \247n_rows_in_file: {n_rows_in_file}, \248pre_slice: {pre_slice_arg:?}, \249resolved_pre_slice: {normalized_pre_slice:?}"250)251}252253return Ok((254rx,255async_executor::spawn(TaskPriority::Low, std::future::ready(Ok(()))),256));257}258259let mut _projected_arrow_fields: Option<Arc<[ArrowFieldProjection]>> = None;260let mut projected_arrow_fields = || {261if _projected_arrow_fields.is_none() {262_projected_arrow_fields = Some(resolve_arrow_field_projections(263&file_arrow_schema,264&file_schema,265projection.clone(),266cast_columns_policy.clone(),267)?);268}269PolarsResult::Ok(_projected_arrow_fields.as_ref().unwrap().clone())270};271272if verbose {273eprintln!(274"[ParquetFileReader]: \275project: {} / {}, \276pre_slice: {:?}, \277resolved_pre_slice: {:?}, \278row_index: {:?}, \279predicate: {:?}",280projected_arrow_fields()?.len(),281file_schema.len(),282pre_slice_arg,283normalized_pre_slice,284&row_index,285predicate.as_ref().map(|_| "<predicate>"),286)287}288289if let Some(single_morsel_height) = single_morsel_height {290let (mut tx, rx) = FileReaderOutputSend::new_serial();291292let handle = async_executor::spawn(TaskPriority::Low, async move {293let _ = tx294.send_morsel(Morsel::new(295DataFrame::empty_with_height(single_morsel_height),296MorselSeq::default(),297SourceToken::default(),298))299.await;300Ok(())301});302303return Ok((rx, handle));304}305306// Prepare parameters for dispatch307let projected_arrow_fields = projected_arrow_fields()?.clone();308let memory_prefetch_func = get_memory_prefetch_func(verbose);309let row_group_prefetch_size = self310.row_group_prefetch_sync311.prefetch_limit312.min(file_metadata.row_groups.len())313.max(1);314315// This can be set to 1 to force column-per-thread parallelism, e.g. for bug reproduction.316let target_values_per_thread =317std::env::var("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD")318.map(|x| x.parse::<usize>().expect("integer").max(1))319.unwrap_or(16_777_216);320321let is_full_projection = projected_arrow_fields.len() == file_schema.len();322323let (output_recv, handle) = ParquetReadImpl {324projected_arrow_fields,325is_full_projection,326predicate,327// TODO: Refactor to avoid full clone328options: Arc::unwrap_or_clone(self.config.clone()),329byte_source,330normalized_pre_slice: normalized_pre_slice.map(|x| match x {331Slice::Positive { offset, len } => (offset, len),332Slice::Negative { .. } => unreachable!(),333}),334metadata: file_metadata,335config: io_sources::parquet::Config {336num_pipelines,337row_group_prefetch_size,338target_values_per_thread,339},340verbose,341memory_prefetch_func,342row_index,343rg_prefetch_semaphore: Arc::clone(&self.row_group_prefetch_sync.prefetch_semaphore),344rg_prefetch_prev_all_spawned: Option::take(345&mut self.row_group_prefetch_sync.prev_all_spawned,346),347rg_prefetch_current_all_spawned: Option::take(348&mut self.row_group_prefetch_sync.current_all_spawned,349),350disable_morsel_split,351}352.run();353354Ok((355output_recv,356async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),357))358}359360async fn file_schema(&mut self) -> PolarsResult<SchemaRef> {361Ok(self._file_schema().clone())362}363364async fn file_arrow_schema(&mut self) -> PolarsResult<Option<ArrowSchemaRef>> {365Ok(Some(self._file_arrow_schema().clone()))366}367368async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {369self._n_rows_in_file()370}371372async fn fast_n_rows_in_file(&mut self) -> PolarsResult<Option<IdxSize>> {373self._n_rows_in_file().map(Some)374}375376async fn row_position_after_slice(377&mut self,378pre_slice: Option<Slice>,379) -> PolarsResult<IdxSize> {380self._row_position_after_slice(pre_slice)381}382}383384impl ParquetFileReader {385fn _file_schema(&mut self) -> &SchemaRef {386let InitializedState {387file_schema,388file_schema_pl,389..390} = self.init_data.as_mut().unwrap();391392if file_schema_pl.is_none() {393*file_schema_pl = Some(Arc::new(Schema::from_arrow_schema(file_schema.as_ref())))394}395396file_schema_pl.as_ref().unwrap()397}398399fn _file_arrow_schema(&mut self) -> &ArrowSchemaRef {400let InitializedState { file_schema, .. } = self.init_data.as_mut().unwrap();401file_schema402}403404fn _n_rows_in_file(&self) -> PolarsResult<IdxSize> {405let n = self.init_data.as_ref().unwrap().file_metadata.num_rows;406IdxSize::try_from(n).map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = n))407}408409fn _row_position_after_slice(&self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {410Ok(calc_row_position_after_slice(411self._n_rows_in_file()?,412pre_slice,413))414}415}416417type AsyncTaskData = (418FileReaderOutputRecv,419tokio_handle_ext::AbortOnDropHandle<PolarsResult<()>>,420);421422struct ParquetReadImpl {423projected_arrow_fields: Arc<[ArrowFieldProjection]>,424is_full_projection: bool,425predicate: Option<ScanIOPredicate>,426options: ParquetOptions,427byte_source: Arc<DynByteSource>,428normalized_pre_slice: Option<(usize, usize)>,429metadata: Arc<FileMetadata>,430// Run-time vars431config: Config,432verbose: bool,433memory_prefetch_func: fn(&[u8]) -> (),434row_index: Option<RowIndex>,435436rg_prefetch_semaphore: Arc<tokio::sync::Semaphore>,437rg_prefetch_prev_all_spawned: Option<WaitGroup>,438rg_prefetch_current_all_spawned: Option<WaitToken>,439disable_morsel_split: bool,440}441442#[derive(Debug)]443struct Config {444num_pipelines: usize,445/// Number of row groups to pre-fetch concurrently, this can be across files446row_group_prefetch_size: usize,447/// Minimum number of values for a parallel spawned task to process to amortize448/// parallelism overhead.449target_values_per_thread: usize,450}451452impl ParquetReadImpl {453fn run(mut self) -> AsyncTaskData {454if self.verbose {455eprintln!("[ParquetFileReader]: {:?}", &self.config);456}457458self.init_morsel_distributor()459}460}461462463