Path: blob/main/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs
7887 views
use super::super::delta_bitpacked;1use crate::parquet::encoding::delta_bitpacked::SumGatherer;2use crate::parquet::error::ParquetResult;34/// Decodes according to [Delta strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7),5/// prefixes, lengths and values6/// # Implementation7/// This struct does not allocate on the heap.8#[derive(Debug)]9pub struct Decoder<'a> {10pub(crate) prefix_lengths: delta_bitpacked::Decoder<'a>,11pub(crate) suffix_lengths: delta_bitpacked::Decoder<'a>,12pub(crate) values: &'a [u8],1314pub(crate) offset: usize,15pub(crate) last: Vec<u8>,16}1718impl<'a> Decoder<'a> {19pub fn try_new(values: &'a [u8]) -> ParquetResult<Self> {20let (prefix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?;21let (suffix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?;2223Ok(Self {24prefix_lengths,25suffix_lengths,26values,2728offset: 0,29last: Vec::with_capacity(32),30})31}3233pub fn values(&self) -> &'a [u8] {34self.values35}3637pub fn len(&self) -> usize {38debug_assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());39self.prefix_lengths.len()40}4142pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {43let mut prefix_sum = 0usize;44self.prefix_lengths45.gather_n_into(&mut prefix_sum, n, &mut SumGatherer(0))?;46let mut suffix_sum = 0usize;47self.suffix_lengths48.gather_n_into(&mut suffix_sum, n, &mut SumGatherer(0))?;49self.offset += prefix_sum + suffix_sum;50Ok(())51}52}5354impl Iterator for Decoder<'_> {55type Item = ParquetResult<Vec<u8>>;5657fn next(&mut self) -> Option<Self::Item> {58if self.len() == 0 {59return None;60}6162let mut prefix_length = vec![];63let mut suffix_length = vec![];64if let Err(e) = self.prefix_lengths.collect_n(&mut prefix_length, 1) {65return Some(Err(e));66}67if let Err(e) = self.suffix_lengths.collect_n(&mut suffix_length, 1) {68return Some(Err(e));69}70let prefix_length = prefix_length[0];71let suffix_length = suffix_length[0];7273let prefix_length = prefix_length as usize;74let suffix_length = suffix_length as usize;7576let mut value = Vec::with_capacity(prefix_length + suffix_length);7778value.extend_from_slice(&self.last[..prefix_length]);79value.extend_from_slice(&self.values[self.offset..self.offset + suffix_length]);8081self.last.clear();82self.last.extend_from_slice(&value);8384self.offset += suffix_length;8586Some(Ok(value))87}8889fn size_hint(&self) -> (usize, Option<usize>) {90(self.prefix_lengths.len(), Some(self.prefix_lengths.len()))91}92}9394impl ExactSizeIterator for Decoder<'_> {}9596#[cfg(test)]97mod tests {98use super::*;99100#[test]101fn test_bla() -> ParquetResult<()> {102// VALIDATED from spark==3.1.1103let data = &[104128, 1, 4, 2, 0, 0, 0, 0, 0, 0, 128, 1, 4, 2, 10, 0, 0, 0, 0, 0, 72, 101, 108, 108,105111, 87, 111, 114, 108, 100,106// extra bytes are not from spark, but they should be ignored by the decoder107// because they are beyond the sum of all lengths.1081, 2, 3,109];110111let decoder = Decoder::try_new(data)?;112let values = decoder.collect::<Result<Vec<_>, _>>()?;113assert_eq!(values, vec![b"Hello".to_vec(), b"World".to_vec()]);114115Ok(())116}117118#[test]119fn test_with_prefix() -> ParquetResult<()> {120// VALIDATED from spark==3.1.1121let data = &[122128, 1, 4, 2, 0, 6, 0, 0, 0, 0, 128, 1, 4, 2, 10, 4, 0, 0, 0, 0, 72, 101, 108, 108,123111, 105, 99, 111, 112, 116, 101, 114,124// extra bytes are not from spark, but they should be ignored by the decoder125// because they are beyond the sum of all lengths.1261, 2, 3,127];128129let decoder = Decoder::try_new(data)?;130let prefixes = decoder.collect::<Result<Vec<_>, _>>()?;131assert_eq!(prefixes, vec![b"Hello".to_vec(), b"Helicopter".to_vec()]);132133Ok(())134}135}136137138