Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/read/compression.rs
6940 views
1
use polars_parquet_format::DataPageHeaderV2;
2
3
use super::PageReader;
4
use crate::parquet::CowBuffer;
5
use crate::parquet::compression::{self, Compression, DecompressionContext};
6
use crate::parquet::error::{ParquetError, ParquetResult};
7
use crate::parquet::page::{
8
CompressedDataPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page,
9
};
10
11
fn decompress_v1(
12
compressed: &[u8],
13
compression: Compression,
14
buffer: &mut [u8],
15
context: &mut DecompressionContext,
16
) -> ParquetResult<()> {
17
compression::decompress(compression, compressed, buffer, context)
18
}
19
20
fn decompress_v2(
21
compressed: &[u8],
22
page_header: &DataPageHeaderV2,
23
compression: Compression,
24
buffer: &mut [u8],
25
context: &mut DecompressionContext,
26
) -> ParquetResult<()> {
27
// When processing data page v2, depending on enabled compression for the
28
// page, we should account for uncompressed data ('offset') of
29
// repetition and definition levels.
30
//
31
// We always use 0 offset for other pages other than v2, `true` flag means
32
// that compression will be applied if decompressor is defined
33
let offset = (page_header.definition_levels_byte_length
34
+ page_header.repetition_levels_byte_length) as usize;
35
// When is_compressed flag is missing the page is considered compressed
36
let can_decompress = page_header.is_compressed.unwrap_or(true);
37
38
if can_decompress {
39
if offset > buffer.len() || offset > compressed.len() {
40
return Err(ParquetError::oos(
41
"V2 Page Header reported incorrect offset to compressed data",
42
));
43
}
44
45
(buffer[..offset]).copy_from_slice(&compressed[..offset]);
46
47
// https://github.com/pola-rs/polars/issues/22170
48
if compressed.len() > offset {
49
compression::decompress(
50
compression,
51
&compressed[offset..],
52
&mut buffer[offset..],
53
context,
54
)?;
55
}
56
} else {
57
if buffer.len() != compressed.len() {
58
return Err(ParquetError::oos(
59
"V2 Page Header reported incorrect decompressed size",
60
));
61
}
62
buffer.copy_from_slice(compressed);
63
}
64
Ok(())
65
}
66
67
/// Decompresses the page, using `buffer` for decompression.
68
/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved.
69
/// Else, decompression took place.
70
pub fn decompress(
71
compressed_page: CompressedPage,
72
buffer: &mut Vec<u8>,
73
context: &mut DecompressionContext,
74
) -> ParquetResult<Page> {
75
Ok(match (compressed_page.compression(), compressed_page) {
76
(Compression::Uncompressed, CompressedPage::Data(page)) => Page::Data(DataPage::new_read(
77
page.header,
78
page.buffer,
79
page.descriptor,
80
)),
81
(_, CompressedPage::Data(page)) => {
82
// prepare the compression buffer
83
let read_size = page.uncompressed_size();
84
if read_size > buffer.capacity() {
85
// Clear before resizing to avoid memcpy'ing junk we don't care about anymore rather
86
// than just memset'ing zeroes.
87
buffer.clear();
88
}
89
buffer.resize(read_size, 0);
90
91
match page.header() {
92
DataPageHeader::V1(_) => {
93
decompress_v1(&page.buffer, page.compression, buffer, context)?
94
},
95
DataPageHeader::V2(header) => {
96
decompress_v2(&page.buffer, header, page.compression, buffer, context)?
97
},
98
}
99
let buffer = CowBuffer::Owned(std::mem::take(buffer));
100
101
Page::Data(DataPage::new_read(page.header, buffer, page.descriptor))
102
},
103
(Compression::Uncompressed, CompressedPage::Dict(page)) => Page::Dict(DictPage {
104
buffer: page.buffer,
105
num_values: page.num_values,
106
is_sorted: page.is_sorted,
107
}),
108
(_, CompressedPage::Dict(page)) => {
109
// prepare the compression buffer
110
let read_size = page.uncompressed_page_size;
111
if read_size > buffer.capacity() {
112
// Clear before resizing to avoid memcpy'ing junk we don't care about anymore rather
113
// than just memset'ing zeroes.
114
buffer.clear();
115
}
116
buffer.resize(read_size, 0);
117
118
decompress_v1(&page.buffer, page.compression(), buffer, context)?;
119
let buffer = CowBuffer::Owned(std::mem::take(buffer));
120
121
Page::Dict(DictPage {
122
buffer,
123
num_values: page.num_values,
124
is_sorted: page.is_sorted,
125
})
126
},
127
})
128
}
129
130
type _Decompressor<I> = streaming_decompression::Decompressor<
131
CompressedPage,
132
Page,
133
fn(CompressedPage, &mut Vec<u8>) -> ParquetResult<Page>,
134
ParquetError,
135
I,
136
>;
137
138
impl streaming_decompression::Compressed for CompressedPage {
139
#[inline]
140
fn is_compressed(&self) -> bool {
141
self.compression() != Compression::Uncompressed
142
}
143
}
144
145
impl streaming_decompression::Decompressed for Page {
146
#[inline]
147
fn buffer_mut(&mut self) -> &mut Vec<u8> {
148
self.buffer_mut()
149
}
150
}
151
152
/// A [`FallibleStreamingIterator`] that decompresses [`CompressedPage`] into [`DataPage`].
153
/// # Implementation
154
/// This decompressor uses an internal [`Vec<u8>`] to perform decompressions which
155
/// is reused across pages, so that a single allocation is required.
156
/// If the pages are not compressed, the internal buffer is not used.
157
pub struct BasicDecompressor {
158
reader: PageReader,
159
buffer: Vec<u8>,
160
context: DecompressionContext,
161
}
162
163
impl BasicDecompressor {
164
/// Create a new [`BasicDecompressor`]
165
pub fn new(reader: PageReader, buffer: Vec<u8>) -> Self {
166
Self {
167
reader,
168
buffer,
169
context: DecompressionContext::Unset,
170
}
171
}
172
173
/// The total number of values is given from the `ColumnChunk` metadata.
174
///
175
/// - Nested column: equal to the number of non-null values at the lowest nesting level.
176
/// - Unnested column: equal to the number of non-null rows.
177
pub fn total_num_values(&self) -> usize {
178
self.reader.total_num_values()
179
}
180
181
/// Returns its internal buffer, consuming itself.
182
pub fn into_inner(self) -> Vec<u8> {
183
self.buffer
184
}
185
186
pub fn read_dict_page(&mut self) -> ParquetResult<Option<DictPage>> {
187
match self.reader.read_dict()? {
188
None => Ok(None),
189
Some(p) => {
190
let num_values = p.num_values;
191
let page = decompress(
192
CompressedPage::Dict(p),
193
&mut Vec::with_capacity(num_values),
194
&mut self.context,
195
)?;
196
197
match page {
198
Page::Dict(d) => Ok(Some(d)),
199
Page::Data(_) => unreachable!(),
200
}
201
},
202
}
203
}
204
205
pub fn reuse_page_buffer(&mut self, page: DataPage) {
206
let buffer = match page.buffer {
207
CowBuffer::Borrowed(_) => return,
208
CowBuffer::Owned(vec) => vec,
209
};
210
211
if self.buffer.capacity() > buffer.capacity() {
212
return;
213
};
214
215
self.buffer = buffer;
216
}
217
}
218
219
pub struct DataPageItem {
220
page: CompressedDataPage,
221
}
222
223
impl DataPageItem {
224
pub fn num_values(&self) -> usize {
225
self.page.num_values()
226
}
227
228
pub fn page(&self) -> &CompressedDataPage {
229
&self.page
230
}
231
232
pub fn decompress(self, decompressor: &mut BasicDecompressor) -> ParquetResult<DataPage> {
233
let p = decompress(
234
CompressedPage::Data(self.page),
235
&mut decompressor.buffer,
236
&mut decompressor.context,
237
)?;
238
let Page::Data(p) = p else {
239
panic!("Decompressing a data page should result in a data page");
240
};
241
242
Ok(p)
243
}
244
}
245
246
impl Iterator for BasicDecompressor {
247
type Item = ParquetResult<DataPageItem>;
248
249
fn next(&mut self) -> Option<Self::Item> {
250
let page = match self.reader.next() {
251
None => return None,
252
Some(Err(e)) => return Some(Err(e)),
253
Some(Ok(p)) => p,
254
};
255
256
let CompressedPage::Data(page) = page else {
257
return Some(Err(ParquetError::oos(
258
"Found dictionary page beyond the first page of a column chunk",
259
)));
260
};
261
262
Some(Ok(DataPageItem { page }))
263
}
264
265
fn size_hint(&self) -> (usize, Option<usize>) {
266
self.reader.size_hint()
267
}
268
}
269
270
#[cfg(test)]
271
mod tests {
272
use polars_parquet_format::Encoding;
273
274
use super::*;
275
276
#[test]
277
fn test_decompress_v2_empty_datapage() {
278
let compressions = [
279
Compression::Snappy,
280
Compression::Gzip,
281
Compression::Lzo,
282
Compression::Brotli,
283
Compression::Lz4,
284
Compression::Zstd,
285
Compression::Lz4Raw,
286
];
287
288
// this datapage has an empty compressed section after the first two bytes (uncompressed definition levels)
289
let compressed: &mut Vec<u8> = &mut vec![0x03, 0x00];
290
let page_header = DataPageHeaderV2::new(1, 1, 1, Encoding::PLAIN, 2, 0, true, None);
291
let buffer: &mut Vec<u8> = &mut vec![0, 2];
292
293
compressions.iter().for_each(|compression| {
294
test_decompress_v2_datapage(compressed, &page_header, *compression, buffer, compressed)
295
});
296
}
297
298
fn test_decompress_v2_datapage(
299
compressed: &[u8],
300
page_header: &DataPageHeaderV2,
301
compression: Compression,
302
buffer: &mut [u8],
303
expected: &[u8],
304
) {
305
decompress_v2(
306
compressed,
307
page_header,
308
compression,
309
buffer,
310
&mut DecompressionContext::Unset,
311
)
312
.unwrap();
313
assert_eq!(buffer, expected);
314
}
315
}
316
317