Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested.rs
8458 views
use arrow::array::StructArray;1use arrow::datatypes::{2DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,3DTYPE_ENUM_VALUES_NEW, IntegerType,4};5use polars_compute::cast::CastOptionsImpl;67use self::categorical::CategoricalDecoder;8use self::nested::deserialize::utils::freeze_validity;9use self::nested_utils::NestedContent;10use self::utils::PageDecoder;11use super::*;12use crate::parquet::error::ParquetResult;1314pub fn columns_to_iter_recursive(15mut columns: Vec<BasicDecompressor>,16mut types: Vec<&PrimitiveType>,17field: Field,18mut init: Vec<InitNested>,19filter: Option<Filter>,20) -> ParquetResult<(NestedState, Vec<Box<dyn Array>>, Bitmap)> {21if !field.dtype().is_nested() || field.is_pl_pq_empty_struct() {22let pages = columns.pop().unwrap();23init.push(InitNested::Primitive(field.is_nullable));24let type_ = types.pop().unwrap();25let (nested, arr, pdm) = page_iter_to_array(pages, type_, field, filter, Some(init))?;26Ok((nested.unwrap(), arr, pdm))27} else {28match field.dtype() {29ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => {30init.push(InitNested::List(field.is_nullable));31let (mut nested, array, ptm) = columns_to_iter_recursive(32columns,33types,34inner.as_ref().clone(),35init,36filter,37)?;38let array = array39.into_iter()40.map(|array| create_list(field.dtype().clone(), &mut nested, array))41.collect();42Ok((nested, array, ptm))43},44ArrowDataType::FixedSizeList(inner, width) => {45init.push(InitNested::FixedSizeList(field.is_nullable, *width));46let (mut nested, array, ptm) = columns_to_iter_recursive(47columns,48types,49inner.as_ref().clone(),50init,51filter,52)?;53let array = array54.into_iter()55.map(|array| create_list(field.dtype().clone(), &mut nested, array))56.collect();57Ok((nested, array, ptm))58},59ArrowDataType::Struct(fields) => {60// This definitely does not support Filter predicate yet.61assert!(!matches!(&filter, Some(Filter::Predicate(_))));6263// @NOTE:64// We go back to front here, because we constantly split off the end of the array65// to grab the relevant columns and types.66//67// Is this inefficient? Yes. Is this how we are going to do it for now? Yes.6869let Some(last_field) = fields.last() else {70return Err(ParquetError::not_supported("Struct has zero fields"));71};7273let field_to_nested_array =74|mut init: Vec<InitNested>,75columns: &mut Vec<BasicDecompressor>,76types: &mut Vec<&PrimitiveType>,77struct_field: &Field| {78init.push(InitNested::Struct(field.is_nullable));79let n = n_columns(&struct_field.dtype);80let columns = columns.split_off(columns.len() - n);81let types = types.split_off(types.len() - n);8283columns_to_iter_recursive(84columns,85types,86struct_field.clone(),87init,88filter.clone(),89)90};9192let (mut nested, mut last_array, _) =93field_to_nested_array(init.clone(), &mut columns, &mut types, last_field)?;94debug_assert!(matches!(nested.last().unwrap(), NestedContent::Struct));95let (length, _, struct_validity) = nested.pop().unwrap();9697let mut field_arrays = Vec::<Box<dyn Array>>::with_capacity(fields.len());98assert_eq!(last_array.len(), 1);99field_arrays.push(last_array.pop().unwrap());100101for field in fields.iter().rev().skip(1) {102let (mut _nested, mut array, _) =103field_to_nested_array(init.clone(), &mut columns, &mut types, field)?;104105#[cfg(debug_assertions)]106{107debug_assert!(matches!(_nested.last().unwrap(), NestedContent::Struct));108debug_assert_eq!(109_nested.pop().unwrap().2.and_then(freeze_validity),110struct_validity.clone().and_then(freeze_validity),111);112}113114assert_eq!(array.len(), 1);115field_arrays.push(array.pop().unwrap());116}117118field_arrays.reverse();119let struct_validity = struct_validity.and_then(freeze_validity);120121Ok((122nested,123vec![124StructArray::new(125ArrowDataType::Struct(fields.clone()),126length,127field_arrays,128struct_validity,129)130.to_boxed(),131],132Bitmap::new(),133))134},135ArrowDataType::Map(inner, _) => {136init.push(InitNested::List(field.is_nullable));137let (mut nested, array, ptm) = columns_to_iter_recursive(138columns,139types,140inner.as_ref().clone(),141init,142filter,143)?;144let array = array145.into_iter()146.map(|array| create_map(field.dtype().clone(), &mut nested, array))147.collect();148Ok((nested, array, ptm))149},150151ArrowDataType::Dictionary(key_type, value_type, _) => {152// @note: this should only hit in two cases:153// - polars enum's and categorical's154// - int -> string which can be turned into categoricals155assert!(matches!(value_type.as_ref(), ArrowDataType::Utf8View));156157init.push(InitNested::Primitive(field.is_nullable));158159if field.metadata.as_ref().is_none_or(|md| {160!md.contains_key(DTYPE_ENUM_VALUES_LEGACY)161&& !md.contains_key(DTYPE_ENUM_VALUES_NEW)162&& !md.contains_key(DTYPE_CATEGORICAL_NEW)163&& !md.contains_key(DTYPE_CATEGORICAL_LEGACY)164}) {165let (nested, arrays, ptm) = PageDecoder::new(166&field.name,167columns.pop().unwrap(),168ArrowDataType::Utf8View,169binview::BinViewDecoder::new_string(),170Some(init),171)?172.collect_nested(filter)?;173174let arrays = arrays175.into_iter()176.map(|arr| {177polars_compute::cast::cast(178arr.as_ref(),179field.dtype(),180CastOptionsImpl::default(),181)182.unwrap()183})184.collect();185186Ok((nested, arrays, ptm))187} else {188let (nested, arr, ptm) = match key_type {189IntegerType::UInt8 => PageDecoder::new(190&field.name,191columns.pop().unwrap(),192field.dtype().clone(),193CategoricalDecoder::<u8>::new(),194Some(init),195)?196.collect_boxed(filter)?,197IntegerType::UInt16 => PageDecoder::new(198&field.name,199columns.pop().unwrap(),200field.dtype().clone(),201CategoricalDecoder::<u16>::new(),202Some(init),203)?204.collect_boxed(filter)?,205IntegerType::UInt32 => PageDecoder::new(206&field.name,207columns.pop().unwrap(),208field.dtype().clone(),209CategoricalDecoder::<u32>::new(),210Some(init),211)?212.collect_boxed(filter)?,213_ => unimplemented!(),214};215216Ok((nested.unwrap(), arr, ptm))217}218},219220ArrowDataType::Extension(ext) => {221// Perform deserialization for the storage type.222let (nested, mut array, ptm) = columns_to_iter_recursive(223columns,224types,225field.with_dtype(ext.inner.clone()),226init,227filter,228)?;229230// Restore the extension type.231for arr in &mut array {232assert!(arr.dtype() == &ext.inner);233*arr.dtype_mut() = field.dtype.clone();234}235236Ok((nested, array, ptm))237},238239other => Err(ParquetError::not_supported(format!(240"Deserializing type {other:?} from parquet"241))),242}243}244}245246247