Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/compression.rs
6940 views
1
//! Functionality to compress and decompress data according to the parquet specification
2
pub use super::parquet_bridge::{
3
BrotliLevel, Compression, CompressionOptions, GzipLevel, ZstdLevel,
4
};
5
use crate::parquet::error::{ParquetError, ParquetResult};
6
7
#[cfg(any(feature = "snappy", feature = "lz4"))]
8
fn inner_compress<
9
G: Fn(usize) -> ParquetResult<usize>,
10
F: Fn(&[u8], &mut [u8]) -> ParquetResult<usize>,
11
>(
12
input: &[u8],
13
output: &mut Vec<u8>,
14
get_length: G,
15
compress: F,
16
) -> ParquetResult<()> {
17
let original_length = output.len();
18
let max_required_length = get_length(input.len())?;
19
20
output.resize(original_length + max_required_length, 0);
21
let compressed_size = compress(input, &mut output[original_length..])?;
22
23
output.truncate(original_length + compressed_size);
24
Ok(())
25
}
26
27
/// Compresses data stored in slice `input_buf` and writes the compressed result
28
/// to `output_buf`.
29
///
30
/// Note that you'll need to call `clear()` before reusing the same `output_buf`
31
/// across different `compress` calls.
32
#[allow(unused_variables)]
33
pub fn compress(
34
compression: CompressionOptions,
35
input_buf: &[u8],
36
#[allow(clippy::ptr_arg)] output_buf: &mut Vec<u8>,
37
) -> ParquetResult<()> {
38
match compression {
39
#[cfg(feature = "brotli")]
40
CompressionOptions::Brotli(level) => {
41
use std::io::Write;
42
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
43
const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
44
45
let q = level.unwrap_or_default();
46
let mut encoder = brotli::CompressorWriter::new(
47
output_buf,
48
BROTLI_DEFAULT_BUFFER_SIZE,
49
q.compression_level(),
50
BROTLI_DEFAULT_LG_WINDOW_SIZE,
51
);
52
encoder.write_all(input_buf)?;
53
encoder.flush().map_err(|e| e.into())
54
},
55
#[cfg(not(feature = "brotli"))]
56
CompressionOptions::Brotli(_) => Err(ParquetError::FeatureNotActive(
57
crate::parquet::error::Feature::Brotli,
58
"compress to brotli".to_string(),
59
)),
60
#[cfg(feature = "gzip")]
61
CompressionOptions::Gzip(level) => {
62
use std::io::Write;
63
let level = level.unwrap_or_default();
64
let mut encoder = flate2::write::GzEncoder::new(output_buf, level.into());
65
encoder.write_all(input_buf)?;
66
encoder.try_finish().map_err(|e| e.into())
67
},
68
#[cfg(not(feature = "gzip"))]
69
CompressionOptions::Gzip(_) => Err(ParquetError::FeatureNotActive(
70
crate::parquet::error::Feature::Gzip,
71
"compress to gzip".to_string(),
72
)),
73
#[cfg(feature = "snappy")]
74
CompressionOptions::Snappy => inner_compress(
75
input_buf,
76
output_buf,
77
|len| Ok(snap::raw::max_compress_len(len)),
78
|input, output| Ok(snap::raw::Encoder::new().compress(input, output)?),
79
),
80
#[cfg(not(feature = "snappy"))]
81
CompressionOptions::Snappy => Err(ParquetError::FeatureNotActive(
82
crate::parquet::error::Feature::Snappy,
83
"compress to snappy".to_string(),
84
)),
85
#[cfg(feature = "lz4")]
86
CompressionOptions::Lz4Raw => inner_compress(
87
input_buf,
88
output_buf,
89
|len| Ok(lz4::block::compress_bound(len)?),
90
|input, output| {
91
let compressed_size = lz4::block::compress_to_buffer(input, None, false, output)?;
92
Ok(compressed_size)
93
},
94
),
95
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
96
CompressionOptions::Lz4Raw => Err(ParquetError::FeatureNotActive(
97
crate::parquet::error::Feature::Lz4,
98
"compress to lz4".to_string(),
99
)),
100
#[cfg(feature = "zstd")]
101
CompressionOptions::Zstd(level) => {
102
let level = level.map(|v| v.compression_level()).unwrap_or_default();
103
// Make sure the buffer is large enough; the interface assumption is
104
// that decompressed data is appended to the output buffer.
105
let old_len = output_buf.len();
106
output_buf.resize(
107
old_len + zstd::zstd_safe::compress_bound(input_buf.len()),
108
0,
109
);
110
match zstd::bulk::compress_to_buffer(input_buf, &mut output_buf[old_len..], level) {
111
Ok(written_size) => {
112
output_buf.truncate(old_len + written_size);
113
Ok(())
114
},
115
Err(e) => Err(e.into()),
116
}
117
},
118
#[cfg(not(feature = "zstd"))]
119
CompressionOptions::Zstd(_) => Err(ParquetError::FeatureNotActive(
120
crate::parquet::error::Feature::Zstd,
121
"compress to zstd".to_string(),
122
)),
123
CompressionOptions::Uncompressed => Err(ParquetError::InvalidParameter(
124
"Compressing uncompressed".to_string(),
125
)),
126
_ => Err(ParquetError::FeatureNotSupported(format!(
127
"Compression {compression:?} is not supported",
128
))),
129
}
130
}
131
132
pub enum DecompressionContext {
133
Unset,
134
#[cfg(feature = "zstd")]
135
Zstd(zstd::zstd_safe::DCtx<'static>),
136
}
137
138
/// Decompresses data stored in slice `input_buf` and writes output to `output_buf`.
139
/// Returns the total number of bytes written.
140
#[allow(unused_variables)]
141
pub fn decompress(
142
compression: Compression,
143
input_buf: &[u8],
144
output_buf: &mut [u8],
145
ctx: &mut DecompressionContext,
146
) -> ParquetResult<()> {
147
match compression {
148
#[cfg(feature = "brotli")]
149
Compression::Brotli => {
150
use std::io::Read;
151
const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
152
brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
153
.read_exact(output_buf)
154
.map_err(|e| e.into())
155
},
156
#[cfg(not(feature = "brotli"))]
157
Compression::Brotli => Err(ParquetError::FeatureNotActive(
158
crate::parquet::error::Feature::Brotli,
159
"decompress with brotli".to_string(),
160
)),
161
#[cfg(feature = "gzip")]
162
Compression::Gzip => {
163
use std::io::Read;
164
let mut decoder = flate2::read::GzDecoder::new(input_buf);
165
decoder.read_exact(output_buf).map_err(|e| e.into())
166
},
167
#[cfg(not(feature = "gzip"))]
168
Compression::Gzip => Err(ParquetError::FeatureNotActive(
169
crate::parquet::error::Feature::Gzip,
170
"decompress with gzip".to_string(),
171
)),
172
#[cfg(feature = "snappy")]
173
Compression::Snappy => {
174
use snap::raw::{Decoder, decompress_len};
175
176
let len = decompress_len(input_buf)?;
177
if len > output_buf.len() {
178
return Err(ParquetError::oos("snappy header out of spec"));
179
}
180
Decoder::new()
181
.decompress(input_buf, output_buf)
182
.map_err(|e| e.into())
183
.map(|_| ())
184
},
185
#[cfg(not(feature = "snappy"))]
186
Compression::Snappy => Err(ParquetError::FeatureNotActive(
187
crate::parquet::error::Feature::Snappy,
188
"decompress with snappy".to_string(),
189
)),
190
#[cfg(all(feature = "lz4_flex", not(feature = "lz4")))]
191
Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf)
192
.map(|_| {})
193
.map_err(|e| e.into()),
194
#[cfg(feature = "lz4")]
195
Compression::Lz4Raw => {
196
lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
197
.map(|_| {})
198
.map_err(|e| e.into())
199
},
200
#[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))]
201
Compression::Lz4Raw => Err(ParquetError::FeatureNotActive(
202
crate::parquet::error::Feature::Lz4,
203
"decompress with lz4".to_string(),
204
)),
205
206
#[cfg(any(feature = "lz4_flex", feature = "lz4"))]
207
Compression::Lz4 => try_decompress_hadoop(input_buf, output_buf).or_else(|_| {
208
lz4_decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf)
209
.map(|_| {})
210
}),
211
212
#[cfg(all(not(feature = "lz4_flex"), not(feature = "lz4")))]
213
Compression::Lz4 => Err(ParquetError::FeatureNotActive(
214
crate::parquet::error::Feature::Lz4,
215
"decompress with legacy lz4".to_string(),
216
)),
217
218
#[cfg(feature = "zstd")]
219
Compression::Zstd => {
220
use std::io::Read;
221
if !matches!(ctx, DecompressionContext::Zstd(_)) {
222
*ctx = DecompressionContext::Zstd(zstd::zstd_safe::DCtx::create());
223
}
224
let DecompressionContext::Zstd(ctx) = ctx else {
225
unreachable!();
226
};
227
let mut decoder = zstd::Decoder::with_context(input_buf, ctx);
228
decoder.read_exact(output_buf).map_err(|e| e.into())
229
},
230
#[cfg(not(feature = "zstd"))]
231
Compression::Zstd => Err(ParquetError::FeatureNotActive(
232
crate::parquet::error::Feature::Zstd,
233
"decompress with zstd".to_string(),
234
)),
235
Compression::Uncompressed => Err(ParquetError::InvalidParameter(
236
"Compressing uncompressed".to_string(),
237
)),
238
_ => Err(ParquetError::FeatureNotSupported(format!(
239
"Compression {compression:?} is not supported",
240
))),
241
}
242
}
243
244
/// Try to decompress the buffer as if it was compressed with the Hadoop Lz4Codec.
245
/// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474).
246
/// Returns error if decompression failed.
247
#[cfg(any(feature = "lz4", feature = "lz4_flex"))]
248
fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> ParquetResult<()> {
249
// Parquet files written with the Hadoop Lz4Codec use their own framing.
250
// The input buffer can contain an arbitrary number of "frames", each
251
// with the following structure:
252
// - bytes 0..3: big-endian uint32_t representing the frame decompressed size
253
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
254
// - bytes 8...: frame compressed data
255
//
256
// The Hadoop Lz4Codec source code can be found here:
257
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
258
259
const SIZE_U32: usize = size_of::<u32>();
260
const PREFIX_LEN: usize = SIZE_U32 * 2;
261
let mut input_len = input_buf.len();
262
let mut input = input_buf;
263
let mut output_len = output_buf.len();
264
let mut output: &mut [u8] = output_buf;
265
while input_len >= PREFIX_LEN {
266
let mut bytes = [0; SIZE_U32];
267
bytes.copy_from_slice(&input[0..4]);
268
let expected_decompressed_size = u32::from_be_bytes(bytes);
269
let mut bytes = [0; SIZE_U32];
270
bytes.copy_from_slice(&input[4..8]);
271
let expected_compressed_size = u32::from_be_bytes(bytes);
272
input = &input[PREFIX_LEN..];
273
input_len -= PREFIX_LEN;
274
275
if input_len < expected_compressed_size as usize {
276
return Err(ParquetError::oos("Not enough bytes for Hadoop frame"));
277
}
278
279
if output_len < expected_decompressed_size as usize {
280
return Err(ParquetError::oos(
281
"Not enough bytes to hold advertised output",
282
));
283
}
284
let decompressed_size = lz4_decompress_to_buffer(
285
&input[..expected_compressed_size as usize],
286
Some(output_len as i32),
287
output,
288
)?;
289
if decompressed_size != expected_decompressed_size as usize {
290
return Err(ParquetError::oos("unexpected decompressed size"));
291
}
292
input_len -= expected_compressed_size as usize;
293
output_len -= expected_decompressed_size as usize;
294
if input_len > expected_compressed_size as usize {
295
input = &input[expected_compressed_size as usize..];
296
output = &mut output[expected_decompressed_size as usize..];
297
} else {
298
break;
299
}
300
}
301
if input_len == 0 {
302
Ok(())
303
} else {
304
Err(ParquetError::oos("Not all input are consumed"))
305
}
306
}
307
308
#[cfg(feature = "lz4")]
309
#[inline]
310
fn lz4_decompress_to_buffer(
311
src: &[u8],
312
uncompressed_size: Option<i32>,
313
buffer: &mut [u8],
314
) -> ParquetResult<usize> {
315
let size = lz4::block::decompress_to_buffer(src, uncompressed_size, buffer)?;
316
Ok(size)
317
}
318
319
#[cfg(test)]
320
mod tests {
321
use super::*;
322
323
fn test_roundtrip(c: CompressionOptions, data: &[u8]) {
324
let offset = 2048;
325
326
// Compress to a buffer that already has data is possible
327
let mut compressed = vec![2; offset];
328
compress(c, data, &mut compressed).expect("Error when compressing");
329
330
// data is compressed...
331
assert!(compressed.len() - offset < data.len());
332
333
let mut decompressed = vec![0; data.len()];
334
let mut context = DecompressionContext::Unset;
335
decompress(
336
c.into(),
337
&compressed[offset..],
338
&mut decompressed,
339
&mut context,
340
)
341
.expect("Error when decompressing");
342
assert_eq!(data, decompressed.as_slice());
343
}
344
345
fn test_codec(c: CompressionOptions) {
346
let sizes = vec![1000, 10000, 100000];
347
for size in sizes {
348
let data = (0..size).map(|x| (x % 255) as u8).collect::<Vec<_>>();
349
test_roundtrip(c, &data);
350
}
351
}
352
353
#[test]
354
fn test_codec_snappy() {
355
test_codec(CompressionOptions::Snappy);
356
}
357
358
#[test]
359
fn test_codec_gzip_default() {
360
test_codec(CompressionOptions::Gzip(None));
361
}
362
363
#[test]
364
fn test_codec_gzip_low_compression() {
365
test_codec(CompressionOptions::Gzip(Some(
366
GzipLevel::try_new(1).unwrap(),
367
)));
368
}
369
370
#[test]
371
fn test_codec_brotli_default() {
372
test_codec(CompressionOptions::Brotli(None));
373
}
374
375
#[test]
376
fn test_codec_brotli_low_compression() {
377
test_codec(CompressionOptions::Brotli(Some(
378
BrotliLevel::try_new(1).unwrap(),
379
)));
380
}
381
382
#[test]
383
fn test_codec_brotli_high_compression() {
384
test_codec(CompressionOptions::Brotli(Some(
385
BrotliLevel::try_new(11).unwrap(),
386
)));
387
}
388
389
#[test]
390
fn test_codec_lz4_raw() {
391
test_codec(CompressionOptions::Lz4Raw);
392
}
393
394
#[test]
395
fn test_codec_zstd_default() {
396
test_codec(CompressionOptions::Zstd(None));
397
}
398
399
#[cfg(feature = "zstd")]
400
#[test]
401
fn test_codec_zstd_low_compression() {
402
test_codec(CompressionOptions::Zstd(Some(
403
ZstdLevel::try_new(1).unwrap(),
404
)));
405
}
406
407
#[cfg(feature = "zstd")]
408
#[test]
409
fn test_codec_zstd_high_compression() {
410
test_codec(CompressionOptions::Zstd(Some(
411
ZstdLevel::try_new(21).unwrap(),
412
)));
413
}
414
}
415
416