Path: blob/main/crates/polars-parquet/src/arrow/read/mod.rs
6940 views
//! APIs to read from Parquet format.1#![allow(clippy::type_complexity)]23mod deserialize;4pub mod expr;5pub mod schema;6pub mod statistics;78use std::io::{Read, Seek};910use arrow::types::{NativeType, i256};11pub use deserialize::{12Filter, InitNested, NestedState, PredicateFilter, column_iter_to_arrays, create_list,13create_map, get_page_iterator, init_nested, n_columns,14};15#[cfg(feature = "async")]16use futures::{AsyncRead, AsyncSeek};17use polars_error::PolarsResult;18pub use schema::{FileMetadata, infer_schema};1920#[cfg(feature = "async")]21pub use crate::parquet::read::{get_page_stream, read_metadata_async as _read_metadata_async};22// re-exports of crate::parquet's relevant APIs23pub use crate::parquet::{24FallibleStreamingIterator,25error::ParquetError,26fallible_streaming_iterator,27metadata::{ColumnChunkMetadata, ColumnDescriptor, RowGroupMetadata},28page::{CompressedDataPage, DataPageHeader, Page},29read::{30BasicDecompressor, MutStreamingIterator, PageReader, ReadColumnIterator, State, decompress,31get_column_iterator, read_metadata as _read_metadata,32},33schema::types::{34GroupLogicalType, ParquetType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType,35TimeUnit as ParquetTimeUnit,36},37types::int96_to_i64_ns,38};3940/// Returns all [`ColumnChunkMetadata`] associated to `field_name`.41/// For non-nested parquet types, this returns a single column42pub fn get_field_pages<'a, T>(43columns: &'a [ColumnChunkMetadata],44items: &'a [T],45field_name: &str,46) -> Vec<&'a T> {47columns48.iter()49.zip(items)50.filter(|(metadata, _)| metadata.descriptor().path_in_schema[0].as_str() == field_name)51.map(|(_, item)| item)52.collect()53}5455/// Reads parquets' metadata synchronously.56pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetadata> {57Ok(_read_metadata(reader)?)58}5960/// Reads parquets' metadata asynchronously.61#[cfg(feature = "async")]62pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(63reader: &mut R,64) -> PolarsResult<FileMetadata> {65Ok(_read_metadata_async(reader).await?)66}6768fn convert_year_month(value: &[u8]) -> i32 {69i32::from_le_bytes(value[..4].try_into().unwrap())70}7172fn convert_days_ms(value: &[u8]) -> arrow::types::days_ms {73arrow::types::days_ms(74i32::from_le_bytes(value[4..8].try_into().unwrap()),75i32::from_le_bytes(value[8..12].try_into().unwrap()),76)77}7879fn convert_i128(value: &[u8], n: usize) -> i128 {80// Copy the fixed-size byte value to the start of a 16 byte stack81// allocated buffer, then use an arithmetic right shift to fill in82// MSBs, which accounts for leading 1's in negative (two's complement)83// values.84let mut bytes = [0u8; 16];85bytes[..n].copy_from_slice(value);86i128::from_be_bytes(bytes) >> (8 * (16 - n))87}8889fn convert_i256(value: &[u8]) -> i256 {90if value[0] >= 128 {91let mut neg_bytes = [255u8; 32];92neg_bytes[32 - value.len()..].copy_from_slice(value);93i256::from_be_bytes(neg_bytes)94} else {95let mut bytes = [0u8; 32];96bytes[32 - value.len()..].copy_from_slice(value);97i256::from_be_bytes(bytes)98}99}100101102