Path: blob/main/crates/polars-arrow/src/io/ipc/compression.rs
8421 views
use polars_error::PolarsResult;1#[cfg(feature = "io_ipc_compression")]2use polars_error::to_compute_err;3use polars_utils::compression::ZstdLevel;45#[cfg(feature = "io_ipc_compression")]6#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]7pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {8use std::io::Read;9let mut decoder = lz4::Decoder::new(input_buf)?;10decoder.read_exact(output_buf).map_err(|e| e.into())11}1213#[cfg(feature = "io_ipc_compression")]14#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]15pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {16use std::io::Read;17let mut decoder = zstd::Decoder::with_buffer(input_buf)?;18decoder.read_exact(output_buf).map_err(|e| e.into())19}2021#[cfg(not(feature = "io_ipc_compression"))]22pub fn decompress_lz4(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {23panic!(24"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."25);26}2728#[cfg(not(feature = "io_ipc_compression"))]29pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {30panic!(31"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."32);33}3435#[cfg(feature = "io_ipc_compression")]36#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]37pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {38use std::io::Write;3940let mut encoder = lz4::EncoderBuilder::new()41.build(output_buf)42.map_err(to_compute_err)?;43encoder.write_all(input_buf)?;44encoder.finish().1.map_err(|e| e.into())45}4647#[cfg(feature = "io_ipc_compression")]48#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]49pub fn compress_zstd(50input_buf: &[u8],51output_buf: &mut Vec<u8>,52level: ZstdLevel,53) -> PolarsResult<()> {54zstd::stream::copy_encode(input_buf, output_buf, level.compression_level())55.map_err(|e| e.into())56}5758#[cfg(not(feature = "io_ipc_compression"))]59pub fn compress_lz4(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {60panic!(61"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."62)63}6465#[cfg(not(feature = "io_ipc_compression"))]66pub fn compress_zstd(_input_buf: &[u8], _output_buf: &[u8], _level: ZstdLevel) -> PolarsResult<()> {67panic!(68"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."69)70}7172#[cfg(test)]73mod tests {74use super::*;7576#[cfg(feature = "io_ipc_compression")]77#[test]78#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support79fn round_trip_zstd() {80let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();81let mut buffer = vec![];82compress_zstd(&data, &mut buffer, Default::default()).unwrap();8384let mut result = vec![0; 200];85decompress_zstd(&buffer, &mut result).unwrap();86assert_eq!(data, result);87}8889#[cfg(feature = "io_ipc_compression")]90#[test]91#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support92fn round_trip_lz4() {93let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();94let mut buffer = vec![];95compress_lz4(&data, &mut buffer).unwrap();9697let mut result = vec![0; 200];98decompress_lz4(&buffer, &mut result).unwrap();99assert_eq!(data, result);100}101}102103104