Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/simple.rs
6940 views
use arrow::array::{Array, FixedSizeBinaryArray, PrimitiveArray};1use arrow::bitmap::Bitmap;2use arrow::datatypes::{3ArrowDataType, DTYPE_CATEGORICAL_LEGACY, DTYPE_CATEGORICAL_NEW, DTYPE_ENUM_VALUES_LEGACY,4DTYPE_ENUM_VALUES_NEW, Field, IntegerType, IntervalUnit, TimeUnit,5};6use arrow::types::{NativeType, days_ms, i256};7use ethnum::I256;8use polars_compute::cast::CastOptionsImpl;910use super::utils::filter::Filter;11use super::{12BasicDecompressor, InitNested, NestedState, boolean, fixed_size_binary, null, primitive,13};14use crate::parquet::error::ParquetResult;15use crate::parquet::schema::types::{16PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,17};18use crate::parquet::types::int96_to_i64_ns;19use crate::read::ParquetError;20use crate::read::deserialize::binview;21use crate::read::deserialize::categorical::CategoricalDecoder;22use crate::read::deserialize::utils::PageDecoder;2324/// An iterator adapter that maps an iterator of Pages a boxed [`Array`] of [`ArrowDataType`]25/// `dtype` with a maximum of `num_rows` elements.26pub fn page_iter_to_array(27pages: BasicDecompressor,28type_: &PrimitiveType,29field: Field,30filter: Option<Filter>,31init_nested: Option<Vec<InitNested>>,32) -> ParquetResult<(Option<NestedState>, Box<dyn Array>, Bitmap)> {33use ArrowDataType::*;3435let physical_type = &type_.physical_type;36let logical_type = &type_.logical_type;37let dtype = field.dtype;3839Ok(match (physical_type, dtype.to_logical_type()) {40(_, Null) => PageDecoder::new(&field.name, pages, dtype, null::NullDecoder, init_nested)?41.collect_boxed(filter)?,42(PhysicalType::Boolean, Boolean) => PageDecoder::new(43&field.name,44pages,45dtype,46boolean::BooleanDecoder,47init_nested,48)?49.collect_boxed(filter)?,50(PhysicalType::Int32, UInt8) => PageDecoder::new(51&field.name,52pages,53dtype,54primitive::IntDecoder::<i32, u8, _>::cast_as(),55init_nested,56)?57.collect_boxed(filter)?,58(PhysicalType::Int32, UInt16) => PageDecoder::new(59&field.name,60pages,61dtype,62primitive::IntDecoder::<i32, u16, _>::cast_as(),63init_nested,64)?65.collect_boxed(filter)?,66(PhysicalType::Int32, UInt32) => PageDecoder::new(67&field.name,68pages,69dtype,70primitive::IntDecoder::<i32, u32, _>::cast_as(),71init_nested,72)?73.collect_boxed(filter)?,74(PhysicalType::Int64, UInt32) => PageDecoder::new(75&field.name,76pages,77dtype,78primitive::IntDecoder::<i64, u32, _>::cast_as(),79init_nested,80)?81.collect_boxed(filter)?,82(PhysicalType::Int32, Int8) => PageDecoder::new(83&field.name,84pages,85dtype,86primitive::IntDecoder::<i32, i8, _>::cast_as(),87init_nested,88)?89.collect_boxed(filter)?,90(PhysicalType::Int32, Int16) => PageDecoder::new(91&field.name,92pages,93dtype,94primitive::IntDecoder::<i32, i16, _>::cast_as(),95init_nested,96)?97.collect_boxed(filter)?,98(PhysicalType::Int32, Int32 | Date32 | Time32(_)) => PageDecoder::new(99&field.name,100pages,101dtype,102primitive::IntDecoder::<i32, _, _>::unit(),103init_nested,104)?105.collect_boxed(filter)?,106(PhysicalType::Int64 | PhysicalType::Int96, Timestamp(time_unit, _)) => {107let time_unit = *time_unit;108return timestamp(109&field.name,110pages,111physical_type,112logical_type,113dtype,114filter,115time_unit,116init_nested,117);118},119(PhysicalType::FixedLenByteArray(_), FixedSizeBinary(_)) => {120let size = FixedSizeBinaryArray::get_size(&dtype);121122PageDecoder::new(123&field.name,124pages,125dtype,126fixed_size_binary::BinaryDecoder { size },127init_nested,128)?129.collect_boxed(filter)?130},131(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::YearMonth)) => {132// @TODO: Make a separate decoder for this133134let n = 12;135let (nested, array, ptm) = PageDecoder::new(136&field.name,137pages,138ArrowDataType::FixedSizeBinary(n),139fixed_size_binary::BinaryDecoder { size: n },140init_nested,141)?142.collect(filter)?;143144let values = array145.values()146.chunks_exact(n)147.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))148.collect::<Vec<_>>();149let validity = array.validity().cloned();150151(152nested,153PrimitiveArray::<i32>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),154ptm,155)156},157(PhysicalType::FixedLenByteArray(12), Interval(IntervalUnit::DayTime)) => {158// @TODO: Make a separate decoder for this159160let n = 12;161let (nested, array, ptm) = PageDecoder::new(162&field.name,163pages,164ArrowDataType::FixedSizeBinary(n),165fixed_size_binary::BinaryDecoder { size: n },166init_nested,167)?168.collect(filter)?;169170let values = array171.values()172.chunks_exact(n)173.map(super::super::convert_days_ms)174.collect::<Vec<_>>();175let validity = array.validity().cloned();176177(178nested,179PrimitiveArray::<days_ms>::try_new(dtype.clone(), values.into(), validity)?180.to_boxed(),181ptm,182)183},184(PhysicalType::FixedLenByteArray(16), Int128) => {185let n = 16;186let (nested, array, ptm) = PageDecoder::new(187&field.name,188pages,189ArrowDataType::FixedSizeBinary(n),190fixed_size_binary::BinaryDecoder { size: n },191init_nested,192)?193.collect(filter)?;194195let (_, values, validity) = array.into_inner();196let values = values197.try_transmute()198.expect("this should work since the parquet decoder has alignment constraints");199200(201nested,202PrimitiveArray::<i128>::try_new(dtype.clone(), values, validity)?.to_boxed(),203ptm,204)205},206(PhysicalType::Int32, Decimal(_, _)) => PageDecoder::new(207&field.name,208pages,209dtype,210primitive::IntDecoder::<i32, i128, _>::cast_into(),211init_nested,212)?213.collect_boxed(filter)?,214(PhysicalType::Int64, Decimal(_, _)) => PageDecoder::new(215&field.name,216pages,217dtype,218primitive::IntDecoder::<i64, i128, _>::cast_into(),219init_nested,220)?221.collect_boxed(filter)?,222(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) if *n > 16 => {223return Err(ParquetError::not_supported(format!(224"not implemented: can't decode Decimal128 type from Fixed Size Byte Array of len {n:?}"225)));226},227(PhysicalType::FixedLenByteArray(n), Decimal(_, _)) => {228// @TODO: Make a separate decoder for this229230let n = *n;231232let (nested, array, ptm) = PageDecoder::new(233&field.name,234pages,235ArrowDataType::FixedSizeBinary(n),236fixed_size_binary::BinaryDecoder { size: n },237init_nested,238)?239.collect(filter)?;240241let values = array242.values()243.chunks_exact(n)244.map(|value: &[u8]| super::super::convert_i128(value, n))245.collect::<Vec<_>>();246let validity = array.validity().cloned();247248(249nested,250PrimitiveArray::<i128>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),251ptm,252)253},254(PhysicalType::Int32, Decimal256(_, _)) => PageDecoder::new(255&field.name,256pages,257dtype,258primitive::IntDecoder::closure(|x: i32| i256(I256::new(x as i128))),259init_nested,260)?261.collect_boxed(filter)?,262(PhysicalType::Int64, Decimal256(_, _)) => PageDecoder::new(263&field.name,264pages,265dtype,266primitive::IntDecoder::closure(|x: i64| i256(I256::new(x as i128))),267init_nested,268)?269.collect_boxed(filter)?,270(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 16 => {271// @TODO: Make a separate decoder for this272273let n = *n;274275let (nested, array, ptm) = PageDecoder::new(276&field.name,277pages,278ArrowDataType::FixedSizeBinary(n),279fixed_size_binary::BinaryDecoder { size: n },280init_nested,281)?282.collect(filter)?;283284let values = array285.values()286.chunks_exact(n)287.map(|value: &[u8]| i256(I256::new(super::super::convert_i128(value, n))))288.collect::<Vec<_>>();289let validity = array.validity().cloned();290291(292nested,293PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),294ptm,295)296},297(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n <= 32 => {298// @TODO: Make a separate decoder for this299300let n = *n;301302let (nested, array, ptm) = PageDecoder::new(303&field.name,304pages,305ArrowDataType::FixedSizeBinary(n),306fixed_size_binary::BinaryDecoder { size: n },307init_nested,308)?309.collect(filter)?;310311let values = array312.values()313.chunks_exact(n)314.map(super::super::convert_i256)315.collect::<Vec<_>>();316let validity = array.validity().cloned();317318(319nested,320PrimitiveArray::<i256>::try_new(dtype.clone(), values.into(), validity)?.to_boxed(),321ptm,322)323},324(PhysicalType::FixedLenByteArray(n), Decimal256(_, _)) if *n > 32 => {325return Err(ParquetError::not_supported(format!(326"Can't decode Decimal256 type from Fixed Size Byte Array of len {n:?}",327)));328},329(PhysicalType::Int32, Date64) => PageDecoder::new(330&field.name,331pages,332dtype,333primitive::IntDecoder::closure(|x: i32| i64::from(x) * 86400000),334init_nested,335)?336.collect_boxed(filter)?,337(PhysicalType::Int64, Date64) => PageDecoder::new(338&field.name,339pages,340dtype,341primitive::IntDecoder::<i64, _, _>::unit(),342init_nested,343)?344.collect_boxed(filter)?,345(PhysicalType::Int64, Int64 | Time64(_) | Duration(_)) => PageDecoder::new(346&field.name,347pages,348dtype,349primitive::IntDecoder::<i64, _, _>::unit(),350init_nested,351)?352.collect_boxed(filter)?,353354(PhysicalType::Int64, UInt64) => PageDecoder::new(355&field.name,356pages,357dtype,358primitive::IntDecoder::<i64, u64, _>::cast_as(),359init_nested,360)?361.collect_boxed(filter)?,362363// Float16364(PhysicalType::FixedLenByteArray(2), Float32) => {365// @NOTE: To reduce code bloat, we just use the FixedSizeBinary decoder.366367let (nested, mut fsb_array, ptm) = PageDecoder::new(368&field.name,369pages,370ArrowDataType::FixedSizeBinary(2),371fixed_size_binary::BinaryDecoder { size: 2 },372init_nested,373)?374.collect(filter)?;375376let validity = fsb_array.take_validity();377let values = fsb_array.values().as_slice();378assert_eq!(values.len() % 2, 0);379let values = values.chunks_exact(2);380let values = values381.map(|v| {382// SAFETY: We know that `v` is always of size two.383let le_bytes: [u8; 2] = unsafe { v.try_into().unwrap_unchecked() };384let v = arrow::types::f16::from_le_bytes(le_bytes);385v.to_f32()386})387.collect();388389(390nested,391PrimitiveArray::<f32>::new(dtype, values, validity).to_boxed(),392ptm,393)394},395396(PhysicalType::Float, Float32) => PageDecoder::new(397&field.name,398pages,399dtype,400primitive::FloatDecoder::<f32, _, _>::unit(),401init_nested,402)?403.collect_boxed(filter)?,404(PhysicalType::Double, Float64) => PageDecoder::new(405&field.name,406pages,407dtype,408primitive::FloatDecoder::<f64, _, _>::unit(),409init_nested,410)?411.collect_boxed(filter)?,412// Don't compile this code with `i32` as we don't use this in polars413(PhysicalType::ByteArray, LargeBinary | LargeUtf8) => {414let is_string = matches!(dtype, LargeUtf8);415PageDecoder::new(416&field.name,417pages,418dtype,419binview::BinViewDecoder { is_string },420init_nested,421)?422.collect(filter)?423},424(_, Binary | Utf8) => unreachable!(),425(PhysicalType::ByteArray, BinaryView | Utf8View) => {426let is_string = matches!(dtype, Utf8View);427PageDecoder::new(428&field.name,429pages,430dtype,431binview::BinViewDecoder { is_string },432init_nested,433)?434.collect(filter)?435},436(_, Dictionary(key_type, value_type, _)) => {437// @NOTE: This should only hit in two cases:438// - Polars enum's and categorical's439// - Int -> String which can be turned into categoricals440assert_eq!(value_type.as_ref(), &ArrowDataType::Utf8View);441442if field.metadata.is_some_and(|md| {443md.contains_key(DTYPE_ENUM_VALUES_LEGACY)444|| md.contains_key(DTYPE_ENUM_VALUES_NEW)445|| md.contains_key(DTYPE_CATEGORICAL_NEW)446|| md.contains_key(DTYPE_CATEGORICAL_LEGACY)447}) && matches!(448key_type,449IntegerType::UInt8 | IntegerType::UInt16 | IntegerType::UInt32450) {451match key_type {452IntegerType::UInt8 => PageDecoder::new(453&field.name,454pages,455dtype,456CategoricalDecoder::<u8>::new(),457init_nested,458)?459.collect_boxed(filter)?,460IntegerType::UInt16 => PageDecoder::new(461&field.name,462pages,463dtype,464CategoricalDecoder::<u16>::new(),465init_nested,466)?467.collect_boxed(filter)?,468IntegerType::UInt32 => PageDecoder::new(469&field.name,470pages,471dtype,472CategoricalDecoder::<u32>::new(),473init_nested,474)?475.collect_boxed(filter)?,476_ => unreachable!(),477}478} else {479let (nested, array, ptm) = PageDecoder::new(480&field.name,481pages,482ArrowDataType::Utf8View,483binview::BinViewDecoder::new_string(),484init_nested,485)?486.collect(filter)?;487488(489nested,490polars_compute::cast::cast(array.as_ref(), &dtype, CastOptionsImpl::default())491.unwrap(),492ptm,493)494}495},496(from, to) => {497return Err(ParquetError::not_supported(format!(498"reading parquet type {from:?} to {to:?} still not implemented",499)));500},501})502}503504/// Unify the timestamp unit from parquet TimeUnit into arrow's TimeUnit505/// Returns (a int64 factor, is_multiplier)506fn unify_timestamp_unit(507logical_type: &Option<PrimitiveLogicalType>,508time_unit: TimeUnit,509) -> (i64, bool) {510if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type {511match (*unit, time_unit) {512(ParquetTimeUnit::Milliseconds, TimeUnit::Millisecond)513| (ParquetTimeUnit::Microseconds, TimeUnit::Microsecond)514| (ParquetTimeUnit::Nanoseconds, TimeUnit::Nanosecond) => (1, true),515516(ParquetTimeUnit::Milliseconds, TimeUnit::Second)517| (ParquetTimeUnit::Microseconds, TimeUnit::Millisecond)518| (ParquetTimeUnit::Nanoseconds, TimeUnit::Microsecond) => (1000, false),519520(ParquetTimeUnit::Microseconds, TimeUnit::Second)521| (ParquetTimeUnit::Nanoseconds, TimeUnit::Millisecond) => (1_000_000, false),522523(ParquetTimeUnit::Nanoseconds, TimeUnit::Second) => (1_000_000_000, false),524525(ParquetTimeUnit::Milliseconds, TimeUnit::Microsecond)526| (ParquetTimeUnit::Microseconds, TimeUnit::Nanosecond) => (1_000, true),527528(ParquetTimeUnit::Milliseconds, TimeUnit::Nanosecond) => (1_000_000, true),529}530} else {531(1, true)532}533}534535#[inline]536pub fn int96_to_i64_us(value: [u32; 3]) -> i64 {537const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;538const SECONDS_PER_DAY: i64 = 86_400;539const MICROS_PER_SECOND: i64 = 1_000_000;540541let day = value[2] as i64;542let microseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000;543let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;544545seconds * MICROS_PER_SECOND + microseconds546}547548#[inline]549pub fn int96_to_i64_ms(value: [u32; 3]) -> i64 {550const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;551const SECONDS_PER_DAY: i64 = 86_400;552const MILLIS_PER_SECOND: i64 = 1_000;553554let day = value[2] as i64;555let milliseconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000;556let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;557558seconds * MILLIS_PER_SECOND + milliseconds559}560561#[inline]562pub fn int96_to_i64_s(value: [u32; 3]) -> i64 {563const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;564const SECONDS_PER_DAY: i64 = 86_400;565566let day = value[2] as i64;567let seconds = (((value[1] as i64) << 32) + value[0] as i64) / 1_000_000_000;568let day_seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;569570day_seconds + seconds571}572573#[expect(clippy::too_many_arguments)]574fn timestamp(575field_name: &str,576pages: BasicDecompressor,577physical_type: &PhysicalType,578logical_type: &Option<PrimitiveLogicalType>,579dtype: ArrowDataType,580filter: Option<Filter>,581time_unit: TimeUnit,582nested: Option<Vec<InitNested>>,583) -> ParquetResult<(Option<NestedState>, Box<dyn Array>, Bitmap)> {584if physical_type == &PhysicalType::Int96 {585return match time_unit {586TimeUnit::Nanosecond => PageDecoder::new(587field_name,588pages,589dtype,590primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)),591nested,592)?593.collect_boxed(filter),594TimeUnit::Microsecond => PageDecoder::new(595field_name,596pages,597dtype,598primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)),599nested,600)?601.collect_boxed(filter),602TimeUnit::Millisecond => PageDecoder::new(603field_name,604pages,605dtype,606primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)),607nested,608)?609.collect_boxed(filter),610TimeUnit::Second => PageDecoder::new(611field_name,612pages,613dtype,614primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)),615nested,616)?617.collect_boxed(filter),618};619};620621if physical_type != &PhysicalType::Int64 {622return Err(ParquetError::not_supported(623"can't decode a timestamp from a non-int64 parquet type",624));625}626627let (factor, is_multiplier) = unify_timestamp_unit(logical_type, time_unit);628match (factor, is_multiplier) {629(1, _) => PageDecoder::new(630field_name,631pages,632dtype,633primitive::IntDecoder::<i64, _, _>::unit(),634nested,635)?636.collect_boxed(filter),637(a, true) => PageDecoder::new(638field_name,639pages,640dtype,641primitive::IntDecoder::closure(|x: i64| x * a),642nested,643)?644.collect_boxed(filter),645(a, false) => PageDecoder::new(646field_name,647pages,648dtype,649primitive::IntDecoder::closure(|x: i64| x / a),650nested,651)?652.collect_boxed(filter),653}654}655656657