Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs
8480 views
pub(crate) mod array_chunks;1pub(crate) mod filter;23use std::fmt;4use std::ops::Range;5use std::sync::OnceLock;67use arrow::array::{Array, IntoBoxedArray, Splitable};8use arrow::bitmap::{Bitmap, BitmapBuilder};9use arrow::datatypes::ArrowDataType;10use arrow::pushable::Pushable;11use polars_compute::filter::filter_boolean_kernel;12use polars_utils::pl_str::PlSmallStr;1314use self::filter::Filter;15use super::{BasicDecompressor, InitNested, NestedState, PredicateFilter};16use crate::parquet::encoding::hybrid_rle::{self, HybridRleChunk, HybridRleDecoder};17use crate::parquet::error::{ParquetError, ParquetResult};18use crate::parquet::page::{DataPage, DictPage, split_buffer};19use crate::parquet::schema::Repetition;20use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};2122#[derive(Debug)]23pub(crate) struct State<'a, D: Decoder> {24pub(crate) dict: Option<&'a D::Dict>,25pub(crate) is_optional: bool,26pub(crate) page_validity: Option<Bitmap>,27pub(crate) translation: D::Translation<'a>,28}2930pub(crate) trait StateTranslation<'a, D: Decoder>: Sized {31type PlainDecoder;3233fn new(34decoder: &D,35page: &'a DataPage,36dict: Option<&'a D::Dict>,37page_validity: Option<&Bitmap>,38) -> ParquetResult<Self>;39fn num_rows(&self) -> usize;40}4142impl<'a, D: Decoder> State<'a, D> {43pub fn new(decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>) -> ParquetResult<Self> {44let is_optional =45page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;4647let mut page_validity = None;4849// Make the page_validity None if there are no nulls in the page50if is_optional && page.null_count().is_none_or(|nc| nc != 0) {51let pv = page_validity_decoder(page)?;52page_validity = decode_page_validity(pv, None)?;53}5455let translation = D::Translation::new(decoder, page, dict, page_validity.as_ref())?;5657Ok(Self {58dict,59is_optional,60page_validity,61translation,62})63}6465pub fn new_nested(66decoder: &D,67page: &'a DataPage,68dict: Option<&'a D::Dict>,69mut page_validity: Option<Bitmap>,70) -> ParquetResult<Self> {71let translation = D::Translation::new(decoder, page, dict, None)?;7273let is_optional =74page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;7576if page_validity77.as_ref()78.is_some_and(|bm| bm.unset_bits() == 0)79{80page_validity = None;81}8283Ok(Self {84dict,85translation,86is_optional,87page_validity,88})89}9091pub fn decode(92self,93decoder: &mut D,94decoded: &mut D::DecodedState,95filter: Option<Filter>,96chunks: &mut Vec<D::Output>,97) -> ParquetResult<()> {98decoder.extend_filtered_with_state(self, decoded, filter, chunks)99}100}101102pub fn not_implemented(page: &DataPage) -> ParquetError {103let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;104let required = if is_optional { "optional" } else { "required" };105ParquetError::not_supported(format!(106"Decoding {:?} \"{:?}\"-encoded {required} parquet pages not yet supported",107page.descriptor.primitive_type.physical_type,108page.encoding(),109))110}111112pub(crate) type PageValidity<'a> = HybridRleDecoder<'a>;113pub(crate) fn page_validity_decoder(page: &DataPage) -> ParquetResult<PageValidity<'_>> {114let validity = split_buffer(page)?.def;115let decoder = hybrid_rle::HybridRleDecoder::new(validity, 1, page.num_values());116Ok(decoder)117}118119pub(crate) fn unspecialized_decode<T: Default>(120mut num_rows: usize,121122mut decode_one: impl FnMut() -> ParquetResult<T>,123124mut filter: Option<Filter>,125mut page_validity: Option<Bitmap>,126127is_optional: bool,128129validity: &mut BitmapBuilder,130target: &mut impl Pushable<T>,131) -> ParquetResult<()> {132match &mut filter {133None => {},134Some(Filter::Range(range)) => {135match page_validity.as_mut() {136None => {137for _ in 0..range.start {138decode_one()?;139}140},141Some(pv) => {142let c;143(c, *pv) = pv.split_at(range.start);144for _ in 0..c.set_bits() {145decode_one()?;146}147*pv = std::mem::take(pv).sliced(0, range.len());148},149}150151num_rows = range.len();152filter = None;153},154Some(Filter::Mask(mask)) => {155let leading_zeros = mask.take_leading_zeros();156mask.take_trailing_zeros();157158match page_validity.as_mut() {159None => {160for _ in 0..leading_zeros {161decode_one()?;162}163},164Some(pv) => {165let c;166(c, *pv) = pv.split_at(leading_zeros);167for _ in 0..c.set_bits() {168decode_one()?;169}170*pv = std::mem::take(pv).sliced(0, mask.len());171},172}173174num_rows = mask.len();175if mask.unset_bits() == 0 {176filter = None;177}178},179Some(Filter::Predicate(_)) => todo!(),180};181182page_validity = page_validity.filter(|pv| pv.unset_bits() > 0);183184match (filter, page_validity) {185(None, None) => {186target.reserve(num_rows);187for _ in 0..num_rows {188target.push(decode_one()?);189}190191if is_optional {192validity.extend_constant(num_rows, true);193}194},195(None, Some(page_validity)) => {196target.reserve(page_validity.len());197for is_valid in page_validity.iter() {198let v = if is_valid {199decode_one()?200} else {201T::default()202};203target.push(v);204}205206validity.extend_from_bitmap(&page_validity);207},208(Some(Filter::Range(_)), _) => unreachable!(),209(Some(Filter::Mask(mut mask)), None) => {210target.reserve(num_rows);211212while !mask.is_empty() {213let num_ones = mask.take_leading_ones();214for _ in 0..num_ones {215target.push(decode_one()?);216}217218let num_zeros = mask.take_leading_zeros();219for _ in 0..num_zeros {220decode_one()?;221}222}223224if is_optional {225validity.extend_constant(num_rows, true);226}227},228(Some(Filter::Mask(mask)), Some(page_validity)) => {229assert_eq!(mask.len(), page_validity.len());230231let num_rows = mask.set_bits();232target.reserve(num_rows);233234let mut mask_iter = mask.fast_iter_u56();235let mut validity_iter = page_validity.fast_iter_u56();236237let mut iter = |mut f: u64, mut v: u64| {238while f != 0 {239let offset = f.trailing_zeros();240241let skip = (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;242for _ in 0..skip {243decode_one()?;244}245246if (v >> offset) & 1 != 0 {247target.push(decode_one()?);248} else {249target.push(T::default());250}251252v >>= offset + 1;253f >>= offset + 1;254}255256for _ in 0..v.count_ones() as usize {257decode_one()?;258}259260ParquetResult::Ok(())261};262263for (f, v) in mask_iter.by_ref().zip(validity_iter.by_ref()) {264iter(f, v)?;265}266267let (f, fl) = mask_iter.remainder();268let (v, vl) = validity_iter.remainder();269270assert_eq!(fl, vl);271272iter(f, v)?;273274validity.extend_from_bitmap(&filter_boolean_kernel(&page_validity, &mask));275},276(Some(Filter::Predicate(_)), _) => todo!(),277}278279Ok(())280}281282/// The state that will be decoded into.283///284/// This is usually an Array and a validity mask as a MutableBitmap.285pub(super) trait Decoded {286/// The number of items in the container287fn len(&self) -> usize;288/// How much capacity is left.289fn remaining_capacity(&self) -> usize;290/// Extend the decoded state with `n` nulls.291fn extend_nulls(&mut self, n: usize);292}293294/// A decoder that knows how to map `State` -> Array295pub(super) trait Decoder: Sized {296/// The state that this decoder derives from a [`DataPage`]. This is bound to the page.297type Translation<'a>: StateTranslation<'a, Self>;298/// The dictionary representation that the decoder uses299type Dict: Array + Clone;300/// The target state that this Decoder decodes into.301type DecodedState: Decoded;302type Output: IntoBoxedArray;303304const CHUNKED: bool = false;305306fn evaluate_dict_predicate(307&self,308dict: &Self::Dict,309predicate: &PredicateFilter,310) -> ParquetResult<Bitmap> {311Ok(predicate.predicate.evaluate(dict))312}313314/// Initializes a new [`Self::DecodedState`].315fn with_capacity(&self, capacity: usize) -> Self::DecodedState;316317/// Deserializes a [`DictPage`] into [`Self::Dict`].318fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict>;319320fn evaluate_predicate(321&mut self,322state: &State<'_, Self>,323predicate: Option<&SpecializedParquetColumnExpr>,324pred_true_mask: &mut BitmapBuilder,325dict_mask: Option<&Bitmap>,326) -> ParquetResult<bool>;327328fn extend_decoded(329&self,330decoded: &mut Self::DecodedState,331additional: &dyn Array,332is_optional: bool,333) -> ParquetResult<()>;334335fn unspecialized_predicate_decode(336&mut self,337state: State<'_, Self>,338decoded: &mut Self::DecodedState,339pred_true_mask: &mut BitmapBuilder,340predicate: &PredicateFilter,341dict: Option<Self::Dict>,342dtype: &ArrowDataType,343) -> ParquetResult<()> {344let is_optional = state.is_optional;345346let mut intermediate_array = self.with_capacity(if Self::CHUNKED {3470348} else {349state.translation.num_rows()350});351if let Some(dict) = dict.as_ref() {352self.apply_dictionary(&mut intermediate_array, dict)?;353}354let mut chunks = Vec::new();355self.extend_filtered_with_state(state, &mut intermediate_array, None, &mut chunks)?;356let intermediate_array = if !chunks.is_empty() {357chunks.pop().unwrap()358} else {359self.finalize(dtype.underlying_physical_type(), dict, intermediate_array)?360}361.into_boxed();362363let mask = if let Some(validity) = intermediate_array.validity() {364let ignore_validity_array = intermediate_array.with_validity(None);365let mask = predicate.predicate.evaluate(ignore_validity_array.as_ref());366367if predicate.predicate.evaluate_null() {368arrow::bitmap::or_not(&mask, validity)369} else {370&mask & validity371}372} else {373predicate.predicate.evaluate(intermediate_array.as_ref())374};375376let filtered =377polars_compute::filter::filter_with_bitmap(intermediate_array.as_ref(), &mask);378379pred_true_mask.extend_from_bitmap(&mask);380self.extend_decoded(decoded, filtered.as_ref(), is_optional)?;381382Ok(())383}384385fn extend_filtered_with_state(386&mut self,387state: State<'_, Self>,388decoded: &mut Self::DecodedState,389filter: Option<Filter>,390chunks: &mut Vec<Self::Output>,391) -> ParquetResult<()>;392393/// Extend the decoded state with `length` times the same `value`.394fn extend_constant(395&mut self,396decoded: &mut Self::DecodedState,397length: usize,398value: &ParquetScalar,399) -> ParquetResult<()>;400401fn apply_dictionary(402&mut self,403_decoded: &mut Self::DecodedState,404_dict: &Self::Dict,405) -> ParquetResult<()> {406Ok(())407}408409fn finalize(410&self,411dtype: ArrowDataType,412dict: Option<Self::Dict>,413decoded: Self::DecodedState,414) -> ParquetResult<Self::Output>;415}416417enum DecodeType {418Plain,419Range,420Mask,421Predicate,422}423424impl fmt::Display for DecodeType {425fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {426f.write_str(match self {427DecodeType::Plain => "plain",428DecodeType::Range => "range",429DecodeType::Mask => "mask",430DecodeType::Predicate => "predicate",431})432}433}434435struct DecodeMetrics {436field_name: PlSmallStr,437num_compressed_bytes: u64,438num_uncompressed_bytes: u64,439num_decompressed_pages: u64,440num_micros_spent_decompressing: u128,441num_micros_spent_decoding: u128,442decode_type: DecodeType,443}444445impl DecodeMetrics {446fn new(field_name: &str) -> Self {447Self {448field_name: PlSmallStr::from_str(field_name),449num_compressed_bytes: 0,450num_uncompressed_bytes: 0,451num_decompressed_pages: 0,452num_micros_spent_decompressing: 0,453num_micros_spent_decoding: 0,454decode_type: DecodeType::Plain,455}456}457}458459pub struct PageDecoder<D: Decoder> {460pub iter: BasicDecompressor,461pub dtype: ArrowDataType,462pub dict: Option<D::Dict>,463pub decoder: D,464465pub init_nested: Option<Vec<InitNested>>,466467/// Used to track metrics with `POLARS_PARQUET_METRICS=1`.468metrics: Option<Box<DecodeMetrics>>,469}470471#[inline(always)]472fn option_time<T>(do_time: bool, f: impl FnOnce() -> T) -> (T, Option<u128>) {473if do_time {474let start = std::time::SystemTime::now();475let result = std::hint::black_box(f());476let elapsed = start.elapsed().unwrap().as_micros();477(result, Some(elapsed))478} else {479(f(), None)480}481}482483static POLARS_PARQUET_METRICS: OnceLock<bool> = OnceLock::new();484485impl<D: Decoder> PageDecoder<D> {486pub fn new(487field_name: &str,488mut iter: BasicDecompressor,489dtype: ArrowDataType,490mut decoder: D,491492init_nested: Option<Vec<InitNested>>,493) -> ParquetResult<Self> {494let dict_page = iter.read_dict_page()?;495let dict = dict_page.map(|d| decoder.deserialize_dict(d)).transpose()?;496497let do_metrics = POLARS_PARQUET_METRICS498.get_or_init(|| std::env::var("POLARS_PARQUET_METRICS").as_deref() == Ok("1"));499500Ok(Self {501iter,502dtype,503dict,504decoder,505506init_nested,507metrics: do_metrics.then(|| Box::new(DecodeMetrics::new(field_name))),508})509}510511pub fn collect(512self,513filter: Option<Filter>,514) -> ParquetResult<(Option<NestedState>, Vec<D::Output>, Bitmap)> {515if self.init_nested.is_some() {516self.collect_nested(filter)517.map(|(nested, arr, ptm)| (Some(nested), arr, ptm))518} else {519match filter {520Some(Filter::Predicate(p)) => self521.collect_predicate_flat(&p)522.map(|(arr, ptm)| (None, arr, ptm)),523filter => self524.collect_flat(filter)525.map(|arrays| (None, arrays, Bitmap::new())),526}527}528}529530pub fn collect_predicate_flat(531mut self,532p: &PredicateFilter,533) -> ParquetResult<(Vec<D::Output>, Bitmap)> {534let mut target = self.decoder.with_capacity(0);535let mut pred_true_mask = BitmapBuilder::with_capacity(self.iter.total_num_values());536537let specialized_pred = p.predicate.as_specialized();538let pred_is_eq_null = matches!(539specialized_pred,540Some(SpecializedParquetColumnExpr::Equal(ParquetScalar::Null)),541);542let pred_tracks_nulls = p.predicate.evaluate_null();543544let mut dict_mask = None;545if let Some(dict) = self.dict.as_ref() {546// @Performance. If we have a predicate, we can prune stuff out of the dictionary and547// reduce memory consumption.548self.decoder.apply_dictionary(&mut target, dict)?;549dict_mask = Some(self.decoder.evaluate_dict_predicate(dict, p)?);550}551552if let Some(metrics) = self.metrics.as_deref_mut() {553metrics.decode_type = DecodeType::Predicate;554}555556const MINIMUM_CHUNK_SIZE: usize = 256;557let mut chunks = Vec::new();558while let Some(page) = self.iter.next() {559let page = page?;560561let mut can_skip_page = false;562563// Skip a dictionary encoded page if none of the dictionary values match the predicate.564// This is essentially a slower version of statistics skipping.565can_skip_page |= dict_mask.as_ref().is_some_and(|dm| dm.set_bits() == 0)566&& page.page().header().is_dictionary_encoded()567&& (!pred_tracks_nulls568|| page.page().null_count() == Some(0)569|| page.page().descriptor.primitive_type.field_info.repetition570!= Repetition::Optional);571572// If we are looking for nulls, but this page does not contain any nulls.573can_skip_page |= pred_is_eq_null574&& (page.page().descriptor.primitive_type.field_info.repetition575== Repetition::Required576|| page.page().null_count() == Some(0));577578if can_skip_page {579pred_true_mask.extend_constant(page.num_values(), false);580continue;581}582583if let Some(metrics) = self.metrics.as_deref_mut() {584metrics.num_compressed_bytes += page.page().buffer.len() as u64;585metrics.num_uncompressed_bytes += page.page().uncompressed_size() as u64;586}587588let iter = &mut self.iter;589let (page, time) = option_time(self.metrics.is_some(), move || page.decompress(iter));590let page = page?;591592if let Some(time) = time {593let metrics = self.metrics.as_deref_mut().unwrap();594metrics.num_micros_spent_decompressing += time;595metrics.num_decompressed_pages += 1;596}597598let state = State::new(&self.decoder, &page, self.dict.as_ref())?;599600let (result, time) = option_time(self.metrics.is_some(), || {601// Handle the case where column is held equal to Null. This can be the same for all602// non-nested columns.603if matches!(604p.predicate.as_specialized(),605Some(SpecializedParquetColumnExpr::Equal(ParquetScalar::Null)),606) {607if state.is_optional608&& let Some(v) = &state.page_validity609{610let start_set_bits = pred_true_mask.set_bits();611pred_true_mask.extend_from_bitmap(&!v);612if p.include_values {613target.extend_nulls(pred_true_mask.set_bits() - start_set_bits);614}615} else {616pred_true_mask.extend_constant(page.num_values(), false)617};618619return Ok(());620}621622// For now, we have a function that indicates whether the predicate can actually be623// handled in the kernels. If it cannot be handled in the kernels, catch it here624// and load it as if it weren't filtered.625let mut page_ptm = BitmapBuilder::new();626if self.decoder.evaluate_predicate(627&state,628specialized_pred,629&mut page_ptm,630dict_mask.as_ref(),631)? {632if page_ptm.set_bits() == 0 {633pred_true_mask.extend_constant(page.num_values(), false);634return Ok(());635}636637let page_ptm = page_ptm.freeze().sliced(0, page.num_values());638pred_true_mask.extend_from_bitmap(&page_ptm);639let num_filtered_values = page_ptm.set_bits();640641// If we would need to move data, just create a new chunk.642if p.include_values && num_filtered_values > target.remaining_capacity() {643let previous_target = std::mem::replace(644&mut target,645self.decoder646.with_capacity(usize::max(num_filtered_values, MINIMUM_CHUNK_SIZE)),647);648if previous_target.len() > 0 {649let chunk = self.decoder.finalize(650self.dtype.clone(),651self.dict.clone(),652previous_target,653)?;654chunks.push(chunk);655}656657if let Some(dict) = self.dict.as_ref() {658self.decoder.apply_dictionary(&mut target, dict)?;659}660}661662if p.include_values {663if let Some(SpecializedParquetColumnExpr::Equal(needle)) = specialized_pred664{665self.decoder.extend_constant(666&mut target,667num_filtered_values,668needle,669)?;670} else {671state.decode(672&mut self.decoder,673&mut target,674Some(Filter::Mask(page_ptm)),675&mut chunks,676)?;677}678}679} else {680self.decoder.unspecialized_predicate_decode(681state,682&mut target,683&mut pred_true_mask,684p,685self.dict.clone(),686&self.dtype,687)?;688}689690ParquetResult::Ok(())691});692result?;693694if let Some(time) = time {695let metrics = self.metrics.as_deref_mut().unwrap();696metrics.num_micros_spent_decoding += time;697}698699self.iter.reuse_page_buffer(page);700}701702if let Some(metrics) = self.metrics.as_ref() {703eprintln!(704"PQ-Metrics: {},{},{},{},{},{},{}",705metrics.field_name,706metrics.num_micros_spent_decompressing,707metrics.num_micros_spent_decoding,708metrics.num_compressed_bytes,709metrics.num_uncompressed_bytes,710metrics.num_decompressed_pages,711metrics.decode_type,712);713}714715if target.len() > 0 || chunks.is_empty() {716chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);717}718Ok((chunks, pred_true_mask.freeze()))719}720721pub fn collect_flat(mut self, mut filter: Option<Filter>) -> ParquetResult<Vec<D::Output>> {722let mut num_rows_remaining = Filter::opt_num_rows(&filter, self.iter.total_num_values());723724let mut target =725self.decoder726.with_capacity(if D::CHUNKED { 0 } else { num_rows_remaining });727728if let Some(dict) = self.dict.as_ref() {729// @Performance. If we have a predicate, we can prune stuff out of the dictionary and730// reduce memory consumption.731self.decoder.apply_dictionary(&mut target, dict)?;732}733734if let Some(metrics) = self.metrics.as_deref_mut() {735metrics.decode_type = match &filter {736None => DecodeType::Plain,737Some(Filter::Range(_)) => DecodeType::Range,738Some(Filter::Mask(_)) => DecodeType::Mask,739Some(Filter::Predicate(_)) => unreachable!(),740};741}742743let mut chunks = Vec::new();744while num_rows_remaining > 0 {745let Some(page) = self.iter.next() else {746break;747};748let page = page?;749750let page_num_values = page.num_values();751752let mut state_filter;753(state_filter, filter) = Filter::opt_split_at(&filter, page_num_values);754755state_filter = state_filter.or(Some(Filter::Range(0..page_num_values)));756757// Skip the whole page if we don't need any rows from it758if state_filter759.as_ref()760.is_some_and(|f| f.num_rows(page_num_values) == 0)761{762continue;763}764765if let Some(metrics) = self.metrics.as_deref_mut() {766metrics.num_compressed_bytes += page.page().buffer.len() as u64;767metrics.num_uncompressed_bytes += page.page().uncompressed_size() as u64;768}769770let iter = &mut self.iter;771let (page, time) = option_time(self.metrics.is_some(), move || page.decompress(iter));772let page = page?;773774if let Some(time) = time {775let metrics = self.metrics.as_deref_mut().unwrap();776metrics.num_micros_spent_decompressing += time;777metrics.num_decompressed_pages += 1;778}779780let state = State::new(&self.decoder, &page, self.dict.as_ref())?;781782let start_length = target.len();783let (result, time) = option_time(self.metrics.is_some(), || {784state.decode(&mut self.decoder, &mut target, state_filter, &mut chunks)?;785786ParquetResult::Ok(())787});788result?;789790if let Some(time) = time {791let metrics = self.metrics.as_deref_mut().unwrap();792metrics.num_micros_spent_decoding += time;793}794795let end_length = target.len();796797num_rows_remaining -= end_length - start_length;798799self.iter.reuse_page_buffer(page);800}801802if let Some(metrics) = self.metrics.as_ref() {803eprintln!(804"PQ-Metrics: {},{},{},{},{},{},{}",805metrics.field_name,806metrics.num_micros_spent_decompressing,807metrics.num_micros_spent_decoding,808metrics.num_compressed_bytes,809metrics.num_uncompressed_bytes,810metrics.num_decompressed_pages,811metrics.decode_type,812);813}814815if target.len() > 0 || chunks.is_empty() {816chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);817}818819Ok(chunks)820}821822pub fn collect_boxed(823self,824filter: Option<Filter>,825) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {826use arrow::array::IntoBoxedArray;827let (nested, array, ptm) = self.collect(filter)?;828let array = array.into_iter().map(|arr| arr.into_boxed()).collect();829Ok((nested, array, ptm))830}831}832833#[inline]834pub(super) fn dict_indices_decoder(835page: &DataPage,836null_count: usize,837) -> ParquetResult<hybrid_rle::HybridRleDecoder<'_>> {838let indices_buffer = split_buffer(page)?.values;839840// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),841// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).842let bit_width = indices_buffer[0];843let indices_buffer = &indices_buffer[1..];844845Ok(hybrid_rle::HybridRleDecoder::new(846indices_buffer,847bit_width as u32,848page.num_values() - null_count,849))850}851852/// Freeze a [`MutableBitmap`] into a `Option<Bitmap>`.853///854/// This will turn the several instances where `None` (representing "all valid") suffices.855pub fn freeze_validity(validity: BitmapBuilder) -> Option<Bitmap> {856if validity.is_empty() || validity.unset_bits() == 0 {857return None;858}859860let validity = validity.freeze();861Some(validity)862}863864pub(crate) fn filter_from_range(rng: Range<usize>) -> Bitmap {865let mut bm = BitmapBuilder::with_capacity(rng.end);866867bm.extend_constant(rng.start, false);868bm.extend_constant(rng.len(), true);869870bm.freeze()871}872873pub(crate) fn decode_hybrid_rle_into_bitmap(874mut page_validity: HybridRleDecoder<'_>,875limit: Option<usize>,876bitmap: &mut BitmapBuilder,877) -> ParquetResult<()> {878assert!(page_validity.num_bits() <= 1);879880let mut limit = limit.unwrap_or(page_validity.len());881bitmap.reserve(limit);882883while let Some(chunk) = page_validity.next_chunk()? {884if limit == 0 {885break;886}887888match chunk {889HybridRleChunk::Rle(value, size) => {890let size = size.min(limit);891bitmap.extend_constant(size, value != 0);892limit -= size;893},894HybridRleChunk::Bitpacked(decoder) => {895let len = decoder.len().min(limit);896bitmap.extend_from_slice(decoder.as_slice(), 0, len);897limit -= len;898},899}900}901902Ok(())903}904905pub(crate) fn decode_page_validity(906mut page_validity: HybridRleDecoder<'_>,907limit: Option<usize>,908) -> ParquetResult<Option<Bitmap>> {909assert!(page_validity.num_bits() <= 1);910911let mut num_ones = 0;912913let mut bm = BitmapBuilder::new();914let limit = limit.unwrap_or(page_validity.len());915page_validity.limit_to(limit);916let num_values = page_validity.len();917918// If all values are valid anyway, we will return a None so don't allocate until we disprove919// that that is the case.920while let Some(chunk) = page_validity.next_chunk()? {921match chunk {922HybridRleChunk::Rle(value, size) if value != 0 => num_ones += size,923HybridRleChunk::Rle(value, size) => {924bm.reserve(num_values);925bm.extend_constant(num_ones, true);926bm.extend_constant(size, value != 0);927break;928},929HybridRleChunk::Bitpacked(decoder) => {930let len = decoder.len();931bm.reserve(num_values);932bm.extend_constant(num_ones, true);933bm.extend_from_slice(decoder.as_slice(), 0, len);934break;935},936}937}938939if page_validity.len() == 0 && bm.is_empty() {940return Ok(None);941}942943decode_hybrid_rle_into_bitmap(page_validity, None, &mut bm)?;944Ok(Some(bm.freeze()))945}946947948