Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/page/mod.rs
8512 views
1
use super::CowBuffer;
2
use crate::parquet::compression::Compression;
3
use crate::parquet::encoding::{Encoding, get_length};
4
use crate::parquet::error::{ParquetError, ParquetResult};
5
use crate::parquet::metadata::Descriptor;
6
pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType};
7
use crate::parquet::statistics::Statistics;
8
pub use crate::parquet::thrift_format::{
9
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding,
10
PageHeader as ParquetPageHeader,
11
};
12
13
pub enum PageResult {
14
Single(Page),
15
Two { dict: DictPage, data: DataPage },
16
}
17
18
/// A [`CompressedDataPage`] is compressed, encoded representation of a Parquet data page.
19
/// It holds actual data and thus cloning it is expensive.
20
#[derive(Debug)]
21
pub struct CompressedDataPage {
22
pub(crate) header: DataPageHeader,
23
pub(crate) buffer: CowBuffer,
24
pub(crate) compression: Compression,
25
uncompressed_page_size: usize,
26
pub(crate) descriptor: Descriptor,
27
pub num_rows: Option<usize>,
28
}
29
30
impl CompressedDataPage {
31
/// Returns a new [`CompressedDataPage`].
32
pub fn new(
33
header: DataPageHeader,
34
buffer: CowBuffer,
35
compression: Compression,
36
uncompressed_page_size: usize,
37
descriptor: Descriptor,
38
num_rows: usize,
39
) -> Self {
40
Self {
41
header,
42
buffer,
43
compression,
44
uncompressed_page_size,
45
descriptor,
46
num_rows: Some(num_rows),
47
}
48
}
49
50
/// Returns a new [`CompressedDataPage`].
51
pub(crate) fn new_read(
52
header: DataPageHeader,
53
buffer: CowBuffer,
54
compression: Compression,
55
uncompressed_page_size: usize,
56
descriptor: Descriptor,
57
) -> Self {
58
Self {
59
header,
60
buffer,
61
compression,
62
uncompressed_page_size,
63
descriptor,
64
num_rows: None,
65
}
66
}
67
68
pub fn header(&self) -> &DataPageHeader {
69
&self.header
70
}
71
72
pub fn uncompressed_size(&self) -> usize {
73
self.uncompressed_page_size
74
}
75
76
pub fn compressed_size(&self) -> usize {
77
self.buffer.len()
78
}
79
80
/// The compression of the data in this page.
81
/// Note that what is compressed in a page depends on its version:
82
/// in V1, the whole data (`[repetition levels][definition levels][values]`) is compressed; in V2 only the values are compressed.
83
pub fn compression(&self) -> Compression {
84
self.compression
85
}
86
87
pub fn num_values(&self) -> usize {
88
self.header.num_values()
89
}
90
91
pub fn num_rows(&self) -> Option<usize> {
92
self.num_rows
93
}
94
95
pub fn null_count(&self) -> Option<usize> {
96
self.header().null_count()
97
}
98
99
/// Decodes the raw statistics into a statistics
100
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
101
match &self.header {
102
DataPageHeader::V1(d) => d
103
.statistics
104
.as_ref()
105
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
106
DataPageHeader::V2(d) => d
107
.statistics
108
.as_ref()
109
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
110
}
111
}
112
113
pub fn slice_mut(&mut self) -> &mut CowBuffer {
114
&mut self.buffer
115
}
116
}
117
118
#[derive(Debug, Clone)]
119
pub enum DataPageHeader {
120
V1(DataPageHeaderV1),
121
V2(DataPageHeaderV2),
122
}
123
124
impl DataPageHeader {
125
pub fn num_values(&self) -> usize {
126
match &self {
127
DataPageHeader::V1(d) => d.num_values as usize,
128
DataPageHeader::V2(d) => d.num_values as usize,
129
}
130
}
131
132
pub fn null_count(&self) -> Option<usize> {
133
match &self {
134
DataPageHeader::V1(_) => None,
135
DataPageHeader::V2(d) => Some(d.num_nulls as usize),
136
}
137
}
138
139
pub fn encoding(&self) -> FormatEncoding {
140
match self {
141
DataPageHeader::V1(d) => d.encoding,
142
DataPageHeader::V2(d) => d.encoding,
143
}
144
}
145
146
pub fn is_dictionary_encoded(&self) -> bool {
147
matches!(self.encoding(), FormatEncoding::RLE_DICTIONARY)
148
}
149
}
150
151
/// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data
152
/// and thus cloning it is expensive.
153
#[derive(Debug, Clone)]
154
pub struct DataPage {
155
pub(super) header: DataPageHeader,
156
pub(super) buffer: CowBuffer,
157
pub descriptor: Descriptor,
158
pub num_rows: Option<usize>,
159
}
160
161
impl DataPage {
162
pub fn new(
163
header: DataPageHeader,
164
buffer: CowBuffer,
165
descriptor: Descriptor,
166
num_rows: usize,
167
) -> Self {
168
Self {
169
header,
170
buffer,
171
descriptor,
172
num_rows: Some(num_rows),
173
}
174
}
175
176
pub(crate) fn new_read(
177
header: DataPageHeader,
178
buffer: CowBuffer,
179
descriptor: Descriptor,
180
) -> Self {
181
Self {
182
header,
183
buffer,
184
descriptor,
185
num_rows: None,
186
}
187
}
188
189
pub fn header(&self) -> &DataPageHeader {
190
&self.header
191
}
192
193
pub fn buffer(&self) -> &[u8] {
194
&self.buffer
195
}
196
197
/// Returns a mutable reference to the internal buffer.
198
/// Useful to recover the buffer after the page has been decoded.
199
pub fn buffer_mut(&mut self) -> &mut Vec<u8> {
200
self.buffer.to_mut()
201
}
202
203
pub fn num_values(&self) -> usize {
204
self.header.num_values()
205
}
206
207
pub fn null_count(&self) -> Option<usize> {
208
self.header.null_count()
209
}
210
211
pub fn num_rows(&self) -> Option<usize> {
212
self.num_rows
213
}
214
215
pub fn encoding(&self) -> Encoding {
216
match &self.header {
217
DataPageHeader::V1(d) => d.encoding(),
218
DataPageHeader::V2(d) => d.encoding(),
219
}
220
}
221
222
pub fn definition_level_encoding(&self) -> Encoding {
223
match &self.header {
224
DataPageHeader::V1(d) => d.definition_level_encoding(),
225
DataPageHeader::V2(_) => Encoding::Rle,
226
}
227
}
228
229
pub fn repetition_level_encoding(&self) -> Encoding {
230
match &self.header {
231
DataPageHeader::V1(d) => d.repetition_level_encoding(),
232
DataPageHeader::V2(_) => Encoding::Rle,
233
}
234
}
235
236
/// Decodes the raw statistics into a statistics
237
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
238
match &self.header {
239
DataPageHeader::V1(d) => d
240
.statistics
241
.as_ref()
242
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
243
DataPageHeader::V2(d) => d
244
.statistics
245
.as_ref()
246
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
247
}
248
}
249
}
250
251
/// A [`Page`] is an uncompressed, encoded representation of a Parquet page. It may hold actual data
252
/// and thus cloning it may be expensive.
253
#[derive(Debug)]
254
#[allow(clippy::large_enum_variant)]
255
pub enum Page {
256
/// A [`DataPage`]
257
Data(DataPage),
258
/// A [`DictPage`]
259
Dict(DictPage),
260
}
261
262
impl Page {
263
pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
264
match self {
265
Self::Data(page) => page.buffer.to_mut(),
266
Self::Dict(page) => page.buffer.to_mut(),
267
}
268
}
269
}
270
271
/// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data
272
/// and thus cloning it is expensive.
273
#[derive(Debug)]
274
#[allow(clippy::large_enum_variant)]
275
pub enum CompressedPage {
276
Data(CompressedDataPage),
277
Dict(CompressedDictPage),
278
}
279
280
impl CompressedPage {
281
pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
282
match self {
283
CompressedPage::Data(page) => page.buffer.to_mut(),
284
CompressedPage::Dict(page) => page.buffer.to_mut(),
285
}
286
}
287
288
pub(crate) fn compression(&self) -> Compression {
289
match self {
290
CompressedPage::Data(page) => page.compression(),
291
CompressedPage::Dict(page) => page.compression(),
292
}
293
}
294
295
pub(crate) fn num_values(&self) -> usize {
296
match self {
297
CompressedPage::Data(page) => page.num_values(),
298
CompressedPage::Dict(_) => 0,
299
}
300
}
301
302
pub(crate) fn num_rows(&self) -> Option<usize> {
303
match self {
304
CompressedPage::Data(page) => page.num_rows(),
305
CompressedPage::Dict(_) => Some(0),
306
}
307
}
308
}
309
310
/// An uncompressed, encoded dictionary page.
311
#[derive(Debug, Clone)]
312
pub struct DictPage {
313
pub buffer: CowBuffer,
314
pub num_values: usize,
315
pub is_sorted: bool,
316
}
317
318
impl DictPage {
319
pub fn new(buffer: CowBuffer, num_values: usize, is_sorted: bool) -> Self {
320
Self {
321
buffer,
322
num_values,
323
is_sorted,
324
}
325
}
326
}
327
328
/// A compressed, encoded dictionary page.
329
#[derive(Debug)]
330
pub struct CompressedDictPage {
331
pub(crate) buffer: CowBuffer,
332
compression: Compression,
333
pub(crate) num_values: usize,
334
pub(crate) uncompressed_page_size: usize,
335
pub is_sorted: bool,
336
}
337
338
impl CompressedDictPage {
339
pub fn new(
340
buffer: CowBuffer,
341
compression: Compression,
342
uncompressed_page_size: usize,
343
num_values: usize,
344
is_sorted: bool,
345
) -> Self {
346
Self {
347
buffer,
348
compression,
349
uncompressed_page_size,
350
num_values,
351
is_sorted,
352
}
353
}
354
355
/// The compression of the data in this page.
356
pub fn compression(&self) -> Compression {
357
self.compression
358
}
359
}
360
361
pub struct EncodedSplitBuffer<'a> {
362
/// Encoded Repetition Levels
363
pub rep: &'a [u8],
364
/// Encoded Definition Levels
365
pub def: &'a [u8],
366
/// Encoded Values
367
pub values: &'a [u8],
368
}
369
370
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages.
371
#[inline]
372
pub fn split_buffer_v1(
373
buffer: &[u8],
374
has_rep: bool,
375
has_def: bool,
376
) -> ParquetResult<EncodedSplitBuffer<'_>> {
377
let (rep, buffer) = if has_rep {
378
let level_buffer_length = get_length(buffer).ok_or_else(|| {
379
ParquetError::oos(
380
"The number of bytes declared in v1 rep levels is higher than the page size",
381
)
382
})?;
383
384
if buffer.len() < level_buffer_length + 4 {
385
return Err(ParquetError::oos(
386
"The number of bytes declared in v1 rep levels is higher than the page size",
387
));
388
}
389
390
buffer[4..].split_at(level_buffer_length)
391
} else {
392
(&[] as &[u8], buffer)
393
};
394
395
let (def, buffer) = if has_def {
396
let level_buffer_length = get_length(buffer).ok_or_else(|| {
397
ParquetError::oos(
398
"The number of bytes declared in v1 def levels is higher than the page size",
399
)
400
})?;
401
402
if buffer.len() < level_buffer_length + 4 {
403
return Err(ParquetError::oos(
404
"The number of bytes declared in v1 def levels is higher than the page size",
405
));
406
}
407
408
buffer[4..].split_at(level_buffer_length)
409
} else {
410
(&[] as &[u8], buffer)
411
};
412
413
Ok(EncodedSplitBuffer {
414
rep,
415
def,
416
values: buffer,
417
})
418
}
419
420
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages.
421
pub fn split_buffer_v2(
422
buffer: &[u8],
423
rep_level_buffer_length: usize,
424
def_level_buffer_length: usize,
425
) -> ParquetResult<EncodedSplitBuffer<'_>> {
426
let (rep, buffer) = buffer.split_at(rep_level_buffer_length);
427
let (def, values) = buffer.split_at(def_level_buffer_length);
428
429
Ok(EncodedSplitBuffer { rep, def, values })
430
}
431
432
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values).
433
pub fn split_buffer(page: &DataPage) -> ParquetResult<EncodedSplitBuffer<'_>> {
434
match page.header() {
435
DataPageHeader::V1(_) => split_buffer_v1(
436
page.buffer(),
437
page.descriptor.max_rep_level > 0,
438
page.descriptor.max_def_level > 0,
439
),
440
DataPageHeader::V2(header) => {
441
let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?;
442
let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?;
443
split_buffer_v2(
444
page.buffer(),
445
rep_level_buffer_length,
446
def_level_buffer_length,
447
)
448
},
449
}
450
}
451
452