Path: blob/main/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs
7887 views
use polars_utils::chunks::Chunks;12use super::{Packed, Unpackable, Unpacked};3use crate::parquet::error::{ParquetError, ParquetResult};45/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.6/// # Implementation7/// This iterator unpacks bytes in chunks and does not allocate.8#[derive(Debug, Clone)]9pub struct Decoder<'a, T: Unpackable> {10packed: Chunks<'a, u8>,11num_bits: usize,12/// number of items13pub(crate) length: usize,14_pd: std::marker::PhantomData<T>,15}1617impl<T: Unpackable> Default for Decoder<'_, T> {18fn default() -> Self {19Self {20packed: Chunks::new(&[], 1),21num_bits: 0,22length: 0,23_pd: std::marker::PhantomData,24}25}26}2728#[inline]29fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {30if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {31let mut buf = T::Packed::zero();32buf.as_mut()[..packed.len()].copy_from_slice(packed);33T::unpack(buf.as_ref(), num_bits, unpacked)34} else {35T::unpack(packed, num_bits, unpacked)36}37}3839impl<'a, T: Unpackable> Decoder<'a, T> {40/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.41pub fn new(packed: &'a [u8], num_bits: usize, length: usize) -> Self {42Self::try_new(packed, num_bits, length).unwrap()43}4445/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.46///47/// `num_bits` is allowed to be `0`.48pub fn new_allow_zero(packed: &'a [u8], num_bits: usize, length: usize) -> Self {49Self::try_new_allow_zero(packed, num_bits, length).unwrap()50}5152/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.53///54/// `num_bits` is allowed to be `0`.55pub fn try_new_allow_zero(56packed: &'a [u8],57num_bits: usize,58length: usize,59) -> ParquetResult<Self> {60let block_size = size_of::<T>() * num_bits;6162if packed.len() * 8 < length * num_bits {63return Err(ParquetError::oos(format!(64"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",65length * num_bits / 866)));67}6869debug_assert!(num_bits != 0 || packed.is_empty());70let block_size = block_size.max(1);71let packed = Chunks::new(packed, block_size);7273Ok(Self {74length,75packed,76num_bits,77_pd: Default::default(),78})79}8081/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.82pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> ParquetResult<Self> {83let block_size = size_of::<T>() * num_bits;8485if num_bits == 0 {86return Err(ParquetError::oos("Bitpacking requires num_bits > 0"));87}8889if packed.len() * 8 < length * num_bits {90return Err(ParquetError::oos(format!(91"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",92length * num_bits / 893)));94}9596let packed = Chunks::new(packed, block_size);9798Ok(Self {99length,100packed,101num_bits,102_pd: Default::default(),103})104}105106pub fn num_bits(&self) -> usize {107self.num_bits108}109110pub fn as_slice(&self) -> &[u8] {111self.packed.as_slice()112}113114pub fn lower_element<N: Unpackable>(self) -> ParquetResult<Decoder<'a, u16>> {115let packed = self.packed.as_slice();116Decoder::try_new(packed, self.num_bits, self.length)117}118}119120/// A iterator over the exact chunks in a [`Decoder`].121///122/// The remainder can be accessed using `remainder` or `next_inexact`.123#[derive(Debug)]124pub struct ChunkedDecoder<'a, 'b, T: Unpackable> {125pub(crate) decoder: &'b mut Decoder<'a, T>,126}127128impl<T: Unpackable> Iterator for ChunkedDecoder<'_, '_, T> {129type Item = T::Unpacked;130131#[inline]132fn next(&mut self) -> Option<Self::Item> {133if self.decoder.len() < T::Unpacked::LENGTH {134return None;135}136137let mut unpacked = T::Unpacked::zero();138self.next_into(&mut unpacked)?;139Some(unpacked)140}141142fn size_hint(&self) -> (usize, Option<usize>) {143let len = self.decoder.len() / T::Unpacked::LENGTH;144(len, Some(len))145}146}147148impl<T: Unpackable> ExactSizeIterator for ChunkedDecoder<'_, '_, T> {}149150impl<T: Unpackable> ChunkedDecoder<'_, '_, T> {151/// Get and consume the remainder chunk if it exists.152///153/// This should only be called after all the chunks full are consumed.154pub fn remainder(&mut self) -> Option<(T::Unpacked, usize)> {155if self.decoder.len() == 0 {156return None;157}158159debug_assert!(self.decoder.len() < T::Unpacked::LENGTH);160let remainder_len = self.decoder.len() % T::Unpacked::LENGTH;161162let mut unpacked = T::Unpacked::zero();163let packed = self.decoder.packed.next()?;164decode_pack::<T>(packed, self.decoder.num_bits, &mut unpacked);165self.decoder.length -= remainder_len;166Some((unpacked, remainder_len))167}168169/// Get the next (possibly partial) chunk and its filled length170pub fn next_inexact(&mut self) -> Option<(T::Unpacked, usize)> {171if self.decoder.len() >= T::Unpacked::LENGTH {172Some((self.next().unwrap(), T::Unpacked::LENGTH))173} else {174self.remainder()175}176}177178/// Consume the next chunk into `unpacked`.179pub fn next_into(&mut self, unpacked: &mut T::Unpacked) -> Option<usize> {180if self.decoder.len() == 0 {181return None;182}183184let unpacked_len = self.decoder.len().min(T::Unpacked::LENGTH);185let packed = self.decoder.packed.next()?;186decode_pack::<T>(packed, self.decoder.num_bits, unpacked);187self.decoder.length -= unpacked_len;188189Some(unpacked_len)190}191}192193impl<'a, T: Unpackable> Decoder<'a, T> {194pub fn chunked<'b>(&'b mut self) -> ChunkedDecoder<'a, 'b, T> {195ChunkedDecoder { decoder: self }196}197198pub fn len(&self) -> usize {199self.length200}201202pub fn skip_chunks(&mut self, n: usize) {203debug_assert!(n * T::Unpacked::LENGTH <= self.length);204205for _ in (&mut self.packed).take(n) {}206self.length -= n * T::Unpacked::LENGTH;207}208209pub fn take(&mut self) -> Self {210let block_size = self.packed.chunk_size();211let packed = std::mem::replace(&mut self.packed, Chunks::new(&[], block_size));212let length = self.length;213self.length = 0;214215Self {216packed,217num_bits: self.num_bits,218length,219_pd: Default::default(),220}221}222223#[inline]224pub fn collect_into(mut self, vec: &mut Vec<T>) {225// @NOTE:226// When microbenchmarking changing this from a element-wise iterator to a collect into227// improves the speed by around 4x.228//229// The unsafe code here allows us to not have to do a double memcopy. This saves us 20% in230// our microbenchmark.231//232// GB: I did some profiling on this function using the Yellow NYC Taxi dataset. There, the233// average self.length is ~52.8 and the average num_packs is ~2.2. Let this guide your234// decisions surrounding the optimization of this function.235236// @NOTE:237// Since T::Unpacked::LENGTH is always a power of two and known at compile time. Division,238// modulo and multiplication are just trivial operators.239let num_packs = (self.length / T::Unpacked::LENGTH)240+ usize::from(!self.length.is_multiple_of(T::Unpacked::LENGTH));241242// We reserve enough space here for self.length rounded up to the next multiple of243// T::Unpacked::LENGTH so that we can safely just write into that memory. Otherwise, we244// would have to make a special path where we memcopy twice which is less than ideal.245vec.reserve(num_packs * T::Unpacked::LENGTH);246247// IMPORTANT: This pointer calculation has to appear after the reserve since that reserve248// might move the buffer.249let mut unpacked_ptr = vec.as_mut_ptr().wrapping_add(vec.len());250251for _ in 0..num_packs {252// This unwrap should never fail since the packed length is checked on initialized of253// the `Decoder`.254let packed = self.packed.next().unwrap();255256// SAFETY:257// Since we did a `vec::reserve` before with the total length, we know that the memory258// necessary for a `T::Unpacked` is available.259//260// - The elements in this buffer are properly aligned, so elements in a slice will also261// be properly aligned.262// - It is deferencable because it is (i) not null, (ii) in one allocated object, (iii)263// not pointing to deallocated memory, (iv) we do not rely on atomicity and (v) we do264// not read or write beyond the lifetime of `vec`.265// - All data is initialized before reading it. This is not perfect but should not lead266// to any UB.267// - We don't alias the same data from anywhere else at the same time, because we have268// the mutable reference to `vec`.269let unpacked_ref = unsafe { (unpacked_ptr as *mut T::Unpacked).as_mut() }.unwrap();270271decode_pack::<T>(packed, self.num_bits, unpacked_ref);272273unpacked_ptr = unpacked_ptr.wrapping_add(T::Unpacked::LENGTH);274}275276// SAFETY:277// We have written these elements before so we know that these are available now.278//279// - The capacity is larger since we reserved enough spaced with the opening280// `vec::reserve`.281// - All elements are initialized by the `decode_pack` into the `unpacked_ref`.282unsafe { vec.set_len(vec.len() + self.length) }283}284}285286#[cfg(test)]287mod tests {288use super::super::tests::case1;289use super::*;290291impl<T: Unpackable> Decoder<'_, T> {292pub fn collect(self) -> Vec<T> {293let mut vec = Vec::new();294self.collect_into(&mut vec);295vec296}297}298299#[test]300fn test_decode_rle() {301// Test data: 0-7 with bit width 3302// 0: 000303// 1: 001304// 2: 010305// 3: 011306// 4: 100307// 5: 101308// 6: 110309// 7: 111310let num_bits = 3;311let length = 8;312// encoded: 0b10001000u8, 0b11000110, 0b11111010313let data = vec![0b10001000u8, 0b11000110, 0b11111010];314315let decoded = Decoder::<u32>::try_new(&data, num_bits, length)316.unwrap()317.collect();318assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);319}320321#[test]322fn decode_large() {323let (num_bits, expected, data) = case1();324325let decoded = Decoder::<u32>::try_new(&data, num_bits, expected.len())326.unwrap()327.collect();328assert_eq!(decoded, expected);329}330331#[test]332fn test_decode_bool() {333let num_bits = 1;334let length = 8;335let data = vec![0b10101010];336337let decoded = Decoder::<u32>::try_new(&data, num_bits, length)338.unwrap()339.collect();340assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);341}342343#[test]344fn test_decode_u64() {345let num_bits = 1;346let length = 8;347let data = vec![0b10101010];348349let decoded = Decoder::<u64>::try_new(&data, num_bits, length)350.unwrap()351.collect();352assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);353}354355#[test]356fn even_case() {357// [0, 1, 2, 3, 4, 5, 6, 0]x99358let data = &[0b10001000u8, 0b11000110, 0b00011010];359let num_bits = 3;360let copies = 99; // 8 * 99 % 32 != 0361let expected = std::iter::repeat_n(&[0u32, 1, 2, 3, 4, 5, 6, 0], copies)362.flatten()363.copied()364.collect::<Vec<_>>();365let data = std::iter::repeat_n(data, copies)366.flatten()367.copied()368.collect::<Vec<_>>();369let length = expected.len();370371let decoded = Decoder::<u32>::try_new(&data, num_bits, length)372.unwrap()373.collect();374assert_eq!(decoded, expected);375}376377#[test]378fn odd_case() {379// [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2]380let data = &[0b10001000u8, 0b11000110, 0b00011010];381let num_bits = 3;382let copies = 4;383let expected = std::iter::repeat_n(&[0u32, 1, 2, 3, 4, 5, 6, 0], copies)384.flatten()385.copied()386.chain(std::iter::once(2))387.collect::<Vec<_>>();388let data = std::iter::repeat_n(data, copies)389.flatten()390.copied()391.chain(std::iter::once(0b00000010u8))392.collect::<Vec<_>>();393let length = expected.len();394395let decoded = Decoder::<u32>::try_new(&data, num_bits, length)396.unwrap()397.collect();398assert_eq!(decoded, expected);399}400401#[test]402fn test_errors() {403// zero length404assert!(Decoder::<u64>::try_new(&[], 1, 0).is_ok());405// no bytes406assert!(Decoder::<u64>::try_new(&[], 1, 1).is_err());407// too few bytes408assert!(Decoder::<u64>::try_new(&[1], 1, 8).is_ok());409assert!(Decoder::<u64>::try_new(&[1, 1], 2, 8).is_ok());410assert!(Decoder::<u64>::try_new(&[1], 1, 9).is_err());411// zero num_bits412assert!(Decoder::<u64>::try_new(&[1], 0, 1).is_err());413}414}415416417