Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/compression.rs
8509 views
1
//! Functionality to compress and decompress data according to the parquet specification
2
pub use super::parquet_bridge::{
3
BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,
4
};
5
use crate::parquet::error::{ParquetError, ParquetResult};
6
7
#[cfg(any(feature = "snappy", feature = "lz4"))]
8
fn inner_compress<
9
G: Fn(usize) -> ParquetResult<usize>,
10
F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,
11
>(
12
input: &[u8],
13
output: &mut Vec<u8>,
14
get_length: G,
15
compress: F,
16
) -> ParquetResult<()> {
17
let original_length = output.len();
18
let max_required_length = get_length(input.len())?;
19
20
output.resize(original_length + max_required_length, 0);
21
let compressed_size = compress(input, &mut output[original_length..])?;
22
23
output.truncate(original_length + compressed_size);
24
Ok(())
25
}
26
27
/// Compresses data stored in slice `input_buf` and writes the compressed result
28
/// to `output_buf`.
29
///
30
/// Note that you'll need to call `clear()` before reusing the same `output_buf`
31
/// across different `compress` calls.
32
#[allow(unused_variables)]
33
pub fn compress(
34
compression: CompressionOptions,
35
input_buf: &[u8],
36
#[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,
37
) -> ParquetResult<()> {
38
match compression {
39
#[cfg(feature = "brotli")]
40
CompressionOptions::Brotli(level) => {
41
use std::io::Write;
42
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
43
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
44
45
let q = level.unwrap_or_default();
46
let mut encoder = brotli::CompressorWriter::new(
47
output_buf,
48
BROTLI_DEFAULT_BUFFER_SIZE,
49
q.compression_level(),
50
BROTLI_DEFAULT_LG_WINDOW_SIZE,
51
);
52
encoder.write_all(input_buf)?;
53
encoder.flush().map_err(|e| e.into())
54
},
55
#[cfg(not(feature = "brotli"))]
56
CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(
57
crate::parquet::error::Feature::Brotli,
58
"compress to brotli".to_string(),
59
)),
60
#[cfg(feature = "gzip")]
61
CompressionOptions::Gzip(level) => {
62
use std::io::Write;
63
let level = level.unwrap_or_default();
64
let mut encoder = flate2::write::GzEncoder::new(
65
output_buf,
66
flate2::Compression::new(level.compression_level() as _),
67
);
68
encoder.write_all(input_buf)?;
69
encoder.try_finish().map_err(|e| e.into())
70
},
71
#[cfg(not(feature = "gzip"))]
72
CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(
73
crate::parquet::error::Feature::Gzip,
74
"compress to gzip".to_string(),
75
)),
76
#[cfg(feature = "snappy")]
77
CompressionOptions::Snappy => inner_compress(
78
input_buf,
79
output_buf,
80
|len| Ok(snap::raw::max_compress_len(len)),
81
|input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),
82
),
83
#[cfg(not(feature = "snappy"))]
84
CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(
85
crate::parquet::error::Feature::Snappy,
86
"compress to snappy".to_string(),
87
)),
88
#[cfg(feature = "lz4")]
89
CompressionOptions::Lz4Raw => inner_compress(
90
input_buf,
91
output_buf,
92
|len| Ok(lz4::block::compress_bound(len)?),
93
|input, output| {
94
let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;
95
Ok(compressed_size)
96
},
97
),
98
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
99
CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(
100
crate::parquet::error::Feature::Lz4,
101
"compress to lz4".to_string(),
102
)),
103
#[cfg(feature = "zstd")]
104
CompressionOptions::Zstd(level) => {
105
let level = level.map(|v| v.compression_level()).unwrap_or_default();
106
// Make sure the buffer is large enough; the interface assumption is
107
// that decompressed data is appended to the output buffer.
108
let old_len = output_buf.len();
109
output_buf.resize(
110
old_len + zstd::zstd_safe::compress_bound(input_buf.len()),
111
0,
112
);
113
match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {
114
Ok(written_size) => {
115
output_buf.truncate(old_len + written_size);
116
Ok(())
117
},
118
Err(e) => Err(e.into()),
119
}
120
},
121
#[cfg(not(feature = "zstd"))]
122
CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(
123
crate::parquet::error::Feature::Zstd,
124
"compress to zstd".to_string(),
125
)),
126
CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(
127
"Compressing uncompressed".to_string(),
128
)),
129
_ => Err(ParquetError::FeatureNotSupported(format!(
130
"Compression {compression:?} is not supported",
131
))),
132
}
133
}
134
135
pub enum DecompressionContext {
136
Unset,
137
#[cfg(feature = "zstd")]
138
Zstd(zstd::zstd_safe::DCtx<'static>),
139
}
140
141
/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.
142
/// Returns the total number of bytes written.
143
#[allow(unused_variables)]
144
pub fn decompress(
145
compression: Compression,
146
input_buf: &[u8],
147
output_buf: &mut [u8],
148
ctx: &mut DecompressionContext,
149
) -> ParquetResult<()> {
150
match compression {
151
#[cfg(feature = "brotli")]
152
Compression::Brotli => {
153
use std::io::Read;
154
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
155
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
156
.read_exact(output_buf)
157
.map_err(|e| e.into())
158
},
159
#[cfg(not(feature = "brotli"))]
160
Compression::Brotli => Err(ParquetError::FeatureNotActive(
161
crate::parquet::error::Feature::Brotli,
162
"decompress with brotli".to_string(),
163
)),
164
#[cfg(feature = "gzip")]
165
Compression::Gzip => {
166
use std::io::Read;
167
let mut decoder = flate2::read::GzDecoder::new(input_buf);
168
decoder.read_exact(output_buf).map_err(|e| e.into())
169
},
170
#[cfg(not(feature = "gzip"))]
171
Compression::Gzip => Err(ParquetError::FeatureNotActive(
172
crate::parquet::error::Feature::Gzip,
173
"decompress with gzip".to_string(),
174
)),
175
#[cfg(feature = "snappy")]
176
Compression::Snappy => {
177
use snap::raw::{Decoder, decompress_len};
178
179
let len = decompress_len(input_buf)?;
180
if len > output_buf.len() {
181
return Err(ParquetError::oos("snappy header out of spec"));
182
}
183
Decoder::new()
184
.decompress(input_buf, output_buf)
185
.map_err(|e| e.into())
186
.map(|_| ())
187
},
188
#[cfg(not(feature = "snappy"))]
189
Compression::Snappy => Err(ParquetError::FeatureNotActive(
190
crate::parquet::error::Feature::Snappy,
191
"decompress with snappy".to_string(),
192
)),
193
#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
194
Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)
195
.map(|_| {})
196
.map_err(|e| e.into()),
197
#[cfg(feature = "lz4")]
198
Compression::Lz4Raw => {
199
lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
200
.map(|_| {})
201
.map_err(|e| e.into())
202
},
203
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
204
Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
205
crate::parquet::error::Feature::Lz4,
206
"decompress with lz4".to_string(),
207
)),
208
209
#[cfg(any(feature = "lz4_flex", feature = "lz4"))]
210
Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {
211
lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
212
.map(|_| {})
213
}),
214
215
#[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
216
Compression::Lz4 => Err(ParquetError::FeatureNotActive(
217
crate::parquet::error::Feature::Lz4,
218
"decompress with legacy lz4".to_string(),
219
)),
220
221
#[cfg(feature = "zstd")]
222
Compression::Zstd => {
223
use std::io::Read;
224
if !matches!(ctx, DecompressionContext::Zstd(_)) {
225
*ctx = DecompressionContext::Zstd(zstd::zstd_safe::DCtx::create());
226
}
227
let DecompressionContext::Zstd(ctx) = ctx else {
228
unreachable!();
229
};
230
let mut decoder = zstd::Decoder::with_context(input_buf, ctx);
231
decoder.read_exact(output_buf).map_err(|e| e.into())
232
},
233
#[cfg(not(feature = "zstd"))]
234
Compression::Zstd => Err(ParquetError::FeatureNotActive(
235
crate::parquet::error::Feature::Zstd,
236
"decompress with zstd".to_string(),
237
)),
238
Compression::Uncompressed => Err(ParquetError::InvalidParameter(
239
"Compressing uncompressed".to_string(),
240
)),
241
_ => Err(ParquetError::FeatureNotSupported(format!(
242
"Compression {compression:?} is not supported",
243
))),
244
}
245
}
246
247
/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
248
/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
249
/// Returns error if decompression failed.
250
#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
251
fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {
252
// Parquet files written with the Hadoop Lz4Codec use their own framing.
253
// The input buffer can contain an arbitrary number of "frames", each
254
// with the following structure:
255
// - bytes 0..3: big-endian uint32_t representing the frame decompressed size
256
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
257
// - bytes 8...: frame compressed data
258
//
259
// The Hadoop Lz4Codec source code can be found here:
260
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
261
262
const SIZE_U32: usize = size_of::<u32>();
263
const PREFIX_LEN: usize = SIZE_U32 * 2;
264
let mut input_len = input_buf.len();
265
let mut input = input_buf;
266
let mut output_len = output_buf.len();
267
let mut output: &mut [u8] = output_buf;
268
while input_len >= PREFIX_LEN {
269
let mut bytes = [0; SIZE_U32];
270
bytes.copy_from_slice(&input[0..4]);
271
let expected_decompressed_size = u32::from_be_bytes(bytes);
272
let mut bytes = [0; SIZE_U32];
273
bytes.copy_from_slice(&input[4..8]);
274
let expected_compressed_size = u32::from_be_bytes(bytes);
275
input = &input[PREFIX_LEN..];
276
input_len -= PREFIX_LEN;
277
278
if input_len < expected_compressed_size as usize {
279
return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));
280
}
281
282
if output_len < expected_decompressed_size as usize {
283
return Err(ParquetError::oos(
284
"Not enough bytes to hold advertised output",
285
));
286
}
287
let decompressed_size = lz4_decompress_to_buffer(
288
&input[..expected_compressed_size as usize],
289
Some(output_len as i32),
290
output,
291
)?;
292
if decompressed_size != expected_decompressed_size as usize {
293
return Err(ParquetError::oos("unexpected decompressed size"));
294
}
295
input_len -= expected_compressed_size as usize;
296
output_len -= expected_decompressed_size as usize;
297
if input_len > expected_compressed_size as usize {
298
input = &input[expected_compressed_size as usize..];
299
output = &mut output[expected_decompressed_size as usize..];
300
} else {
301
break;
302
}
303
}
304
if input_len == 0 {
305
Ok(())
306
} else {
307
Err(ParquetError::oos("Not all input are consumed"))
308
}
309
}
310
311
#[cfg(feature = "lz4")]
312
#[inline]
313
fn lz4_decompress_to_buffer(
314
src: &[u8],
315
uncompressed_size: Option<i32>,
316
buffer: &mut [u8],
317
) -> ParquetResult<usize> {
318
let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;
319
Ok(size)
320
}
321
322
#[cfg(test)]
323
mod tests {
324
use super::*;
325
326
fn test_roundtrip(c: CompressionOptions, data: &[u8]) {
327
let offset = 2048;
328
329
// Compress to a buffer that already has data is possible
330
let mut compressed = vec![2; offset];
331
compress(c, data, &mut compressed).expect("Error when compressing");
332
333
// data is compressed...
334
assert!(compressed.len() - offset < data.len());
335
336
let mut decompressed = vec![0; data.len()];
337
let mut context = DecompressionContext::Unset;
338
decompress(
339
c.into(),
340
&compressed[offset..],
341
&mut decompressed,
342
&mut context,
343
)
344
.expect("Error when decompressing");
345
assert_eq!(data, decompressed.as_slice());
346
}
347
348
fn test_codec(c: CompressionOptions) {
349
let sizes = vec![1000, 10000, 100000];
350
for size in sizes {
351
let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();
352
test_roundtrip(c, &data);
353
}
354
}
355
356
#[test]
357
fn test_codec_snappy() {
358
test_codec(CompressionOptions::Snappy);
359
}
360
361
#[test]
362
fn test_codec_gzip_default() {
363
test_codec(CompressionOptions::Gzip(None));
364
}
365
366
#[test]
367
fn test_codec_gzip_low_compression() {
368
test_codec(CompressionOptions::Gzip(Some(
369
GzipLevel::try_new(1).unwrap(),
370
)));
371
}
372
373
#[test]
374
fn test_codec_brotli_default() {
375
test_codec(CompressionOptions::Brotli(None));
376
}
377
378
#[test]
379
fn test_codec_brotli_low_compression() {
380
test_codec(CompressionOptions::Brotli(Some(
381
BrotliLevel::try_new(1).unwrap(),
382
)));
383
}
384
385
#[test]
386
fn test_codec_brotli_high_compression() {
387
test_codec(CompressionOptions::Brotli(Some(
388
BrotliLevel::try_new(11).unwrap(),
389
)));
390
}
391
392
#[test]
393
fn test_codec_lz4_raw() {
394
test_codec(CompressionOptions::Lz4Raw);
395
}
396
397
#[test]
398
fn test_codec_zstd_default() {
399
test_codec(CompressionOptions::Zstd(None));
400
}
401
402
#[cfg(feature = "zstd")]
403
#[test]
404
fn test_codec_zstd_low_compression() {
405
test_codec(CompressionOptions::Zstd(Some(
406
ZstdLevel::try_new(1).unwrap(),
407
)));
408
}
409
410
#[cfg(feature = "zstd")]
411
#[test]
412
fn test_codec_zstd_high_compression() {
413
test_codec(CompressionOptions::Zstd(Some(
414
ZstdLevel::try_new(21).unwrap(),
415
)));
416
}
417
}
418
419