Path: blob/main/crates/polars-parquet/src/parquet/compression.rs
8509 views
//! Functionality to compress and decompress data according to the parquet specification1pub use super::parquet_bridge::{2BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,3};4use crate::parquet::error::{ParquetError, ParquetResult};56#[cfg(any(feature = "snappy", feature = "lz4"))]7fn inner_compress<8G: Fn(usize) -> ParquetResult<usize>,9F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,10>(11input: &[u8],12output: &mut Vec<u8>,13get_length: G,14compress: F,15) -> ParquetResult<()> {16let original_length = output.len();17let max_required_length = get_length(input.len())?;1819output.resize(original_length + max_required_length, 0);20let compressed_size = compress(input, &mut output[original_length..])?;2122output.truncate(original_length + compressed_size);23Ok(())24}2526/// Compresses data stored in slice `input_buf` and writes the compressed result27/// to `output_buf`.28///29/// Note that you'll need to call `clear()` before reusing the same `output_buf`30/// across different `compress` calls.31#[allow(unused_variables)]32pub fn compress(33compression: CompressionOptions,34input_buf: &[u8],35#[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,36) -> ParquetResult<()> {37match compression {38#[cfg(feature = "brotli")]39CompressionOptions::Brotli(level) => {40use std::io::Write;41const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;42const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-224344let q = level.unwrap_or_default();45let mut encoder = brotli::CompressorWriter::new(46output_buf,47BROTLI_DEFAULT_BUFFER_SIZE,48q.compression_level(),49BROTLI_DEFAULT_LG_WINDOW_SIZE,50);51encoder.write_all(input_buf)?;52encoder.flush().map_err(|e| e.into())53},54#[cfg(not(feature = "brotli"))]55CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(56crate::parquet::error::Feature::Brotli,57"compress to brotli".to_string(),58)),59#[cfg(feature = "gzip")]60CompressionOptions::Gzip(level) => {61use std::io::Write;62let level = level.unwrap_or_default();63let mut encoder = flate2::write::GzEncoder::new(64output_buf,65flate2::Compression::new(level.compression_level() as _),66);67encoder.write_all(input_buf)?;68encoder.try_finish().map_err(|e| e.into())69},70#[cfg(not(feature = "gzip"))]71CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(72crate::parquet::error::Feature::Gzip,73"compress to gzip".to_string(),74)),75#[cfg(feature = "snappy")]76CompressionOptions::Snappy => inner_compress(77input_buf,78output_buf,79|len| Ok(snap::raw::max_compress_len(len)),80|input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),81),82#[cfg(not(feature = "snappy"))]83CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(84crate::parquet::error::Feature::Snappy,85"compress to snappy".to_string(),86)),87#[cfg(feature = "lz4")]88CompressionOptions::Lz4Raw => inner_compress(89input_buf,90output_buf,91|len| Ok(lz4::block::compress_bound(len)?),92|input, output| {93let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;94Ok(compressed_size)95},96),97#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]98CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(99crate::parquet::error::Feature::Lz4,100"compress to lz4".to_string(),101)),102#[cfg(feature = "zstd")]103CompressionOptions::Zstd(level) => {104let level = level.map(|v| v.compression_level()).unwrap_or_default();105// Make sure the buffer is large enough; the interface assumption is106// that decompressed data is appended to the output buffer.107let old_len = output_buf.len();108output_buf.resize(109old_len + zstd::zstd_safe::compress_bound(input_buf.len()),1100,111);112match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {113Ok(written_size) => {114output_buf.truncate(old_len + written_size);115Ok(())116},117Err(e) => Err(e.into()),118}119},120#[cfg(not(feature = "zstd"))]121CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(122crate::parquet::error::Feature::Zstd,123"compress to zstd".to_string(),124)),125CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(126"Compressing uncompressed".to_string(),127)),128_ => Err(ParquetError::FeatureNotSupported(format!(129"Compression {compression:?} is not supported",130))),131}132}133134pub enum DecompressionContext {135Unset,136#[cfg(feature = "zstd")]137Zstd(zstd::zstd_safe::DCtx<'static>),138}139140/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.141/// Returns the total number of bytes written.142#[allow(unused_variables)]143pub fn decompress(144compression: Compression,145input_buf: &[u8],146output_buf: &mut [u8],147ctx: &mut DecompressionContext,148) -> ParquetResult<()> {149match compression {150#[cfg(feature = "brotli")]151Compression::Brotli => {152use std::io::Read;153const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;154brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)155.read_exact(output_buf)156.map_err(|e| e.into())157},158#[cfg(not(feature = "brotli"))]159Compression::Brotli => Err(ParquetError::FeatureNotActive(160crate::parquet::error::Feature::Brotli,161"decompress with brotli".to_string(),162)),163#[cfg(feature = "gzip")]164Compression::Gzip => {165use std::io::Read;166let mut decoder = flate2::read::GzDecoder::new(input_buf);167decoder.read_exact(output_buf).map_err(|e| e.into())168},169#[cfg(not(feature = "gzip"))]170Compression::Gzip => Err(ParquetError::FeatureNotActive(171crate::parquet::error::Feature::Gzip,172"decompress with gzip".to_string(),173)),174#[cfg(feature = "snappy")]175Compression::Snappy => {176use snap::raw::{Decoder, decompress_len};177178let len = decompress_len(input_buf)?;179if len > output_buf.len() {180return Err(ParquetError::oos("snappy header out of spec"));181}182Decoder::new()183.decompress(input_buf, output_buf)184.map_err(|e| e.into())185.map(|_| ())186},187#[cfg(not(feature = "snappy"))]188Compression::Snappy => Err(ParquetError::FeatureNotActive(189crate::parquet::error::Feature::Snappy,190"decompress with snappy".to_string(),191)),192#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]193Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)194.map(|_| {})195.map_err(|e| e.into()),196#[cfg(feature = "lz4")]197Compression::Lz4Raw => {198lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)199.map(|_| {})200.map_err(|e| e.into())201},202#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]203Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(204crate::parquet::error::Feature::Lz4,205"decompress with lz4".to_string(),206)),207208#[cfg(any(feature = "lz4_flex", feature = "lz4"))]209Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {210lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)211.map(|_| {})212}),213214#[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]215Compression::Lz4 => Err(ParquetError::FeatureNotActive(216crate::parquet::error::Feature::Lz4,217"decompress with legacy lz4".to_string(),218)),219220#[cfg(feature = "zstd")]221Compression::Zstd => {222use std::io::Read;223if !matches!(ctx, DecompressionContext::Zstd(_)) {224*ctx = DecompressionContext::Zstd(zstd::zstd_safe::DCtx::create());225}226let DecompressionContext::Zstd(ctx) = ctx else {227unreachable!();228};229let mut decoder = zstd::Decoder::with_context(input_buf, ctx);230decoder.read_exact(output_buf).map_err(|e| e.into())231},232#[cfg(not(feature = "zstd"))]233Compression::Zstd => Err(ParquetError::FeatureNotActive(234crate::parquet::error::Feature::Zstd,235"decompress with zstd".to_string(),236)),237Compression::Uncompressed => Err(ParquetError::InvalidParameter(238"Compressing uncompressed".to_string(),239)),240_ => Err(ParquetError::FeatureNotSupported(format!(241"Compression {compression:?} is not supported",242))),243}244}245246/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.247/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).248/// Returns error if decompression failed.249#[cfg(any(feature = "lz4", feature = "lz4_flex"))]250fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {251// Parquet files written with the Hadoop Lz4Codec use their own framing.252// The input buffer can contain an arbitrary number of "frames", each253// with the following structure:254// - bytes 0..3: big-endian uint32_t representing the frame decompressed size255// - bytes 4..7: big-endian uint32_t representing the frame compressed size256// - bytes 8...: frame compressed data257//258// The Hadoop Lz4Codec source code can be found here:259// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc260261const SIZE_U32: usize = size_of::<u32>();262const PREFIX_LEN: usize = SIZE_U32 * 2;263let mut input_len = input_buf.len();264let mut input = input_buf;265let mut output_len = output_buf.len();266let mut output: &mut [u8] = output_buf;267while input_len >= PREFIX_LEN {268let mut bytes = [0; SIZE_U32];269bytes.copy_from_slice(&input[0..4]);270let expected_decompressed_size = u32::from_be_bytes(bytes);271let mut bytes = [0; SIZE_U32];272bytes.copy_from_slice(&input[4..8]);273let expected_compressed_size = u32::from_be_bytes(bytes);274input = &input[PREFIX_LEN..];275input_len -= PREFIX_LEN;276277if input_len < expected_compressed_size as usize {278return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));279}280281if output_len < expected_decompressed_size as usize {282return Err(ParquetError::oos(283"Not enough bytes to hold advertised output",284));285}286let decompressed_size = lz4_decompress_to_buffer(287&input[..expected_compressed_size as usize],288Some(output_len as i32),289output,290)?;291if decompressed_size != expected_decompressed_size as usize {292return Err(ParquetError::oos("unexpected decompressed size"));293}294input_len -= expected_compressed_size as usize;295output_len -= expected_decompressed_size as usize;296if input_len > expected_compressed_size as usize {297input = &input[expected_compressed_size as usize..];298output = &mut output[expected_decompressed_size as usize..];299} else {300break;301}302}303if input_len == 0 {304Ok(())305} else {306Err(ParquetError::oos("Not all input are consumed"))307}308}309310#[cfg(feature = "lz4")]311#[inline]312fn lz4_decompress_to_buffer(313src: &[u8],314uncompressed_size: Option<i32>,315buffer: &mut [u8],316) -> ParquetResult<usize> {317let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;318Ok(size)319}320321#[cfg(test)]322mod tests {323use super::*;324325fn test_roundtrip(c: CompressionOptions, data: &[u8]) {326let offset = 2048;327328// Compress to a buffer that already has data is possible329let mut compressed = vec![2; offset];330compress(c, data, &mut compressed).expect("Error when compressing");331332// data is compressed...333assert!(compressed.len() - offset < data.len());334335let mut decompressed = vec![0; data.len()];336let mut context = DecompressionContext::Unset;337decompress(338c.into(),339&compressed[offset..],340&mut decompressed,341&mut context,342)343.expect("Error when decompressing");344assert_eq!(data, decompressed.as_slice());345}346347fn test_codec(c: CompressionOptions) {348let sizes = vec![1000, 10000, 100000];349for size in sizes {350let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();351test_roundtrip(c, &data);352}353}354355#[test]356fn test_codec_snappy() {357test_codec(CompressionOptions::Snappy);358}359360#[test]361fn test_codec_gzip_default() {362test_codec(CompressionOptions::Gzip(None));363}364365#[test]366fn test_codec_gzip_low_compression() {367test_codec(CompressionOptions::Gzip(Some(368GzipLevel::try_new(1).unwrap(),369)));370}371372#[test]373fn test_codec_brotli_default() {374test_codec(CompressionOptions::Brotli(None));375}376377#[test]378fn test_codec_brotli_low_compression() {379test_codec(CompressionOptions::Brotli(Some(380BrotliLevel::try_new(1).unwrap(),381)));382}383384#[test]385fn test_codec_brotli_high_compression() {386test_codec(CompressionOptions::Brotli(Some(387BrotliLevel::try_new(11).unwrap(),388)));389}390391#[test]392fn test_codec_lz4_raw() {393test_codec(CompressionOptions::Lz4Raw);394}395396#[test]397fn test_codec_zstd_default() {398test_codec(CompressionOptions::Zstd(None));399}400401#[cfg(feature = "zstd")]402#[test]403fn test_codec_zstd_low_compression() {404test_codec(CompressionOptions::Zstd(Some(405ZstdLevel::try_new(1).unwrap(),406)));407}408409#[cfg(feature = "zstd")]410#[test]411fn test_codec_zstd_high_compression() {412test_codec(CompressionOptions::Zstd(Some(413ZstdLevel::try_new(21).unwrap(),414)));415}416}417418419