Path: blob/main/crates/polars-io/src/parquet/read/read_impl.rs
6940 views
use std::borrow::Cow;12use arrow::bitmap::Bitmap;3use arrow::datatypes::ArrowSchemaRef;4use polars_core::chunked_array::builder::NullChunkedBuilder;5use polars_core::prelude::*;6use polars_core::series::IsSorted;7use polars_core::utils::accumulate_dataframes_vertical;8use polars_core::{POOL, config};9use polars_parquet::read::{self, ColumnChunkMetadata, FileMetadata, Filter, RowGroupMetadata};10use rayon::prelude::*;1112use super::mmap::mmap_columns;13use super::utils::materialize_empty_df;14use super::{ParallelStrategy, mmap};15use crate::RowIndex;16use crate::hive::materialize_hive_partitions;17use crate::mmap::{MmapBytesReader, ReaderBytes};18use crate::parquet::metadata::FileMetadataRef;19use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;20use crate::utils::slice::split_slice_at_file;2122#[cfg(debug_assertions)]23// Ensure we get the proper polars types from schema inference24// This saves unneeded casts.25fn assert_dtypes(dtype: &ArrowDataType) {26use ArrowDataType as D;2728match dtype {29// These should all be cast to the BinaryView / Utf8View variants30D::Utf8 | D::Binary | D::LargeUtf8 | D::LargeBinary => unreachable!(),3132// These should be cast to Float3233D::Float16 => unreachable!(),3435// This should have been converted to a LargeList36D::List(_) => unreachable!(),3738// This should have been converted to a LargeList(Struct(_))39D::Map(_, _) => unreachable!(),4041// Recursive checks42D::Dictionary(_, dtype, _) => assert_dtypes(dtype),43D::Extension(ext) => assert_dtypes(&ext.inner),44D::LargeList(inner) => assert_dtypes(&inner.dtype),45D::FixedSizeList(inner, _) => assert_dtypes(&inner.dtype),46D::Struct(fields) => fields.iter().for_each(|f| assert_dtypes(f.dtype())),4748_ => {},49}50}5152fn should_copy_sortedness(dtype: &DataType) -> bool {53// @NOTE: For now, we are a bit conservative with this.54use DataType as D;5556matches!(57dtype,58D::Int8 | D::Int16 | D::Int32 | D::Int64 | D::UInt8 | D::UInt16 | D::UInt32 | D::UInt6459)60}6162pub fn try_set_sorted_flag(series: &mut Series, col_idx: usize, sorting_map: &[(usize, IsSorted)]) {63let Some((sorted_col, is_sorted)) = sorting_map.first() else {64return;65};66if *sorted_col != col_idx || !should_copy_sortedness(series.dtype()) {67return;68}69if config::verbose() {70eprintln!(71"Parquet conserved SortingColumn for column chunk of '{}' to {is_sorted:?}",72series.name()73);74}7576series.set_sorted_flag(*is_sorted);77}7879pub fn create_sorting_map(md: &RowGroupMetadata) -> Vec<(usize, IsSorted)> {80let capacity = md.sorting_columns().map_or(0, |s| s.len());81let mut sorting_map = Vec::with_capacity(capacity);8283if let Some(sorting_columns) = md.sorting_columns() {84for sorting in sorting_columns {85sorting_map.push((86sorting.column_idx as usize,87if sorting.descending {88IsSorted::Descending89} else {90IsSorted::Ascending91},92))93}94}9596sorting_map97}9899fn column_idx_to_series(100column_i: usize,101// The metadata belonging to this column102field_md: &[&ColumnChunkMetadata],103filter: Option<Filter>,104file_schema: &ArrowSchema,105store: &mmap::ColumnStore,106) -> PolarsResult<(Series, Bitmap)> {107let field = file_schema.get_at_index(column_i).unwrap().1;108109#[cfg(debug_assertions)]110{111assert_dtypes(field.dtype())112}113let columns = mmap_columns(store, field_md);114let (array, pred_true_mask) = mmap::to_deserializer(columns, field.clone(), filter)?;115let series = Series::try_from((field, array))?;116117Ok((series, pred_true_mask))118}119120#[allow(clippy::too_many_arguments)]121fn rg_to_dfs(122store: &mmap::ColumnStore,123previous_row_count: &mut IdxSize,124row_group_start: usize,125row_group_end: usize,126pre_slice: (usize, usize),127file_metadata: &FileMetadata,128schema: &ArrowSchemaRef,129row_index: Option<RowIndex>,130parallel: ParallelStrategy,131projection: &[usize],132hive_partition_columns: Option<&[Series]>,133) -> PolarsResult<Vec<DataFrame>> {134if config::verbose() {135eprintln!("parquet scan with parallel = {parallel:?}");136}137138// If we are only interested in the row_index, we take a little special path here.139if projection.is_empty() {140if let Some(row_index) = row_index {141let placeholder =142NullChunkedBuilder::new(PlSmallStr::from_static("__PL_TMP"), pre_slice.1).finish();143return Ok(vec![144DataFrame::new(vec![placeholder.into_series().into_column()])?145.with_row_index(146row_index.name.clone(),147Some(row_index.offset + IdxSize::try_from(pre_slice.0).unwrap()),148)?149.select(std::iter::once(row_index.name))?,150]);151}152}153154use ParallelStrategy as S;155156match parallel {157S::Columns | S::None => rg_to_dfs_optionally_par_over_columns(158store,159previous_row_count,160row_group_start,161row_group_end,162pre_slice,163file_metadata,164schema,165row_index,166parallel,167projection,168hive_partition_columns,169),170_ => rg_to_dfs_par_over_rg(171store,172row_group_start,173row_group_end,174previous_row_count,175pre_slice,176file_metadata,177schema,178row_index,179projection,180hive_partition_columns,181),182}183}184185#[allow(clippy::too_many_arguments)]186// might parallelize over columns187fn rg_to_dfs_optionally_par_over_columns(188store: &mmap::ColumnStore,189previous_row_count: &mut IdxSize,190row_group_start: usize,191row_group_end: usize,192slice: (usize, usize),193file_metadata: &FileMetadata,194schema: &ArrowSchemaRef,195row_index: Option<RowIndex>,196parallel: ParallelStrategy,197projection: &[usize],198hive_partition_columns: Option<&[Series]>,199) -> PolarsResult<Vec<DataFrame>> {200let mut dfs = Vec::with_capacity(row_group_end - row_group_start);201202let mut n_rows_processed: usize = (0..row_group_start)203.map(|i| file_metadata.row_groups[i].num_rows())204.sum();205let slice_end = slice.0 + slice.1;206207for rg_idx in row_group_start..row_group_end {208let md = &file_metadata.row_groups[rg_idx];209210let rg_slice =211split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);212let current_row_count = md.num_rows() as IdxSize;213214let sorting_map = create_sorting_map(md);215216let f = |column_i: &usize| {217let (name, field) = schema.get_at_index(*column_i).unwrap();218219let Some(iter) = md.columns_under_root_iter(name) else {220return Ok(Column::full_null(221name.clone(),222rg_slice.1,223&DataType::from_arrow_field(field),224));225};226227let part = iter.collect::<Vec<_>>();228229let (mut series, _) = column_idx_to_series(230*column_i,231part.as_slice(),232Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),233schema,234store,235)?;236237try_set_sorted_flag(&mut series, *column_i, &sorting_map);238Ok(series.into_column())239};240241let columns = if let ParallelStrategy::Columns = parallel {242POOL.install(|| {243projection244.par_iter()245.map(f)246.collect::<PolarsResult<Vec<_>>>()247})?248} else {249projection.iter().map(f).collect::<PolarsResult<Vec<_>>>()?250};251252let mut df = unsafe { DataFrame::new_no_checks(rg_slice.1, columns) };253if let Some(rc) = &row_index {254unsafe {255df.with_row_index_mut(256rc.name.clone(),257Some(*previous_row_count + rc.offset + rg_slice.0 as IdxSize),258)259};260}261262materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);263264*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||265polars_err!(266ComputeError: "Parquet file produces more than pow(2, 32) rows; \267consider compiling with polars-bigidx feature (polars-u64-idx package on python), \268or set 'streaming'"269),270)?;271dfs.push(df);272273if *previous_row_count as usize >= slice_end {274break;275}276}277278Ok(dfs)279}280281#[allow(clippy::too_many_arguments)]282// parallelizes over row groups283fn rg_to_dfs_par_over_rg(284store: &mmap::ColumnStore,285row_group_start: usize,286row_group_end: usize,287rows_read: &mut IdxSize,288slice: (usize, usize),289file_metadata: &FileMetadata,290schema: &ArrowSchemaRef,291row_index: Option<RowIndex>,292projection: &[usize],293hive_partition_columns: Option<&[Series]>,294) -> PolarsResult<Vec<DataFrame>> {295// compute the limits per row group and the row count offsets296let mut row_groups = Vec::with_capacity(row_group_end - row_group_start);297298let mut n_rows_processed: usize = (0..row_group_start)299.map(|i| file_metadata.row_groups[i].num_rows())300.sum();301let slice_end = slice.0 + slice.1;302303// rows_scanned is the number of rows that have been scanned so far when checking for overlap with the slice.304// rows_read is the number of rows found to overlap with the slice, and thus the number of rows that will be305// read into a dataframe.306let mut rows_scanned: IdxSize;307308if row_group_start > 0 {309// In the case of async reads, we need to account for the fact that row_group_start may be greater than310// zero due to earlier processing.311// For details, see: https://github.com/pola-rs/polars/pull/20508#discussion_r1900165649312rows_scanned = (0..row_group_start)313.map(|i| file_metadata.row_groups[i].num_rows() as IdxSize)314.sum();315} else {316rows_scanned = 0;317}318319for i in row_group_start..row_group_end {320let row_count_start = rows_scanned;321let rg_md = &file_metadata.row_groups[i];322let n_rows_this_file = rg_md.num_rows();323let rg_slice =324split_slice_at_file(&mut n_rows_processed, n_rows_this_file, slice.0, slice_end);325rows_scanned = rows_scanned326.checked_add(n_rows_this_file as IdxSize)327.ok_or(ROW_COUNT_OVERFLOW_ERR)?;328329*rows_read += rg_slice.1 as IdxSize;330331if rg_slice.1 == 0 {332continue;333}334335row_groups.push((rg_md, rg_slice, row_count_start));336}337338let dfs = POOL.install(|| {339// Set partitioned fields to prevent quadratic behavior.340// Ensure all row groups are partitioned.341row_groups342.into_par_iter()343.map(|(md, slice, row_count_start)| {344if slice.1 == 0 {345return Ok(None);346}347// test we don't read the parquet file if this env var is set348#[cfg(debug_assertions)]349{350assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())351}352353let sorting_map = create_sorting_map(md);354355let columns = projection356.iter()357.map(|column_i| {358let (name, field) = schema.get_at_index(*column_i).unwrap();359360let Some(iter) = md.columns_under_root_iter(name) else {361return Ok(Column::full_null(362name.clone(),363md.num_rows(),364&DataType::from_arrow_field(field),365));366};367368let part = iter.collect::<Vec<_>>();369370let (mut series, _) = column_idx_to_series(371*column_i,372part.as_slice(),373Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),374schema,375store,376)?;377378try_set_sorted_flag(&mut series, *column_i, &sorting_map);379Ok(series.into_column())380})381.collect::<PolarsResult<Vec<_>>>()?;382383let mut df = unsafe { DataFrame::new_no_checks(slice.1, columns) };384385if let Some(rc) = &row_index {386unsafe {387df.with_row_index_mut(388rc.name.clone(),389Some(row_count_start as IdxSize + rc.offset + slice.0 as IdxSize),390)391};392}393394materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns);395396Ok(Some(df))397})398.collect::<PolarsResult<Vec<_>>>()399})?;400Ok(dfs.into_iter().flatten().collect())401}402403#[allow(clippy::too_many_arguments)]404pub fn read_parquet<R: MmapBytesReader>(405mut reader: R,406pre_slice: (usize, usize),407projection: Option<&[usize]>,408reader_schema: &ArrowSchemaRef,409metadata: Option<FileMetadataRef>,410mut parallel: ParallelStrategy,411row_index: Option<RowIndex>,412hive_partition_columns: Option<&[Series]>,413) -> PolarsResult<DataFrame> {414// Fast path.415if pre_slice.1 == 0 {416return Ok(materialize_empty_df(417projection,418reader_schema,419hive_partition_columns,420row_index.as_ref(),421));422}423424let file_metadata = metadata425.map(Ok)426.unwrap_or_else(|| read::read_metadata(&mut reader).map(Arc::new))?;427let n_row_groups = file_metadata.row_groups.len();428429let materialized_projection = projection430.map(Cow::Borrowed)431.unwrap_or_else(|| Cow::Owned((0usize..reader_schema.len()).collect::<Vec<_>>()));432433if ParallelStrategy::Auto == parallel {434if n_row_groups > materialized_projection.len() || n_row_groups > POOL.current_num_threads()435{436parallel = ParallelStrategy::RowGroups;437} else {438parallel = ParallelStrategy::Columns;439}440}441442if let (ParallelStrategy::Columns, true) = (parallel, materialized_projection.len() == 1) {443parallel = ParallelStrategy::None;444}445446let reader = ReaderBytes::from(&mut reader);447let store = mmap::ColumnStore::Local(unsafe {448std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()449});450451let dfs = rg_to_dfs(452&store,453&mut 0,4540,455n_row_groups,456pre_slice,457&file_metadata,458reader_schema,459row_index.clone(),460parallel,461&materialized_projection,462hive_partition_columns,463)?;464465if dfs.is_empty() {466Ok(materialize_empty_df(467projection,468reader_schema,469hive_partition_columns,470row_index.as_ref(),471))472} else {473accumulate_dataframes_vertical(dfs)474}475}476477pub fn calc_prefilter_cost(mask: &arrow::bitmap::Bitmap) -> f64 {478let num_edges = mask.num_edges() as f64;479let rg_len = mask.len() as f64;480481// @GB: I did quite some analysis on this.482//483// Pre-filtered and Post-filtered can both be faster in certain scenarios.484//485// - Pre-filtered is faster when there is some amount of clustering or486// sorting involved or if the number of values selected is small.487// - Post-filtering is faster when the predicate selects a somewhat random488// elements throughout the row group.489//490// The following is a heuristic value to try and estimate which one is491// faster. Essentially, it sees how many times it needs to switch between492// skipping items and collecting items and compares it against the number493// of values that it will collect.494//495// Closer to 0: pre-filtering is probably better.496// Closer to 1: post-filtering is probably better.497(num_edges / rg_len).clamp(0.0, 1.0)498}499500#[derive(Clone, Copy)]501pub enum PrefilterMaskSetting {502Auto,503Pre,504Post,505}506507impl PrefilterMaskSetting {508pub fn init_from_env() -> Self {509std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(Self::Auto, |v| match &v[..] {510"auto" => Self::Auto,511"pre" => Self::Pre,512"post" => Self::Post,513_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),514})515}516517pub fn should_prefilter(&self, prefilter_cost: f64, dtype: &ArrowDataType) -> bool {518match self {519Self::Auto => {520// Prefiltering is only expensive for nested types so we make the cut-off quite521// high.522let is_nested = dtype.is_nested();523524// We empirically selected these numbers.525!is_nested && prefilter_cost <= 0.01526},527Self::Pre => true,528Self::Post => false,529}530}531}532533534