Path: blob/main/crates/polars-parquet/src/parquet/compression.rs
6940 views
//! Functionality to compress and decompress data according to the parquet specification1pub use super::parquet_bridge::{2BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,3};4use crate::parquet::error::{ParquetError, ParquetResult};56#[cfg(any(feature = "snappy", feature = "lz4"))]7fn inner_compress<8G: Fn(usize) -> ParquetResult<usize>,9F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,10>(11input: &[u8],12output: &mut Vec<u8>,13get_length: G,14compress: F,15) -> ParquetResult<()> {16let original_length = output.len();17let max_required_length = get_length(input.len())?;1819output.resize(original_length + max_required_length, 0);20let compressed_size = compress(input, &mut output[original_length..])?;2122output.truncate(original_length + compressed_size);23Ok(())24}2526/// Compresses data stored in slice `input_buf` and writes the compressed result27/// to `output_buf`.28///29/// Note that you'll need to call `clear()` before reusing the same `output_buf`30/// across different `compress` calls.31#[allow(unused_variables)]32pub fn compress(33compression: CompressionOptions,34input_buf: &[u8],35#[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,36) -> ParquetResult<()> {37match compression {38#[cfg(feature = "brotli")]39CompressionOptions::Brotli(level) => {40use std::io::Write;41const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;42const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-224344let q = level.unwrap_or_default();45let mut encoder = brotli::CompressorWriter::new(46output_buf,47BROTLI_DEFAULT_BUFFER_SIZE,48q.compression_level(),49BROTLI_DEFAULT_LG_WINDOW_SIZE,50);51encoder.write_all(input_buf)?;52encoder.flush().map_err(|e| e.into())53},54#[cfg(not(feature = "brotli"))]55CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(56crate::parquet::error::Feature::Brotli,57"compress to brotli".to_string(),58)),59#[cfg(feature = "gzip")]60CompressionOptions::Gzip(level) => {61use std::io::Write;62let level = level.unwrap_or_default();63let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());64encoder.write_all(input_buf)?;65encoder.try_finish().map_err(|e| e.into())66},67#[cfg(not(feature = "gzip"))]68CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(69crate::parquet::error::Feature::Gzip,70"compress to gzip".to_string(),71)),72#[cfg(feature = "snappy")]73CompressionOptions::Snappy => inner_compress(74input_buf,75output_buf,76|len| Ok(snap::raw::max_compress_len(len)),77|input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),78),79#[cfg(not(feature = "snappy"))]80CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(81crate::parquet::error::Feature::Snappy,82"compress to snappy".to_string(),83)),84#[cfg(feature = "lz4")]85CompressionOptions::Lz4Raw => inner_compress(86input_buf,87output_buf,88|len| Ok(lz4::block::compress_bound(len)?),89|input, output| {90let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;91Ok(compressed_size)92},93),94#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]95CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(96crate::parquet::error::Feature::Lz4,97"compress to lz4".to_string(),98)),99#[cfg(feature = "zstd")]100CompressionOptions::Zstd(level) => {101let level = level.map(|v| v.compression_level()).unwrap_or_default();102// Make sure the buffer is large enough; the interface assumption is103// that decompressed data is appended to the output buffer.104let old_len = output_buf.len();105output_buf.resize(106old_len + zstd::zstd_safe::compress_bound(input_buf.len()),1070,108);109match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {110Ok(written_size) => {111output_buf.truncate(old_len + written_size);112Ok(())113},114Err(e) => Err(e.into()),115}116},117#[cfg(not(feature = "zstd"))]118CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(119crate::parquet::error::Feature::Zstd,120"compress to zstd".to_string(),121)),122CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(123"Compressing uncompressed".to_string(),124)),125_ => Err(ParquetError::FeatureNotSupported(format!(126"Compression {compression:?} is not supported",127))),128}129}130131pub enum DecompressionContext {132Unset,133#[cfg(feature = "zstd")]134Zstd(zstd::zstd_safe::DCtx<'static>),135}136137/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.138/// Returns the total number of bytes written.139#[allow(unused_variables)]140pub fn decompress(141compression: Compression,142input_buf: &[u8],143output_buf: &mut [u8],144ctx: &mut DecompressionContext,145) -> ParquetResult<()> {146match compression {147#[cfg(feature = "brotli")]148Compression::Brotli => {149use std::io::Read;150const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;151brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)152.read_exact(output_buf)153.map_err(|e| e.into())154},155#[cfg(not(feature = "brotli"))]156Compression::Brotli => Err(ParquetError::FeatureNotActive(157crate::parquet::error::Feature::Brotli,158"decompress with brotli".to_string(),159)),160#[cfg(feature = "gzip")]161Compression::Gzip => {162use std::io::Read;163let mut decoder = flate2::read::GzDecoder::new(input_buf);164decoder.read_exact(output_buf).map_err(|e| e.into())165},166#[cfg(not(feature = "gzip"))]167Compression::Gzip => Err(ParquetError::FeatureNotActive(168crate::parquet::error::Feature::Gzip,169"decompress with gzip".to_string(),170)),171#[cfg(feature = "snappy")]172Compression::Snappy => {173use snap::raw::{Decoder, decompress_len};174175let len = decompress_len(input_buf)?;176if len > output_buf.len() {177return Err(ParquetError::oos("snappy header out of spec"));178}179Decoder::new()180.decompress(input_buf, output_buf)181.map_err(|e| e.into())182.map(|_| ())183},184#[cfg(not(feature = "snappy"))]185Compression::Snappy => Err(ParquetError::FeatureNotActive(186crate::parquet::error::Feature::Snappy,187"decompress with snappy".to_string(),188)),189#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]190Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)191.map(|_| {})192.map_err(|e| e.into()),193#[cfg(feature = "lz4")]194Compression::Lz4Raw => {195lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)196.map(|_| {})197.map_err(|e| e.into())198},199#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]200Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(201crate::parquet::error::Feature::Lz4,202"decompress with lz4".to_string(),203)),204205#[cfg(any(feature = "lz4_flex", feature = "lz4"))]206Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {207lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)208.map(|_| {})209}),210211#[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]212Compression::Lz4 => Err(ParquetError::FeatureNotActive(213crate::parquet::error::Feature::Lz4,214"decompress with legacy lz4".to_string(),215)),216217#[cfg(feature = "zstd")]218Compression::Zstd => {219use std::io::Read;220if !matches!(ctx, DecompressionContext::Zstd(_)) {221*ctx = DecompressionContext::Zstd(zstd::zstd_safe::DCtx::create());222}223let DecompressionContext::Zstd(ctx) = ctx else {224unreachable!();225};226let mut decoder = zstd::Decoder::with_context(input_buf, ctx);227decoder.read_exact(output_buf).map_err(|e| e.into())228},229#[cfg(not(feature = "zstd"))]230Compression::Zstd => Err(ParquetError::FeatureNotActive(231crate::parquet::error::Feature::Zstd,232"decompress with zstd".to_string(),233)),234Compression::Uncompressed => Err(ParquetError::InvalidParameter(235"Compressing uncompressed".to_string(),236)),237_ => Err(ParquetError::FeatureNotSupported(format!(238"Compression {compression:?} is not supported",239))),240}241}242243/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.244/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).245/// Returns error if decompression failed.246#[cfg(any(feature = "lz4", feature = "lz4_flex"))]247fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {248// Parquet files written with the Hadoop Lz4Codec use their own framing.249// The input buffer can contain an arbitrary number of "frames", each250// with the following structure:251// - bytes 0..3: big-endian uint32_t representing the frame decompressed size252// - bytes 4..7: big-endian uint32_t representing the frame compressed size253// - bytes 8...: frame compressed data254//255// The Hadoop Lz4Codec source code can be found here:256// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc257258const SIZE_U32: usize = size_of::<u32>();259const PREFIX_LEN: usize = SIZE_U32 * 2;260let mut input_len = input_buf.len();261let mut input = input_buf;262let mut output_len = output_buf.len();263let mut output: &mut [u8] = output_buf;264while input_len >= PREFIX_LEN {265let mut bytes = [0; SIZE_U32];266bytes.copy_from_slice(&input[0..4]);267let expected_decompressed_size = u32::from_be_bytes(bytes);268let mut bytes = [0; SIZE_U32];269bytes.copy_from_slice(&input[4..8]);270let expected_compressed_size = u32::from_be_bytes(bytes);271input = &input[PREFIX_LEN..];272input_len -= PREFIX_LEN;273274if input_len < expected_compressed_size as usize {275return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));276}277278if output_len < expected_decompressed_size as usize {279return Err(ParquetError::oos(280"Not enough bytes to hold advertised output",281));282}283let decompressed_size = lz4_decompress_to_buffer(284&input[..expected_compressed_size as usize],285Some(output_len as i32),286output,287)?;288if decompressed_size != expected_decompressed_size as usize {289return Err(ParquetError::oos("unexpected decompressed size"));290}291input_len -= expected_compressed_size as usize;292output_len -= expected_decompressed_size as usize;293if input_len > expected_compressed_size as usize {294input = &input[expected_compressed_size as usize..];295output = &mut output[expected_decompressed_size as usize..];296} else {297break;298}299}300if input_len == 0 {301Ok(())302} else {303Err(ParquetError::oos("Not all input are consumed"))304}305}306307#[cfg(feature = "lz4")]308#[inline]309fn lz4_decompress_to_buffer(310src: &[u8],311uncompressed_size: Option<i32>,312buffer: &mut [u8],313) -> ParquetResult<usize> {314let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;315Ok(size)316}317318#[cfg(test)]319mod tests {320use super::*;321322fn test_roundtrip(c: CompressionOptions, data: &[u8]) {323let offset = 2048;324325// Compress to a buffer that already has data is possible326let mut compressed = vec![2; offset];327compress(c, data, &mut compressed).expect("Error when compressing");328329// data is compressed...330assert!(compressed.len() - offset < data.len());331332let mut decompressed = vec![0; data.len()];333let mut context = DecompressionContext::Unset;334decompress(335c.into(),336&compressed[offset..],337&mut decompressed,338&mut context,339)340.expect("Error when decompressing");341assert_eq!(data, decompressed.as_slice());342}343344fn test_codec(c: CompressionOptions) {345let sizes = vec![1000, 10000, 100000];346for size in sizes {347let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();348test_roundtrip(c, &data);349}350}351352#[test]353fn test_codec_snappy() {354test_codec(CompressionOptions::Snappy);355}356357#[test]358fn test_codec_gzip_default() {359test_codec(CompressionOptions::Gzip(None));360}361362#[test]363fn test_codec_gzip_low_compression() {364test_codec(CompressionOptions::Gzip(Some(365GzipLevel::try_new(1).unwrap(),366)));367}368369#[test]370fn test_codec_brotli_default() {371test_codec(CompressionOptions::Brotli(None));372}373374#[test]375fn test_codec_brotli_low_compression() {376test_codec(CompressionOptions::Brotli(Some(377BrotliLevel::try_new(1).unwrap(),378)));379}380381#[test]382fn test_codec_brotli_high_compression() {383test_codec(CompressionOptions::Brotli(Some(384BrotliLevel::try_new(11).unwrap(),385)));386}387388#[test]389fn test_codec_lz4_raw() {390test_codec(CompressionOptions::Lz4Raw);391}392393#[test]394fn test_codec_zstd_default() {395test_codec(CompressionOptions::Zstd(None));396}397398#[cfg(feature = "zstd")]399#[test]400fn test_codec_zstd_low_compression() {401test_codec(CompressionOptions::Zstd(Some(402ZstdLevel::try_new(1).unwrap(),403)));404}405406#[cfg(feature = "zstd")]407#[test]408fn test_codec_zstd_high_compression() {409test_codec(CompressionOptions::Zstd(Some(410ZstdLevel::try_new(21).unwrap(),411)));412}413}414415416