Path: blob/main/crates/polars-parquet/src/arrow/write/nested/dremel/mod.rs
6940 views
//! Implements the Dremel encoding part of Parquet with *repetition-levels* and *definition-levels*12use arrow::bitmap::Bitmap;3use arrow::offset::OffsetsBuffer;4use polars_utils::fixedringbuffer::FixedRingBuffer;56use super::super::pages::Nested;78#[cfg(test)]9mod tests;1011/// A Dremel encoding value12#[derive(Clone, Copy)]13pub struct DremelValue {14/// A *repetition-level* value15pub rep: u16,16/// A *definition-level* value17pub def: u16,18}1920/// This tries to mirror the Parquet Schema structures, so that is simple to reason about the21/// Dremel structures.22enum LevelContent<'a> {23/// Always 1 instance24Required,25/// Zero or more instances26Repeated,27/// Zero or one instance28Optional(Option<&'a Bitmap>),29}3031struct Level<'a> {32content: LevelContent<'a>,33/// "Iterator" with number of elements for the next level34lengths: LevelLength<'a>,35/// Remaining number of elements to process. NOTE: This is **not** equal to `length - offset`.36remaining: usize,37/// Offset into level elements38offset: usize,39/// The definition-level associated with this level40definition_depth: u16,41/// The repetition-level associated with this level42repetition_depth: u16,43}4445/// This contains the number of elements on the next level for each46enum LevelLength<'a> {47/// Fixed number of elements based on the validity of this element48Optional(usize),49/// Fixed number of elements irregardless of the validity of this element50Constant(usize),51/// Variable number of elements and calculated from the difference between two `i32` offsets52OffsetsI32(&'a OffsetsBuffer<i32>),53/// Variable number of elements and calculated from the difference between two `i64` offsets54OffsetsI64(&'a OffsetsBuffer<i64>),55}5657/// A iterator for Dremel *repetition* and *definition-levels* in Parquet58///59/// This buffers many consequentive repetition and definition-levels as to not have to branch in60/// and out of this code constantly.61pub struct BufferedDremelIter<'a> {62buffer: FixedRingBuffer<DremelValue>,6364levels: Box<[Level<'a>]>,65/// Current offset into `levels` that is being explored66current_level: usize,6768last_repetition: u16,69}7071/// return number values of the nested72pub fn num_values(nested: &[Nested]) -> usize {73// @TODO: Make this smarter74//75// This is not that smart because it is really slow, but not doing this would be:76// 1. Error prone77// 2. Repeat much of the logic that you find below78BufferedDremelIter::new(nested).count()79}8081impl Level<'_> {82/// Fetch the number of elements given on the next level at `offset` on this level83fn next_level_length(&self, offset: usize, is_valid: bool) -> usize {84match self.lengths {85LevelLength::Optional(n) if is_valid => n,86LevelLength::Optional(_) => 0,87LevelLength::Constant(n) => n,88LevelLength::OffsetsI32(n) => n.length_at(offset),89LevelLength::OffsetsI64(n) => n.length_at(offset),90}91}92}9394impl<'a> BufferedDremelIter<'a> {95// @NOTE: This can maybe just directly be gotten from the Field and array, this double96// conversion seems rather wasteful.97/// Create a new [`BufferedDremelIter`] from a set of nested structures98///99/// This creates a structure that resembles (but is not exactly the same) the Parquet schema,100/// we can then iterate this quite well.101pub fn new(nested: &'a [Nested]) -> Self {102let mut levels = Vec::with_capacity(nested.len() * 2 - 1);103104let mut definition_depth = 0u16;105let mut repetition_depth = 0u16;106for n in nested {107match n {108Nested::Primitive(n) => {109let (content, lengths) = if n.is_optional {110definition_depth += 1;111(112LevelContent::Optional(n.validity.as_ref()),113LevelLength::Optional(1),114)115} else {116(LevelContent::Required, LevelLength::Constant(1))117};118119levels.push(Level {120content,121lengths,122remaining: n.length,123offset: 0,124definition_depth,125repetition_depth,126});127},128Nested::List(n) => {129if n.is_optional {130definition_depth += 1;131levels.push(Level {132content: LevelContent::Optional(n.validity.as_ref()),133lengths: LevelLength::Constant(1),134remaining: n.offsets.len_proxy(),135offset: 0,136definition_depth,137repetition_depth,138});139}140141definition_depth += 1;142levels.push(Level {143content: LevelContent::Repeated,144lengths: LevelLength::OffsetsI32(&n.offsets),145remaining: n.offsets.len_proxy(),146offset: 0,147definition_depth,148repetition_depth,149});150repetition_depth += 1;151},152Nested::LargeList(n) => {153if n.is_optional {154definition_depth += 1;155levels.push(Level {156content: LevelContent::Optional(n.validity.as_ref()),157lengths: LevelLength::Constant(1),158remaining: n.offsets.len_proxy(),159offset: 0,160definition_depth,161repetition_depth,162});163}164165definition_depth += 1;166levels.push(Level {167content: LevelContent::Repeated,168lengths: LevelLength::OffsetsI64(&n.offsets),169remaining: n.offsets.len_proxy(),170offset: 0,171definition_depth,172repetition_depth,173});174repetition_depth += 1;175},176Nested::FixedSizeList(n) => {177if n.is_optional {178definition_depth += 1;179levels.push(Level {180content: LevelContent::Optional(n.validity.as_ref()),181lengths: LevelLength::Constant(1),182remaining: n.length,183offset: 0,184definition_depth,185repetition_depth,186});187}188189definition_depth += 1;190levels.push(Level {191content: LevelContent::Repeated,192lengths: LevelLength::Constant(n.width),193remaining: n.length,194offset: 0,195definition_depth,196repetition_depth,197});198repetition_depth += 1;199},200Nested::Struct(n) => {201let content = if n.is_optional {202definition_depth += 1;203LevelContent::Optional(n.validity.as_ref())204} else {205LevelContent::Required206};207208levels.push(Level {209content,210lengths: LevelLength::Constant(1),211remaining: n.length,212offset: 0,213definition_depth,214repetition_depth,215});216},217};218}219220let levels = levels.into_boxed_slice();221222Self {223// This size is rather arbitrary, but it seems good to make it not too, too high as to224// reduce memory consumption.225buffer: FixedRingBuffer::new(256),226227levels,228current_level: 0,229last_repetition: 0,230}231}232233/// Attempt to fill the rest to the buffer with as many values as possible234fn fill(&mut self) {235// First exit condition:236// If the buffer is full stop trying to fetch more values and just pop the first237// element in the buffer.238//239// Second exit condition:240// We have exhausted all elements at the final level, there are no elements left.241while !(self.buffer.is_full() || (self.current_level == 0 && self.levels[0].remaining == 0))242{243if self.levels[self.current_level].remaining == 0 {244self.last_repetition = u16::min(245self.last_repetition,246self.levels[self.current_level - 1].repetition_depth,247);248self.current_level -= 1;249continue;250}251252let ns = &mut self.levels;253let lvl = self.current_level;254255let is_last_nesting = ns.len() == self.current_level + 1;256257macro_rules! push_value {258($def:expr) => {259self.buffer260.push(DremelValue {261rep: self.last_repetition,262def: $def,263})264.unwrap();265self.last_repetition = ns[lvl].repetition_depth;266};267}268269let num_done = match (&ns[lvl].content, is_last_nesting) {270(LevelContent::Required | LevelContent::Optional(None), true) => {271push_value!(ns[lvl].definition_depth);2722731 + self.buffer.fill_repeat(274DremelValue {275rep: self.last_repetition,276def: ns[lvl].definition_depth,277},278ns[lvl].remaining - 1,279)280},281(LevelContent::Required, false) => {282self.current_level += 1;283ns[lvl + 1].remaining = ns[lvl].next_level_length(ns[lvl].offset, true);2841285},286287(LevelContent::Optional(Some(validity)), true) => {288let num_possible =289usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);290291let validity = (*validity).clone().sliced(ns[lvl].offset, num_possible);292293// @NOTE: maybe, we can do something here with leading zeros294for is_valid in validity.iter() {295push_value!(ns[lvl].definition_depth - u16::from(!is_valid));296}297298num_possible299},300(LevelContent::Optional(None), false) => {301let num_possible =302usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);303let mut num_done = num_possible;304let def = ns[lvl].definition_depth;305306// @NOTE: maybe, we can do something here with leading zeros307for i in 0..num_possible {308let next_level_length = ns[lvl].next_level_length(ns[lvl].offset + i, true);309310if next_level_length == 0 {311// Zero-sized (fixed) lists312push_value!(def);313} else {314self.current_level += 1;315ns[lvl + 1].remaining = next_level_length;316num_done = i + 1;317break;318}319}320321num_done322},323(LevelContent::Optional(Some(validity)), false) => {324let mut num_done = 0;325let num_possible =326usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);327let def = ns[lvl].definition_depth;328329let validity = (*validity).clone().sliced(ns[lvl].offset, num_possible);330331// @NOTE: we can do something here with trailing ones and trailing zeros332for is_valid in validity.iter() {333num_done += 1;334let next_level_length =335ns[lvl].next_level_length(ns[lvl].offset + num_done - 1, is_valid);336337match (is_valid, next_level_length) {338(true, 0) => {339// Zero-sized (fixed) lists340push_value!(def);341},342(true, _) => {343self.current_level += 1;344ns[lvl + 1].remaining = next_level_length;345break;346},347(false, 0) => {348push_value!(def - 1);349},350(false, _) => {351ns[lvl + 1].remaining = next_level_length;352353// @NOTE:354// This is needed for structs and fixed-size lists. These will have355// a non-zero length even if they are invalid. In that case, we356// need to skip over all the elements that would have been read if357// it was valid.358let mut embed_lvl = lvl + 1;359'embed: while embed_lvl > lvl {360if embed_lvl == ns.len() - 1 {361ns[embed_lvl].offset += ns[embed_lvl].remaining;362} else {363while ns[embed_lvl].remaining > 0 {364let length = ns[embed_lvl]365.next_level_length(ns[embed_lvl].offset, false);366367ns[embed_lvl].offset += 1;368ns[embed_lvl].remaining -= 1;369370if length > 0 {371ns[embed_lvl + 1].remaining = length;372embed_lvl += 1;373continue 'embed;374}375}376}377378embed_lvl -= 1;379}380381push_value!(def - 1);382},383}384}385386num_done387},388(LevelContent::Repeated, _) => {389debug_assert!(!is_last_nesting);390let length = ns[lvl].next_level_length(ns[lvl].offset, true);391392if length == 0 {393push_value!(ns[lvl].definition_depth - 1);394} else {395self.current_level += 1;396ns[lvl + 1].remaining = length;397}3983991400},401};402403ns[lvl].offset += num_done;404ns[lvl].remaining -= num_done;405}406}407}408409impl Iterator for BufferedDremelIter<'_> {410type Item = DremelValue;411412fn next(&mut self) -> Option<Self::Item> {413// Use an item from the buffer if it is available414if let Some(item) = self.buffer.pop_front() {415return Some(item);416}417418self.fill();419self.buffer.pop_front()420}421}422423424