Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/compression.rs
8424 views
1
use std::cmp;
2
use std::io::{BufRead, Cursor, Read, Write};
3
4
use polars_buffer::Buffer;
5
use polars_core::prelude::*;
6
use polars_error::{feature_gated, to_compute_err};
7
8
use crate::utils::file::{Writeable, WriteableTrait};
9
#[cfg(feature = "async")]
10
use crate::utils::stream_buf_reader::ReaderSource;
11
use crate::utils::sync_on_close::SyncOnCloseType;
12
13
/// Represents the compression algorithms that we have decoders for
14
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
15
pub enum SupportedCompression {
16
GZIP,
17
ZLIB,
18
ZSTD,
19
}
20
21
impl SupportedCompression {
22
/// If the given byte slice starts with the "magic" bytes for a supported compression family, return
23
/// that family, for unsupported/uncompressed slices, return None.
24
/// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.
25
pub fn check(bytes: &[u8]) -> Option<Self> {
26
if bytes.len() < 4 {
27
// not enough bytes to perform prefix checks
28
return None;
29
}
30
match bytes[..4] {
31
[0x1f, 0x8b, _, _] => Some(Self::GZIP),
32
// Different zlib compression levels without preset dictionary.
33
[0x78, 0x01, _, _] => Some(Self::ZLIB),
34
[0x78, 0x5e, _, _] => Some(Self::ZLIB),
35
[0x78, 0x9c, _, _] => Some(Self::ZLIB),
36
[0x78, 0xda, _, _] => Some(Self::ZLIB),
37
[0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),
38
_ => None,
39
}
40
}
41
}
42
43
/// Decompress `bytes` if compression is detected, otherwise simply return it.
44
/// An `out` vec must be given for ownership of the decompressed data.
45
#[allow(clippy::ptr_arg)]
46
#[deprecated(note = "may cause OOM, use CompressedReader instead")]
47
pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {
48
assert!(out.is_empty());
49
50
let Some(algo) = SupportedCompression::check(bytes) else {
51
return Ok(bytes);
52
};
53
54
feature_gated!("decompress", {
55
match algo {
56
SupportedCompression::GZIP => {
57
flate2::read::MultiGzDecoder::new(bytes)
58
.read_to_end(out)
59
.map_err(to_compute_err)?;
60
},
61
SupportedCompression::ZLIB => {
62
flate2::read::ZlibDecoder::new(bytes)
63
.read_to_end(out)
64
.map_err(to_compute_err)?;
65
},
66
SupportedCompression::ZSTD => {
67
zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;
68
},
69
}
70
71
Ok(out)
72
})
73
}
74
75
/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd
76
/// compression.
77
///
78
/// This allows handling decompression transparently in a streaming fashion.
79
pub enum CompressedReader {
80
Uncompressed {
81
slice: Buffer<u8>,
82
offset: usize,
83
},
84
#[cfg(feature = "decompress")]
85
Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),
86
#[cfg(feature = "decompress")]
87
Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),
88
#[cfg(feature = "decompress")]
89
Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),
90
}
91
92
impl CompressedReader {
93
pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {
94
let algo = SupportedCompression::check(&slice);
95
96
Ok(match algo {
97
None => CompressedReader::Uncompressed { slice, offset: 0 },
98
#[cfg(feature = "decompress")]
99
Some(SupportedCompression::GZIP) => {
100
CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))
101
},
102
#[cfg(feature = "decompress")]
103
Some(SupportedCompression::ZLIB) => {
104
CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))
105
},
106
#[cfg(feature = "decompress")]
107
Some(SupportedCompression::ZSTD) => {
108
CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)
109
},
110
#[cfg(not(feature = "decompress"))]
111
_ => panic!("activate 'decompress' feature"),
112
})
113
}
114
115
pub fn is_compressed(&self) -> bool {
116
!matches!(&self, CompressedReader::Uncompressed { .. })
117
}
118
119
pub const fn initial_read_size() -> usize {
120
// We don't want to read too much at the beginning to keep decompression to a minimum if for
121
// example only the schema is needed or a slice op is used. Keep in sync with
122
// `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
123
32 * 1024
124
}
125
126
pub const fn ideal_read_size() -> usize {
127
// Somewhat conservative guess for L2 size, which performs the best on most machines and is
128
// nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
129
// recouped by amortizing the block processing cost even further.
130
//
131
// It's possible that callers use or need a larger `read_size` if for example a single row
132
// doesn't fit in the 512KB.
133
512 * 1024
134
}
135
136
/// If possible returns the total number of bytes that will be produced by reading from the
137
/// start to finish.
138
pub fn total_len_estimate(&self) -> usize {
139
const ESTIMATED_DEFLATE_RATIO: usize = 3;
140
const ESTIMATED_ZSTD_RATIO: usize = 5;
141
142
match self {
143
CompressedReader::Uncompressed { slice, .. } => slice.len(),
144
#[cfg(feature = "decompress")]
145
CompressedReader::Gzip(reader) => {
146
reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
147
},
148
#[cfg(feature = "decompress")]
149
CompressedReader::Zlib(reader) => {
150
reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO
151
},
152
#[cfg(feature = "decompress")]
153
CompressedReader::Zstd(reader) => {
154
reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO
155
},
156
}
157
}
158
159
/// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
160
/// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
161
///
162
/// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
163
/// this function is called again.
164
///
165
/// If the underlying reader is uncompressed the operation is a cheap zero-copy
166
/// [`Buffer::sliced`] operation.
167
///
168
/// By handling slice concatenation at this level we can implement zero-copy reading *and* make
169
/// the interface easier to use.
170
///
171
/// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
172
/// function.
173
pub fn read_next_slice(
174
&mut self,
175
prev_leftover: &Buffer<u8>,
176
read_size: usize,
177
) -> std::io::Result<(Buffer<u8>, usize)> {
178
// Assuming that callers of this function correctly handle re-trying, by continuously growing
179
// prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
180
// sized rows.
181
let prev_len = prev_leftover.len();
182
183
let mut buf = Vec::new();
184
if self.is_compressed() {
185
let reserve_size = cmp::min(
186
prev_len.saturating_add(read_size),
187
self.total_len_estimate().saturating_mul(2),
188
);
189
buf.reserve_exact(reserve_size);
190
buf.extend_from_slice(prev_leftover);
191
}
192
193
let new_slice_from_read =
194
|bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {
195
buf.truncate(prev_len + bytes_read);
196
Ok((Buffer::from_vec(buf), bytes_read))
197
};
198
199
match self {
200
CompressedReader::Uncompressed { slice, offset, .. } => {
201
let bytes_read = cmp::min(read_size, slice.len() - *offset);
202
let new_slice = slice
203
.clone()
204
.sliced(*offset - prev_len..*offset + bytes_read);
205
*offset += bytes_read;
206
Ok((new_slice, bytes_read))
207
},
208
#[cfg(feature = "decompress")]
209
CompressedReader::Gzip(decoder) => {
210
new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
211
},
212
#[cfg(feature = "decompress")]
213
CompressedReader::Zlib(decoder) => {
214
new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
215
},
216
#[cfg(feature = "decompress")]
217
CompressedReader::Zstd(decoder) => {
218
new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)
219
},
220
}
221
}
222
}
223
224
/// This implementation is meant for compatibility. Use [`Self::read_next_slice`] for best
225
/// performance.
226
impl Read for CompressedReader {
227
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
228
match self {
229
CompressedReader::Uncompressed { slice, offset, .. } => {
230
let bytes_read = cmp::min(buf.len(), slice.len() - *offset);
231
buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);
232
*offset += bytes_read;
233
Ok(bytes_read)
234
},
235
#[cfg(feature = "decompress")]
236
CompressedReader::Gzip(decoder) => decoder.read(buf),
237
#[cfg(feature = "decompress")]
238
CompressedReader::Zlib(decoder) => decoder.read(buf),
239
#[cfg(feature = "decompress")]
240
CompressedReader::Zstd(decoder) => decoder.read(buf),
241
}
242
}
243
}
244
245
/// A byte source that abstracts over in-memory buffers and streaming
246
/// readers, with optional transparent decompression and buffering.
247
///
248
/// Implements `BufRead`, allowing uniform access regardless of whether
249
/// the underlying data is an in-memory slice, a raw stream, or a
250
/// compressed stream (gzip/zlib/zstd).
251
///
252
/// This is the generic successor to [`CompressedReader`], which only
253
/// supports in-memory (`Buffer<u8>`) sources.
254
#[cfg(feature = "async")]
255
pub enum ByteSourceReader<R: BufRead> {
256
UncompressedMemory {
257
slice: Buffer<u8>,
258
offset: usize,
259
},
260
UncompressedStream(R),
261
#[cfg(feature = "decompress")]
262
Gzip(flate2::bufread::MultiGzDecoder<R>),
263
#[cfg(feature = "decompress")]
264
Zlib(flate2::bufread::ZlibDecoder<R>),
265
#[cfg(feature = "decompress")]
266
Zstd(zstd::Decoder<'static, R>),
267
}
268
269
#[cfg(feature = "async")]
270
impl<R: BufRead> ByteSourceReader<R> {
271
pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {
272
Ok(match compression {
273
None => Self::UncompressedStream(reader),
274
#[cfg(feature = "decompress")]
275
Some(SupportedCompression::GZIP) => {
276
Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))
277
},
278
#[cfg(feature = "decompress")]
279
Some(SupportedCompression::ZLIB) => {
280
Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))
281
},
282
#[cfg(feature = "decompress")]
283
Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),
284
#[cfg(not(feature = "decompress"))]
285
_ => panic!("activate 'decompress' feature"),
286
})
287
}
288
289
pub fn is_compressed(&self) -> bool {
290
!matches!(
291
&self,
292
Self::UncompressedMemory { .. } | Self::UncompressedStream(_)
293
)
294
}
295
296
pub const fn initial_read_size() -> usize {
297
// We don't want to read too much at the beginning to keep decompression to a minimum if for
298
// example only the schema is needed or a slice op is used. Keep in sync with
299
// `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.
300
32 * 1024
301
}
302
303
pub const fn ideal_read_size() -> usize {
304
// Somewhat conservative guess for L2 size, which performs the best on most machines and is
305
// nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not
306
// recouped by amortizing the block processing cost even further.
307
//
308
// It's possible that callers use or need a larger `read_size` if for example a single row
309
// doesn't fit in the 512KB.
310
512 * 1024
311
}
312
313
/// Reads exactly `read_size` bytes if possible from the internal readers and creates a new
314
/// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.
315
///
316
/// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and
317
/// this function is called again.
318
///
319
/// If the underlying reader is uncompressed the operation is a cheap zero-copy
320
/// [`Buffer::sliced`] operation.
321
///
322
/// By handling slice concatenation at this level we can implement zero-copy reading *and* make
323
/// the interface easier to use.
324
///
325
/// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this
326
/// function.
327
pub fn read_next_slice(
328
&mut self,
329
prev_leftover: &Buffer<u8>,
330
read_size: usize,
331
uncompressed_size_hint: Option<usize>,
332
) -> std::io::Result<(Buffer<u8>, usize)> {
333
// Assuming that callers of this function correctly handle re-trying, by continuously growing
334
// prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily
335
// sized rows.
336
let prev_len = prev_leftover.len();
337
338
let reader: &mut dyn Read = match self {
339
// Zero-copy fast-path — no allocation required
340
Self::UncompressedMemory { slice, offset } => {
341
let bytes_read = cmp::min(read_size, slice.len() - *offset);
342
let new_slice = slice
343
.clone()
344
.sliced(*offset - prev_len..*offset + bytes_read);
345
*offset += bytes_read;
346
return Ok((new_slice, bytes_read));
347
},
348
Self::UncompressedStream(reader) => reader,
349
#[cfg(feature = "decompress")]
350
Self::Gzip(reader) => reader,
351
#[cfg(feature = "decompress")]
352
Self::Zlib(reader) => reader,
353
#[cfg(feature = "decompress")]
354
Self::Zstd(reader) => reader,
355
};
356
357
let mut buf = Vec::new();
358
359
// Cap the reserve_size, for the scenario where read_size == usize::MAX
360
let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);
361
let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);
362
buf.reserve_exact(reserve_size);
363
buf.extend_from_slice(prev_leftover);
364
365
let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;
366
buf.truncate(prev_len + bytes_read);
367
Ok((Buffer::from_vec(buf), bytes_read))
368
}
369
}
370
371
#[cfg(feature = "async")]
372
impl ByteSourceReader<ReaderSource> {
373
pub fn from_memory(
374
slice: Buffer<u8>,
375
compression: Option<SupportedCompression>,
376
) -> PolarsResult<Self> {
377
match compression {
378
None => Ok(Self::UncompressedMemory { slice, offset: 0 }),
379
_ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),
380
}
381
}
382
}
383
384
/// Constructor for `WriteableTrait` compressed encoders.
385
pub enum CompressedWriter {
386
#[cfg(feature = "decompress")]
387
Gzip(Option<flate2::write::GzEncoder<Writeable>>),
388
#[cfg(feature = "decompress")]
389
Zstd(Option<zstd::Encoder<'static, Writeable>>),
390
}
391
392
impl CompressedWriter {
393
pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {
394
feature_gated!("decompress", {
395
Self::Gzip(Some(flate2::write::GzEncoder::new(
396
writer,
397
level.map(flate2::Compression::new).unwrap_or_default(),
398
)))
399
})
400
}
401
402
pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {
403
feature_gated!("decompress", {
404
zstd::Encoder::new(writer, level.unwrap_or(3) as i32)
405
.map(Some)
406
.map(Self::Zstd)
407
})
408
}
409
}
410
411
impl Write for CompressedWriter {
412
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
413
feature_gated!("decompress", {
414
match self {
415
Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),
416
Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),
417
}
418
})
419
}
420
421
fn flush(&mut self) -> std::io::Result<()> {
422
feature_gated!("decompress", {
423
match self {
424
Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),
425
Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),
426
}
427
})
428
}
429
}
430
431
impl WriteableTrait for CompressedWriter {
432
fn close(&mut self) -> std::io::Result<()> {
433
feature_gated!("decompress", {
434
let writer = match self {
435
Self::Gzip(encoder) => encoder.take().unwrap().finish()?,
436
Self::Zstd(encoder) => encoder.take().unwrap().finish()?,
437
};
438
439
writer.close(SyncOnCloseType::All)
440
})
441
}
442
443
fn sync_all(&self) -> std::io::Result<()> {
444
feature_gated!("decompress", {
445
match self {
446
Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
447
Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),
448
}
449
})
450
}
451
452
fn sync_data(&self) -> std::io::Result<()> {
453
feature_gated!("decompress", {
454
match self {
455
Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
456
Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),
457
}
458
})
459
}
460
}
461
462