Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/float.rs
8480 views
use arrow::array::PrimitiveArray;1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::datatypes::ArrowDataType;3use arrow::types::NativeType;45use super::super::utils;6use super::{ClosureDecoderFunction, DecoderFunction, PrimitiveDecoder, UnitDecoderFunction};7use crate::parquet::encoding::{Encoding, byte_stream_split, hybrid_rle};8use crate::parquet::error::ParquetResult;9use crate::parquet::page::{DataPage, DictPage, split_buffer};10use crate::parquet::types::{NativeType as ParquetNativeType, decode};11use crate::read::Filter;12use crate::read::deserialize::dictionary_encoded;13use crate::read::deserialize::utils::{14dict_indices_decoder, freeze_validity, unspecialized_decode,15};16use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};1718#[allow(clippy::large_enum_variant)]19#[derive(Debug)]20pub(crate) enum StateTranslation<'a> {21Plain(&'a [u8]),22Dictionary(hybrid_rle::HybridRleDecoder<'a>),23ByteStreamSplit(byte_stream_split::Decoder<'a>),24}2526impl<'a, P, T, D> utils::StateTranslation<'a, FloatDecoder<P, T, D>> for StateTranslation<'a>27where28T: NativeType,29P: ParquetNativeType + for<'b> TryFrom<&'b ParquetScalar>,30D: DecoderFunction<P, T>,31{32type PlainDecoder = &'a [u8];3334fn new(35_decoder: &FloatDecoder<P, T, D>,36page: &'a DataPage,37dict: Option<&'a <FloatDecoder<P, T, D> as utils::Decoder>::Dict>,38page_validity: Option<&Bitmap>,39) -> ParquetResult<Self> {40match (page.encoding(), dict) {41(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {42let values =43dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;44Ok(Self::Dictionary(values))45},46(Encoding::Plain, _) => {47let values = split_buffer(page)?.values;48Ok(Self::Plain(values))49},50(Encoding::ByteStreamSplit, _) => {51let values = split_buffer(page)?.values;52Ok(Self::ByteStreamSplit(byte_stream_split::Decoder::try_new(53values,54size_of::<P>(),55)?))56},57_ => Err(utils::not_implemented(page)),58}59}60fn num_rows(&self) -> usize {61match self {62Self::Plain(v) => v.len() / size_of::<P>(),63Self::Dictionary(i) => i.len(),64Self::ByteStreamSplit(i) => i.len(),65}66}67}6869#[derive(Debug)]70pub(crate) struct FloatDecoder<P, T, D>(PrimitiveDecoder<P, T, D>)71where72P: ParquetNativeType,73T: NativeType,74D: DecoderFunction<P, T>;7576impl<P, T, D> FloatDecoder<P, T, D>77where78P: ParquetNativeType,79T: NativeType,80D: DecoderFunction<P, T>,81{82#[inline]83fn new(decoder: D) -> Self {84Self(PrimitiveDecoder::new(decoder))85}86}8788impl<T> FloatDecoder<T, T, UnitDecoderFunction<T>>89where90T: NativeType + ParquetNativeType,91UnitDecoderFunction<T>: Default + DecoderFunction<T, T>,92{93pub(crate) fn unit() -> Self {94Self::new(UnitDecoderFunction::<T>::default())95}96}9798impl<P, T, F> FloatDecoder<P, T, ClosureDecoderFunction<P, T, F>>99where100P: ParquetNativeType,101T: NativeType,102F: Copy + Fn(P) -> T,103{104pub(crate) fn closure(f: F) -> Self {105Self::new(ClosureDecoderFunction(f, std::marker::PhantomData))106}107}108109impl<T: NativeType> utils::Decoded for (Vec<T>, BitmapBuilder) {110fn len(&self) -> usize {111self.0.len()112}113114fn extend_nulls(&mut self, n: usize) {115self.0.resize(self.0.len() + n, T::default());116self.1.extend_constant(n, false);117}118119fn remaining_capacity(&self) -> usize {120(self.0.capacity() - self.0.len()).min(self.1.capacity() - self.1.len())121}122}123124impl<P, T, D> utils::Decoder for FloatDecoder<P, T, D>125where126T: NativeType,127P: ParquetNativeType,128D: DecoderFunction<P, T>,129{130type Translation<'a> = StateTranslation<'a>;131type Dict = PrimitiveArray<T>;132type DecodedState = (Vec<T>, BitmapBuilder);133type Output = PrimitiveArray<T>;134135fn with_capacity(&self, capacity: usize) -> Self::DecodedState {136(137Vec::<T>::with_capacity(capacity),138BitmapBuilder::with_capacity(capacity),139)140}141142fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {143let values = page.buffer.as_ref();144145let mut target = Vec::with_capacity(page.num_values);146super::plain::decode(147values,148false,149None,150None,151&mut BitmapBuilder::new(),152&mut self.0.intermediate,153&mut target,154self.0.decoder,155)?;156Ok(PrimitiveArray::new(157T::PRIMITIVE.into(),158target.into(),159None,160))161}162163fn evaluate_predicate(164&mut self,165state: &utils::State<'_, Self>,166_predicate: Option<&SpecializedParquetColumnExpr>,167pred_true_mask: &mut BitmapBuilder,168dict_mask: Option<&Bitmap>,169) -> ParquetResult<bool> {170if state.page_validity.is_some() {171// @Performance: implement validity aware172return Ok(false);173}174175if let StateTranslation::Dictionary(values) = &state.translation {176let dict_mask = dict_mask.unwrap();177super::super::dictionary_encoded::predicate::decode(178values.clone(),179dict_mask,180pred_true_mask,181)?;182return Ok(true);183}184185Ok(false)186}187188fn extend_decoded(189&self,190decoded: &mut Self::DecodedState,191additional: &dyn arrow::array::Array,192is_optional: bool,193) -> ParquetResult<()> {194let additional = additional195.as_any()196.downcast_ref::<PrimitiveArray<T>>()197.unwrap();198decoded.0.extend(additional.values().iter().copied());199match additional.validity() {200Some(v) => decoded.1.extend_from_bitmap(v),201None if is_optional => decoded.1.extend_constant(additional.len(), true),202None => {},203}204205Ok(())206}207208fn extend_filtered_with_state(209&mut self,210mut state: utils::State<'_, Self>,211decoded: &mut Self::DecodedState,212filter: Option<Filter>,213_chunks: &mut Vec<Self::Output>,214) -> ParquetResult<()> {215match state.translation {216StateTranslation::Plain(ref mut values) => super::plain::decode(217values,218state.is_optional,219state.page_validity.as_ref(),220filter,221&mut decoded.1,222&mut self.0.intermediate,223&mut decoded.0,224self.0.decoder,225),226StateTranslation::Dictionary(ref mut indexes) => dictionary_encoded::decode_dict(227indexes.clone(),228state.dict.unwrap().values().as_slice(),229state.is_optional,230state.page_validity.as_ref(),231filter,232&mut decoded.1,233&mut decoded.0,234),235StateTranslation::ByteStreamSplit(mut decoder) => {236let num_rows = decoder.len();237let mut iter = decoder.iter_converted(|v| self.0.decoder.decode(decode(v)));238239unspecialized_decode(240num_rows,241|| Ok(iter.next().unwrap()),242filter,243state.page_validity,244state.is_optional,245&mut decoded.1,246&mut decoded.0,247)248},249}250}251252fn extend_constant(253&mut self,254decoded: &mut Self::DecodedState,255length: usize,256value: &ParquetScalar,257) -> ParquetResult<()> {258self.0.extend_constant(decoded, length, value)259}260261fn finalize(262&self,263dtype: ArrowDataType,264_dict: Option<Self::Dict>,265(values, validity): Self::DecodedState,266) -> ParquetResult<Self::Output> {267let validity = freeze_validity(validity);268Ok(PrimitiveArray::try_new(dtype, values.into(), validity).unwrap())269}270}271272273