Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/binview/mod.rs
8500 views
use arrow::array::{Array, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View};1use arrow::bitmap::{Bitmap, BitmapBuilder};2use arrow::datatypes::{ArrowDataType, PhysicalType};3use polars_utils::aliases::PlIndexSet;45use super::dictionary_encoded::{append_validity, constrain_page_validity};6use super::utils::{7dict_indices_decoder, filter_from_range, freeze_validity, unspecialized_decode,8};9use super::{Filter, PredicateFilter, dictionary_encoded};10use crate::parquet::encoding::{Encoding, delta_byte_array, delta_length_byte_array, hybrid_rle};11use crate::parquet::error::{ParquetError, ParquetResult};12use crate::parquet::page::{DataPage, DictPage, split_buffer};13use crate::read::deserialize::utils::{self, Decoded};14use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};1516mod optional;17mod optional_masked;18mod predicate;19mod required;20mod required_masked;2122pub struct DecodedState {23binview: MutableBinaryViewArray<[u8]>,24validity: BitmapBuilder,2526// Used to store the needles for EqualsOneOf::Set that were inserted27// into the buffers (but not the views).28needle_views: Vec<View>,29}3031impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> {32type PlainDecoder = BinaryIter<'a>;3334fn new(35_decoder: &BinViewDecoder,36page: &'a DataPage,37dict: Option<&'a <BinViewDecoder as utils::Decoder>::Dict>,38page_validity: Option<&Bitmap>,39) -> ParquetResult<Self> {40match (page.encoding(), dict) {41(Encoding::Plain, _) => {42let values = split_buffer(page)?.values;43let values = BinaryIter::new(values, page.num_values());4445Ok(Self::Plain(values))46},47(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {48let values =49dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;50Ok(Self::Dictionary(values))51},52(Encoding::DeltaLengthByteArray, _) => {53let values = split_buffer(page)?.values;54Ok(Self::DeltaLengthByteArray(55delta_length_byte_array::Decoder::try_new(values)?,56Vec::new(),57))58},59(Encoding::DeltaByteArray, _) => {60let values = split_buffer(page)?.values;61Ok(Self::DeltaBytes(delta_byte_array::Decoder::try_new(62values,63)?))64},65_ => Err(utils::not_implemented(page)),66}67}6869fn num_rows(&self) -> usize {70match self {71StateTranslation::Plain(i) => i.max_num_values,72StateTranslation::Dictionary(i) => i.len(),73StateTranslation::DeltaLengthByteArray(i, _) => i.len(),74StateTranslation::DeltaBytes(i) => i.len(),75}76}77}7879enum EqualsOneOf {80Empty,81Inlinable([View; 4]),82Set(PlIndexSet<Box<[u8]>>),83}8485pub(crate) struct BinViewDecoder {86is_string: bool,87equals_one_of: Option<Box<EqualsOneOf>>,88}8990impl BinViewDecoder {91pub fn new(is_string: bool) -> Self {92Self {93is_string,94equals_one_of: None,95}96}9798pub fn new_string() -> Self {99Self::new(true)100}101102fn initialize_predicate_equals_one_of(&mut self, needles: &[ParquetScalar]) -> &EqualsOneOf {103self.equals_one_of.get_or_insert_with(|| {104if needles.is_empty() {105return Box::new(EqualsOneOf::Empty);106}107108let is_inlinable = needles.len() <= 4109&& needles.iter().all(|needle| {110let needle = if self.is_string {111needle.as_str().unwrap().as_bytes()112} else {113needle.as_binary().unwrap()114};115needle.len() < View::MAX_INLINE_SIZE as usize116});117118Box::new(if is_inlinable {119let mut views = [View::default(); 4];120for (i, needle) in needles.iter().enumerate() {121let needle = if self.is_string {122needle.as_str().unwrap().as_bytes()123} else {124needle.as_binary().unwrap()125};126views[i] = View::new_inline(needle);127}128for i in needles.len()..4 {129views[i] = views[0];130}131EqualsOneOf::Inlinable(views)132} else {133let mut needle_set = PlIndexSet::<Box<_>>::default();134needle_set.extend(needles.iter().map(|needle| {135assert!(!needle.is_null());136let needle = if self.is_string {137needle.as_str().unwrap().as_bytes()138} else {139needle.as_binary().unwrap()140};141needle.into()142}));143EqualsOneOf::Set(needle_set)144})145})146}147148fn initialize_decode_equals_one_of_state(149&mut self,150target: &mut DecodedState,151) -> Option<&EqualsOneOf> {152if let Some(EqualsOneOf::Set(needles)) = self.equals_one_of.as_deref_mut() {153if target.needle_views.is_empty() {154target.needle_views.extend(155needles156.iter()157.map(|needle| target.binview.push_value_into_buffer(needle)),158);159}160}161self.equals_one_of.as_deref()162}163}164165#[allow(clippy::large_enum_variant)]166#[derive(Debug)]167pub(crate) enum StateTranslation<'a> {168Plain(BinaryIter<'a>),169Dictionary(hybrid_rle::HybridRleDecoder<'a>),170DeltaLengthByteArray(delta_length_byte_array::Decoder<'a>, Vec<u32>),171DeltaBytes(delta_byte_array::Decoder<'a>),172}173174impl utils::Decoded for DecodedState {175fn len(&self) -> usize {176self.binview.len()177}178179fn extend_nulls(&mut self, n: usize) {180self.binview.extend_constant(n, Some(&[]));181self.validity.extend_constant(n, false);182}183184fn remaining_capacity(&self) -> usize {185(self.binview.capacity() - self.binview.len())186.min(self.validity.capacity() - self.validity.len())187}188}189190#[allow(clippy::too_many_arguments)]191fn decode_plain(192values: &[u8],193max_num_values: usize,194state: &mut DecodedState,195is_optional: bool,196197page_validity: Option<&Bitmap>,198filter: Option<Filter>,199200equals_one_of_state: Option<&EqualsOneOf>,201verify_utf8: bool,202) -> ParquetResult<()> {203if is_optional {204append_validity(205page_validity,206filter.as_ref(),207&mut state.validity,208max_num_values,209);210}211212if let Some(equals_one_of_state) = equals_one_of_state213&& page_validity.is_none()214{215let mut total_bytes_len = 0;216match equals_one_of_state {217EqualsOneOf::Empty => {},218EqualsOneOf::Inlinable(views) => {219predicate::decode_is_in_inlinable(220max_num_values,221values,222views,223unsafe { state.binview.views_mut() },224&mut total_bytes_len,225)?;226},227EqualsOneOf::Set(needles) => {228predicate::decode_is_in_non_inlinable(229max_num_values,230values,231needles,232&state.needle_views,233unsafe { state.binview.views_mut() },234&mut total_bytes_len,235)?;236},237}238239let new_total_bytes_len = state.binview.total_bytes_len() + total_bytes_len;240241// SAFETY: We know that the view is valid since we added it safely and we242// update the total_bytes_len afterwards. The total_buffer_len is not affected.243unsafe {244state.binview.set_total_bytes_len(new_total_bytes_len);245}246247return Ok(());248}249250let page_validity = constrain_page_validity(max_num_values, page_validity, filter.as_ref());251252match (filter, page_validity) {253(None, None) => required::decode(254max_num_values,255values,256None,257&mut state.binview,258verify_utf8,259),260(Some(Filter::Range(rng)), None) if rng.start == 0 => required::decode(261max_num_values,262values,263Some(rng.end),264&mut state.binview,265verify_utf8,266),267(None, Some(page_validity)) => optional::decode(268page_validity.set_bits(),269values,270&mut state.binview,271&page_validity,272verify_utf8,273),274(Some(Filter::Range(rng)), Some(page_validity)) if rng.start == 0 => optional::decode(275page_validity.set_bits(),276values,277&mut state.binview,278&page_validity,279verify_utf8,280),281(Some(Filter::Mask(mask)), None) => required_masked::decode(282max_num_values,283values,284&mut state.binview,285&mask,286verify_utf8,287),288(Some(Filter::Mask(mask)), Some(page_validity)) => optional_masked::decode(289page_validity.set_bits(),290values,291&mut state.binview,292&page_validity,293&mask,294verify_utf8,295),296(Some(Filter::Range(rng)), None) => required_masked::decode(297max_num_values,298values,299&mut state.binview,300&filter_from_range(rng),301verify_utf8,302),303(Some(Filter::Range(rng)), Some(page_validity)) => optional_masked::decode(304page_validity.set_bits(),305values,306&mut state.binview,307&page_validity,308&filter_from_range(rng),309verify_utf8,310),311(Some(Filter::Predicate(_)), _) => unreachable!(),312}?;313314Ok(())315}316317#[cold]318fn invalid_input_err() -> ParquetError {319ParquetError::oos("String data does not match given length")320}321322#[cold]323fn invalid_utf8_err() -> ParquetError {324ParquetError::oos("String data contained invalid UTF-8")325}326327pub fn decode_plain_generic(328values: &[u8],329target: &mut MutableBinaryViewArray<[u8]>,330331num_rows: usize,332mut next: impl FnMut() -> Option<(bool, bool)>,333334verify_utf8: bool,335) -> ParquetResult<()> {336// Since the offset in the buffer is decided by the interleaved lengths, every value has to be337// walked no matter what. This makes decoding rather inefficient in general.338//339// There are three cases:340// 1. All inlinable values341// - Most time is spend in decoding342// - No additional buffer has to be formed343// - Possible UTF-8 verification is fast because the len_below_128 trick344// 2. All non-inlinable values345// - Little time is spend in decoding346// - Most time is spend in buffer memcopying (we remove the interleaved lengths)347// - Possible UTF-8 verification is fast because the continuation byte trick348// 3. Mixed inlinable and non-inlinable values349// - Time shared between decoding and buffer forming350// - UTF-8 verification might still use len_below_128 trick, but might need to fall back to351// slow path.352353target.finish_in_progress();354unsafe { target.views_mut() }.reserve(num_rows);355356let start_target_length = target.len();357358let buffer_idx = target.completed_buffers().len() as u32;359let mut buffer = Vec::with_capacity(values.len() + 1);360let mut none_starting_with_continuation_byte = true; // Whether the transition from between strings is valid361// UTF-8362let mut all_len_below_128 = true; // Whether all the lengths of the values are below 128, this363// allows us to make UTF-8 verification a lot faster.364365let mut total_bytes_len = 0;366let mut num_inlined = 0;367368let mut mvalues = values;369while let Some((is_valid, is_selected)) = next() {370if !is_valid {371if is_selected {372unsafe { target.views_mut() }.push(unsafe { View::new_inline_unchecked(&[]) });373}374continue;375}376377if mvalues.len() < 4 {378return Err(invalid_input_err());379}380381let length;382(length, mvalues) = mvalues.split_at(4);383let length: &[u8; 4] = unsafe { length.try_into().unwrap_unchecked() };384let length = u32::from_le_bytes(*length);385386if mvalues.len() < length as usize {387return Err(invalid_input_err());388}389390let value;391(value, mvalues) = mvalues.split_at(length as usize);392393all_len_below_128 &= value.len() < 128;394// Everything starting with 10.. .... is a continuation byte.395none_starting_with_continuation_byte &=396value.is_empty() || value[0] & 0b1100_0000 != 0b1000_0000;397398if !is_selected {399continue;400}401402let offset = buffer.len() as u32;403404if value.len() <= View::MAX_INLINE_SIZE as usize {405unsafe { target.views_mut() }.push(unsafe { View::new_inline_unchecked(value) });406num_inlined += 1;407} else {408buffer.extend_from_slice(value);409unsafe { target.views_mut() }410.push(unsafe { View::new_noninline_unchecked(value, buffer_idx, offset) });411}412413total_bytes_len += value.len();414}415416unsafe {417target.set_total_bytes_len(target.total_bytes_len() + total_bytes_len);418}419420if verify_utf8 {421// This is a trick that allows us to check the resulting buffer which allows to batch the422// UTF-8 verification.423//424// This is allowed if none of the strings start with a UTF-8 continuation byte, so we keep425// track of that during the decoding.426if num_inlined == 0 {427if !none_starting_with_continuation_byte || simdutf8::basic::from_utf8(&buffer).is_err()428{429return Err(invalid_utf8_err());430}431432// This is a small trick that allows us to check the Parquet buffer instead of the view433// buffer. Batching the UTF-8 verification is more performant. For this to be allowed,434// all the interleaved lengths need to be valid UTF-8.435//436// Every strings prepended by 4 bytes (L, 0, 0, 0), since we check here L < 128. L is437// only a valid first byte of a UTF-8 code-point and (L, 0, 0, 0) is valid UTF-8.438// Consequently, it is valid to just check the whole buffer.439} else if all_len_below_128 {440if simdutf8::basic::from_utf8(&values[..values.len() - mvalues.len()]).is_err() {441return Err(invalid_utf8_err());442}443} else {444// We check all the non-inlined values here.445if !none_starting_with_continuation_byte || simdutf8::basic::from_utf8(&buffer).is_err()446{447return Err(invalid_utf8_err());448}449450let mut all_inlined_are_ascii = true;451452// @NOTE: This is only valid because we initialize our inline View's to be zeroes on453// non-included bytes.454for view in &target.views()[start_target_length..] {455all_inlined_are_ascii &= (view.length > View::MAX_INLINE_SIZE)456| (view.as_u128() & 0x0000_0000_8080_8080_8080_8080_8080_8080 == 0);457}458459// This is the very slow path.460if !all_inlined_are_ascii {461let mut is_valid = true;462for view in &target.views()[start_target_length..] {463if view.length <= View::MAX_INLINE_SIZE {464is_valid &=465std::str::from_utf8(unsafe { view.get_inlined_slice_unchecked() })466.is_ok();467}468}469470if !is_valid {471return Err(invalid_utf8_err());472}473}474}475}476477target.push_buffer(buffer.into());478479Ok(())480}481482impl utils::Decoder for BinViewDecoder {483type Translation<'a> = StateTranslation<'a>;484type Dict = BinaryViewArray;485type DecodedState = DecodedState;486type Output = Box<dyn Array>;487488fn with_capacity(&self, capacity: usize) -> Self::DecodedState {489DecodedState {490binview: MutableBinaryViewArray::with_capacity(capacity),491validity: BitmapBuilder::with_capacity(capacity),492needle_views: Vec::new(),493}494}495496fn evaluate_dict_predicate(497&self,498dict: &Self::Dict,499predicate: &PredicateFilter,500) -> ParquetResult<Bitmap> {501let utf8_array;502let mut dict_arr = dict as &dyn Array;503504if self.is_string {505utf8_array = unsafe { dict.to_utf8view_unchecked() };506dict_arr = &utf8_array507}508509Ok(predicate.predicate.evaluate(dict_arr))510}511512fn evaluate_predicate(513&mut self,514state: &utils::State<'_, Self>,515predicate: Option<&SpecializedParquetColumnExpr>,516pred_true_mask: &mut BitmapBuilder,517dict_mask: Option<&Bitmap>,518) -> ParquetResult<bool> {519if state.page_validity.is_some() {520// @Performance. This should be implemented.521return Ok(false);522}523524if let StateTranslation::Dictionary(values) = &state.translation {525let dict_mask = dict_mask.unwrap();526super::dictionary_encoded::predicate::decode(527values.clone(),528dict_mask,529pred_true_mask,530)?;531return Ok(true);532}533534let Some(predicate) = predicate else {535return Ok(false);536};537538use {SpecializedParquetColumnExpr as Spce, StateTranslation as St};539match (&state.translation, predicate) {540(St::Plain(iter), Spce::Equal(needle)) => {541assert!(!needle.is_null());542543let needle = if self.is_string {544needle.as_str().unwrap().as_bytes()545} else {546needle.as_binary().unwrap()547};548predicate::decode_equals(iter.max_num_values, iter.values, needle, pred_true_mask)?;549},550(St::Plain(iter), Spce::EqualOneOf(needles)) => {551let e = self.initialize_predicate_equals_one_of(needles);552553match e {554EqualsOneOf::Empty => {555pred_true_mask.extend_constant(iter.max_num_values, false)556},557EqualsOneOf::Inlinable(views) => {558predicate::decode_is_in_no_values_inlinable(559iter.max_num_values,560iter.values,561views,562pred_true_mask,563)?;564},565EqualsOneOf::Set(needle_set) => {566predicate::decode_is_in_no_values_non_inlinable(567iter.max_num_values,568iter.values,569needle_set,570pred_true_mask,571)?;572},573}574},575(St::Plain(iter), Spce::StartsWith(pattern)) => predicate::decode_matches(576iter.max_num_values,577iter.values,578|v| v.starts_with(pattern),579pred_true_mask,580)?,581(St::Plain(iter), Spce::EndsWith(pattern)) => predicate::decode_matches(582iter.max_num_values,583iter.values,584|v| v.ends_with(pattern),585pred_true_mask,586)?,587(St::Plain(iter), Spce::RegexMatch(regex)) => predicate::decode_matches(588iter.max_num_values,589iter.values,590|v| regex.is_match(v),591pred_true_mask,592)?,593_ => return Ok(false),594}595596Ok(true)597}598599fn apply_dictionary(600&mut self,601state: &mut Self::DecodedState,602dict: &Self::Dict,603) -> ParquetResult<()> {604if state.binview.completed_buffers().len() < dict.data_buffers().len() {605for buffer in dict.data_buffers().as_ref() {606state.binview.push_buffer(buffer.clone());607}608}609610assert!(state.binview.completed_buffers().len() == dict.data_buffers().len());611612Ok(())613}614615fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {616let values = &page.buffer;617let num_values = page.num_values;618619let mut arr = MutableBinaryViewArray::new();620required::decode(num_values, values, None, &mut arr, self.is_string)?;621622Ok(arr.freeze())623}624625fn extend_decoded(626&self,627decoded: &mut Self::DecodedState,628additional: &dyn Array,629is_optional: bool,630) -> ParquetResult<()> {631let is_utf8 = self.is_string;632if is_utf8 {633let array = additional.as_any().downcast_ref::<Utf8ViewArray>().unwrap();634let mut array = array.to_binview();635636if let Some(validity) = array.take_validity() {637decoded.binview.extend_from_array(&array);638decoded.validity.extend_from_bitmap(&validity);639} else {640decoded.binview.extend_from_array(&array);641if is_optional {642decoded.validity.extend_constant(array.len(), true);643}644}645} else {646let array = additional647.as_any()648.downcast_ref::<BinaryViewArray>()649.unwrap();650let mut array = array.clone();651652if let Some(validity) = array.take_validity() {653decoded.binview.extend_from_array(&array);654decoded.validity.extend_from_bitmap(&validity);655} else {656decoded.binview.extend_from_array(&array);657if is_optional {658decoded.validity.extend_constant(array.len(), true);659}660}661}662663Ok(())664}665666fn extend_filtered_with_state(667&mut self,668mut state: utils::State<'_, Self>,669decoded: &mut Self::DecodedState,670filter: Option<super::Filter>,671_chunks: &mut Vec<Self::Output>,672) -> ParquetResult<()> {673let is_string = self.is_string;674let equals_one_of_state = self.initialize_decode_equals_one_of_state(decoded);675match state.translation {676StateTranslation::Plain(iter) => decode_plain(677iter.values,678iter.max_num_values,679decoded,680state.is_optional,681state.page_validity.as_ref(),682filter,683equals_one_of_state,684is_string,685),686StateTranslation::Dictionary(ref mut indexes) => {687let dict = state.dict.unwrap();688689let start_length = decoded.binview.views().len();690691dictionary_encoded::decode_dict(692indexes.clone(),693dict.views().as_slice(),694state.is_optional,695state.page_validity.as_ref(),696filter,697&mut decoded.validity,698unsafe { decoded.binview.views_mut() },699)?;700701let total_length: usize = decoded702.binview703.views()704.iter()705.skip(start_length)706.map(|view| view.length as usize)707.sum();708unsafe {709decoded710.binview711.set_total_bytes_len(decoded.binview.total_bytes_len() + total_length);712}713714Ok(())715},716StateTranslation::DeltaLengthByteArray(decoder, _vec) => {717let values = decoder.values;718let lengths = decoder.lengths.collect::<Vec<i64>>()?;719720if self.is_string {721let mut none_starting_with_continuation_byte = true;722let mut offset = 0;723for length in &lengths {724none_starting_with_continuation_byte &=725*length == 0 || values[offset] & 0xC0 != 0x80;726offset += *length as usize;727}728729if !none_starting_with_continuation_byte {730return Err(invalid_utf8_err());731}732733if simdutf8::basic::from_utf8(&values[..offset]).is_err() {734return Err(invalid_utf8_err());735}736}737738let mut i = 0;739let mut offset = 0;740unspecialized_decode(741lengths.len(),742|| {743let length = lengths[i] as usize;744745let value = &values[offset..offset + length];746747i += 1;748offset += length;749750Ok(value)751},752filter,753state.page_validity,754state.is_optional,755&mut decoded.validity,756&mut decoded.binview,757)758},759StateTranslation::DeltaBytes(mut decoder) => {760let check_utf8 = self.is_string;761762unspecialized_decode(763decoder.len(),764|| {765let value = decoder.next().unwrap()?;766767if check_utf8 && simdutf8::basic::from_utf8(&value[..]).is_err() {768return Err(invalid_utf8_err());769}770771Ok(value)772},773filter,774state.page_validity,775state.is_optional,776&mut decoded.validity,777&mut decoded.binview,778)779},780}781}782783fn extend_constant(784&mut self,785decoded: &mut Self::DecodedState,786length: usize,787value: &ParquetScalar,788) -> ParquetResult<()> {789if value.is_null() {790decoded.extend_nulls(length);791return Ok(());792}793794let value = match value {795ParquetScalar::String(v) => v.as_bytes(),796ParquetScalar::Binary(v) => v.as_ref(),797_ => unreachable!(),798};799800decoded.binview.extend_constant(length, Some(value));801decoded.validity.extend_constant(length, true);802803Ok(())804}805806fn finalize(807&self,808dtype: ArrowDataType,809_dict: Option<Self::Dict>,810state: Self::DecodedState,811) -> ParquetResult<Box<dyn Array>> {812let mut array: BinaryViewArray = state.binview.freeze();813814let validity = freeze_validity(state.validity);815array = array.with_validity(validity);816817match dtype.to_physical_type() {818PhysicalType::BinaryView => Ok(array.boxed()),819PhysicalType::Utf8View => {820// SAFETY: we already checked utf8821unsafe {822Ok(Utf8ViewArray::new_unchecked(823dtype,824array.views().clone(),825array.data_buffers().clone(),826array.validity().cloned(),827array.try_total_bytes_len(),828array.total_buffer_len(),829)830.boxed())831}832},833_ => unreachable!(),834}835}836}837838#[derive(Debug)]839pub struct BinaryIter<'a> {840values: &'a [u8],841842/// A maximum number of items that this [`BinaryIter`] may produce.843///844/// This equal the length of the iterator i.f.f. the data encoded by the [`BinaryIter`] is not845/// nullable.846max_num_values: usize,847}848849impl<'a> BinaryIter<'a> {850pub fn new(values: &'a [u8], max_num_values: usize) -> Self {851Self {852values,853max_num_values,854}855}856}857858impl<'a> Iterator for BinaryIter<'a> {859type Item = &'a [u8];860861#[inline]862fn next(&mut self) -> Option<Self::Item> {863if self.max_num_values == 0 {864assert!(self.values.is_empty());865return None;866}867868let (length, remaining) = self.values.split_at(4);869let length: [u8; 4] = unsafe { length.try_into().unwrap_unchecked() };870let length = u32::from_le_bytes(length) as usize;871let (result, remaining) = remaining.split_at(length);872self.max_num_values -= 1;873self.values = remaining;874Some(result)875}876877#[inline]878fn size_hint(&self) -> (usize, Option<usize>) {879(0, Some(self.max_num_values))880}881}882883884