Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/compression.rs
8421 views
1
use polars_error::PolarsResult;
2
#[cfg(feature = "io_ipc_compression")]
3
use polars_error::to_compute_err;
4
use polars_utils::compression::ZstdLevel;
5
6
#[cfg(feature = "io_ipc_compression")]
7
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
8
pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {
9
use std::io::Read;
10
let mut decoder = lz4::Decoder::new(input_buf)?;
11
decoder.read_exact(output_buf).map_err(|e| e.into())
12
}
13
14
#[cfg(feature = "io_ipc_compression")]
15
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
16
pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {
17
use std::io::Read;
18
let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
19
decoder.read_exact(output_buf).map_err(|e| e.into())
20
}
21
22
#[cfg(not(feature = "io_ipc_compression"))]
23
pub fn decompress_lz4(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {
24
panic!(
25
"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."
26
);
27
}
28
29
#[cfg(not(feature = "io_ipc_compression"))]
30
pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {
31
panic!(
32
"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."
33
);
34
}
35
36
#[cfg(feature = "io_ipc_compression")]
37
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
38
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {
39
use std::io::Write;
40
41
let mut encoder = lz4::EncoderBuilder::new()
42
.build(output_buf)
43
.map_err(to_compute_err)?;
44
encoder.write_all(input_buf)?;
45
encoder.finish().1.map_err(|e| e.into())
46
}
47
48
#[cfg(feature = "io_ipc_compression")]
49
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
50
pub fn compress_zstd(
51
input_buf: &[u8],
52
output_buf: &mut Vec<u8>,
53
level: ZstdLevel,
54
) -> PolarsResult<()> {
55
zstd::stream::copy_encode(input_buf, output_buf, level.compression_level())
56
.map_err(|e| e.into())
57
}
58
59
#[cfg(not(feature = "io_ipc_compression"))]
60
pub fn compress_lz4(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {
61
panic!(
62
"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."
63
)
64
}
65
66
#[cfg(not(feature = "io_ipc_compression"))]
67
pub fn compress_zstd(_input_buf: &[u8], _output_buf: &[u8], _level: ZstdLevel) -> PolarsResult<()> {
68
panic!(
69
"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."
70
)
71
}
72
73
#[cfg(test)]
74
mod tests {
75
use super::*;
76
77
#[cfg(feature = "io_ipc_compression")]
78
#[test]
79
#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support
80
fn round_trip_zstd() {
81
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
82
let mut buffer = vec![];
83
compress_zstd(&data, &mut buffer, Default::default()).unwrap();
84
85
let mut result = vec![0; 200];
86
decompress_zstd(&buffer, &mut result).unwrap();
87
assert_eq!(data, result);
88
}
89
90
#[cfg(feature = "io_ipc_compression")]
91
#[test]
92
#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support
93
fn round_trip_lz4() {
94
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
95
let mut buffer = vec![];
96
compress_lz4(&data, &mut buffer).unwrap();
97
98
let mut result = vec![0; 200];
99
decompress_lz4(&buffer, &mut result).unwrap();
100
assert_eq!(data, result);
101
}
102
}
103
104