Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/mod.rs
6940 views
//! APIs to read from Parquet format.12mod binview;3mod boolean;4mod categorical;5mod dictionary_encoded;6mod fixed_size_binary;7mod nested;8mod nested_utils;9mod null;10mod primitive;11mod simple;12mod utils;1314use arrow::array::{Array, FixedSizeListArray, ListArray, MapArray};15use arrow::bitmap::Bitmap;16use arrow::datatypes::{ArrowDataType, Field};17use arrow::offset::Offsets;18use polars_utils::mmap::MemReader;19use simple::page_iter_to_array;2021pub use self::nested_utils::{InitNested, NestedState, init_nested};22pub use self::utils::filter::{Filter, PredicateFilter};23use self::utils::freeze_validity;24use super::*;25use crate::parquet::error::ParquetResult;26use crate::parquet::read::get_page_iterator as _get_page_iterator;27use crate::parquet::schema::types::PrimitiveType;2829/// Creates a new iterator of compressed pages.30pub fn get_page_iterator(31column_metadata: &ColumnChunkMetadata,32reader: MemReader,33buffer: Vec<u8>,34max_header_size: usize,35) -> PolarsResult<PageReader> {36Ok(_get_page_iterator(37column_metadata,38reader,39buffer,40max_header_size,41)?)42}4344/// Creates a new [`ListArray`] or [`FixedSizeListArray`].45pub fn create_list(46dtype: ArrowDataType,47nested: &mut NestedState,48values: Box<dyn Array>,49) -> Box<dyn Array> {50let (length, mut offsets, validity) = nested.pop().unwrap();51let validity = validity.and_then(freeze_validity);52match dtype.to_logical_type() {53ArrowDataType::List(_) => {54offsets.push(values.len() as i64);5556let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();5758let offsets: Offsets<i32> = offsets59.try_into()60.expect("i64 offsets do not fit in i32 offsets");6162Box::new(ListArray::<i32>::new(63dtype,64offsets.into(),65values,66validity,67))68},69ArrowDataType::LargeList(_) => {70offsets.push(values.len() as i64);7172Box::new(ListArray::<i64>::new(73dtype,74offsets.try_into().expect("List too large"),75values,76validity,77))78},79ArrowDataType::FixedSizeList(_, _) => {80Box::new(FixedSizeListArray::new(dtype, length, values, validity))81},82_ => unreachable!(),83}84}8586/// Creates a new [`MapArray`].87pub fn create_map(88dtype: ArrowDataType,89nested: &mut NestedState,90values: Box<dyn Array>,91) -> Box<dyn Array> {92let (_, mut offsets, validity) = nested.pop().unwrap();93match dtype.to_logical_type() {94ArrowDataType::Map(_, _) => {95offsets.push(values.len() as i64);96let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();9798let offsets: Offsets<i32> = offsets99.try_into()100.expect("i64 offsets do not fit in i32 offsets");101102Box::new(MapArray::new(103dtype,104offsets.into(),105values,106validity.and_then(freeze_validity),107))108},109_ => unreachable!(),110}111}112113fn is_primitive(dtype: &ArrowDataType) -> bool {114matches!(115dtype.to_physical_type(),116arrow::datatypes::PhysicalType::Primitive(_)117| arrow::datatypes::PhysicalType::Null118| arrow::datatypes::PhysicalType::Boolean119| arrow::datatypes::PhysicalType::Utf8120| arrow::datatypes::PhysicalType::LargeUtf8121| arrow::datatypes::PhysicalType::Binary122| arrow::datatypes::PhysicalType::BinaryView123| arrow::datatypes::PhysicalType::Utf8View124| arrow::datatypes::PhysicalType::LargeBinary125| arrow::datatypes::PhysicalType::FixedSizeBinary126| arrow::datatypes::PhysicalType::Dictionary(_)127)128}129130fn columns_to_iter_recursive(131mut columns: Vec<BasicDecompressor>,132mut types: Vec<&PrimitiveType>,133field: Field,134init: Vec<InitNested>,135filter: Option<Filter>,136) -> ParquetResult<(NestedState, Box<dyn Array>, Bitmap)> {137if init.is_empty() && is_primitive(&field.dtype) {138let (_, array, pred_true_mask) = page_iter_to_array(139columns.pop().unwrap(),140types.pop().unwrap(),141field,142filter,143None,144)?;145146return Ok((NestedState::default(), array, pred_true_mask));147}148149nested::columns_to_iter_recursive(columns, types, field, init, filter)150}151152/// Returns the number of (parquet) columns that a [`ArrowDataType`] contains.153pub fn n_columns(dtype: &ArrowDataType) -> usize {154use arrow::datatypes::PhysicalType::*;155match dtype.to_physical_type() {156Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8157| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => 1,158List | FixedSizeList | LargeList => {159let a = dtype.to_logical_type();160if let ArrowDataType::List(inner) = a {161n_columns(&inner.dtype)162} else if let ArrowDataType::LargeList(inner) = a {163n_columns(&inner.dtype)164} else if let ArrowDataType::FixedSizeList(inner, _) = a {165n_columns(&inner.dtype)166} else {167unreachable!()168}169},170Map => {171let a = dtype.to_logical_type();172if let ArrowDataType::Map(inner, _) = a {173n_columns(&inner.dtype)174} else {175unreachable!()176}177},178Struct => {179if let ArrowDataType::Struct(fields) = dtype.to_logical_type() {180fields.iter().map(|inner| n_columns(&inner.dtype)).sum()181} else {182unreachable!()183}184},185_ => todo!(),186}187}188189/// An iterator adapter that maps multiple iterators of [`PagesIter`] into an iterator of [`Array`]s.190///191/// For a non-nested datatypes such as [`ArrowDataType::Int32`], this function requires a single element in `columns` and `types`.192/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.193///194/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.dtype`.195pub fn column_iter_to_arrays(196columns: Vec<BasicDecompressor>,197types: Vec<&PrimitiveType>,198field: Field,199filter: Option<Filter>,200) -> PolarsResult<(Box<dyn Array>, Bitmap)> {201let (_, array, pred_true_mask) =202columns_to_iter_recursive(columns, types, field, vec![], filter)?;203Ok((array, pred_true_mask))204}205206207