Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs
8506 views
use arrow::bitmap::bitmask::BitMask;1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::types::{3AlignedBytes, Bytes1Alignment1, Bytes2Alignment2, Bytes4Alignment4, NativeType,4};5use polars_compute::filter::filter_boolean_kernel;6use polars_utils::vec::with_cast_mut_vec;78use super::ParquetError;9use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;10use crate::parquet::error::ParquetResult;11use crate::read::Filter;1213mod optional;14mod optional_masked_dense;15pub mod predicate;16mod required;17mod required_masked_dense;1819/// A mapping from a `u32` to a value. This is used in to map dictionary encoding to a value.20pub trait IndexMapping {21type Output: Copy + AlignedBytes;2223fn is_empty(&self) -> bool {24self.len() == 025}26fn len(&self) -> usize;27fn get(&self, idx: u32) -> Option<Self::Output> {28((idx as usize) < self.len()).then(|| unsafe { self.get_unchecked(idx) })29}30unsafe fn get_unchecked(&self, idx: u32) -> Self::Output;31}3233// Base mapping used for everything except the CategoricalDecoder.34impl<T: Copy + AlignedBytes> IndexMapping for &[T] {35type Output = T;3637#[inline(always)]38fn len(&self) -> usize {39<[T]>::len(self)40}4142#[inline(always)]43unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {44*unsafe { <[T]>::get_unchecked(self, idx as usize) }45}46}4748// Unit mapping used in the CategoricalDecoder.49impl IndexMapping for u8 {50type Output = Bytes1Alignment1;5152#[inline(always)]53fn len(&self) -> usize {54*self as usize55}5657#[inline(always)]58unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {59bytemuck::must_cast(idx as u8)60}61}6263impl IndexMapping for u16 {64type Output = Bytes2Alignment2;6566#[inline(always)]67fn len(&self) -> usize {68*self as usize69}7071#[inline(always)]72unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {73bytemuck::must_cast(idx as u16)74}75}7677impl IndexMapping for u32 {78type Output = Bytes4Alignment4;7980#[inline(always)]81fn len(&self) -> usize {82*self as usize83}8485#[inline(always)]86unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {87bytemuck::must_cast(idx)88}89}9091#[allow(clippy::too_many_arguments)]92pub fn decode_dict<T: NativeType>(93values: HybridRleDecoder<'_>,94dict: &[T],95is_optional: bool,96page_validity: Option<&Bitmap>,97filter: Option<Filter>,98validity: &mut BitmapBuilder,99target: &mut Vec<T>,100) -> ParquetResult<()> {101with_cast_mut_vec::<T, T::AlignedBytes, _, _>(target, |aligned_bytes_vec| {102decode_dict_dispatch(103values,104bytemuck::cast_slice(dict),105is_optional,106page_validity,107filter,108validity,109aligned_bytes_vec,110)111})112}113114#[inline(never)]115#[allow(clippy::too_many_arguments)]116pub fn decode_dict_dispatch<B: AlignedBytes, D: IndexMapping<Output = B>>(117mut values: HybridRleDecoder<'_>,118dict: D,119is_optional: bool,120page_validity: Option<&Bitmap>,121filter: Option<Filter>,122validity: &mut BitmapBuilder,123target: &mut Vec<B>,124) -> ParquetResult<()> {125if is_optional {126append_validity(page_validity, filter.as_ref(), validity, values.len());127}128129let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());130131match (filter, page_validity) {132(None, None) => required::decode(values, dict, target, 0),133(Some(Filter::Range(rng)), None) => {134values.limit_to(rng.end);135required::decode(values, dict, target, rng.start)136},137(None, Some(page_validity)) => optional::decode(values, dict, page_validity, target, 0),138(Some(Filter::Range(rng)), Some(page_validity)) => {139optional::decode(values, dict, page_validity, target, rng.start)140},141(Some(Filter::Mask(filter)), None) => {142required_masked_dense::decode(values, dict, filter, target)143},144(Some(Filter::Mask(filter)), Some(page_validity)) => {145optional_masked_dense::decode(values, dict, filter, page_validity, target)146},147(Some(Filter::Predicate(_)), _) => unreachable!(),148}?;149150Ok(())151}152153pub(crate) fn append_validity(154page_validity: Option<&Bitmap>,155filter: Option<&Filter>,156validity: &mut BitmapBuilder,157values_len: usize,158) {159match (page_validity, filter) {160(None, None) => validity.extend_constant(values_len, true),161(None, Some(Filter::Range(range))) => validity.extend_constant(range.len(), true),162(None, Some(Filter::Mask(mask))) => validity.extend_constant(mask.set_bits(), true),163(None, Some(Filter::Predicate(_))) => {164// Done later.165},166(Some(page_validity), None) => validity.extend_from_bitmap(page_validity),167(Some(page_validity), Some(Filter::Range(rng))) => {168let page_validity = page_validity.clone();169validity.extend_from_bitmap(&page_validity.sliced(rng.start, rng.len()))170},171(Some(page_validity), Some(Filter::Mask(mask))) => {172validity.extend_from_bitmap(&filter_boolean_kernel(page_validity, mask))173},174(_, Some(Filter::Predicate(_))) => todo!(),175}176}177178pub(crate) fn constrain_page_validity(179values_len: usize,180page_validity: Option<&Bitmap>,181filter: Option<&Filter>,182) -> Option<Bitmap> {183let num_unfiltered_rows = match (filter.as_ref(), page_validity) {184(None, None) => values_len,185(None, Some(pv)) => pv.len(),186(Some(f), Some(pv)) => {187debug_assert!(pv.len() >= f.max_offset(pv.len()));188f.max_offset(pv.len())189},190(Some(f), None) => f.max_offset(values_len),191};192193page_validity.map(|pv| {194if pv.len() > num_unfiltered_rows {195pv.clone().sliced(0, num_unfiltered_rows)196} else {197pv.clone()198}199})200}201202#[cold]203fn oob_dict_idx() -> ParquetError {204ParquetError::oos("Dictionary Index is out-of-bounds")205}206207#[cold]208fn no_more_bitpacked_values() -> ParquetError {209ParquetError::oos("Bitpacked Hybrid-RLE ran out before all values were served")210}211212#[inline(always)]213pub(super) fn verify_dict_indices(indices: &[u32], dict_size: usize) -> ParquetResult<()> {214debug_assert!(dict_size <= u32::MAX as usize);215let dict_size = dict_size as u32;216217let mut is_valid = true;218for &idx in indices {219is_valid &= idx < dict_size;220}221222if is_valid {223Ok(())224} else {225Err(oob_dict_idx())226}227}228229/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include230/// more than `num_values_to_skip` values.231#[inline(always)]232fn required_skip_whole_chunks(233values: &mut HybridRleDecoder<'_>,234num_values_to_skip: &mut usize,235) -> ParquetResult<()> {236if *num_values_to_skip == 0 {237return Ok(());238}239240loop {241let mut values_clone = values.clone();242let Some(chunk_len) = values_clone.next_chunk_length()? else {243break;244};245if *num_values_to_skip < chunk_len {246break;247}248*values = values_clone;249*num_values_to_skip -= chunk_len;250}251252Ok(())253}254255/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include256/// more than `num_values_to_skip` values.257#[inline(always)]258fn optional_skip_whole_chunks(259values: &mut HybridRleDecoder<'_>,260validity: &mut BitMask<'_>,261num_rows_to_skip: &mut usize,262num_values_to_skip: &mut usize,263) -> ParquetResult<()> {264if *num_values_to_skip == 0 {265return Ok(());266}267268let mut total_num_skipped_values = 0;269270loop {271let mut values_clone = values.clone();272let Some(chunk_len) = values_clone.next_chunk_length()? else {273break;274};275if *num_values_to_skip < chunk_len {276break;277}278*values = values_clone;279*num_values_to_skip -= chunk_len;280total_num_skipped_values += chunk_len;281}282283if total_num_skipped_values > 0 {284let offset = validity285.nth_set_bit_idx(total_num_skipped_values - 1, 0)286.map_or(validity.len(), |v| v + 1);287*num_rows_to_skip -= offset;288validity.advance_by(offset);289}290291Ok(())292}293294295