Path: blob/main/crates/polars-arrow/src/io/ipc/compression.rs
6939 views
use polars_error::PolarsResult;1#[cfg(feature = "io_ipc_compression")]2use polars_error::to_compute_err;34#[cfg(feature = "io_ipc_compression")]5#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]6pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {7use std::io::Read;8let mut decoder = lz4::Decoder::new(input_buf)?;9decoder.read_exact(output_buf).map_err(|e| e.into())10}1112#[cfg(feature = "io_ipc_compression")]13#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]14pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {15use std::io::Read;16let mut decoder = zstd::Decoder::with_buffer(input_buf)?;17decoder.read_exact(output_buf).map_err(|e| e.into())18}1920#[cfg(not(feature = "io_ipc_compression"))]21pub fn decompress_lz4(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {22panic!(23"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."24);25}2627#[cfg(not(feature = "io_ipc_compression"))]28pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {29panic!(30"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."31);32}3334#[cfg(feature = "io_ipc_compression")]35#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]36pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {37use std::io::Write;3839let mut encoder = lz4::EncoderBuilder::new()40.build(output_buf)41.map_err(to_compute_err)?;42encoder.write_all(input_buf)?;43encoder.finish().1.map_err(|e| e.into())44}4546#[cfg(feature = "io_ipc_compression")]47#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]48pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {49zstd::stream::copy_encode(input_buf, output_buf, 0).map_err(|e| e.into())50}5152#[cfg(not(feature = "io_ipc_compression"))]53pub fn compress_lz4(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {54panic!(55"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."56)57}5859#[cfg(not(feature = "io_ipc_compression"))]60pub fn compress_zstd(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {61panic!(62"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."63)64}6566#[cfg(test)]67mod tests {68use super::*;6970#[cfg(feature = "io_ipc_compression")]71#[test]72#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support73fn round_trip_zstd() {74let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();75let mut buffer = vec![];76compress_zstd(&data, &mut buffer).unwrap();7778let mut result = vec![0; 200];79decompress_zstd(&buffer, &mut result).unwrap();80assert_eq!(data, result);81}8283#[cfg(feature = "io_ipc_compression")]84#[test]85#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support86fn round_trip_lz4() {87let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();88let mut buffer = vec![];89compress_lz4(&data, &mut buffer).unwrap();9091let mut result = vec![0; 200];92decompress_lz4(&buffer, &mut result).unwrap();93assert_eq!(data, result);94}95}969798