Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs
8506 views
use arrow::array::{PrimitiveArray, Splitable};1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::types::{AlignedBytes, NativeType, PrimitiveType};3use polars_utils::vec::with_cast_mut_vec;45use super::DecoderFunction;6use crate::parquet::error::ParquetResult;7use crate::parquet::types::NativeType as ParquetNativeType;8use crate::read::deserialize::dictionary_encoded::{append_validity, constrain_page_validity};9use crate::read::deserialize::utils::array_chunks::ArrayChunks;10use crate::read::deserialize::utils::freeze_validity;11use crate::read::expr::SpecializedParquetColumnExpr;12use crate::read::{Filter, ParquetError};1314pub mod predicate;15mod required;1617#[allow(clippy::too_many_arguments)]18pub fn decode<P: ParquetNativeType, T: NativeType, D: DecoderFunction<P, T>>(19values: &[u8],20is_optional: bool,21page_validity: Option<&Bitmap>,22filter: Option<Filter>,23validity: &mut BitmapBuilder,24intermediate: &mut Vec<P>,25target: &mut Vec<T>,26dfn: D,27) -> ParquetResult<()> {28let can_filter_on_raw_data =29// Floats have different equality that just byte-wise comparison.30// @TODO: Maybe be smarter about this, because most predicates should not hit this problem.31!matches!(T::PRIMITIVE, PrimitiveType::Float16 | PrimitiveType::Float32 | PrimitiveType::Float64) &&32D::CAN_TRANSMUTE && !D::NEED_TO_DECODE;3334match filter {35Some(Filter::Predicate(p))36if !can_filter_on_raw_data37|| matches!(38p.predicate.as_specialized(),39Some(SpecializedParquetColumnExpr::Equal(_))40) =>41{42let num_values = values.len() / size_of::<P::AlignedBytes>();4344// @TODO: Do something smarter with the validity45let mut unfiltered_target = Vec::with_capacity(num_values);46let mut unfiltered_validity = if page_validity.is_some() {47BitmapBuilder::with_capacity(num_values)48} else {49Default::default()50};5152decode_no_incompact_predicates(53values,54is_optional,55page_validity,56None,57&mut unfiltered_validity,58intermediate,59&mut unfiltered_target,60dfn,61)?;6263let unfiltered_validity = freeze_validity(unfiltered_validity);6465let array = PrimitiveArray::new(66T::PRIMITIVE.into(),67unfiltered_target.into(),68unfiltered_validity,69);70let intermediate_pred_true_mask = p.predicate.evaluate(&array);7172let array =73polars_compute::filter::filter_with_bitmap(&array, &intermediate_pred_true_mask);74let array = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();7576target.extend(array.values().iter().copied());77if is_optional {78match array.validity() {79None => validity.extend_constant(array.len(), true),80Some(v) => validity.extend_from_bitmap(v),81}82}83},84f => {85decode_no_incompact_predicates(86values,87is_optional,88page_validity,89f,90validity,91intermediate,92target,93dfn,94)?;95},96}9798Ok(())99}100101#[allow(clippy::too_many_arguments)]102pub fn decode_no_incompact_predicates<103P: ParquetNativeType,104T: NativeType,105D: DecoderFunction<P, T>,106>(107values: &[u8],108is_optional: bool,109page_validity: Option<&Bitmap>,110filter: Option<Filter>,111validity: &mut BitmapBuilder,112intermediate: &mut Vec<P>,113target: &mut Vec<T>,114dfn: D,115) -> ParquetResult<()> {116if cfg!(debug_assertions) && is_optional {117assert_eq!(target.len(), validity.len());118}119120if D::CAN_TRANSMUTE {121let values = ArrayChunks::<'_, T::AlignedBytes>::new(values).ok_or_else(|| {122ParquetError::oos("Page content does not align with expected element size")123})?;124125let start_length = target.len();126with_cast_mut_vec::<T, T::AlignedBytes, _, _>(target, |aligned_bytes_vec| {127decode_aligned_bytes_dispatch(128values,129is_optional,130page_validity,131filter,132validity,133aligned_bytes_vec,134)135})?;136137if D::NEED_TO_DECODE {138let to_decode: &mut [P] = bytemuck::cast_slice_mut(&mut target[start_length..]);139140for v in to_decode {141*v = bytemuck::cast(dfn.decode(*v));142}143}144} else {145let values = ArrayChunks::<'_, P::AlignedBytes>::new(values).ok_or_else(|| {146ParquetError::oos("Page content does not align with expected element size")147})?;148149intermediate.clear();150with_cast_mut_vec::<P, P::AlignedBytes, _, _>(intermediate, |aligned_bytes_vec| {151decode_aligned_bytes_dispatch(152values,153is_optional,154page_validity,155filter,156validity,157aligned_bytes_vec,158)159})?;160161target.extend(intermediate.iter().copied().map(|v| dfn.decode(v)));162}163164if cfg!(debug_assertions) && is_optional {165assert_eq!(target.len(), validity.len());166}167168Ok(())169}170171#[inline(never)]172pub fn decode_aligned_bytes_dispatch<B: AlignedBytes>(173values: ArrayChunks<'_, B>,174is_optional: bool,175page_validity: Option<&Bitmap>,176filter: Option<Filter>,177validity: &mut BitmapBuilder,178target: &mut Vec<B>,179) -> ParquetResult<()> {180if is_optional {181append_validity(page_validity, filter.as_ref(), validity, values.len());182}183184let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());185186match (filter, page_validity) {187(None, None) => required::decode(values, target),188(None, Some(page_validity)) => decode_optional(values, page_validity, target),189190(Some(Filter::Range(rng)), None) => {191required::decode(values.slice(rng.start, rng.len()), target)192},193(Some(Filter::Range(rng)), Some(mut page_validity)) => {194let mut values = values;195if rng.start > 0 {196let prevalidity;197(prevalidity, page_validity) = page_validity.split_at(rng.start);198page_validity.slice(0, rng.len());199let values_start = prevalidity.set_bits();200values = values.slice(values_start, values.len() - values_start);201}202203decode_optional(values, page_validity, target)204},205206(Some(Filter::Mask(filter)), None) => decode_masked_required(values, filter, target),207(Some(Filter::Mask(filter)), Some(page_validity)) => {208decode_masked_optional(values, page_validity, filter, target)209},210(Some(Filter::Predicate(_)), _) => unreachable!(),211}?;212213Ok(())214}215216#[inline(never)]217fn decode_optional<B: AlignedBytes>(218values: ArrayChunks<'_, B>,219mut validity: Bitmap,220target: &mut Vec<B>,221) -> ParquetResult<()> {222target.reserve(validity.len());223224// Handle the leading and trailing zeros. This may allow dispatch to a faster kernel or225// possibly removes iterations from the lower kernel.226let num_leading_nulls = validity.take_leading_zeros();227target.resize(target.len() + num_leading_nulls, B::zeroed());228let num_trailing_nulls = validity.take_trailing_zeros();229230// Dispatch to a faster kernel if possible.231let num_values = validity.set_bits();232if num_values == validity.len() {233required::decode(values.truncate(validity.len()), target)?;234target.resize(target.len() + num_trailing_nulls, B::zeroed());235return Ok(());236}237238assert!(num_values <= values.len());239240let start_length = target.len();241let end_length = target.len() + validity.len();242let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };243244let mut validity_iter = validity.fast_iter_u56();245let mut num_values_remaining = num_values;246let mut value_offset = 0;247248let mut iter = |mut v: u64, len: usize| {249debug_assert!(len < 64);250251let num_chunk_values = v.count_ones() as usize;252253if num_values_remaining == num_chunk_values {254for i in 0..len {255let is_valid = v & 1 != 0;256let value = if is_valid {257unsafe { values.get_unchecked(value_offset) }258} else {259B::zeroed()260};261unsafe { target_ptr.add(i).write(value) };262263value_offset += (v & 1) as usize;264v >>= 1;265}266} else {267for i in 0..len {268let value = unsafe { values.get_unchecked(value_offset) };269unsafe { target_ptr.add(i).write(value) };270271value_offset += (v & 1) as usize;272v >>= 1;273}274}275276num_values_remaining -= num_chunk_values;277unsafe {278target_ptr = target_ptr.add(len);279}280};281282let mut num_remaining = validity.len();283for v in validity_iter.by_ref() {284if num_remaining < 56 {285iter(v, num_remaining);286} else {287iter(v, 56);288}289num_remaining -= 56;290}291292let (v, vl) = validity_iter.remainder();293294iter(v, vl.min(num_remaining));295296unsafe { target.set_len(end_length) };297target.resize(target.len() + num_trailing_nulls, B::zeroed());298299Ok(())300}301302#[inline(never)]303fn decode_masked_required<B: AlignedBytes>(304values: ArrayChunks<'_, B>,305mut mask: Bitmap,306target: &mut Vec<B>,307) -> ParquetResult<()> {308// Remove leading or trailing filtered values. This may allow dispatch to a faster kernel or309// may remove iterations from the slower kernel below.310let num_leading_filtered = mask.take_leading_zeros();311mask.take_trailing_zeros();312let values = values.slice(num_leading_filtered, mask.len());313314// Dispatch to a faster kernel if possible.315let num_rows = mask.set_bits();316if num_rows == mask.len() {317return required::decode(values.truncate(num_rows), target);318}319320assert!(mask.len() <= values.len());321322let start_length = target.len();323target.reserve(num_rows);324let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };325326let mut mask_iter = mask.fast_iter_u56();327let mut num_rows_left = num_rows;328let mut value_offset = 0;329330let mut iter = |mut f: u64, len: usize| {331if num_rows_left == 0 {332return false;333}334335let mut num_read = 0;336let mut num_written = 0;337338while f != 0 {339let offset = f.trailing_zeros() as usize;340341num_read += offset;342343// SAFETY:344// 1. `values_buffer` starts out as only zeros, which we know is in the345// dictionary following the original `dict.is_empty` check.346// 2. Each time we write to `values_buffer`, it is followed by a347// `verify_dict_indices`.348let value = unsafe { values.get_unchecked(value_offset + num_read) };349unsafe { target_ptr.add(num_written).write(value) };350351num_written += 1;352num_read += 1;353354f >>= offset + 1; // Clear least significant bit.355}356357unsafe {358target_ptr = target_ptr.add(num_written);359}360value_offset += len;361num_rows_left -= num_written;362363true364};365366for f in mask_iter.by_ref() {367if !iter(f, 56) {368break;369}370}371let (f, fl) = mask_iter.remainder();372iter(f, fl);373374unsafe { target.set_len(start_length + num_rows) };375376Ok(())377}378379#[inline(never)]380fn decode_masked_optional<B: AlignedBytes>(381values: ArrayChunks<'_, B>,382mut validity: Bitmap,383mut mask: Bitmap,384target: &mut Vec<B>,385) -> ParquetResult<()> {386assert_eq!(validity.len(), mask.len());387388let num_leading_filtered = mask.take_leading_zeros();389mask.take_trailing_zeros();390let leading_validity;391(leading_validity, validity) = validity.split_at(num_leading_filtered);392validity.slice(0, mask.len());393394let num_rows = mask.set_bits();395let num_values = validity.set_bits();396397let values = values.slice(leading_validity.set_bits(), num_values);398399// Dispatch to a faster kernel if possible.400if num_rows == mask.len() {401return decode_optional(values, validity, target);402}403if num_values == validity.len() {404return decode_masked_required(values, mask, target);405}406407assert!(num_values <= values.len());408409let start_length = target.len();410target.reserve(num_rows);411let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };412413let mut validity_iter = validity.fast_iter_u56();414let mut mask_iter = mask.fast_iter_u56();415let mut num_values_left = num_values;416let mut num_rows_left = num_rows;417let mut value_offset = 0;418419let mut iter = |mut f: u64, mut v: u64| {420if num_rows_left == 0 {421return false;422}423424let num_chunk_values = v.count_ones() as usize;425426let mut num_read = 0;427let mut num_written = 0;428429if num_chunk_values == num_values_left {430while f != 0 {431let offset = f.trailing_zeros() as usize;432433num_read += (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;434v >>= offset;435436let is_valid = v & 1 != 0;437let value = if is_valid {438unsafe { values.get_unchecked(value_offset + num_read) }439} else {440B::zeroed()441};442unsafe { target_ptr.add(num_written).write(value) };443444num_written += 1;445num_read += (v & 1) as usize;446447f >>= offset + 1; // Clear least significant bit.448v >>= 1;449}450} else {451while f != 0 {452let offset = f.trailing_zeros() as usize;453454num_read += (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;455v >>= offset;456457let value = unsafe { values.get_unchecked(value_offset + num_read) };458unsafe { target_ptr.add(num_written).write(value) };459460num_written += 1;461num_read += (v & 1) as usize;462463f >>= offset + 1; // Clear least significant bit.464v >>= 1;465}466}467468unsafe {469target_ptr = target_ptr.add(num_written);470}471value_offset += num_chunk_values;472num_rows_left -= num_written;473num_values_left -= num_chunk_values;474475true476};477478for (f, v) in mask_iter.by_ref().zip(validity_iter.by_ref()) {479if !iter(f, v) {480break;481}482}483484let (f, fl) = mask_iter.remainder();485let (v, vl) = validity_iter.remainder();486assert_eq!(fl, vl);487iter(f, v);488489unsafe { target.set_len(start_length + num_rows) };490491Ok(())492}493494#[cfg(test)]495mod tests {496use arrow::bitmap::proptest::bitmap;497use proptest::collection::size_range;498use proptest::prelude::*;499500use super::*;501502fn values_and_mask() -> impl Strategy<Value = (Vec<u32>, Bitmap)> {503any_with::<Vec<u32>>(size_range(0..100).lift()).prop_flat_map(|vec| {504let len = vec.len();505(Just(vec), bitmap(len))506})507}508509fn validity_values_and_mask() -> impl Strategy<Value = (Bitmap, Vec<u32>, Bitmap)> {510bitmap(0..100).prop_flat_map(|validity| {511let len = validity.len();512let values_length = validity.set_bits();513514(515Just(validity),516any_with::<Vec<u32>>(size_range(values_length).lift()),517bitmap(len),518)519})520}521522fn _test_decode_masked_required(values: &Vec<u32>, mask: &Bitmap) {523let mut reference_result = Vec::with_capacity(mask.set_bits());524for (value, is_selected) in values.iter().zip(mask.iter()) {525if is_selected {526reference_result.push(*value);527}528}529530let mut result = Vec::<arrow::types::Bytes4Alignment4>::with_capacity(mask.set_bits());531decode_masked_required(532ArrayChunks::new(bytemuck::cast_slice(values.as_slice())).unwrap(),533mask.clone(),534&mut result,535)536.unwrap();537538let result = bytemuck::cast_vec::<_, u32>(result);539assert_eq!(reference_result, result);540}541542fn _test_decode_masked_optional(validity: &Bitmap, values: &Vec<u32>, mask: &Bitmap) {543let mut result = Vec::<arrow::types::Bytes4Alignment4>::with_capacity(mask.set_bits());544decode_masked_optional(545ArrayChunks::new(bytemuck::cast_slice(values.as_slice())).unwrap(),546validity.clone(),547mask.clone(),548&mut result,549)550.unwrap();551552let result = bytemuck::cast_vec::<_, u32>(result);553554let mut result_i = 0;555let mut values_i = 0;556for (is_valid, is_selected) in validity.iter().zip(mask.iter()) {557if is_selected {558if is_valid {559assert_eq!(result[result_i], values[values_i]);560}561result_i += 1;562}563564if is_valid {565values_i += 1;566}567}568}569570proptest! {571#[test]572fn test_decode_masked_required(573(ref values, ref mask) in values_and_mask()574) {575_test_decode_masked_required(values, mask)576}577}578579proptest! {580#[test]581fn test_decode_masked_optional(582(ref validity, ref values, ref mask) in validity_values_and_mask()583) {584_test_decode_masked_optional(validity, values, mask)585}586}587}588589590