Path: blob/main/crates/polars-stream/src/nodes/io_sources/parquet/statistics.rs
6939 views
use std::ops::Range;12use arrow::array::{MutablePrimitiveArray, PrimitiveArray};3use arrow::bitmap::Bitmap;4use arrow::pushable::Pushable;5use polars_core::prelude::*;6use polars_io::RowIndex;7use polars_io::predicates::ScanIOPredicate;8use polars_io::prelude::FileMetadata;9use polars_parquet::read::RowGroupMetadata;10use polars_parquet::read::statistics::{ArrowColumnStatisticsArrays, deserialize_all};11use polars_utils::format_pl_smallstr;1213use crate::async_executor::{self, TaskPriority};14use crate::nodes::io_sources::parquet::projection::ArrowFieldProjection;1516struct StatisticsColumns {17min: Column,18max: Column,19null_count: Column,20}2122impl StatisticsColumns {23fn new_null(dtype: &DataType, height: usize) -> Self {24Self {25min: Column::full_null(PlSmallStr::EMPTY, height, dtype),26max: Column::full_null(PlSmallStr::EMPTY, height, dtype),27null_count: Column::full_null(PlSmallStr::EMPTY, height, &IDX_DTYPE),28}29}3031fn from_arrow_statistics(32statistics: ArrowColumnStatisticsArrays,33field: &ArrowField,34) -> PolarsResult<Self> {35Ok(Self {36min: unsafe {37Series::_try_from_arrow_unchecked_with_md(38PlSmallStr::EMPTY,39vec![statistics.min_value],40field.dtype(),41field.metadata.as_deref(),42)43}?44.into_column(),4546max: unsafe {47Series::_try_from_arrow_unchecked_with_md(48PlSmallStr::EMPTY,49vec![statistics.max_value],50field.dtype(),51field.metadata.as_deref(),52)53}?54.into_column(),5556null_count: Series::from_arrow(PlSmallStr::EMPTY, statistics.null_count.boxed())?57.into_column(),58})59}6061fn with_base_column_name(self, base_column_name: &str) -> Self {62let b = base_column_name;6364let min = self.min.with_name(format_pl_smallstr!("{b}_min"));65let max = self.max.with_name(format_pl_smallstr!("{b}_max"));66let null_count = self.null_count.with_name(format_pl_smallstr!("{b}_nc"));6768Self {69min,70max,71null_count,72}73}74}7576pub(super) async fn calculate_row_group_pred_pushdown_skip_mask(77row_group_slice: Range<usize>,78use_statistics: bool,79predicate: Option<&ScanIOPredicate>,80metadata: &Arc<FileMetadata>,81projected_arrow_fields: Arc<[ArrowFieldProjection]>,82// This is mut so that the offset is updated to the position of the first83// row group.84mut row_index: Option<RowIndex>,85verbose: bool,86) -> PolarsResult<Option<Bitmap>> {87if !use_statistics {88return Ok(None);89}9091let Some(predicate) = predicate else {92return Ok(None);93};9495let Some(sbp) = predicate.skip_batch_predicate.as_ref() else {96return Ok(None);97};9899let sbp = sbp.clone();100101let num_row_groups = row_group_slice.len();102let metadata = metadata.clone();103let live_columns = predicate.live_columns.clone();104105// Note: We are spawning here onto the computational async runtime because the caller is being run106// on a tokio async thread.107let skip_row_group_mask = async_executor::spawn(TaskPriority::High, async move {108let row_groups_slice = &metadata.row_groups[row_group_slice.clone()];109110if let Some(ri) = &mut row_index {111for md in metadata.row_groups[0..row_group_slice.start].iter() {112ri.offset = ri113.offset114.saturating_add(IdxSize::try_from(md.num_rows()).unwrap_or(IdxSize::MAX));115}116}117118let mut columns = Vec::with_capacity(1 + live_columns.len() * 3);119120let lengths: Vec<IdxSize> = row_groups_slice121.iter()122.map(|rg| rg.num_rows() as IdxSize)123.collect();124125columns.push(Column::new("len".into(), lengths));126127for projection in projected_arrow_fields.iter() {128let c = projection.output_name();129130if !live_columns.contains(c) {131continue;132}133134let mut statistics = load_parquet_column_statistics(row_groups_slice, projection)?;135136// Note: Order is important here. We re-use the transform for the output column, meaning137// that it may set the column name.138statistics.min = projection.apply_transform(statistics.min)?;139statistics.max = projection.apply_transform(statistics.max)?;140141let statistics = statistics.with_base_column_name(c);142143columns.extend([statistics.min, statistics.max, statistics.null_count]);144}145146if let Some(row_index) = row_index {147let statistics = build_row_index_statistics(&row_index, row_groups_slice)148.with_base_column_name(&row_index.name);149150columns.extend([statistics.min, statistics.max, statistics.null_count]);151}152153let statistics_df = DataFrame::new_with_height(num_row_groups, columns)?;154155sbp.evaluate_with_stat_df(&statistics_df)156})157.await?;158159if verbose {160eprintln!(161"[ParquetFileReader]: Predicate pushdown: \162reading {} / {} row groups",163skip_row_group_mask.unset_bits(),164num_row_groups,165);166}167168Ok(Some(skip_row_group_mask))169}170171fn load_parquet_column_statistics(172row_groups: &[RowGroupMetadata],173projection: &ArrowFieldProjection,174) -> PolarsResult<StatisticsColumns> {175let arrow_field = projection.arrow_field();176177let null_statistics = || {178Ok(StatisticsColumns::new_null(179&DataType::from_arrow_field(arrow_field),180row_groups.len(),181))182};183184// This can be None in the allow_missing_columns case.185let Some(idxs) = row_groups[0].columns_idxs_under_root_iter(&arrow_field.name) else {186return null_statistics();187};188189// 0 is possible for possible for empty structs.190//191// 2+ is for structs. We don't support reading nested statistics for now. It does not192// really make any sense at the moment with how we structure statistics.193if idxs.is_empty() || idxs.len() > 1 {194return null_statistics();195}196197let idx = idxs[0];198199let Some(statistics) = deserialize_all(arrow_field, row_groups, idx)? else {200return null_statistics();201};202203StatisticsColumns::from_arrow_statistics(statistics, arrow_field)204}205206fn build_row_index_statistics(207row_index: &RowIndex,208row_groups: &[RowGroupMetadata],209) -> StatisticsColumns {210let mut offset = row_index.offset;211212let null_count = PrimitiveArray::<IdxSize>::full(row_groups.len(), 0, ArrowDataType::IDX_DTYPE);213214let mut min_value = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());215let mut max_value = MutablePrimitiveArray::<IdxSize>::with_capacity(row_groups.len());216217for rg in row_groups.iter() {218let n_rows = IdxSize::try_from(rg.num_rows()).unwrap_or(IdxSize::MAX);219220if offset.checked_add(n_rows).is_none() {221min_value.push_null();222max_value.push_null();223continue;224}225226if n_rows == 0 {227min_value.push_null();228max_value.push_null();229} else {230min_value.push_value(offset);231max_value.push_value(offset + n_rows - 1);232}233234offset = offset.saturating_add(n_rows);235}236237StatisticsColumns {238min: Series::from_array(PlSmallStr::EMPTY, min_value.freeze()).into_column(),239max: Series::from_array(PlSmallStr::EMPTY, max_value.freeze()).into_column(),240null_count: Series::from_array(PlSmallStr::EMPTY, null_count).into_column(),241}242}243244245