Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/binary/mod.rs
8475 views
use arrow::array::{Array, BinaryArray};1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::datatypes::ArrowDataType;3use arrow::offset::OffsetsBuffer;4use polars_buffer::Buffer;5use polars_compute::filter::filter_with_bitmap;67use super::utils::dict_indices_decoder;8use super::{Filter, PredicateFilter};9use crate::parquet::encoding::{Encoding, hybrid_rle};10use crate::parquet::error::ParquetResult;11use crate::parquet::page::{DataPage, DictPage, split_buffer};12use crate::read::deserialize::utils::{self};13use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};1415mod dictionary;16mod plain;1718type DecodedStateTuple = (Vec<u8>, Vec<i64>);1920impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> {21type PlainDecoder = BinaryIter<'a>;2223fn new(24_decoder: &BinaryDecoder,25page: &'a DataPage,26dict: Option<&'a <BinaryDecoder as utils::Decoder>::Dict>,27page_validity: Option<&Bitmap>,28) -> ParquetResult<Self> {29match (page.encoding(), dict) {30(Encoding::Plain, _) => {31let values = split_buffer(page)?.values;32let values = BinaryIter::new(values, page.num_values());3334Ok(Self::Plain(values))35},36(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {37let values =38dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;39Ok(Self::Dictionary(values))40},41_ => Err(utils::not_implemented(page)),42}43}4445fn num_rows(&self) -> usize {46match self {47StateTranslation::Plain(i) => i.max_num_values,48StateTranslation::Dictionary(i) => i.len(),49}50}51}5253pub struct BinaryDecoder;5455#[allow(clippy::large_enum_variant)]56#[derive(Debug)]57pub(crate) enum StateTranslation<'a> {58Plain(BinaryIter<'a>),59Dictionary(hybrid_rle::HybridRleDecoder<'a>),60}6162impl utils::Decoded for DecodedStateTuple {63fn len(&self) -> usize {64self.1.len().saturating_sub(1)65}6667fn extend_nulls(&mut self, n: usize) {68let last = *self.1.last().unwrap();69self.1.extend(std::iter::repeat_n(last, n));70}7172fn remaining_capacity(&self) -> usize {73self.1.capacity() - self.1.len()74}75}7677impl utils::Decoder for BinaryDecoder {78type Translation<'a> = StateTranslation<'a>;79type Dict = BinaryArray<i64>;80type DecodedState = DecodedStateTuple;81type Output = BinaryArray<i64>;8283const CHUNKED: bool = true;8485fn with_capacity(&self, _capacity: usize) -> Self::DecodedState {86// Handled in extend87(Vec::new(), Vec::new())88}8990fn evaluate_dict_predicate(91&self,92dict: &Self::Dict,93predicate: &PredicateFilter,94) -> ParquetResult<Bitmap> {95Ok(predicate.predicate.evaluate(dict as &dyn Array))96}9798fn evaluate_predicate(99&mut self,100_state: &utils::State<'_, Self>,101_predicate: Option<&SpecializedParquetColumnExpr>,102_pred_true_mask: &mut BitmapBuilder,103_dict_mask: Option<&Bitmap>,104) -> ParquetResult<bool> {105Ok(false)106}107108fn apply_dictionary(109&mut self,110_decoded: &mut Self::DecodedState,111_dict: &Self::Dict,112) -> ParquetResult<()> {113Ok(())114}115116fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {117let values = &page.buffer;118let num_values = page.num_values;119let mut target = Vec::<u8>::new();120let mut offsets = Vec::<i64>::with_capacity(page.num_values + 1);121122plain::decode_plain(values, num_values, &mut target, &mut offsets)?;123124let values = target.into();125let offsets = offsets.into();126let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };127let arr = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);128129Ok(arr)130}131132fn extend_decoded(133&self,134decoded: &mut Self::DecodedState,135additional: &dyn Array,136is_optional: bool,137) -> ParquetResult<()> {138assert!(!is_optional);139140let array = additional141.as_any()142.downcast_ref::<BinaryArray<i64>>()143.unwrap();144145let offsets = array.offsets();146let fst = *offsets.first();147let lst = *offsets.last();148let current_lst = *decoded.1.last().unwrap();149decoded150.0151.extend_from_slice(&array.values()[fst as usize..lst as usize]);152decoded153.1154.extend(offsets.iter().map(|o| o + current_lst - fst));155156Ok(())157}158159fn extend_filtered_with_state(160&mut self,161mut state: utils::State<'_, Self>,162_decoded: &mut Self::DecodedState,163filter: Option<super::Filter>,164chunks: &mut Vec<Self::Output>,165) -> ParquetResult<()> {166if state.page_validity.is_some() || matches!(filter, Some(Filter::Predicate(_))) {167// Currently we only use BinaryArray for internal (de)serialization, so this is a168// limited implementation to save effort.169unimplemented!()170}171172let mut target = Vec::new();173let mut offsets =174Vec::with_capacity(utils::StateTranslation::num_rows(&state.translation) + 1);175176match state.translation {177StateTranslation::Plain(iter) => {178plain::decode_plain(iter.values, iter.max_num_values, &mut target, &mut offsets)?179},180StateTranslation::Dictionary(ref mut indexes) => {181let dict = state.dict.unwrap();182dictionary::decode_dictionary(indexes.clone(), &mut target, &mut offsets, dict)?;183},184}185186if let Some(Filter::Range(slice)) = &filter187&& (slice.start > 0 || slice.end < offsets.len() - 1)188{189let mut new_target = Vec::new();190let mut new_offsets = Vec::new();191192let buffer_start = offsets[slice.start] as usize;193let buffer_end = offsets[slice.end.min(offsets.len() - 1)] as usize;194195new_target.extend(target.drain(buffer_start..buffer_end));196new_offsets.extend(offsets.drain(slice.start..slice.end.min(offsets.len() - 1)));197198target = new_target;199offsets = new_offsets;200}201202let values = target.into();203let offsets = offsets.into();204let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };205206let mut array = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);207208if let Some(Filter::Mask(mask)) = &filter {209array = filter_with_bitmap(&array, mask)210.as_any()211.downcast_ref::<BinaryArray<i64>>()212.unwrap()213.clone();214}215216chunks.push(array);217218Ok(())219}220221fn finalize(222&self,223dtype: ArrowDataType,224_dict: Option<Self::Dict>,225(values, mut offsets): Self::DecodedState,226) -> ParquetResult<Self::Output> {227assert!(values.is_empty());228assert!(offsets.is_empty());229offsets.push(0);230let values: Buffer<u8> = values.into();231let offsets: OffsetsBuffer<i64> = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };232Ok(BinaryArray::<i64>::new(dtype, offsets, values, None))233}234235fn extend_constant(236&mut self,237_decoded: &mut Self::DecodedState,238_length: usize,239_value: &ParquetScalar,240) -> ParquetResult<()> {241todo!()242}243}244245#[derive(Debug)]246pub struct BinaryIter<'a> {247values: &'a [u8],248249/// A maximum number of items that this [`BinaryIter`] may produce.250///251/// This equal the length of the iterator i.f.f. the data encoded by the [`BinaryIter`] is not252/// nullable.253max_num_values: usize,254}255256impl<'a> BinaryIter<'a> {257pub fn new(values: &'a [u8], max_num_values: usize) -> Self {258Self {259values,260max_num_values,261}262}263}264265impl<'a> Iterator for BinaryIter<'a> {266type Item = &'a [u8];267268#[inline]269fn next(&mut self) -> Option<Self::Item> {270if self.max_num_values == 0 {271assert!(self.values.is_empty());272return None;273}274275let (length, remaining) = self.values.split_at(4);276let length: [u8; 4] = unsafe { length.try_into().unwrap_unchecked() };277let length = u32::from_le_bytes(length) as usize;278let (result, remaining) = remaining.split_at(length);279self.max_num_values -= 1;280self.values = remaining;281Some(result)282}283284#[inline]285fn size_hint(&self) -> (usize, Option<usize>) {286(0, Some(self.max_num_values))287}288}289290291