Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs
6939 views
use std::sync::Arc;12use arrow::datatypes::ArrowSchemaRef;3use async_trait::async_trait;4use polars_core::prelude::ArrowSchema;5use polars_core::schema::{Schema, SchemaExt, SchemaRef};6use polars_error::{PolarsResult, polars_err};7use polars_io::cloud::CloudOptions;8use polars_io::predicates::ScanIOPredicate;9use polars_io::prelude::{FileMetadata, ParquetOptions};10use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder, MemSliceByteSource};11use polars_io::{RowIndex, pl_async};12use polars_parquet::read::schema::infer_schema_with_options;13use polars_plan::dsl::ScanSource;14use polars_utils::IdxSize;15use polars_utils::mem::prefetch::get_memory_prefetch_func;16use polars_utils::slice_enum::Slice;1718use super::multi_scan::reader_interface::output::{FileReaderOutputRecv, FileReaderOutputSend};19use super::multi_scan::reader_interface::{20BeginReadArgs, FileReader, FileReaderCallbacks, calc_row_position_after_slice,21};22use crate::async_executor::{self};23use crate::nodes::compute_node_prelude::*;24use crate::nodes::io_sources::parquet::projection::{25ArrowFieldProjection, resolve_arrow_field_projections,26};27use crate::nodes::{TaskPriority, io_sources};28use crate::utils::task_handles_ext;2930pub mod builder;31mod init;32mod metadata_utils;33mod projection;34mod row_group_data_fetch;35mod row_group_decode;36mod statistics;3738pub struct ParquetFileReader {39scan_source: ScanSource,40cloud_options: Option<Arc<CloudOptions>>,41config: Arc<ParquetOptions>,42/// Set by the builder if we have metadata left over from DSL conversion.43metadata: Option<Arc<FileMetadata>>,44byte_source_builder: DynByteSourceBuilder,45verbose: bool,4647/// Set during initialize()48init_data: Option<InitializedState>,49}5051#[derive(Clone)]52struct InitializedState {53file_metadata: Arc<FileMetadata>,54file_schema: Arc<ArrowSchema>,55file_schema_pl: Option<SchemaRef>,56byte_source: Arc<DynByteSource>,57}5859#[async_trait]60impl FileReader for ParquetFileReader {61async fn initialize(&mut self) -> PolarsResult<()> {62let verbose = self.verbose;6364if self.init_data.is_some() {65return Ok(());66}6768let scan_source = self.scan_source.clone();69let byte_source_builder = self.byte_source_builder.clone();70let cloud_options = self.cloud_options.clone();7172let byte_source = pl_async::get_runtime()73.spawn(async move {74scan_source75.as_scan_source_ref()76.to_dyn_byte_source(&byte_source_builder, cloud_options.as_deref())77.await78})79.await80.unwrap()?;8182let mut byte_source = Arc::new(byte_source);8384let file_metadata = if let Some(v) = self.metadata.clone() {85v86} else {87let (metadata_bytes, opt_full_bytes) = {88let byte_source = byte_source.clone();8990pl_async::get_runtime()91.spawn(async move {92metadata_utils::read_parquet_metadata_bytes(&byte_source, verbose).await93})94.await95.unwrap()?96};9798if let Some(full_bytes) = opt_full_bytes {99byte_source = Arc::new(DynByteSource::MemSlice(MemSliceByteSource(full_bytes)));100}101102Arc::new(polars_parquet::parquet::read::deserialize_metadata(103metadata_bytes.as_ref(),104metadata_bytes.len() * 2 + 1024,105)?)106};107108let file_schema = Arc::new(infer_schema_with_options(&file_metadata, &None)?);109110self.init_data = Some(InitializedState {111file_metadata,112file_schema,113file_schema_pl: None,114byte_source,115});116117Ok(())118}119120fn begin_read(121&mut self,122args: BeginReadArgs,123) -> PolarsResult<(FileReaderOutputRecv, JoinHandle<PolarsResult<()>>)> {124let verbose = self.verbose;125126let InitializedState {127file_metadata,128file_schema: file_arrow_schema,129file_schema_pl: _,130byte_source,131} = self.init_data.clone().unwrap();132133let BeginReadArgs {134projection,135row_index,136pre_slice: pre_slice_arg,137predicate,138cast_columns_policy,139num_pipelines,140callbacks:141FileReaderCallbacks {142file_schema_tx,143n_rows_in_file_tx,144row_position_on_end_tx,145},146} = args;147148let file_schema = self._file_schema().clone();149150let projected_arrow_fields = resolve_arrow_field_projections(151&file_arrow_schema,152&file_schema,153projection,154cast_columns_policy,155)?;156157let n_rows_in_file = self._n_rows_in_file()?;158159let normalized_pre_slice = pre_slice_arg160.clone()161.map(|x| x.restrict_to_bounds(usize::try_from(n_rows_in_file).unwrap()));162163// Send all callbacks to unblock the next reader. We can do this immediately as we know164// the total row count upfront.165166if let Some(mut n_rows_in_file_tx) = n_rows_in_file_tx {167_ = n_rows_in_file_tx.try_send(n_rows_in_file);168}169170// We are allowed to send this value immediately, even though we haven't "ended" yet171// (see its definition under FileReaderCallbacks).172if let Some(mut row_position_on_end_tx) = row_position_on_end_tx {173_ = row_position_on_end_tx174.try_send(self._row_position_after_slice(normalized_pre_slice.clone())?);175}176177if let Some(mut file_schema_tx) = file_schema_tx {178_ = file_schema_tx.try_send(file_schema.clone());179}180181if normalized_pre_slice.as_ref().is_some_and(|x| x.len() == 0) {182let (_, rx) = FileReaderOutputSend::new_serial();183184if verbose {185eprintln!(186"[ParquetFileReader]: early return: \187n_rows_in_file: {n_rows_in_file}, \188pre_slice: {pre_slice_arg:?}, \189resolved_pre_slice: {normalized_pre_slice:?} \190"191)192}193194return Ok((195rx,196async_executor::spawn(TaskPriority::Low, std::future::ready(Ok(()))),197));198}199200// Prepare parameters for dispatch201202let memory_prefetch_func = get_memory_prefetch_func(verbose);203let row_group_prefetch_size = polars_core::config::get_rg_prefetch_size();204205// This can be set to 1 to force column-per-thread parallelism, e.g. for bug reproduction.206let target_values_per_thread =207std::env::var("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD")208.map(|x| x.parse::<usize>().expect("integer").max(1))209.unwrap_or(16_777_216);210211let is_full_projection = projected_arrow_fields.len() == file_schema.len();212213if verbose {214eprintln!(215"[ParquetFileReader]: \216project: {} / {}, \217pre_slice: {:?}, \218resolved_pre_slice: {:?}, \219row_index: {:?}, \220predicate: {:?} \221",222projected_arrow_fields.len(),223file_schema.len(),224pre_slice_arg,225normalized_pre_slice,226&row_index,227predicate.as_ref().map(|_| "<predicate>"),228)229}230231let (output_recv, handle) = ParquetReadImpl {232projected_arrow_fields,233is_full_projection,234predicate,235// TODO: Refactor to avoid full clone236options: Arc::unwrap_or_clone(self.config.clone()),237byte_source,238normalized_pre_slice: normalized_pre_slice.map(|x| match x {239Slice::Positive { offset, len } => (offset, len),240Slice::Negative { .. } => unreachable!(),241}),242metadata: file_metadata,243config: io_sources::parquet::Config {244num_pipelines,245row_group_prefetch_size,246target_values_per_thread,247},248verbose,249memory_prefetch_func,250row_index,251}252.run();253254Ok((255output_recv,256async_executor::spawn(TaskPriority::Low, async move { handle.await.unwrap() }),257))258}259260async fn file_schema(&mut self) -> PolarsResult<SchemaRef> {261Ok(self._file_schema().clone())262}263264async fn file_arrow_schema(&mut self) -> PolarsResult<Option<ArrowSchemaRef>> {265Ok(Some(self._file_arrow_schema().clone()))266}267268async fn n_rows_in_file(&mut self) -> PolarsResult<IdxSize> {269self._n_rows_in_file()270}271272async fn fast_n_rows_in_file(&mut self) -> PolarsResult<Option<IdxSize>> {273self._n_rows_in_file().map(Some)274}275276async fn row_position_after_slice(277&mut self,278pre_slice: Option<Slice>,279) -> PolarsResult<IdxSize> {280self._row_position_after_slice(pre_slice)281}282}283284impl ParquetFileReader {285fn _file_schema(&mut self) -> &SchemaRef {286let InitializedState {287file_schema,288file_schema_pl,289..290} = self.init_data.as_mut().unwrap();291292if file_schema_pl.is_none() {293*file_schema_pl = Some(Arc::new(Schema::from_arrow_schema(file_schema.as_ref())))294}295296file_schema_pl.as_ref().unwrap()297}298299fn _file_arrow_schema(&mut self) -> &ArrowSchemaRef {300let InitializedState { file_schema, .. } = self.init_data.as_mut().unwrap();301file_schema302}303304fn _n_rows_in_file(&self) -> PolarsResult<IdxSize> {305let n = self.init_data.as_ref().unwrap().file_metadata.num_rows;306IdxSize::try_from(n).map_err(|_| polars_err!(bigidx, ctx = "parquet file", size = n))307}308309fn _row_position_after_slice(&self, pre_slice: Option<Slice>) -> PolarsResult<IdxSize> {310Ok(calc_row_position_after_slice(311self._n_rows_in_file()?,312pre_slice,313))314}315}316317type AsyncTaskData = (318FileReaderOutputRecv,319task_handles_ext::AbortOnDropHandle<PolarsResult<()>>,320);321322struct ParquetReadImpl {323projected_arrow_fields: Arc<[ArrowFieldProjection]>,324is_full_projection: bool,325predicate: Option<ScanIOPredicate>,326options: ParquetOptions,327byte_source: Arc<DynByteSource>,328normalized_pre_slice: Option<(usize, usize)>,329metadata: Arc<FileMetadata>,330// Run-time vars331config: Config,332verbose: bool,333memory_prefetch_func: fn(&[u8]) -> (),334row_index: Option<RowIndex>,335}336337#[derive(Debug)]338struct Config {339num_pipelines: usize,340/// Number of row groups to pre-fetch concurrently, this can be across files341row_group_prefetch_size: usize,342/// Minimum number of values for a parallel spawned task to process to amortize343/// parallelism overhead.344target_values_per_thread: usize,345}346347impl ParquetReadImpl {348fn run(mut self) -> AsyncTaskData {349if self.verbose {350eprintln!("[ParquetFileReader]: {:?}", &self.config);351}352353self.init_morsel_distributor()354}355}356357358