Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/mod.rs
8503 views
//! APIs to read from Parquet format.12mod binary;3mod binview;4mod boolean;5mod categorical;6mod dictionary_encoded;7mod fixed_size_binary;8mod nested;9mod nested_utils;10mod null;11mod primitive;12mod simple;13mod utils;1415use std::io::Cursor;1617use arrow::array::{Array, FixedSizeListArray, ListArray, MapArray};18use arrow::bitmap::Bitmap;19use arrow::datatypes::{ArrowDataType, Field};20use arrow::offset::Offsets;21use polars_buffer::Buffer;22use simple::page_iter_to_array;2324pub use self::nested_utils::{InitNested, NestedState, init_nested};25pub use self::utils::filter::{Filter, PredicateFilter};26use self::utils::freeze_validity;27use super::*;28use crate::parquet::error::ParquetResult;29use crate::parquet::read::get_page_iterator as _get_page_iterator;30use crate::parquet::schema::types::PrimitiveType;3132/// Creates a new iterator of compressed pages.33pub fn get_page_iterator(34column_metadata: &ColumnChunkMetadata,35reader: Cursor<Buffer<u8>>,36buffer: Vec<u8>,37max_header_size: usize,38) -> PolarsResult<PageReader> {39Ok(_get_page_iterator(40column_metadata,41reader,42buffer,43max_header_size,44)?)45}4647/// Creates a new [`ListArray`] or [`FixedSizeListArray`].48pub fn create_list(49dtype: ArrowDataType,50nested: &mut NestedState,51values: Box<dyn Array>,52) -> Box<dyn Array> {53let (length, mut offsets, validity) = nested.pop().unwrap();54let validity = validity.and_then(freeze_validity);55match dtype.to_storage() {56ArrowDataType::List(_) => {57offsets.push(values.len() as i64);5859let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();6061let offsets: Offsets<i32> = offsets62.try_into()63.expect("i64 offsets do not fit in i32 offsets");6465Box::new(ListArray::<i32>::new(66dtype,67offsets.into(),68values,69validity,70))71},72ArrowDataType::LargeList(_) => {73offsets.push(values.len() as i64);7475Box::new(ListArray::<i64>::new(76dtype,77offsets.try_into().expect("List too large"),78values,79validity,80))81},82ArrowDataType::FixedSizeList(_, _) => {83Box::new(FixedSizeListArray::new(dtype, length, values, validity))84},85_ => unreachable!(),86}87}8889/// Creates a new [`MapArray`].90pub fn create_map(91dtype: ArrowDataType,92nested: &mut NestedState,93values: Box<dyn Array>,94) -> Box<dyn Array> {95let (_, mut offsets, validity) = nested.pop().unwrap();96match dtype.to_storage() {97ArrowDataType::Map(_, _) => {98offsets.push(values.len() as i64);99let offsets = offsets.iter().map(|x| *x as i32).collect::<Vec<_>>();100101let offsets: Offsets<i32> = offsets102.try_into()103.expect("i64 offsets do not fit in i32 offsets");104105Box::new(MapArray::new(106dtype,107offsets.into(),108values,109validity.and_then(freeze_validity),110))111},112_ => unreachable!(),113}114}115116fn is_primitive(dtype: &ArrowDataType) -> bool {117matches!(118dtype.to_physical_type(),119arrow::datatypes::PhysicalType::Primitive(_)120| arrow::datatypes::PhysicalType::Null121| arrow::datatypes::PhysicalType::Boolean122| arrow::datatypes::PhysicalType::Utf8123| arrow::datatypes::PhysicalType::LargeUtf8124| arrow::datatypes::PhysicalType::Binary125| arrow::datatypes::PhysicalType::BinaryView126| arrow::datatypes::PhysicalType::Utf8View127| arrow::datatypes::PhysicalType::LargeBinary128| arrow::datatypes::PhysicalType::FixedSizeBinary129| arrow::datatypes::PhysicalType::Dictionary(_)130) && !matches!(dtype, ArrowDataType::Extension(_))131}132133fn columns_to_iter_recursive(134mut columns: Vec<BasicDecompressor>,135mut types: Vec<&PrimitiveType>,136field: Field,137init: Vec<InitNested>,138filter: Option<Filter>,139) -> ParquetResult<(NestedState, Vec<Box<dyn Array>>, Bitmap)> {140if init.is_empty() && is_primitive(&field.dtype) {141let (_, array, pred_true_mask) = page_iter_to_array(142columns.pop().unwrap(),143types.pop().unwrap(),144field,145filter,146None,147)?;148149return Ok((NestedState::default(), array, pred_true_mask));150}151152nested::columns_to_iter_recursive(columns, types, field, init, filter)153}154155/// Returns the number of (parquet) columns that a [`ArrowDataType`] contains.156pub fn n_columns(dtype: &ArrowDataType) -> usize {157use arrow::datatypes::PhysicalType::*;158match dtype.to_physical_type() {159Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8160| Dictionary(_) | LargeUtf8 | BinaryView | Utf8View => 1,161List | FixedSizeList | LargeList => {162let a = dtype.to_storage();163if let ArrowDataType::List(inner) = a {164n_columns(&inner.dtype)165} else if let ArrowDataType::LargeList(inner) = a {166n_columns(&inner.dtype)167} else if let ArrowDataType::FixedSizeList(inner, _) = a {168n_columns(&inner.dtype)169} else {170unreachable!()171}172},173Map => {174let a = dtype.to_storage();175if let ArrowDataType::Map(inner, _) = a {176n_columns(&inner.dtype)177} else {178unreachable!()179}180},181Struct => {182if let ArrowDataType::Struct(fields) = dtype.to_storage() {183fields.iter().map(|inner| n_columns(&inner.dtype)).sum()184} else {185unreachable!()186}187},188_ => todo!(),189}190}191192/// An iterator adapter that maps multiple iterators of [`PagesIter`] into an iterator of [`Array`]s.193///194/// For a non-nested datatypes such as [`ArrowDataType::Int32`], this function requires a single element in `columns` and `types`.195/// For nested types, `columns` must be composed by all parquet columns with associated types `types`.196///197/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.dtype`.198pub fn column_iter_to_arrays(199columns: Vec<BasicDecompressor>,200types: Vec<&PrimitiveType>,201field: Field,202filter: Option<Filter>,203) -> PolarsResult<(Vec<Box<dyn Array>>, Bitmap)> {204let (_, array, pred_true_mask) =205columns_to_iter_recursive(columns, types, field, vec![], filter)?;206Ok((array, pred_true_mask))207}208209210