Path: blob/main/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs
7887 views
use super::super::{bitpacked, uleb128, zigzag_leb128};1use crate::parquet::encoding::ceil8;23/// Encodes an iterator of `i64` according to parquet's `DELTA_BINARY_PACKED`.4/// # Implementation5/// * This function does not allocate on the heap.6/// * The number of mini-blocks is always 1. This may change in the future.7pub fn encode<I: ExactSizeIterator<Item = i64>>(8mut iterator: I,9buffer: &mut Vec<u8>,10num_miniblocks_per_block: usize,11) {12const BLOCK_SIZE: usize = 256;13assert!([1, 2, 4].contains(&num_miniblocks_per_block));14let values_per_miniblock = BLOCK_SIZE / num_miniblocks_per_block;1516let mut container = [0u8; 10];17let encoded_len = uleb128::encode(BLOCK_SIZE as u64, &mut container);18buffer.extend_from_slice(&container[..encoded_len]);1920let encoded_len = uleb128::encode(num_miniblocks_per_block as u64, &mut container);21buffer.extend_from_slice(&container[..encoded_len]);2223let length = iterator.len();24let encoded_len = uleb128::encode(length as u64, &mut container);25buffer.extend_from_slice(&container[..encoded_len]);2627let mut values = [0i64; BLOCK_SIZE];28let mut deltas = [0u64; BLOCK_SIZE];29let mut num_bits = [0u8; 4];3031let first_value = iterator.next().unwrap_or_default();32let (container, encoded_len) = zigzag_leb128::encode(first_value);33buffer.extend_from_slice(&container[..encoded_len]);3435let mut prev = first_value;36let mut length = iterator.len();37while length != 0 {38let mut min_delta = i64::MAX;39let mut max_delta = i64::MIN;40for (i, integer) in iterator.by_ref().enumerate().take(BLOCK_SIZE) {41if i % values_per_miniblock == 0 {42min_delta = i64::MAX;43max_delta = i64::MIN44}4546let delta = integer.wrapping_sub(prev);47min_delta = min_delta.min(delta);48max_delta = max_delta.max(delta);4950let miniblock_idx = i / values_per_miniblock;51num_bits[miniblock_idx] = (64 - max_delta.abs_diff(min_delta).leading_zeros()) as u8;52values[i] = delta;53prev = integer;54}55let consumed = std::cmp::min(length - iterator.len(), BLOCK_SIZE);56length = iterator.len();57let values = &values[..consumed];5859values.iter().zip(deltas.iter_mut()).for_each(|(v, delta)| {60*delta = v.wrapping_sub(min_delta) as u64;61});6263// <min delta> <list of bitwidths of miniblocks> <miniblocks>64let (container, encoded_len) = zigzag_leb128::encode(min_delta);65buffer.extend_from_slice(&container[..encoded_len]);6667// one miniblock => 1 byte68let mut values_remaining = consumed;69buffer.extend_from_slice(&num_bits[..num_miniblocks_per_block]);70for i in 0..num_miniblocks_per_block {71if values_remaining == 0 {72break;73}7475values_remaining = values_remaining.saturating_sub(values_per_miniblock);76write_miniblock(77buffer,78num_bits[i],79&deltas[i * values_per_miniblock..(i + 1) * values_per_miniblock],80);81}82}83}8485fn write_miniblock(buffer: &mut Vec<u8>, num_bits: u8, deltas: &[u64]) {86let num_bits = num_bits as usize;87if num_bits > 0 {88let start = buffer.len();8990// bitpack encode all (deltas.len = 128 which is a multiple of 32)91let bytes_needed = start + ceil8(deltas.len() * num_bits);92buffer.resize(bytes_needed, 0);93bitpacked::encode(deltas, num_bits, &mut buffer[start..]);9495let bytes_needed = start + ceil8(deltas.len() * num_bits);96buffer.truncate(bytes_needed);97}98}99100#[cfg(test)]101mod tests {102use super::*;103104#[test]105fn constant_delta() {106// header: [128, 2, 1, 5, 2]:107// block size: 256 <=u> 128, 2108// mini-blocks: 1 <=u> 1109// elements: 5 <=u> 5110// first_value: 2 <=z> 1111// block1: [2, 0, 0, 0, 0]112// min_delta: 1 <=z> 2113// bitwidth: 0114let data = 1..=5;115let expected = vec![128u8, 2, 1, 5, 2, 2, 0];116117let mut buffer = vec![];118encode(data.collect::<Vec<_>>().into_iter(), &mut buffer, 1);119assert_eq!(expected, buffer);120}121122#[test]123fn negative_min_delta() {124// max - min = 1 - -4 = 5125let data = vec![1, 2, 3, 4, 5, 1];126// header: [128, 2, 4, 6, 2]127// block size: 256 <=u> 128, 2128// mini-blocks: 1 <=u> 1129// elements: 6 <=u> 5130// first_value: 2 <=z> 1131// block1: [7, 3, 253, 255]132// min_delta: -4 <=z> 7133// bitwidth: 3134// values: [5, 5, 5, 5, 0] <=b> [135// 0b01101101136// 0b00001011137// ]138let mut expected = vec![128u8, 2, 1, 6, 2, 7, 3, 0b01101101, 0b00001011];139expected.extend(std::iter::repeat_n(0, 256 * 3 / 8 - 2)); // 128 values, 3 bits, 2 already used140141let mut buffer = vec![];142encode(data.into_iter(), &mut buffer, 1);143assert_eq!(expected, buffer);144}145}146147148