Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
8446 views
use arrow::bitmap::utils::BitmapIter;1use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};23use super::utils::PageDecoder;4use super::{Filter, utils};5use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};6use crate::parquet::error::ParquetResult;7use crate::parquet::page::{DataPage, split_buffer};8use crate::parquet::read::levels::get_bit_width;9use crate::read::deserialize::utils::Decoded;1011pub struct Nested {12validity: Option<BitmapBuilder>,13length: usize,14content: NestedContent,1516// We batch the collection of valids and invalids to amortize the costs. This only really works17// when valids and invalids are grouped or there is a disbalance in the amount of valids vs.18// invalids. This, however, is a very common situation.19num_valids: usize,20num_invalids: usize,21}2223#[derive(Debug)]24pub enum NestedContent {25Primitive,26List { offsets: Vec<i64> },27FixedSizeList { width: usize },28Struct,29}3031impl Nested {32fn primitive(is_nullable: bool) -> Self {33// @NOTE: We allocate with `0` capacity here since we will not be pushing to this bitmap.34// This is because primitive does not keep track of the validity here. It keeps track in35// the decoder. We do still want to put something so that we can check for nullability by36// looking at the option.37let validity = is_nullable.then(|| BitmapBuilder::with_capacity(0));3839Self {40validity,41length: 0,42content: NestedContent::Primitive,4344num_valids: 0,45num_invalids: 0,46}47}4849fn list_with_capacity(is_nullable: bool, capacity: usize) -> Self {50let offsets = Vec::with_capacity(capacity);51let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));52Self {53validity,54length: 0,55content: NestedContent::List { offsets },5657num_valids: 0,58num_invalids: 0,59}60}6162fn fixedlist_with_capacity(is_nullable: bool, width: usize, capacity: usize) -> Self {63let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));64Self {65validity,66length: 0,67content: NestedContent::FixedSizeList { width },6869num_valids: 0,70num_invalids: 0,71}72}7374fn struct_with_capacity(is_nullable: bool, capacity: usize) -> Self {75let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));76Self {77validity,78length: 0,79content: NestedContent::Struct,8081num_valids: 0,82num_invalids: 0,83}84}8586fn take(mut self) -> (usize, Vec<i64>, Option<BitmapBuilder>) {87if !matches!(self.content, NestedContent::Primitive) {88if let Some(validity) = self.validity.as_mut() {89validity.extend_constant(self.num_valids, true);90validity.extend_constant(self.num_invalids, false);91}9293debug_assert!(94self.validity95.as_ref()96.is_none_or(|v| v.len() == self.length)97);98}99100self.num_valids = 0;101self.num_invalids = 0;102103match self.content {104NestedContent::Primitive => {105debug_assert!(self.validity.is_none_or(|validity| validity.is_empty()));106(self.length, Vec::new(), None)107},108NestedContent::List { offsets } => (self.length, offsets, self.validity),109NestedContent::FixedSizeList { .. } => (self.length, Vec::new(), self.validity),110NestedContent::Struct => (self.length, Vec::new(), self.validity),111}112}113114fn is_nullable(&self) -> bool {115self.validity.is_some()116}117118fn is_repeated(&self) -> bool {119match self.content {120NestedContent::Primitive => false,121NestedContent::List { .. } => true,122NestedContent::FixedSizeList { .. } => true,123NestedContent::Struct => false,124}125}126127fn is_required(&self) -> bool {128match self.content {129NestedContent::Primitive => false,130NestedContent::List { .. } => false,131NestedContent::FixedSizeList { .. } => false,132NestedContent::Struct => true,133}134}135136/// number of rows137fn len(&self) -> usize {138self.length139}140141fn invalid_num_values(&self) -> usize {142match &self.content {143NestedContent::Primitive => 1,144NestedContent::List { .. } => 0,145NestedContent::FixedSizeList { width } => *width,146NestedContent::Struct => 1,147}148}149150fn push(&mut self, value: i64, is_valid: bool) {151let is_primitive = matches!(self.content, NestedContent::Primitive);152153if is_valid && self.num_invalids != 0 {154debug_assert!(!is_primitive);155156// @NOTE: Having invalid items might not necessarily mean that we have a validity mask.157//158// For instance, if we have a optional struct with a required list in it, that struct159// will have a validity mask and the list will not. In the arrow representation of this160// array, however, the list will still have invalid items where the struct is null.161//162// Array:163// [164// { 'x': [1] },165// None,166// { 'x': [1, 2] },167// ]168//169// Arrow:170// struct = [ list[0] None list[2] ]171// list = {172// values = [ 1, 1, 2 ],173// offsets = [ 0, 1, 1, 3 ],174// }175//176// Parquet:177// [ 1, 1, 2 ] + definition + repetition levels178//179// As you can see we need to insert an invalid item into the list even though it does180// not have a validity mask.181if let Some(validity) = self.validity.as_mut() {182validity.extend_constant(self.num_valids, true);183validity.extend_constant(self.num_invalids, false);184}185186self.num_valids = 0;187self.num_invalids = 0;188}189190self.num_valids += usize::from(!is_primitive & is_valid);191self.num_invalids += usize::from(!is_primitive & !is_valid);192193self.length += 1;194if let NestedContent::List { offsets } = &mut self.content {195offsets.push(value);196}197}198199fn push_default(&mut self, length: i64) {200let is_primitive = matches!(self.content, NestedContent::Primitive);201self.num_invalids += usize::from(!is_primitive);202203self.length += 1;204if let NestedContent::List { offsets } = &mut self.content {205offsets.push(length);206}207}208}209210/// Utility structure to create a `Filter` and `Validity` mask for the leaf values.211///212/// This batches the extending.213pub struct BatchedNestedDecoder<'a> {214pub(crate) num_waiting_valids: usize,215pub(crate) num_waiting_invalids: usize,216217filter: &'a mut MutableBitmap,218validity: &'a mut MutableBitmap,219}220221impl BatchedNestedDecoder<'_> {222fn push_valid(&mut self) -> ParquetResult<()> {223self.push_n_valids(1)224}225226fn push_invalid(&mut self) -> ParquetResult<()> {227self.push_n_invalids(1)228}229230fn push_n_valids(&mut self, n: usize) -> ParquetResult<()> {231if self.num_waiting_invalids == 0 {232self.num_waiting_valids += n;233return Ok(());234}235236self.filter.extend_constant(self.num_waiting_valids, true);237self.validity.extend_constant(self.num_waiting_valids, true);238239self.filter.extend_constant(self.num_waiting_invalids, true);240self.validity241.extend_constant(self.num_waiting_invalids, false);242243self.num_waiting_valids = n;244self.num_waiting_invalids = 0;245246Ok(())247}248249fn push_n_invalids(&mut self, n: usize) -> ParquetResult<()> {250self.num_waiting_invalids += n;251Ok(())252}253254fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {255if self.num_waiting_valids > 0 {256self.filter.extend_constant(self.num_waiting_valids, true);257self.validity.extend_constant(self.num_waiting_valids, true);258self.num_waiting_valids = 0;259}260if self.num_waiting_invalids > 0 {261self.filter.extend_constant(self.num_waiting_invalids, true);262self.validity263.extend_constant(self.num_waiting_invalids, false);264self.num_waiting_invalids = 0;265}266267self.filter.extend_constant(n, false);268self.validity.extend_constant(n, true);269270Ok(())271}272273fn finalize(self) -> ParquetResult<()> {274self.filter.extend_constant(self.num_waiting_valids, true);275self.validity.extend_constant(self.num_waiting_valids, true);276277self.filter.extend_constant(self.num_waiting_invalids, true);278self.validity279.extend_constant(self.num_waiting_invalids, false);280281Ok(())282}283}284285/// The initial info of nested data types.286/// The `bool` indicates if the type is nullable.287#[derive(Debug, Clone, Copy, PartialEq, Eq)]288pub enum InitNested {289/// Primitive data types290Primitive(bool),291/// List data types292List(bool),293/// Fixed-Size List data types294FixedSizeList(bool, usize),295/// Struct data types296Struct(bool),297}298299/// Initialize [`NestedState`] from `&[InitNested]`.300pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState {301use {InitNested as IN, Nested as N};302303let container = init304.iter()305.map(|init| match init {306IN::Primitive(is_nullable) => N::primitive(*is_nullable),307IN::List(is_nullable) => N::list_with_capacity(*is_nullable, capacity),308IN::FixedSizeList(is_nullable, width) => {309N::fixedlist_with_capacity(*is_nullable, *width, capacity)310},311IN::Struct(is_nullable) => N::struct_with_capacity(*is_nullable, capacity),312})313.collect();314315NestedState::new(container)316}317318/// The state of nested data types.319#[derive(Default)]320pub struct NestedState {321/// The nesteds composing `NestedState`.322nested: Vec<Nested>,323}324325impl NestedState {326/// Creates a new [`NestedState`].327fn new(nested: Vec<Nested>) -> Self {328Self { nested }329}330331pub fn pop(&mut self) -> Option<(usize, Vec<i64>, Option<BitmapBuilder>)> {332Some(self.nested.pop()?.take())333}334335pub fn last(&self) -> Option<&NestedContent> {336self.nested.last().map(|v| &v.content)337}338339/// The number of rows in this state340pub fn len(&self) -> usize {341// outermost is the number of rows342self.nested[0].len()343}344345/// Returns the definition and repetition levels for each nesting level346fn levels(&self) -> (Vec<u16>, Vec<u16>) {347let depth = self.nested.len();348349let mut def_levels = Vec::with_capacity(depth + 1);350let mut rep_levels = Vec::with_capacity(depth + 1);351352def_levels.push(0);353rep_levels.push(0);354355for i in 0..depth {356let nest = &self.nested[i];357358let def_delta = nest.is_nullable() as u16 + nest.is_repeated() as u16;359let rep_delta = nest.is_repeated() as u16;360361def_levels.push(def_levels[i] + def_delta);362rep_levels.push(rep_levels[i] + rep_delta);363}364365(def_levels, rep_levels)366}367}368369fn collect_level_values(370target: &mut Vec<u16>,371hybrid_rle: HybridRleDecoder<'_>,372) -> ParquetResult<()> {373target.reserve(hybrid_rle.len());374375for chunk in hybrid_rle.into_chunk_iter() {376let chunk = chunk?;377378match chunk {379HybridRleChunk::Rle(value, size) => {380target.resize(target.len() + size, value as u16);381},382HybridRleChunk::Bitpacked(decoder) => {383decoder.lower_element::<u16>()?.collect_into(target);384},385}386}387388Ok(())389}390391/// State to keep track of how many top-level values (i.e. rows) still need to be skipped and392/// collected.393///394/// This state should be kept between pages because a top-level value / row value may span several395/// pages.396///397/// - `num_skips = Some(n)` means that it will skip till the `n + 1`-th occurrence of the repetition398/// level of `0` (i.e. the start of a top-level value / row value).399/// - `num_collects = Some(n)` means that it will collect values till the `n + 1`-th occurrence of400/// the repetition level of `0` (i.e. the start of a top-level value / row value).401struct DecodingState {402num_skips: Option<usize>,403num_collects: Option<usize>,404}405406#[allow(clippy::too_many_arguments)]407fn decode_nested(408mut current_def_levels: &[u16],409mut current_rep_levels: &[u16],410411batched_collector: &mut BatchedNestedDecoder<'_>,412nested: &mut [Nested],413414state: &mut DecodingState,415top_level_filter: &mut BitmapIter<'_>,416417// Amortized allocations418def_levels: &[u16],419rep_levels: &[u16],420) -> ParquetResult<()> {421let max_depth = nested.len();422let leaf_def_level = *def_levels.last().unwrap();423424while !current_def_levels.is_empty() {425debug_assert_eq!(current_def_levels.len(), current_rep_levels.len());426427// Handle skips428if let Some(ref mut num_skips) = state.num_skips {429let mut i = 0;430let mut num_skipped_values = 0;431while i < current_def_levels.len() && (*num_skips > 0 || current_rep_levels[i] != 0) {432let def = current_def_levels[i];433let rep = current_rep_levels[i];434435*num_skips -= usize::from(rep == 0);436i += 1;437438// @NOTE:439// We don't need to account for higher def-levels that imply extra values, since we440// don't have those higher levels either.441num_skipped_values += usize::from(def == leaf_def_level);442}443batched_collector.skip_in_place(num_skipped_values)?;444445current_def_levels = ¤t_def_levels[i..];446current_rep_levels = ¤t_rep_levels[i..];447448if current_def_levels.is_empty() {449break;450} else {451state.num_skips = None;452}453}454455// Handle collects456if let Some(ref mut num_collects) = state.num_collects {457let mut i = 0;458while i < current_def_levels.len() && (*num_collects > 0 || current_rep_levels[i] != 0)459{460let def = current_def_levels[i];461let rep = current_rep_levels[i];462463*num_collects -= usize::from(rep == 0);464i += 1;465466let mut is_required = false;467468for depth in 0..max_depth {469// Defines whether this element is defined at `depth`470//471// e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and472// [ [ [ 1 ] ] ].473let is_defined_at_this_depth =474rep <= rep_levels[depth] && def >= def_levels[depth];475476let length = nested477.get(depth + 1)478.map(|x| x.len() as i64)479// the last depth is the leaf, which is always increased by 1480.unwrap_or(1);481482let nest = &mut nested[depth];483484let is_valid = !nest.is_nullable() || def > def_levels[depth];485486if is_defined_at_this_depth && !is_valid {487let mut num_elements = 1;488489nest.push(length, is_valid);490491for embed_depth in depth..max_depth {492let embed_length = nested493.get(embed_depth + 1)494.map(|x| x.len() as i64)495// the last depth is the leaf, which is always increased by 1496.unwrap_or(1);497498let embed_nest = &mut nested[embed_depth];499500if embed_depth > depth {501for _ in 0..num_elements {502embed_nest.push_default(embed_length);503}504}505506let embed_num_values = embed_nest.invalid_num_values();507num_elements *= embed_num_values;508509if embed_num_values == 0 {510break;511}512}513514batched_collector.push_n_invalids(num_elements)?;515516break;517}518519if is_required || is_defined_at_this_depth {520nest.push(length, is_valid);521522if depth == max_depth - 1 {523// the leaf / primitive524let is_valid = (def != def_levels[depth]) || !nest.is_nullable();525526if is_valid {527batched_collector.push_valid()?;528} else {529batched_collector.push_invalid()?;530}531}532}533534is_required = (is_required || is_defined_at_this_depth)535&& nest.is_required()536&& !is_valid;537}538}539540current_def_levels = ¤t_def_levels[i..];541current_rep_levels = ¤t_rep_levels[i..];542543if current_def_levels.is_empty() {544break;545} else {546state.num_collects = None;547}548}549550if top_level_filter.num_remaining() == 0 {551break;552}553554state.num_skips = Some(top_level_filter.take_leading_zeros()).filter(|v| *v != 0);555state.num_collects = Some(top_level_filter.take_leading_ones()).filter(|v| *v != 0);556}557558Ok(())559}560561/// Return the definition and repetition level iterators for this page.562fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder<'_>, HybridRleDecoder<'_>)> {563let split = split_buffer(page)?;564let def = split.def;565let rep = split.rep;566567let max_def_level = page.descriptor.max_def_level;568let max_rep_level = page.descriptor.max_rep_level;569570let def_iter = HybridRleDecoder::new(def, get_bit_width(max_def_level), page.num_values());571let rep_iter = HybridRleDecoder::new(rep, get_bit_width(max_rep_level), page.num_values());572573Ok((def_iter, rep_iter))574}575576impl<D: utils::Decoder> PageDecoder<D> {577pub fn collect_nested(578mut self,579filter: Option<Filter>,580) -> ParquetResult<(NestedState, Vec<D::Output>, Bitmap)> {581let init = self.init_nested.as_mut().unwrap();582583// @TODO: We should probably count the filter so that we don't overallocate584let mut target = self.decoder.with_capacity(self.iter.total_num_values());585// @TODO: Self capacity586let mut nested_state = init_nested(init, 0);587588if let Some(dict) = self.dict.as_ref() {589self.decoder.apply_dictionary(&mut target, dict)?;590}591592// Amortize the allocations.593let (def_levels, rep_levels) = nested_state.levels();594595let mut current_def_levels = Vec::<u16>::new();596let mut current_rep_levels = Vec::<u16>::new();597598let (mut decode_state, top_level_filter) = match filter {599None => (600DecodingState {601num_skips: None,602num_collects: Some(usize::MAX),603},604Bitmap::new(),605),606Some(Filter::Range(range)) => (607DecodingState {608num_skips: Some(range.start),609num_collects: Some(range.len()),610},611Bitmap::new(),612),613Some(Filter::Mask(mask)) => (614DecodingState {615num_skips: None,616num_collects: None,617},618mask,619),620Some(Filter::Predicate(_)) => todo!(),621};622623let mut top_level_filter = top_level_filter.iter();624625let mut chunks = Vec::new();626while let Some(page) = self.iter.next() {627let page = page?;628let page = page.decompress(&mut self.iter)?;629630let (mut def_iter, mut rep_iter) = level_iters(&page)?;631632let num_levels = def_iter.len().min(rep_iter.len());633def_iter.limit_to(num_levels);634rep_iter.limit_to(num_levels);635636current_def_levels.clear();637current_rep_levels.clear();638639collect_level_values(&mut current_def_levels, def_iter)?;640collect_level_values(&mut current_rep_levels, rep_iter)?;641642let mut leaf_filter = MutableBitmap::new();643let mut leaf_validity = MutableBitmap::new();644645// @TODO: move this to outside the loop.646let mut batched_collector = BatchedNestedDecoder {647num_waiting_valids: 0,648num_waiting_invalids: 0,649650filter: &mut leaf_filter,651validity: &mut leaf_validity,652};653654decode_nested(655¤t_def_levels,656¤t_rep_levels,657&mut batched_collector,658&mut nested_state.nested,659&mut decode_state,660&mut top_level_filter,661&def_levels,662&rep_levels,663)?;664665batched_collector.finalize()?;666667let leaf_validity = leaf_validity.freeze();668let leaf_filter = leaf_filter.freeze();669670let state = utils::State::new_nested(671&self.decoder,672&page,673self.dict.as_ref(),674Some(leaf_validity),675)?;676state.decode(677&mut self.decoder,678&mut target,679Some(Filter::Mask(leaf_filter)),680&mut chunks,681)?;682683self.iter.reuse_page_buffer(page);684}685686// we pop the primitive off here.687debug_assert!(matches!(688nested_state.nested.last().unwrap().content,689NestedContent::Primitive690));691_ = nested_state.pop().unwrap();692693if target.len() > 0 || chunks.is_empty() {694chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);695}696697Ok((nested_state, chunks, Bitmap::new()))698}699}700701702