Path: blob/main/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs
7887 views
use std::io::Write;12use super::bitpacked_encode;3use crate::parquet::encoding::{bitpacked, ceil8, uleb128};45// Arbitrary value that balances memory usage and storage overhead6const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8;78pub trait Encoder<T: PartialEq + Default + Copy> {9fn bitpacked_encode<W: Write, I: Iterator<Item = T>>(10writer: &mut W,11iterator: I,12num_bits: usize,13) -> std::io::Result<()>;1415fn run_length_encode<W: Write>(16writer: &mut W,17run_length: usize,18value: T,19bit_width: u32,20) -> std::io::Result<()>;21}2223const U32_BLOCK_LEN: usize = 32;2425impl Encoder<u32> for u32 {26fn bitpacked_encode<W: Write, I: Iterator<Item = u32>>(27writer: &mut W,28mut iterator: I,29num_bits: usize,30) -> std::io::Result<()> {31// the length of the iterator.32let length = iterator.size_hint().1.unwrap();3334let mut header = ceil8(length) as u64;35header <<= 1;36header |= 1; // it is bitpacked => first bit is set37let mut container = [0; 10];38let used = uleb128::encode(header, &mut container);39writer.write_all(&container[..used])?;4041let chunks = length / U32_BLOCK_LEN;42let remainder = length - chunks * U32_BLOCK_LEN;43let mut buffer = [0u32; U32_BLOCK_LEN];4445// simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 3246let compressed_chunk_size = 4 * num_bits;4748for _ in 0..chunks {49iterator50.by_ref()51.take(U32_BLOCK_LEN)52.zip(buffer.iter_mut())53.for_each(|(item, buf)| *buf = item);5455let mut packed = [0u8; 4 * U32_BLOCK_LEN];56bitpacked::encode_pack::<u32>(&buffer, num_bits, packed.as_mut());57writer.write_all(&packed[..compressed_chunk_size])?;58}5960if remainder != 0 {61// Must be careful here to ensure we write a multiple of `num_bits`62// (the bit width) to align with the spec. Some readers also rely on63// this - see https://github.com/pola-rs/polars/pull/13883.6465// this is ceil8(remainder * num_bits), but we ensure the output is a66// multiple of num_bits by rewriting it as ceil8(remainder) * num_bits67let compressed_remainder_size = ceil8(remainder) * num_bits;68iterator69.by_ref()70.take(remainder)71.zip(buffer.iter_mut())72.for_each(|(item, buf)| *buf = item);7374let mut packed = [0u8; 4 * U32_BLOCK_LEN];75// No need to zero rest of buffer because remainder is either:76// * Multiple of 8: We pad non-terminal literal runs to have a77// multiple of 8 values. Once compressed, the data will end on78// clean byte boundaries and packed[..compressed_remainder_size]79// will include only the remainder values and nothing extra.80// * Final run: Extra values from buffer will be included in81// packed[..compressed_remainder_size] but ignored when decoding82// because they extend beyond known column length83bitpacked::encode_pack(&buffer, num_bits, packed.as_mut());84writer.write_all(&packed[..compressed_remainder_size])?;85};86Ok(())87}8889fn run_length_encode<W: Write>(90writer: &mut W,91run_length: usize,92value: u32,93bit_width: u32,94) -> std::io::Result<()> {95// write the length + indicator96let mut header = run_length as u64;97header <<= 1;98let mut container = [0; 10];99let used = uleb128::encode(header, &mut container);100writer.write_all(&container[..used])?;101102let num_bytes = ceil8(bit_width as usize);103let bytes = value.to_le_bytes();104writer.write_all(&bytes[..num_bytes])?;105Ok(())106}107}108109impl Encoder<bool> for bool {110fn bitpacked_encode<W: Write, I: Iterator<Item = bool>>(111writer: &mut W,112iterator: I,113_num_bits: usize,114) -> std::io::Result<()> {115// the length of the iterator.116let length = iterator.size_hint().1.unwrap();117118let mut header = ceil8(length) as u64;119header <<= 1;120header |= 1; // it is bitpacked => first bit is set121let mut container = [0; 10];122let used = uleb128::encode(header, &mut container);123writer.write_all(&container[..used])?;124bitpacked_encode(writer, iterator)?;125Ok(())126}127128fn run_length_encode<W: Write>(129writer: &mut W,130run_length: usize,131value: bool,132_bit_width: u32,133) -> std::io::Result<()> {134// write the length + indicator135let mut header = run_length as u64;136header <<= 1;137let mut container = [0; 10];138let used = uleb128::encode(header, &mut container);139writer.write_all(&container[..used])?;140writer.write_all(&(value as u8).to_le_bytes())?;141Ok(())142}143}144145#[allow(clippy::comparison_chain)]146pub fn encode<T: PartialEq + Default + Copy + Encoder<T>, W: Write, I: Iterator<Item = T>>(147writer: &mut W,148iterator: I,149num_bits: u32,150) -> std::io::Result<()> {151let mut consecutive_repeats: usize = 0;152let mut previous_val = T::default();153let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN];154let mut buffer_idx = 0;155let mut literal_run_idx = 0;156for val in iterator {157if val == previous_val {158consecutive_repeats += 1;159if consecutive_repeats >= 8 {160// Run is long enough to RLE, no need to buffer values161if consecutive_repeats > 8 {162continue;163} else {164// When we encounter a run long enough to potentially RLE,165// we must first ensure that the buffered literal run has166// a multiple of 8 values for bit-packing. If not, we pad167// up by taking some of the consecutive repeats168let literal_padding = (8 - (literal_run_idx % 8)) % 8;169consecutive_repeats -= literal_padding;170literal_run_idx += literal_padding;171}172}173// Too short to RLE, continue to buffer values174} else if consecutive_repeats > 8 {175// Value changed so start a new run but the current run is long176// enough to RLE. First, bit-pack any buffered literal run. Then,177// RLE current run and reset consecutive repeat counter and buffer.178if literal_run_idx > 0 {179debug_assert!(literal_run_idx % 8 == 0);180T::bitpacked_encode(181writer,182buffered_bits.iter().take(literal_run_idx).copied(),183num_bits as usize,184)?;185literal_run_idx = 0;186}187T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?;188consecutive_repeats = 1;189buffer_idx = 0;190} else {191// Value changed so start a new run but the current run is not long192// enough to RLE. Consolidate all consecutive repeats into buffered193// literal run.194literal_run_idx = buffer_idx;195consecutive_repeats = 1;196}197// If buffer is full, bit-pack as literal run and reset198if buffer_idx == MAX_VALUES_PER_LITERAL_RUN {199T::bitpacked_encode(writer, buffered_bits.iter().copied(), num_bits as usize)?;200// If buffer fills up in the middle of a run, all but the last201// repeat is consolidated into the literal run.202debug_assert!(203(consecutive_repeats < 8)204&& (buffer_idx - literal_run_idx == consecutive_repeats - 1)205);206consecutive_repeats = 1;207buffer_idx = 0;208literal_run_idx = 0;209}210buffered_bits[buffer_idx] = val;211previous_val = val;212buffer_idx += 1;213}214// Final run not long enough to RLE, extend literal run.215if consecutive_repeats <= 8 {216literal_run_idx = buffer_idx;217}218// Bit-pack final buffered literal run, if any219if literal_run_idx > 0 {220T::bitpacked_encode(221writer,222buffered_bits.iter().take(literal_run_idx).copied(),223num_bits as usize,224)?;225}226// RLE final consecutive run if long enough227if consecutive_repeats > 8 {228T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?;229}230Ok(())231}232233#[cfg(test)]234mod tests {235use super::super::bitmap::BitmapIter;236use super::*;237238#[test]239fn bool_basics_1() -> std::io::Result<()> {240let iter = BitmapIter::new(&[0b10011101u8, 0b10011101], 0, 14);241242let mut vec = vec![];243244encode::<bool, _, _>(&mut vec, iter, 1)?;245246assert_eq!(vec, vec![((2 << 1) | 1), 0b10011101u8, 0b00011101]);247248Ok(())249}250251#[test]252fn bool_from_iter() -> std::io::Result<()> {253let mut vec = vec![];254255encode::<bool, _, _>(256&mut vec,257vec![true, true, true, true, true, true, true, true].into_iter(),2581,259)?;260261assert_eq!(vec, vec![((1 << 1) | 1), 0b11111111]);262Ok(())263}264265#[test]266fn test_encode_u32() -> std::io::Result<()> {267let mut vec = vec![];268269encode::<u32, _, _>(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?;270271assert_eq!(272vec,273vec![274((2 << 1) | 1),2750b01_10_01_00,2760b00_01_01_10,2770b_00_00_00_11,2780b0279]280);281Ok(())282}283284#[test]285fn test_encode_u32_large() -> std::io::Result<()> {286let mut vec = vec![];287288let values = (0..128).map(|x| x % 4);289290encode::<u32, _, _>(&mut vec, values, 2)?;291292let length = 128;293let expected = 0b11_10_01_00u8;294295let mut expected = vec![expected; length / 4];296expected.insert(0, (((length / 8) as u8) << 1) | 1);297298assert_eq!(vec, expected);299Ok(())300}301302#[test]303fn test_u32_other() -> std::io::Result<()> {304let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter();305306let mut vec = vec![];307encode::<u32, _, _>(&mut vec, values, 2)?;308309let expected = vec![5, 207, 254, 247, 51];310assert_eq!(expected, vec);311Ok(())312}313}314315316