Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
8475 views
use arrow::array::{BooleanArray, Splitable};1use arrow::bitmap::bitmask::BitMask;2use arrow::bitmap::utils::BitmapIter;3use arrow::bitmap::{Bitmap, BitmapBuilder};4use arrow::datatypes::ArrowDataType;5use polars_compute::filter::filter_boolean_kernel;67use super::Filter;8use super::dictionary_encoded::{append_validity, constrain_page_validity};9use super::utils::{10self, Decoded, Decoder, decode_hybrid_rle_into_bitmap, filter_from_range, freeze_validity,11};12use crate::parquet::encoding::Encoding;13use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};14use crate::parquet::error::ParquetResult;15use crate::parquet::page::{DataPage, DictPage, split_buffer};16use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};1718#[allow(clippy::large_enum_variant)]19#[derive(Debug)]20pub(crate) enum StateTranslation<'a> {21Plain(BitMask<'a>),22Rle(HybridRleDecoder<'a>),23}2425impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {26type PlainDecoder = BitmapIter<'a>;2728fn new(29_decoder: &BooleanDecoder,30page: &'a DataPage,31_dict: Option<&'a <BooleanDecoder as Decoder>::Dict>,32page_validity: Option<&Bitmap>,33) -> ParquetResult<Self> {34let values = split_buffer(page)?.values;3536match page.encoding() {37Encoding::Plain => {38let max_num_values = values.len() * u8::BITS as usize;39let num_values = if page_validity.is_some() {40// @NOTE: We overestimate the amount of values here, but in the V141// specification we don't really have a way to know the number of valid items.42// Without traversing the list.43max_num_values44} else {45// @NOTE: We cannot really trust the value from this as it might relate to the46// number of top-level nested values. Therefore, we do a `min` with the maximum47// number of possible values.48usize::min(page.num_values(), max_num_values)49};5051Ok(Self::Plain(BitMask::new(values, 0, num_values)))52},53Encoding::Rle => {54// @NOTE: For a nullable list, we might very well overestimate the amount of55// values, but we never collect those items. We don't really have a way to know the56// number of valid items in the V1 specification.5758// For RLE boolean values the length in bytes is pre-pended.59// https://github.com/apache/parquet-format/blob/e517ac4dbe08d518eb5c2e58576d4c711973db94/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--360let (_len_in_bytes, values) = values.split_at(4);61Ok(Self::Rle(HybridRleDecoder::new(62values,631,64page.num_values(),65)))66},67_ => Err(utils::not_implemented(page)),68}69}70fn num_rows(&self) -> usize {71match self {72Self::Plain(m) => m.len(),73Self::Rle(m) => m.len(),74}75}76}7778fn decode_required_rle(79values: HybridRleDecoder<'_>,80limit: Option<usize>,81target: &mut BitmapBuilder,82) -> ParquetResult<()> {83decode_hybrid_rle_into_bitmap(values, limit, target)?;84Ok(())85}8687fn decode_optional_rle(88values: HybridRleDecoder<'_>,89target: &mut BitmapBuilder,90page_validity: &Bitmap,91) -> ParquetResult<()> {92debug_assert!(page_validity.set_bits() <= values.len());9394if page_validity.unset_bits() == 0 {95return decode_required_rle(values, Some(page_validity.len()), target);96}9798target.reserve(page_validity.len());99100let mut validity_mask = BitMask::from_bitmap(page_validity);101102for chunk in values.into_chunk_iter() {103let chunk = chunk?;104105match chunk {106HybridRleChunk::Rle(value, size) => {107let offset = validity_mask108.nth_set_bit_idx(size, 0)109.unwrap_or(validity_mask.len());110111let t;112(t, validity_mask) = validity_mask.split_at(offset);113114target.extend_constant(t.len(), value != 0);115},116HybridRleChunk::Bitpacked(decoder) => {117let decoder_slice = decoder.as_slice();118let offset = validity_mask119.nth_set_bit_idx(decoder.len(), 0)120.unwrap_or(validity_mask.len());121122let decoder_validity;123(decoder_validity, validity_mask) = validity_mask.split_at(offset);124125let mut offset = 0;126let mut validity_iter = decoder_validity.iter();127while validity_iter.num_remaining() > 0 {128let num_valid = validity_iter.take_leading_ones();129target.extend_from_slice(decoder_slice, offset, num_valid);130offset += num_valid;131132let num_invalid = validity_iter.take_leading_zeros();133target.extend_constant(num_invalid, false);134}135},136}137}138139if cfg!(debug_assertions) {140assert_eq!(validity_mask.set_bits(), 0);141}142target.extend_constant(validity_mask.len(), false);143144Ok(())145}146147fn decode_masked_required_rle(148values: HybridRleDecoder<'_>,149target: &mut BitmapBuilder,150mask: &Bitmap,151) -> ParquetResult<()> {152debug_assert!(mask.len() <= values.len());153154if mask.unset_bits() == 0 {155return decode_required_rle(values, Some(mask.len()), target);156}157158let mut im_target = BitmapBuilder::new();159decode_required_rle(values, Some(mask.len()), &mut im_target)?;160161target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));162163Ok(())164}165166fn decode_masked_optional_rle(167values: HybridRleDecoder<'_>,168target: &mut BitmapBuilder,169page_validity: &Bitmap,170mask: &Bitmap,171) -> ParquetResult<()> {172debug_assert_eq!(page_validity.len(), mask.len());173debug_assert!(mask.len() <= values.len());174175if mask.unset_bits() == 0 {176return decode_optional_rle(values, target, page_validity);177}178179if page_validity.unset_bits() == 0 {180return decode_masked_required_rle(values, target, mask);181}182183let mut im_target = BitmapBuilder::new();184decode_optional_rle(values, &mut im_target, page_validity)?;185186target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));187188Ok(())189}190191fn decode_required_plain(values: BitMask<'_>, target: &mut BitmapBuilder) -> ParquetResult<()> {192target.extend_from_bitmask(values);193Ok(())194}195196fn decode_optional_plain(197mut values: BitMask<'_>,198target: &mut BitmapBuilder,199mut page_validity: Bitmap,200) -> ParquetResult<()> {201debug_assert!(page_validity.set_bits() <= values.len());202203if page_validity.unset_bits() == 0 {204return decode_required_plain(values.sliced(0, page_validity.len()), target);205}206207target.reserve(page_validity.len());208209while !page_validity.is_empty() {210let num_valid = page_validity.take_leading_ones();211let iv;212(iv, values) = values.split_at(num_valid);213target.extend_from_bitmask(iv);214215let num_invalid = page_validity.take_leading_zeros();216target.extend_constant(num_invalid, false);217}218219Ok(())220}221222fn decode_masked_required_plain(223mut values: BitMask,224target: &mut BitmapBuilder,225mut mask: Bitmap,226) -> ParquetResult<()> {227debug_assert!(mask.len() <= values.len());228229let leading_zeros = mask.take_leading_zeros();230mask.take_trailing_zeros();231232values = values.sliced(leading_zeros, mask.len());233234if mask.unset_bits() == 0 {235return decode_required_plain(values, target);236}237238let mut im_target = BitmapBuilder::new();239decode_required_plain(values, &mut im_target)?;240241target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));242243Ok(())244}245246fn decode_masked_optional_plain(247mut values: BitMask<'_>,248target: &mut BitmapBuilder,249mut page_validity: Bitmap,250mut mask: Bitmap,251) -> ParquetResult<()> {252debug_assert_eq!(page_validity.len(), mask.len());253debug_assert!(page_validity.set_bits() <= values.len());254255let leading_zeros = mask.take_leading_zeros();256mask.take_trailing_zeros();257258let (skipped, truncated);259(skipped, page_validity) = page_validity.split_at(leading_zeros);260(page_validity, truncated) = page_validity.split_at(mask.len());261262let skipped_values = skipped.set_bits();263let truncated_values = truncated.set_bits();264values = values.sliced(265skipped_values,266values.len() - skipped_values - truncated_values,267);268269if mask.unset_bits() == 0 {270return decode_optional_plain(values, target, page_validity);271}272273if page_validity.unset_bits() == 0 {274return decode_masked_required_plain(values, target, mask);275}276277let mut im_target = BitmapBuilder::new();278decode_optional_plain(values, &mut im_target, page_validity)?;279280target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));281282Ok(())283}284285impl Decoded for (BitmapBuilder, BitmapBuilder) {286fn len(&self) -> usize {287self.0.len()288}289290fn extend_nulls(&mut self, n: usize) {291self.0.extend_constant(n, false);292self.1.extend_constant(n, false);293}294295fn remaining_capacity(&self) -> usize {296self.0.capacity().min(self.1.capacity())297}298}299300pub(crate) struct BooleanDecoder;301302impl Decoder for BooleanDecoder {303type Translation<'a> = StateTranslation<'a>;304type Dict = BooleanArray;305type DecodedState = (BitmapBuilder, BitmapBuilder);306type Output = BooleanArray;307308fn with_capacity(&self, capacity: usize) -> Self::DecodedState {309(310BitmapBuilder::with_capacity(capacity),311BitmapBuilder::with_capacity(capacity),312)313}314315fn deserialize_dict(&mut self, _: DictPage) -> ParquetResult<Self::Dict> {316Ok(BooleanArray::new_empty(ArrowDataType::Boolean))317}318319fn finalize(320&self,321dtype: ArrowDataType,322_dict: Option<Self::Dict>,323(values, validity): Self::DecodedState,324) -> ParquetResult<Self::Output> {325let validity = freeze_validity(validity);326Ok(BooleanArray::new(dtype, values.freeze(), validity))327}328329fn evaluate_predicate(330&mut self,331_state: &utils::State<'_, Self>,332_predicate: Option<&SpecializedParquetColumnExpr>,333_pred_true_mask: &mut BitmapBuilder,334_dict_mask: Option<&Bitmap>,335) -> ParquetResult<bool> {336// @TODO: This can be enabled for the fast paths337Ok(false)338}339340fn extend_decoded(341&self,342decoded: &mut Self::DecodedState,343additional: &dyn arrow::array::Array,344is_optional: bool,345) -> ParquetResult<()> {346let additional = additional.as_any().downcast_ref::<BooleanArray>().unwrap();347decoded.0.extend_from_bitmap(additional.values());348match additional.validity() {349Some(v) => decoded.1.extend_from_bitmap(v),350None if is_optional => decoded.1.extend_constant(additional.len(), true),351None => {},352}353354Ok(())355}356357fn extend_filtered_with_state(358&mut self,359state: utils::State<'_, Self>,360(target, validity): &mut Self::DecodedState,361filter: Option<super::Filter>,362_chunks: &mut Vec<Self::Output>,363) -> ParquetResult<()> {364match state.translation {365StateTranslation::Plain(values) => {366if state.is_optional {367append_validity(368state.page_validity.as_ref(),369filter.as_ref(),370validity,371values.len(),372);373}374375let page_validity = constrain_page_validity(376values.len(),377state.page_validity.as_ref(),378filter.as_ref(),379);380381match (filter, page_validity) {382(None, None) => decode_required_plain(values, target),383(Some(Filter::Range(rng)), None) => {384decode_required_plain(values.sliced(rng.start, rng.len()), target)385},386(None, Some(page_validity)) => {387decode_optional_plain(values, target, page_validity)388},389(Some(Filter::Range(rng)), Some(mut page_validity)) => {390let (skipped, truncated);391(skipped, page_validity) = page_validity.split_at(rng.start);392(page_validity, truncated) = page_validity.split_at(rng.len());393394let skipped_values = skipped.set_bits();395let truncated_values = truncated.set_bits();396let values = values.sliced(397skipped_values,398values.len() - skipped_values - truncated_values,399);400401decode_optional_plain(values, target, page_validity)402},403(Some(Filter::Mask(mask)), None) => {404decode_masked_required_plain(values, target, mask)405},406(Some(Filter::Mask(mask)), Some(page_validity)) => {407decode_masked_optional_plain(values, target, page_validity, mask)408},409(Some(Filter::Predicate(_)), _) => todo!(),410}?;411412Ok(())413},414StateTranslation::Rle(values) => {415if state.is_optional {416append_validity(417state.page_validity.as_ref(),418filter.as_ref(),419validity,420values.len(),421);422}423424let page_validity = constrain_page_validity(425values.len(),426state.page_validity.as_ref(),427filter.as_ref(),428);429430match (filter, page_validity) {431(None, None) => decode_required_rle(values, None, target),432(Some(Filter::Range(rng)), None) if rng.start == 0 => {433decode_required_rle(values, Some(rng.end), target)434},435(None, Some(page_validity)) => {436decode_optional_rle(values, target, &page_validity)437},438(Some(Filter::Range(rng)), Some(page_validity)) if rng.start == 0 => {439decode_optional_rle(values, target, &page_validity)440},441(Some(Filter::Mask(filter)), None) => {442decode_masked_required_rle(values, target, &filter)443},444(Some(Filter::Mask(filter)), Some(page_validity)) => {445decode_masked_optional_rle(values, target, &page_validity, &filter)446},447(Some(Filter::Range(rng)), None) => {448decode_masked_required_rle(values, target, &filter_from_range(rng))449},450(Some(Filter::Range(rng)), Some(page_validity)) => decode_masked_optional_rle(451values,452target,453&page_validity,454&filter_from_range(rng),455),456(Some(Filter::Predicate(_)), _) => todo!(),457}?;458459Ok(())460},461}462}463464fn extend_constant(465&mut self,466decoded: &mut Self::DecodedState,467length: usize,468value: &ParquetScalar,469) -> ParquetResult<()> {470if value.is_null() {471decoded.extend_nulls(length);472return Ok(());473}474475let value = value.as_bool().unwrap();476477decoded.0.extend_constant(length, value);478decoded.1.extend_constant(length, true);479480Ok(())481}482}483484485