Path: blob/main/crates/polars-parquet/src/parquet/encoding/byte_stream_split/decoder.rs
7887 views
use crate::parquet::error::ParquetError;12const MAX_ELEMENT_SIZE: usize = 8;34/// Decodes using the [Byte Stream Split](https://github.com/apache/parquet-format/blob/master/Encodings.md#byte-stream-split-byte_stream_split--9) encoding.5/// # Implementation6/// A fixed size buffer is stored inline to support reading types of up to 8 bytes in size.7#[derive(Debug)]8pub struct Decoder<'a> {9values: &'a [u8],10buffer: [u8; MAX_ELEMENT_SIZE],11num_elements: usize,12position: usize,13element_size: usize,14}1516impl<'a> Decoder<'a> {17pub fn try_new(values: &'a [u8], element_size: usize) -> Result<Self, ParquetError> {18if element_size > MAX_ELEMENT_SIZE {19// Since Parquet format version 2.11 it's valid to use byte stream split for fixed-length byte array data,20// which could be larger than 8 bytes, but Polars doesn't yet support reading byte stream split encoded FLBA data.21return Err(ParquetError::oos(format!(22"Byte stream split decoding only supports up to {MAX_ELEMENT_SIZE} byte element sizes"23)));24}2526let values_size = values.len();27if !values_size.is_multiple_of(element_size) {28return Err(ParquetError::oos(format!(29"Values array length ({values_size}) is not a multiple of the element size ({element_size})"30)));31}32let num_elements = values.len() / element_size;3334Ok(Self {35values,36buffer: [0; MAX_ELEMENT_SIZE],37num_elements,38position: 0,39element_size,40})41}4243pub fn move_next(&mut self) -> bool {44if self.position >= self.num_elements {45return false;46}4748debug_assert!(self.element_size <= MAX_ELEMENT_SIZE);49debug_assert!(self.values.len() >= self.num_elements * self.element_size);50for n in 0..self.element_size {51unsafe {52// SAFETY:53// We have the invariants that element_size <= MAX_ELEMENT_SIZE,54// buffer.len() == MAX_ELEMENT_SIZE,55// position < num_elements and56// values.len() >= num_elements * element_size.57*self.buffer.get_unchecked_mut(n) = *self58.values59.get_unchecked((self.num_elements * n) + self.position)60}61}6263self.position += 1;64true65}6667/// The number of remaining values68pub fn len(&self) -> usize {69self.num_elements - self.position70}7172pub fn current_value(&self) -> &[u8] {73&self.buffer[0..self.element_size]74}7576pub fn iter_converted<'b, T, F>(&'b mut self, converter: F) -> DecoderIterator<'a, 'b, T, F>77where78F: Copy + Fn(&[u8]) -> T,79{80DecoderIterator {81decoder: self,82converter,83}84}85}8687#[derive(Debug)]88pub struct DecoderIterator<'a, 'b, T, F>89where90F: Copy + Fn(&[u8]) -> T,91{92decoder: &'b mut Decoder<'a>,93converter: F,94}9596impl<T, F> Iterator for DecoderIterator<'_, '_, T, F>97where98F: Copy + Fn(&[u8]) -> T,99{100type Item = T;101102#[inline]103fn next(&mut self) -> Option<Self::Item> {104if self.decoder.move_next() {105Some((self.converter)(self.decoder.current_value()))106} else {107None108}109}110111#[inline]112fn size_hint(&self) -> (usize, Option<usize>) {113(self.decoder.len(), Some(self.decoder.len()))114}115}116117118