Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs
8450 views
use arrow::array::PrimitiveArray;1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::datatypes::ArrowDataType;3use arrow::types::{AlignedBytes, NativeType};4use bytemuck::Zeroable;56use super::super::utils;7use super::{8AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, IntoDecoderFunction,9PrimitiveDecoder, UnitDecoderFunction,10};11use crate::parquet::encoding::{Encoding, byte_stream_split, delta_bitpacked, hybrid_rle};12use crate::parquet::error::ParquetResult;13use crate::parquet::page::{DataPage, DictPage, split_buffer};14use crate::parquet::types::{NativeType as ParquetNativeType, decode};15use crate::read::Filter;16use crate::read::deserialize::dictionary_encoded;17use crate::read::deserialize::utils::array_chunks::ArrayChunks;18use crate::read::deserialize::utils::{19dict_indices_decoder, freeze_validity, unspecialized_decode,20};21use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};2223#[allow(clippy::large_enum_variant)]24#[derive(Debug)]25pub(crate) enum StateTranslation<'a> {26Plain(&'a [u8]),27Dictionary(hybrid_rle::HybridRleDecoder<'a>),28ByteStreamSplit(byte_stream_split::Decoder<'a>),29DeltaBinaryPacked(delta_bitpacked::Decoder<'a>),30}3132impl<'a, P, T, D> utils::StateTranslation<'a, IntDecoder<P, T, D>> for StateTranslation<'a>33where34T: NativeType,35P: ParquetNativeType,36i64: num_traits::AsPrimitive<P>,37D: DecoderFunction<P, T>,38{39type PlainDecoder = &'a [u8];4041fn new(42_decoder: &IntDecoder<P, T, D>,43page: &'a DataPage,44dict: Option<&'a <IntDecoder<P, T, D> as utils::Decoder>::Dict>,45page_validity: Option<&Bitmap>,46) -> ParquetResult<Self> {47match (page.encoding(), dict) {48(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {49let values =50dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;51Ok(Self::Dictionary(values))52},53(Encoding::Plain, _) => {54let values = split_buffer(page)?.values;55Ok(Self::Plain(values))56},57(Encoding::ByteStreamSplit, _) => {58let values = split_buffer(page)?.values;59Ok(Self::ByteStreamSplit(byte_stream_split::Decoder::try_new(60values,61size_of::<P>(),62)?))63},64(Encoding::DeltaBinaryPacked, _) => {65let values = split_buffer(page)?.values;66Ok(Self::DeltaBinaryPacked(67delta_bitpacked::Decoder::try_new(values)?.0,68))69},70_ => Err(utils::not_implemented(page)),71}72}73fn num_rows(&self) -> usize {74match self {75Self::Plain(v) => v.len() / size_of::<P>(),76Self::Dictionary(i) => i.len(),77Self::ByteStreamSplit(i) => i.len(),78Self::DeltaBinaryPacked(i) => i.len(),79}80}81}8283/// Decoder of integer parquet type84#[derive(Debug)]85pub(crate) struct IntDecoder<P, T, D>(PrimitiveDecoder<P, T, D>)86where87T: NativeType,88P: ParquetNativeType,89i64: num_traits::AsPrimitive<P>,90D: DecoderFunction<P, T>;9192impl<P, T, D> IntDecoder<P, T, D>93where94P: ParquetNativeType,95T: NativeType,96i64: num_traits::AsPrimitive<P>,97D: DecoderFunction<P, T>,98{99#[inline]100fn new(decoder: D) -> Self {101Self(PrimitiveDecoder::new(decoder))102}103}104105impl<T> IntDecoder<T, T, UnitDecoderFunction<T>>106where107T: NativeType + ParquetNativeType,108i64: num_traits::AsPrimitive<T>,109UnitDecoderFunction<T>: Default + DecoderFunction<T, T>,110{111pub(crate) fn unit() -> Self {112Self::new(UnitDecoderFunction::<T>::default())113}114}115116impl<P, T> IntDecoder<P, T, AsDecoderFunction<P, T>>117where118P: ParquetNativeType,119T: NativeType,120i64: num_traits::AsPrimitive<P>,121AsDecoderFunction<P, T>: Default + DecoderFunction<P, T>,122{123pub(crate) fn cast_as() -> Self {124Self::new(AsDecoderFunction::<P, T>::default())125}126}127128impl<P, T> IntDecoder<P, T, IntoDecoderFunction<P, T>>129where130P: ParquetNativeType,131T: NativeType,132i64: num_traits::AsPrimitive<P>,133IntoDecoderFunction<P, T>: Default + DecoderFunction<P, T>,134{135pub(crate) fn cast_into() -> Self {136Self::new(IntoDecoderFunction::<P, T>::default())137}138}139140impl<P, T, F> IntDecoder<P, T, ClosureDecoderFunction<P, T, F>>141where142P: ParquetNativeType,143T: NativeType,144i64: num_traits::AsPrimitive<P>,145F: Copy + Fn(P) -> T,146{147pub(crate) fn closure(f: F) -> Self {148Self::new(ClosureDecoderFunction(f, std::marker::PhantomData))149}150}151152impl<P, T, D> utils::Decoder for IntDecoder<P, T, D>153where154T: NativeType,155P: ParquetNativeType,156i64: num_traits::AsPrimitive<P>,157D: DecoderFunction<P, T>,158{159type Translation<'a> = StateTranslation<'a>;160type Dict = PrimitiveArray<T>;161type DecodedState = (Vec<T>, BitmapBuilder);162type Output = PrimitiveArray<T>;163164fn with_capacity(&self, capacity: usize) -> Self::DecodedState {165(166Vec::<T>::with_capacity(capacity),167BitmapBuilder::with_capacity(capacity),168)169}170171fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {172let values = page.buffer.as_ref();173174let mut target = Vec::with_capacity(page.num_values);175super::plain::decode(176values,177false,178None,179None,180&mut BitmapBuilder::new(),181&mut self.0.intermediate,182&mut target,183self.0.decoder,184)?;185Ok(PrimitiveArray::new(186T::PRIMITIVE.into(),187target.into(),188None,189))190}191192fn evaluate_predicate(193&mut self,194state: &utils::State<'_, Self>,195predicate: Option<&SpecializedParquetColumnExpr>,196pred_true_mask: &mut BitmapBuilder,197dict_mask: Option<&Bitmap>,198) -> ParquetResult<bool> {199// @Performance: This should be added200if state.page_validity.is_some() {201return Ok(false);202}203204if let StateTranslation::Dictionary(values) = &state.translation {205let dict_mask = dict_mask.unwrap();206super::super::dictionary_encoded::predicate::decode(207values.clone(),208dict_mask,209pred_true_mask,210)?;211return Ok(true);212}213214if !D::CAN_TRANSMUTE || D::NEED_TO_DECODE {215return Ok(false);216}217218let Some(predicate) = predicate else {219return Ok(false);220};221222use SpecializedParquetColumnExpr as S;223match (&state.translation, predicate) {224(StateTranslation::Plain(values), S::Equal(needle)) => {225let values = ArrayChunks::new(values).unwrap();226let needle = needle.to_aligned_bytes::<T::AlignedBytes>().unwrap();227super::plain::predicate::decode_equals(values, needle, pred_true_mask);228},229(StateTranslation::Plain(values), S::Between(low, high)) => {230let values = ArrayChunks::new(values).unwrap();231use arrow::types::PrimitiveType as PT;232let is_signed = match T::PRIMITIVE {233PT::Int8 | PT::Int16 | PT::Int32 | PT::Int64 => true,234PT::UInt8 | PT::UInt16 | PT::UInt32 | PT::UInt64 => false,235PT::Int128236| PT::Int256237| PT::UInt128238| PT::Float16239| PT::Float32240| PT::Float64241| PT::DaysMs242| PT::MonthDayNano243| PT::MonthDayMillis => return Ok(false),244};245246let Some(low) = low.to_aligned_bytes::<T::AlignedBytes>() else {247return Ok(false);248};249let Some(high) = high.to_aligned_bytes::<T::AlignedBytes>() else {250return Ok(false);251};252253let mut low1 = low;254let mut high1 = high;255let mut low2 = low;256let mut high2 = high;257258if is_signed && !low.unsigned_leq(high) {259low1 = low;260high1 = T::AlignedBytes::ones();261262low2 = T::AlignedBytes::zeros();263high2 = high;264}265266super::plain::predicate::decode_between(267values,268low1,269high1,270low2,271high2,272pred_true_mask,273);274},275(StateTranslation::Plain(values), S::EqualOneOf(needles))276if (1..=8).contains(&needles.len()) =>277{278let values = ArrayChunks::new(values).unwrap();279let mut needles_array = [<T::AlignedBytes>::zeroed(); 8];280for i in 0..8 {281needles_array[i] = needles[i.min(needles.len() - 1)]282.to_aligned_bytes::<T::AlignedBytes>()283.unwrap();284}285super::plain::predicate::decode_is_in(values, &needles_array, pred_true_mask);286},287_ => return Ok(false),288}289290Ok(true)291}292293fn finalize(294&self,295dtype: ArrowDataType,296_dict: Option<Self::Dict>,297(values, validity): Self::DecodedState,298) -> ParquetResult<Self::Output> {299let validity = freeze_validity(validity);300Ok(PrimitiveArray::try_new(dtype, values.into(), validity).unwrap())301}302303fn extend_decoded(304&self,305decoded: &mut Self::DecodedState,306additional: &dyn arrow::array::Array,307is_optional: bool,308) -> ParquetResult<()> {309let additional = additional310.as_any()311.downcast_ref::<PrimitiveArray<T>>()312.unwrap();313decoded.0.extend(additional.values().iter().copied());314match additional.validity() {315Some(v) => decoded.1.extend_from_bitmap(v),316None if is_optional => decoded.1.extend_constant(additional.len(), true),317None => {},318}319320Ok(())321}322323fn extend_constant(324&mut self,325decoded: &mut Self::DecodedState,326length: usize,327value: &ParquetScalar,328) -> ParquetResult<()> {329self.0.extend_constant(decoded, length, value)330}331332fn extend_filtered_with_state(333&mut self,334mut state: utils::State<'_, Self>,335decoded: &mut Self::DecodedState,336filter: Option<Filter>,337_chunks: &mut Vec<Self::Output>,338) -> ParquetResult<()> {339match state.translation {340StateTranslation::Plain(ref mut values) => super::plain::decode(341values,342state.is_optional,343state.page_validity.as_ref(),344filter,345&mut decoded.1,346&mut self.0.intermediate,347&mut decoded.0,348self.0.decoder,349),350StateTranslation::Dictionary(ref mut indexes) => dictionary_encoded::decode_dict(351indexes.clone(),352state.dict.unwrap().values().as_slice(),353state.is_optional,354state.page_validity.as_ref(),355filter,356&mut decoded.1,357&mut decoded.0,358),359StateTranslation::ByteStreamSplit(mut decoder) => {360let num_rows = decoder.len();361let mut iter = decoder.iter_converted(|v| self.0.decoder.decode(decode(v)));362363unspecialized_decode(364num_rows,365|| Ok(iter.next().unwrap()),366filter,367state.page_validity,368state.is_optional,369&mut decoded.1,370&mut decoded.0,371)372},373StateTranslation::DeltaBinaryPacked(decoder) => {374let num_rows = decoder.len();375let values = decoder.collect::<Vec<i64>>()?;376377let mut i = 0;378unspecialized_decode(379num_rows,380|| {381use num_traits::AsPrimitive;382let value = values[i];383i += 1;384Ok(self.0.decoder.decode(value.as_()))385},386filter,387state.page_validity,388state.is_optional,389&mut decoded.1,390&mut decoded.0,391)392},393}394}395}396397398