Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/page/mod.rs
6940 views
1
use super::CowBuffer;
2
use crate::parquet::compression::Compression;
3
use crate::parquet::encoding::{Encoding, get_length};
4
use crate::parquet::error::{ParquetError, ParquetResult};
5
use crate::parquet::metadata::Descriptor;
6
pub use crate::parquet::parquet_bridge::{DataPageHeaderExt, PageType};
7
use crate::parquet::statistics::Statistics;
8
pub use crate::parquet::thrift_format::{
9
DataPageHeader as DataPageHeaderV1, DataPageHeaderV2, Encoding as FormatEncoding,
10
PageHeader as ParquetPageHeader,
11
};
12
13
pub enum PageResult {
14
Single(Page),
15
Two { dict: DictPage, data: DataPage },
16
}
17
18
/// A [`CompressedDataPage`] is compressed, encoded representation of a Parquet data page.
19
/// It holds actual data and thus cloning it is expensive.
20
#[derive(Debug)]
21
pub struct CompressedDataPage {
22
pub(crate) header: DataPageHeader,
23
pub(crate) buffer: CowBuffer,
24
pub(crate) compression: Compression,
25
uncompressed_page_size: usize,
26
pub(crate) descriptor: Descriptor,
27
pub num_rows: Option<usize>,
28
}
29
30
impl CompressedDataPage {
31
/// Returns a new [`CompressedDataPage`].
32
pub fn new(
33
header: DataPageHeader,
34
buffer: CowBuffer,
35
compression: Compression,
36
uncompressed_page_size: usize,
37
descriptor: Descriptor,
38
num_rows: usize,
39
) -> Self {
40
Self {
41
header,
42
buffer,
43
compression,
44
uncompressed_page_size,
45
descriptor,
46
num_rows: Some(num_rows),
47
}
48
}
49
50
/// Returns a new [`CompressedDataPage`].
51
pub(crate) fn new_read(
52
header: DataPageHeader,
53
buffer: CowBuffer,
54
compression: Compression,
55
uncompressed_page_size: usize,
56
descriptor: Descriptor,
57
) -> Self {
58
Self {
59
header,
60
buffer,
61
compression,
62
uncompressed_page_size,
63
descriptor,
64
num_rows: None,
65
}
66
}
67
68
pub fn header(&self) -> &DataPageHeader {
69
&self.header
70
}
71
72
pub fn uncompressed_size(&self) -> usize {
73
self.uncompressed_page_size
74
}
75
76
pub fn compressed_size(&self) -> usize {
77
self.buffer.len()
78
}
79
80
/// The compression of the data in this page.
81
/// Note that what is compressed in a page depends on its version:
82
/// in V1, the whole data (`[repetition levels][definition levels][values]`) is compressed; in V2 only the values are compressed.
83
pub fn compression(&self) -> Compression {
84
self.compression
85
}
86
87
pub fn num_values(&self) -> usize {
88
self.header.num_values()
89
}
90
91
pub fn num_rows(&self) -> Option<usize> {
92
self.num_rows
93
}
94
95
/// Decodes the raw statistics into a statistics
96
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
97
match &self.header {
98
DataPageHeader::V1(d) => d
99
.statistics
100
.as_ref()
101
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
102
DataPageHeader::V2(d) => d
103
.statistics
104
.as_ref()
105
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
106
}
107
}
108
109
pub fn slice_mut(&mut self) -> &mut CowBuffer {
110
&mut self.buffer
111
}
112
}
113
114
#[derive(Debug, Clone)]
115
pub enum DataPageHeader {
116
V1(DataPageHeaderV1),
117
V2(DataPageHeaderV2),
118
}
119
120
impl DataPageHeader {
121
pub fn num_values(&self) -> usize {
122
match &self {
123
DataPageHeader::V1(d) => d.num_values as usize,
124
DataPageHeader::V2(d) => d.num_values as usize,
125
}
126
}
127
128
pub fn null_count(&self) -> Option<usize> {
129
match &self {
130
DataPageHeader::V1(_) => None,
131
DataPageHeader::V2(d) => Some(d.num_nulls as usize),
132
}
133
}
134
135
pub fn encoding(&self) -> FormatEncoding {
136
match self {
137
DataPageHeader::V1(d) => d.encoding,
138
DataPageHeader::V2(d) => d.encoding,
139
}
140
}
141
142
pub fn is_dictionary_encoded(&self) -> bool {
143
matches!(self.encoding(), FormatEncoding::RLE_DICTIONARY)
144
}
145
}
146
147
/// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data
148
/// and thus cloning it is expensive.
149
#[derive(Debug, Clone)]
150
pub struct DataPage {
151
pub(super) header: DataPageHeader,
152
pub(super) buffer: CowBuffer,
153
pub descriptor: Descriptor,
154
pub num_rows: Option<usize>,
155
}
156
157
impl DataPage {
158
pub fn new(
159
header: DataPageHeader,
160
buffer: CowBuffer,
161
descriptor: Descriptor,
162
num_rows: usize,
163
) -> Self {
164
Self {
165
header,
166
buffer,
167
descriptor,
168
num_rows: Some(num_rows),
169
}
170
}
171
172
pub(crate) fn new_read(
173
header: DataPageHeader,
174
buffer: CowBuffer,
175
descriptor: Descriptor,
176
) -> Self {
177
Self {
178
header,
179
buffer,
180
descriptor,
181
num_rows: None,
182
}
183
}
184
185
pub fn header(&self) -> &DataPageHeader {
186
&self.header
187
}
188
189
pub fn buffer(&self) -> &[u8] {
190
&self.buffer
191
}
192
193
/// Returns a mutable reference to the internal buffer.
194
/// Useful to recover the buffer after the page has been decoded.
195
pub fn buffer_mut(&mut self) -> &mut Vec<u8> {
196
self.buffer.to_mut()
197
}
198
199
pub fn num_values(&self) -> usize {
200
self.header.num_values()
201
}
202
203
pub fn null_count(&self) -> Option<usize> {
204
self.header.null_count()
205
}
206
207
pub fn num_rows(&self) -> Option<usize> {
208
self.num_rows
209
}
210
211
pub fn encoding(&self) -> Encoding {
212
match &self.header {
213
DataPageHeader::V1(d) => d.encoding(),
214
DataPageHeader::V2(d) => d.encoding(),
215
}
216
}
217
218
pub fn definition_level_encoding(&self) -> Encoding {
219
match &self.header {
220
DataPageHeader::V1(d) => d.definition_level_encoding(),
221
DataPageHeader::V2(_) => Encoding::Rle,
222
}
223
}
224
225
pub fn repetition_level_encoding(&self) -> Encoding {
226
match &self.header {
227
DataPageHeader::V1(d) => d.repetition_level_encoding(),
228
DataPageHeader::V2(_) => Encoding::Rle,
229
}
230
}
231
232
/// Decodes the raw statistics into a statistics
233
pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {
234
match &self.header {
235
DataPageHeader::V1(d) => d
236
.statistics
237
.as_ref()
238
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
239
DataPageHeader::V2(d) => d
240
.statistics
241
.as_ref()
242
.map(|x| Statistics::deserialize(x, self.descriptor.primitive_type.clone())),
243
}
244
}
245
}
246
247
/// A [`Page`] is an uncompressed, encoded representation of a Parquet page. It may hold actual data
248
/// and thus cloning it may be expensive.
249
#[derive(Debug)]
250
#[allow(clippy::large_enum_variant)]
251
pub enum Page {
252
/// A [`DataPage`]
253
Data(DataPage),
254
/// A [`DictPage`]
255
Dict(DictPage),
256
}
257
258
impl Page {
259
pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
260
match self {
261
Self::Data(page) => page.buffer.to_mut(),
262
Self::Dict(page) => page.buffer.to_mut(),
263
}
264
}
265
266
pub(crate) fn unwrap_data(self) -> DataPage {
267
match self {
268
Self::Data(page) => page,
269
_ => panic!(),
270
}
271
}
272
}
273
274
/// A [`CompressedPage`] is a compressed, encoded representation of a Parquet page. It holds actual data
275
/// and thus cloning it is expensive.
276
#[derive(Debug)]
277
#[allow(clippy::large_enum_variant)]
278
pub enum CompressedPage {
279
Data(CompressedDataPage),
280
Dict(CompressedDictPage),
281
}
282
283
impl CompressedPage {
284
pub(crate) fn buffer_mut(&mut self) -> &mut Vec<u8> {
285
match self {
286
CompressedPage::Data(page) => page.buffer.to_mut(),
287
CompressedPage::Dict(page) => page.buffer.to_mut(),
288
}
289
}
290
291
pub(crate) fn compression(&self) -> Compression {
292
match self {
293
CompressedPage::Data(page) => page.compression(),
294
CompressedPage::Dict(page) => page.compression(),
295
}
296
}
297
298
pub(crate) fn num_values(&self) -> usize {
299
match self {
300
CompressedPage::Data(page) => page.num_values(),
301
CompressedPage::Dict(_) => 0,
302
}
303
}
304
305
pub(crate) fn num_rows(&self) -> Option<usize> {
306
match self {
307
CompressedPage::Data(page) => page.num_rows(),
308
CompressedPage::Dict(_) => Some(0),
309
}
310
}
311
}
312
313
/// An uncompressed, encoded dictionary page.
314
#[derive(Debug, Clone)]
315
pub struct DictPage {
316
pub buffer: CowBuffer,
317
pub num_values: usize,
318
pub is_sorted: bool,
319
}
320
321
impl DictPage {
322
pub fn new(buffer: CowBuffer, num_values: usize, is_sorted: bool) -> Self {
323
Self {
324
buffer,
325
num_values,
326
is_sorted,
327
}
328
}
329
}
330
331
/// A compressed, encoded dictionary page.
332
#[derive(Debug)]
333
pub struct CompressedDictPage {
334
pub(crate) buffer: CowBuffer,
335
compression: Compression,
336
pub(crate) num_values: usize,
337
pub(crate) uncompressed_page_size: usize,
338
pub is_sorted: bool,
339
}
340
341
impl CompressedDictPage {
342
pub fn new(
343
buffer: CowBuffer,
344
compression: Compression,
345
uncompressed_page_size: usize,
346
num_values: usize,
347
is_sorted: bool,
348
) -> Self {
349
Self {
350
buffer,
351
compression,
352
uncompressed_page_size,
353
num_values,
354
is_sorted,
355
}
356
}
357
358
/// The compression of the data in this page.
359
pub fn compression(&self) -> Compression {
360
self.compression
361
}
362
}
363
364
pub struct EncodedSplitBuffer<'a> {
365
/// Encoded Repetition Levels
366
pub rep: &'a [u8],
367
/// Encoded Definition Levels
368
pub def: &'a [u8],
369
/// Encoded Values
370
pub values: &'a [u8],
371
}
372
373
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages.
374
#[inline]
375
pub fn split_buffer_v1(
376
buffer: &[u8],
377
has_rep: bool,
378
has_def: bool,
379
) -> ParquetResult<EncodedSplitBuffer<'_>> {
380
let (rep, buffer) = if has_rep {
381
let level_buffer_length = get_length(buffer).ok_or_else(|| {
382
ParquetError::oos(
383
"The number of bytes declared in v1 rep levels is higher than the page size",
384
)
385
})?;
386
387
if buffer.len() < level_buffer_length + 4 {
388
return Err(ParquetError::oos(
389
"The number of bytes declared in v1 rep levels is higher than the page size",
390
));
391
}
392
393
buffer[4..].split_at(level_buffer_length)
394
} else {
395
(&[] as &[u8], buffer)
396
};
397
398
let (def, buffer) = if has_def {
399
let level_buffer_length = get_length(buffer).ok_or_else(|| {
400
ParquetError::oos(
401
"The number of bytes declared in v1 def levels is higher than the page size",
402
)
403
})?;
404
405
if buffer.len() < level_buffer_length + 4 {
406
return Err(ParquetError::oos(
407
"The number of bytes declared in v1 def levels is higher than the page size",
408
));
409
}
410
411
buffer[4..].split_at(level_buffer_length)
412
} else {
413
(&[] as &[u8], buffer)
414
};
415
416
Ok(EncodedSplitBuffer {
417
rep,
418
def,
419
values: buffer,
420
})
421
}
422
423
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages.
424
pub fn split_buffer_v2(
425
buffer: &[u8],
426
rep_level_buffer_length: usize,
427
def_level_buffer_length: usize,
428
) -> ParquetResult<EncodedSplitBuffer<'_>> {
429
let (rep, buffer) = buffer.split_at(rep_level_buffer_length);
430
let (def, values) = buffer.split_at(def_level_buffer_length);
431
432
Ok(EncodedSplitBuffer { rep, def, values })
433
}
434
435
/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values).
436
pub fn split_buffer(page: &DataPage) -> ParquetResult<EncodedSplitBuffer<'_>> {
437
match page.header() {
438
DataPageHeader::V1(_) => split_buffer_v1(
439
page.buffer(),
440
page.descriptor.max_rep_level > 0,
441
page.descriptor.max_def_level > 0,
442
),
443
DataPageHeader::V2(header) => {
444
let def_level_buffer_length: usize = header.definition_levels_byte_length.try_into()?;
445
let rep_level_buffer_length: usize = header.repetition_levels_byte_length.try_into()?;
446
split_buffer_v2(
447
page.buffer(),
448
rep_level_buffer_length,
449
def_level_buffer_length,
450
)
451
},
452
}
453
}
454
455