Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/compression.rs
6939 views
1
use polars_error::PolarsResult;
2
#[cfg(feature = "io_ipc_compression")]
3
use polars_error::to_compute_err;
4
5
#[cfg(feature = "io_ipc_compression")]
6
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
7
pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {
8
use std::io::Read;
9
let mut decoder = lz4::Decoder::new(input_buf)?;
10
decoder.read_exact(output_buf).map_err(|e| e.into())
11
}
12
13
#[cfg(feature = "io_ipc_compression")]
14
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
15
pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> PolarsResult<()> {
16
use std::io::Read;
17
let mut decoder = zstd::Decoder::with_buffer(input_buf)?;
18
decoder.read_exact(output_buf).map_err(|e| e.into())
19
}
20
21
#[cfg(not(feature = "io_ipc_compression"))]
22
pub fn decompress_lz4(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {
23
panic!(
24
"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."
25
);
26
}
27
28
#[cfg(not(feature = "io_ipc_compression"))]
29
pub fn decompress_zstd(_input_buf: &[u8], _output_buf: &mut [u8]) -> PolarsResult<()> {
30
panic!(
31
"The crate was compiled without IPC compression. Use `io_ipc_compression` to read compressed IPC."
32
);
33
}
34
35
#[cfg(feature = "io_ipc_compression")]
36
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
37
pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {
38
use std::io::Write;
39
40
let mut encoder = lz4::EncoderBuilder::new()
41
.build(output_buf)
42
.map_err(to_compute_err)?;
43
encoder.write_all(input_buf)?;
44
encoder.finish().1.map_err(|e| e.into())
45
}
46
47
#[cfg(feature = "io_ipc_compression")]
48
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
49
pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec<u8>) -> PolarsResult<()> {
50
zstd::stream::copy_encode(input_buf, output_buf, 0).map_err(|e| e.into())
51
}
52
53
#[cfg(not(feature = "io_ipc_compression"))]
54
pub fn compress_lz4(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {
55
panic!(
56
"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."
57
)
58
}
59
60
#[cfg(not(feature = "io_ipc_compression"))]
61
pub fn compress_zstd(_input_buf: &[u8], _output_buf: &[u8]) -> PolarsResult<()> {
62
panic!(
63
"The crate was compiled without IPC compression. Use `io_ipc_compression` to write compressed IPC."
64
)
65
}
66
67
#[cfg(test)]
68
mod tests {
69
use super::*;
70
71
#[cfg(feature = "io_ipc_compression")]
72
#[test]
73
#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support
74
fn round_trip_zstd() {
75
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
76
let mut buffer = vec![];
77
compress_zstd(&data, &mut buffer).unwrap();
78
79
let mut result = vec![0; 200];
80
decompress_zstd(&buffer, &mut result).unwrap();
81
assert_eq!(data, result);
82
}
83
84
#[cfg(feature = "io_ipc_compression")]
85
#[test]
86
#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support
87
fn round_trip_lz4() {
88
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
89
let mut buffer = vec![];
90
compress_lz4(&data, &mut buffer).unwrap();
91
92
let mut result = vec![0; 200];
93
decompress_lz4(&buffer, &mut result).unwrap();
94
assert_eq!(data, result);
95
}
96
}
97
98