Path: blob/main/crates/polars-parquet/src/parquet/metadata/row_metadata.rs
6940 views
use std::sync::Arc;12use hashbrown::hash_map::RawEntryMut;3use polars_parquet_format::{RowGroup, SortingColumn};4use polars_utils::aliases::{InitHashMaps, PlHashMap};5use polars_utils::idx_vec::UnitVec;6use polars_utils::pl_str::PlSmallStr;7use polars_utils::unitvec;89use super::column_chunk_metadata::{ColumnChunkMetadata, column_metadata_byte_range};10use super::schema_descriptor::SchemaDescriptor;11use crate::parquet::error::{ParquetError, ParquetResult};1213type ColumnLookup = PlHashMap<PlSmallStr, UnitVec<usize>>;1415trait InitColumnLookup {16fn add_column(&mut self, index: usize, column: &ColumnChunkMetadata);17}1819impl InitColumnLookup for ColumnLookup {20#[inline(always)]21fn add_column(&mut self, index: usize, column: &ColumnChunkMetadata) {22let root_name = &column.descriptor().path_in_schema[0];2324match self.raw_entry_mut().from_key(root_name) {25RawEntryMut::Vacant(slot) => {26slot.insert(root_name.clone(), unitvec![index]);27},28RawEntryMut::Occupied(mut slot) => {29slot.get_mut().push(index);30},31};32}33}3435/// Metadata for a row group.36#[derive(Debug, Clone, Default)]37pub struct RowGroupMetadata {38// Moving of `ColumnChunkMetadata` is very expensive they are rather big. So, we arc the vec39// instead of having an arc slice. This way we don't to move the vec values into an arc when40// collecting.41columns: Arc<Vec<ColumnChunkMetadata>>,42column_lookup: PlHashMap<PlSmallStr, UnitVec<usize>>,43num_rows: usize,44total_byte_size: usize,45full_byte_range: core::ops::Range<u64>,46sorting_columns: Option<Vec<SortingColumn>>,47}4849impl RowGroupMetadata {50#[inline(always)]51pub fn n_columns(&self) -> usize {52self.columns.len()53}5455/// Fetch all columns under this root name if it exists.56pub fn columns_under_root_iter(57&self,58root_name: &str,59) -> Option<impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator> {60self.column_lookup61.get(root_name)62.map(|x| x.iter().map(|&x| &self.columns[x]))63}6465/// Fetch all columns under this root name if it exists.66pub fn columns_idxs_under_root_iter<'a>(&'a self, root_name: &str) -> Option<&'a [usize]> {67self.column_lookup.get(root_name).map(|x| x.as_slice())68}6970pub fn parquet_columns(&self) -> &[ColumnChunkMetadata] {71self.columns.as_ref().as_slice()72}7374/// Number of rows in this row group.75pub fn num_rows(&self) -> usize {76self.num_rows77}7879/// Total byte size of all uncompressed column data in this row group.80pub fn total_byte_size(&self) -> usize {81self.total_byte_size82}8384/// Total size of all compressed column data in this row group.85pub fn compressed_size(&self) -> usize {86self.columns87.iter()88.map(|c| c.compressed_size() as usize)89.sum::<usize>()90}9192pub fn full_byte_range(&self) -> core::ops::Range<u64> {93self.full_byte_range.clone()94}9596pub fn byte_ranges_iter(&self) -> impl '_ + ExactSizeIterator<Item = core::ops::Range<u64>> {97self.columns.iter().map(|x| x.byte_range())98}99100pub fn sorting_columns(&self) -> Option<&[SortingColumn]> {101self.sorting_columns.as_deref()102}103104/// Method to convert from Thrift.105pub(crate) fn try_from_thrift(106schema_descr: &SchemaDescriptor,107rg: RowGroup,108) -> ParquetResult<RowGroupMetadata> {109if schema_descr.columns().len() != rg.columns.len() {110return Err(ParquetError::oos(format!(111"The number of columns in the row group ({}) must be equal to the number of columns in the schema ({})",112rg.columns.len(),113schema_descr.columns().len()114)));115}116let total_byte_size = rg.total_byte_size.try_into()?;117let num_rows = rg.num_rows.try_into()?;118119let mut column_lookup = ColumnLookup::with_capacity(rg.columns.len());120let mut full_byte_range = if let Some(first_column_chunk) = rg.columns.first() {121let Some(metadata) = &first_column_chunk.meta_data else {122return Err(ParquetError::oos("Column chunk requires metadata"));123};124column_metadata_byte_range(metadata)125} else {1260..0127};128129let sorting_columns = rg.sorting_columns.clone();130131let columns = rg132.columns133.into_iter()134.zip(schema_descr.columns())135.enumerate()136.map(|(i, (column_chunk, descriptor))| {137let column =138ColumnChunkMetadata::try_from_thrift(descriptor.clone(), column_chunk)?;139140column_lookup.add_column(i, &column);141142let byte_range = column.byte_range();143full_byte_range = full_byte_range.start.min(byte_range.start)144..full_byte_range.end.max(byte_range.end);145146Ok(column)147})148.collect::<ParquetResult<Vec<_>>>()?;149let columns = Arc::new(columns);150151Ok(RowGroupMetadata {152columns,153column_lookup,154num_rows,155total_byte_size,156full_byte_range,157sorting_columns,158})159}160}161162163