Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
6940 views
use arrow::bitmap::utils::BitmapIter;1use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};23use super::utils::PageDecoder;4use super::{Filter, utils};5use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};6use crate::parquet::error::ParquetResult;7use crate::parquet::page::{DataPage, split_buffer};8use crate::parquet::read::levels::get_bit_width;910pub struct Nested {11validity: Option<BitmapBuilder>,12length: usize,13content: NestedContent,1415// We batch the collection of valids and invalids to amortize the costs. This only really works16// when valids and invalids are grouped or there is a disbalance in the amount of valids vs.17// invalids. This, however, is a very common situation.18num_valids: usize,19num_invalids: usize,20}2122#[derive(Debug)]23pub enum NestedContent {24Primitive,25List { offsets: Vec<i64> },26FixedSizeList { width: usize },27Struct,28}2930impl Nested {31fn primitive(is_nullable: bool) -> Self {32// @NOTE: We allocate with `0` capacity here since we will not be pushing to this bitmap.33// This is because primitive does not keep track of the validity here. It keeps track in34// the decoder. We do still want to put something so that we can check for nullability by35// looking at the option.36let validity = is_nullable.then(|| BitmapBuilder::with_capacity(0));3738Self {39validity,40length: 0,41content: NestedContent::Primitive,4243num_valids: 0,44num_invalids: 0,45}46}4748fn list_with_capacity(is_nullable: bool, capacity: usize) -> Self {49let offsets = Vec::with_capacity(capacity);50let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));51Self {52validity,53length: 0,54content: NestedContent::List { offsets },5556num_valids: 0,57num_invalids: 0,58}59}6061fn fixedlist_with_capacity(is_nullable: bool, width: usize, capacity: usize) -> Self {62let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));63Self {64validity,65length: 0,66content: NestedContent::FixedSizeList { width },6768num_valids: 0,69num_invalids: 0,70}71}7273fn struct_with_capacity(is_nullable: bool, capacity: usize) -> Self {74let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));75Self {76validity,77length: 0,78content: NestedContent::Struct,7980num_valids: 0,81num_invalids: 0,82}83}8485fn take(mut self) -> (usize, Vec<i64>, Option<BitmapBuilder>) {86if !matches!(self.content, NestedContent::Primitive) {87if let Some(validity) = self.validity.as_mut() {88validity.extend_constant(self.num_valids, true);89validity.extend_constant(self.num_invalids, false);90}9192debug_assert!(93self.validity94.as_ref()95.is_none_or(|v| v.len() == self.length)96);97}9899self.num_valids = 0;100self.num_invalids = 0;101102match self.content {103NestedContent::Primitive => {104debug_assert!(self.validity.is_none_or(|validity| validity.is_empty()));105(self.length, Vec::new(), None)106},107NestedContent::List { offsets } => (self.length, offsets, self.validity),108NestedContent::FixedSizeList { .. } => (self.length, Vec::new(), self.validity),109NestedContent::Struct => (self.length, Vec::new(), self.validity),110}111}112113fn is_nullable(&self) -> bool {114self.validity.is_some()115}116117fn is_repeated(&self) -> bool {118match self.content {119NestedContent::Primitive => false,120NestedContent::List { .. } => true,121NestedContent::FixedSizeList { .. } => true,122NestedContent::Struct => false,123}124}125126fn is_required(&self) -> bool {127match self.content {128NestedContent::Primitive => false,129NestedContent::List { .. } => false,130NestedContent::FixedSizeList { .. } => false,131NestedContent::Struct => true,132}133}134135/// number of rows136fn len(&self) -> usize {137self.length138}139140fn invalid_num_values(&self) -> usize {141match &self.content {142NestedContent::Primitive => 1,143NestedContent::List { .. } => 0,144NestedContent::FixedSizeList { width } => *width,145NestedContent::Struct => 1,146}147}148149fn push(&mut self, value: i64, is_valid: bool) {150let is_primitive = matches!(self.content, NestedContent::Primitive);151152if is_valid && self.num_invalids != 0 {153debug_assert!(!is_primitive);154155// @NOTE: Having invalid items might not necessarily mean that we have a validity mask.156//157// For instance, if we have a optional struct with a required list in it, that struct158// will have a validity mask and the list will not. In the arrow representation of this159// array, however, the list will still have invalid items where the struct is null.160//161// Array:162// [163// { 'x': [1] },164// None,165// { 'x': [1, 2] },166// ]167//168// Arrow:169// struct = [ list[0] None list[2] ]170// list = {171// values = [ 1, 1, 2 ],172// offsets = [ 0, 1, 1, 3 ],173// }174//175// Parquet:176// [ 1, 1, 2 ] + definition + repetition levels177//178// As you can see we need to insert an invalid item into the list even though it does179// not have a validity mask.180if let Some(validity) = self.validity.as_mut() {181validity.extend_constant(self.num_valids, true);182validity.extend_constant(self.num_invalids, false);183}184185self.num_valids = 0;186self.num_invalids = 0;187}188189self.num_valids += usize::from(!is_primitive & is_valid);190self.num_invalids += usize::from(!is_primitive & !is_valid);191192self.length += 1;193if let NestedContent::List { offsets } = &mut self.content {194offsets.push(value);195}196}197198fn push_default(&mut self, length: i64) {199let is_primitive = matches!(self.content, NestedContent::Primitive);200self.num_invalids += usize::from(!is_primitive);201202self.length += 1;203if let NestedContent::List { offsets } = &mut self.content {204offsets.push(length);205}206}207}208209/// Utility structure to create a `Filter` and `Validity` mask for the leaf values.210///211/// This batches the extending.212pub struct BatchedNestedDecoder<'a> {213pub(crate) num_waiting_valids: usize,214pub(crate) num_waiting_invalids: usize,215216filter: &'a mut MutableBitmap,217validity: &'a mut MutableBitmap,218}219220impl BatchedNestedDecoder<'_> {221fn push_valid(&mut self) -> ParquetResult<()> {222self.push_n_valids(1)223}224225fn push_invalid(&mut self) -> ParquetResult<()> {226self.push_n_invalids(1)227}228229fn push_n_valids(&mut self, n: usize) -> ParquetResult<()> {230if self.num_waiting_invalids == 0 {231self.num_waiting_valids += n;232return Ok(());233}234235self.filter.extend_constant(self.num_waiting_valids, true);236self.validity.extend_constant(self.num_waiting_valids, true);237238self.filter.extend_constant(self.num_waiting_invalids, true);239self.validity240.extend_constant(self.num_waiting_invalids, false);241242self.num_waiting_valids = n;243self.num_waiting_invalids = 0;244245Ok(())246}247248fn push_n_invalids(&mut self, n: usize) -> ParquetResult<()> {249self.num_waiting_invalids += n;250Ok(())251}252253fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {254if self.num_waiting_valids > 0 {255self.filter.extend_constant(self.num_waiting_valids, true);256self.validity.extend_constant(self.num_waiting_valids, true);257self.num_waiting_valids = 0;258}259if self.num_waiting_invalids > 0 {260self.filter.extend_constant(self.num_waiting_invalids, true);261self.validity262.extend_constant(self.num_waiting_invalids, false);263self.num_waiting_invalids = 0;264}265266self.filter.extend_constant(n, false);267self.validity.extend_constant(n, true);268269Ok(())270}271272fn finalize(self) -> ParquetResult<()> {273self.filter.extend_constant(self.num_waiting_valids, true);274self.validity.extend_constant(self.num_waiting_valids, true);275276self.filter.extend_constant(self.num_waiting_invalids, true);277self.validity278.extend_constant(self.num_waiting_invalids, false);279280Ok(())281}282}283284/// The initial info of nested data types.285/// The `bool` indicates if the type is nullable.286#[derive(Debug, Clone, Copy, PartialEq, Eq)]287pub enum InitNested {288/// Primitive data types289Primitive(bool),290/// List data types291List(bool),292/// Fixed-Size List data types293FixedSizeList(bool, usize),294/// Struct data types295Struct(bool),296}297298/// Initialize [`NestedState`] from `&[InitNested]`.299pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState {300use {InitNested as IN, Nested as N};301302let container = init303.iter()304.map(|init| match init {305IN::Primitive(is_nullable) => N::primitive(*is_nullable),306IN::List(is_nullable) => N::list_with_capacity(*is_nullable, capacity),307IN::FixedSizeList(is_nullable, width) => {308N::fixedlist_with_capacity(*is_nullable, *width, capacity)309},310IN::Struct(is_nullable) => N::struct_with_capacity(*is_nullable, capacity),311})312.collect();313314NestedState::new(container)315}316317/// The state of nested data types.318#[derive(Default)]319pub struct NestedState {320/// The nesteds composing `NestedState`.321nested: Vec<Nested>,322}323324impl NestedState {325/// Creates a new [`NestedState`].326fn new(nested: Vec<Nested>) -> Self {327Self { nested }328}329330pub fn pop(&mut self) -> Option<(usize, Vec<i64>, Option<BitmapBuilder>)> {331Some(self.nested.pop()?.take())332}333334pub fn last(&self) -> Option<&NestedContent> {335self.nested.last().map(|v| &v.content)336}337338/// The number of rows in this state339pub fn len(&self) -> usize {340// outermost is the number of rows341self.nested[0].len()342}343344/// Returns the definition and repetition levels for each nesting level345fn levels(&self) -> (Vec<u16>, Vec<u16>) {346let depth = self.nested.len();347348let mut def_levels = Vec::with_capacity(depth + 1);349let mut rep_levels = Vec::with_capacity(depth + 1);350351def_levels.push(0);352rep_levels.push(0);353354for i in 0..depth {355let nest = &self.nested[i];356357let def_delta = nest.is_nullable() as u16 + nest.is_repeated() as u16;358let rep_delta = nest.is_repeated() as u16;359360def_levels.push(def_levels[i] + def_delta);361rep_levels.push(rep_levels[i] + rep_delta);362}363364(def_levels, rep_levels)365}366}367368fn collect_level_values(369target: &mut Vec<u16>,370hybrid_rle: HybridRleDecoder<'_>,371) -> ParquetResult<()> {372target.reserve(hybrid_rle.len());373374for chunk in hybrid_rle.into_chunk_iter() {375let chunk = chunk?;376377match chunk {378HybridRleChunk::Rle(value, size) => {379target.resize(target.len() + size, value as u16);380},381HybridRleChunk::Bitpacked(decoder) => {382decoder.lower_element::<u16>()?.collect_into(target);383},384}385}386387Ok(())388}389390/// State to keep track of how many top-level values (i.e. rows) still need to be skipped and391/// collected.392///393/// This state should be kept between pages because a top-level value / row value may span several394/// pages.395///396/// - `num_skips = Some(n)` means that it will skip till the `n + 1`-th occurrence of the repetition397/// level of `0` (i.e. the start of a top-level value / row value).398/// - `num_collects = Some(n)` means that it will collect values till the `n + 1`-th occurrence of399/// the repetition level of `0` (i.e. the start of a top-level value / row value).400struct DecodingState {401num_skips: Option<usize>,402num_collects: Option<usize>,403}404405#[allow(clippy::too_many_arguments)]406fn decode_nested(407mut current_def_levels: &[u16],408mut current_rep_levels: &[u16],409410batched_collector: &mut BatchedNestedDecoder<'_>,411nested: &mut [Nested],412413state: &mut DecodingState,414top_level_filter: &mut BitmapIter<'_>,415416// Amortized allocations417def_levels: &[u16],418rep_levels: &[u16],419) -> ParquetResult<()> {420let max_depth = nested.len();421let leaf_def_level = *def_levels.last().unwrap();422423while !current_def_levels.is_empty() {424debug_assert_eq!(current_def_levels.len(), current_rep_levels.len());425426// Handle skips427if let Some(ref mut num_skips) = state.num_skips {428let mut i = 0;429let mut num_skipped_values = 0;430while i < current_def_levels.len() && (*num_skips > 0 || current_rep_levels[i] != 0) {431let def = current_def_levels[i];432let rep = current_rep_levels[i];433434*num_skips -= usize::from(rep == 0);435i += 1;436437// @NOTE:438// We don't need to account for higher def-levels that imply extra values, since we439// don't have those higher levels either.440num_skipped_values += usize::from(def == leaf_def_level);441}442batched_collector.skip_in_place(num_skipped_values)?;443444current_def_levels = ¤t_def_levels[i..];445current_rep_levels = ¤t_rep_levels[i..];446447if current_def_levels.is_empty() {448break;449} else {450state.num_skips = None;451}452}453454// Handle collects455if let Some(ref mut num_collects) = state.num_collects {456let mut i = 0;457while i < current_def_levels.len() && (*num_collects > 0 || current_rep_levels[i] != 0)458{459let def = current_def_levels[i];460let rep = current_rep_levels[i];461462*num_collects -= usize::from(rep == 0);463i += 1;464465let mut is_required = false;466467for depth in 0..max_depth {468// Defines whether this element is defined at `depth`469//470// e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and471// [ [ [ 1 ] ] ].472let is_defined_at_this_depth =473rep <= rep_levels[depth] && def >= def_levels[depth];474475let length = nested476.get(depth + 1)477.map(|x| x.len() as i64)478// the last depth is the leaf, which is always increased by 1479.unwrap_or(1);480481let nest = &mut nested[depth];482483let is_valid = !nest.is_nullable() || def > def_levels[depth];484485if is_defined_at_this_depth && !is_valid {486let mut num_elements = 1;487488nest.push(length, is_valid);489490for embed_depth in depth..max_depth {491let embed_length = nested492.get(embed_depth + 1)493.map(|x| x.len() as i64)494// the last depth is the leaf, which is always increased by 1495.unwrap_or(1);496497let embed_nest = &mut nested[embed_depth];498499if embed_depth > depth {500for _ in 0..num_elements {501embed_nest.push_default(embed_length);502}503}504505let embed_num_values = embed_nest.invalid_num_values();506num_elements *= embed_num_values;507508if embed_num_values == 0 {509break;510}511}512513batched_collector.push_n_invalids(num_elements)?;514515break;516}517518if is_required || is_defined_at_this_depth {519nest.push(length, is_valid);520521if depth == max_depth - 1 {522// the leaf / primitive523let is_valid = (def != def_levels[depth]) || !nest.is_nullable();524525if is_valid {526batched_collector.push_valid()?;527} else {528batched_collector.push_invalid()?;529}530}531}532533is_required = (is_required || is_defined_at_this_depth)534&& nest.is_required()535&& !is_valid;536}537}538539current_def_levels = ¤t_def_levels[i..];540current_rep_levels = ¤t_rep_levels[i..];541542if current_def_levels.is_empty() {543break;544} else {545state.num_collects = None;546}547}548549if top_level_filter.num_remaining() == 0 {550break;551}552553state.num_skips = Some(top_level_filter.take_leading_zeros()).filter(|v| *v != 0);554state.num_collects = Some(top_level_filter.take_leading_ones()).filter(|v| *v != 0);555}556557Ok(())558}559560/// Return the definition and repetition level iterators for this page.561fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder<'_>, HybridRleDecoder<'_>)> {562let split = split_buffer(page)?;563let def = split.def;564let rep = split.rep;565566let max_def_level = page.descriptor.max_def_level;567let max_rep_level = page.descriptor.max_rep_level;568569let def_iter = HybridRleDecoder::new(def, get_bit_width(max_def_level), page.num_values());570let rep_iter = HybridRleDecoder::new(rep, get_bit_width(max_rep_level), page.num_values());571572Ok((def_iter, rep_iter))573}574575impl<D: utils::Decoder> PageDecoder<D> {576pub fn collect_nested(577mut self,578filter: Option<Filter>,579) -> ParquetResult<(NestedState, D::Output, Bitmap)> {580let init = self.init_nested.as_mut().unwrap();581582// @TODO: We should probably count the filter so that we don't overallocate583let mut target = self.decoder.with_capacity(self.iter.total_num_values());584// @TODO: Self capacity585let mut nested_state = init_nested(init, 0);586587if let Some(dict) = self.dict.as_ref() {588self.decoder.apply_dictionary(&mut target, dict)?;589}590591// Amortize the allocations.592let (def_levels, rep_levels) = nested_state.levels();593594let mut current_def_levels = Vec::<u16>::new();595let mut current_rep_levels = Vec::<u16>::new();596597let (mut decode_state, top_level_filter) = match filter {598None => (599DecodingState {600num_skips: None,601num_collects: Some(usize::MAX),602},603Bitmap::new(),604),605Some(Filter::Range(range)) => (606DecodingState {607num_skips: Some(range.start),608num_collects: Some(range.len()),609},610Bitmap::new(),611),612Some(Filter::Mask(mask)) => (613DecodingState {614num_skips: None,615num_collects: None,616},617mask,618),619Some(Filter::Predicate(_)) => todo!(),620};621622let mut top_level_filter = top_level_filter.iter();623624loop {625let Some(page) = self.iter.next() else {626break;627};628let page = page?;629let page = page.decompress(&mut self.iter)?;630631let (mut def_iter, mut rep_iter) = level_iters(&page)?;632633let num_levels = def_iter.len().min(rep_iter.len());634def_iter.limit_to(num_levels);635rep_iter.limit_to(num_levels);636637current_def_levels.clear();638current_rep_levels.clear();639640collect_level_values(&mut current_def_levels, def_iter)?;641collect_level_values(&mut current_rep_levels, rep_iter)?;642643let mut leaf_filter = MutableBitmap::new();644let mut leaf_validity = MutableBitmap::new();645646// @TODO: move this to outside the loop.647let mut batched_collector = BatchedNestedDecoder {648num_waiting_valids: 0,649num_waiting_invalids: 0,650651filter: &mut leaf_filter,652validity: &mut leaf_validity,653};654655decode_nested(656¤t_def_levels,657¤t_rep_levels,658&mut batched_collector,659&mut nested_state.nested,660&mut decode_state,661&mut top_level_filter,662&def_levels,663&rep_levels,664)?;665666batched_collector.finalize()?;667668let leaf_validity = leaf_validity.freeze();669let leaf_filter = leaf_filter.freeze();670671let state = utils::State::new_nested(672&self.decoder,673&page,674self.dict.as_ref(),675Some(leaf_validity),676)?;677state.decode(678&mut self.decoder,679&mut target,680&mut BitmapBuilder::new(), // This will not get used or filled681Some(Filter::Mask(leaf_filter)),682)?;683684self.iter.reuse_page_buffer(page);685}686687// we pop the primitive off here.688debug_assert!(matches!(689nested_state.nested.last().unwrap().content,690NestedContent::Primitive691));692_ = nested_state.pop().unwrap();693694let array = self.decoder.finalize(self.dtype, self.dict, target)?;695696Ok((nested_state, array, Bitmap::new()))697}698}699700701