Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs
8480 views
1
pub(crate) mod array_chunks;
2
pub(crate) mod filter;
3
4
use std::fmt;
5
use std::ops::Range;
6
use std::sync::OnceLock;
7
8
use arrow::array::{Array, IntoBoxedArray, Splitable};
9
use arrow::bitmap::{Bitmap, BitmapBuilder};
10
use arrow::datatypes::ArrowDataType;
11
use arrow::pushable::Pushable;
12
use polars_compute::filter::filter_boolean_kernel;
13
use polars_utils::pl_str::PlSmallStr;
14
15
use self::filter::Filter;
16
use super::{BasicDecompressor, InitNested, NestedState, PredicateFilter};
17
use crate::parquet::encoding::hybrid_rle::{self, HybridRleChunk, HybridRleDecoder};
18
use crate::parquet::error::{ParquetError, ParquetResult};
19
use crate::parquet::page::{DataPage, DictPage, split_buffer};
20
use crate::parquet::schema::Repetition;
21
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
22
23
#[derive(Debug)]
24
pub(crate) struct State<'a, D: Decoder> {
25
pub(crate) dict: Option<&'a D::Dict>,
26
pub(crate) is_optional: bool,
27
pub(crate) page_validity: Option<Bitmap>,
28
pub(crate) translation: D::Translation<'a>,
29
}
30
31
pub(crate) trait StateTranslation<'a, D: Decoder>: Sized {
32
type PlainDecoder;
33
34
fn new(
35
decoder: &D,
36
page: &'a DataPage,
37
dict: Option<&'a D::Dict>,
38
page_validity: Option<&Bitmap>,
39
) -> ParquetResult<Self>;
40
fn num_rows(&self) -> usize;
41
}
42
43
impl<'a, D: Decoder> State<'a, D> {
44
pub fn new(decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>) -> ParquetResult<Self> {
45
let is_optional =
46
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
47
48
let mut page_validity = None;
49
50
// Make the page_validity None if there are no nulls in the page
51
if is_optional && page.null_count().is_none_or(|nc| nc != 0) {
52
let pv = page_validity_decoder(page)?;
53
page_validity = decode_page_validity(pv, None)?;
54
}
55
56
let translation = D::Translation::new(decoder, page, dict, page_validity.as_ref())?;
57
58
Ok(Self {
59
dict,
60
is_optional,
61
page_validity,
62
translation,
63
})
64
}
65
66
pub fn new_nested(
67
decoder: &D,
68
page: &'a DataPage,
69
dict: Option<&'a D::Dict>,
70
mut page_validity: Option<Bitmap>,
71
) -> ParquetResult<Self> {
72
let translation = D::Translation::new(decoder, page, dict, None)?;
73
74
let is_optional =
75
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
76
77
if page_validity
78
.as_ref()
79
.is_some_and(|bm| bm.unset_bits() == 0)
80
{
81
page_validity = None;
82
}
83
84
Ok(Self {
85
dict,
86
translation,
87
is_optional,
88
page_validity,
89
})
90
}
91
92
pub fn decode(
93
self,
94
decoder: &mut D,
95
decoded: &mut D::DecodedState,
96
filter: Option<Filter>,
97
chunks: &mut Vec<D::Output>,
98
) -> ParquetResult<()> {
99
decoder.extend_filtered_with_state(self, decoded, filter, chunks)
100
}
101
}
102
103
pub fn not_implemented(page: &DataPage) -> ParquetError {
104
let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
105
let required = if is_optional { "optional" } else { "required" };
106
ParquetError::not_supported(format!(
107
"Decoding {:?} \"{:?}\"-encoded {required} parquet pages not yet supported",
108
page.descriptor.primitive_type.physical_type,
109
page.encoding(),
110
))
111
}
112
113
pub(crate) type PageValidity<'a> = HybridRleDecoder<'a>;
114
pub(crate) fn page_validity_decoder(page: &DataPage) -> ParquetResult<PageValidity<'_>> {
115
let validity = split_buffer(page)?.def;
116
let decoder = hybrid_rle::HybridRleDecoder::new(validity, 1, page.num_values());
117
Ok(decoder)
118
}
119
120
pub(crate) fn unspecialized_decode<T: Default>(
121
mut num_rows: usize,
122
123
mut decode_one: impl FnMut() -> ParquetResult<T>,
124
125
mut filter: Option<Filter>,
126
mut page_validity: Option<Bitmap>,
127
128
is_optional: bool,
129
130
validity: &mut BitmapBuilder,
131
target: &mut impl Pushable<T>,
132
) -> ParquetResult<()> {
133
match &mut filter {
134
None => {},
135
Some(Filter::Range(range)) => {
136
match page_validity.as_mut() {
137
None => {
138
for _ in 0..range.start {
139
decode_one()?;
140
}
141
},
142
Some(pv) => {
143
let c;
144
(c, *pv) = pv.split_at(range.start);
145
for _ in 0..c.set_bits() {
146
decode_one()?;
147
}
148
*pv = std::mem::take(pv).sliced(0, range.len());
149
},
150
}
151
152
num_rows = range.len();
153
filter = None;
154
},
155
Some(Filter::Mask(mask)) => {
156
let leading_zeros = mask.take_leading_zeros();
157
mask.take_trailing_zeros();
158
159
match page_validity.as_mut() {
160
None => {
161
for _ in 0..leading_zeros {
162
decode_one()?;
163
}
164
},
165
Some(pv) => {
166
let c;
167
(c, *pv) = pv.split_at(leading_zeros);
168
for _ in 0..c.set_bits() {
169
decode_one()?;
170
}
171
*pv = std::mem::take(pv).sliced(0, mask.len());
172
},
173
}
174
175
num_rows = mask.len();
176
if mask.unset_bits() == 0 {
177
filter = None;
178
}
179
},
180
Some(Filter::Predicate(_)) => todo!(),
181
};
182
183
page_validity = page_validity.filter(|pv| pv.unset_bits() > 0);
184
185
match (filter, page_validity) {
186
(None, None) => {
187
target.reserve(num_rows);
188
for _ in 0..num_rows {
189
target.push(decode_one()?);
190
}
191
192
if is_optional {
193
validity.extend_constant(num_rows, true);
194
}
195
},
196
(None, Some(page_validity)) => {
197
target.reserve(page_validity.len());
198
for is_valid in page_validity.iter() {
199
let v = if is_valid {
200
decode_one()?
201
} else {
202
T::default()
203
};
204
target.push(v);
205
}
206
207
validity.extend_from_bitmap(&page_validity);
208
},
209
(Some(Filter::Range(_)), _) => unreachable!(),
210
(Some(Filter::Mask(mut mask)), None) => {
211
target.reserve(num_rows);
212
213
while !mask.is_empty() {
214
let num_ones = mask.take_leading_ones();
215
for _ in 0..num_ones {
216
target.push(decode_one()?);
217
}
218
219
let num_zeros = mask.take_leading_zeros();
220
for _ in 0..num_zeros {
221
decode_one()?;
222
}
223
}
224
225
if is_optional {
226
validity.extend_constant(num_rows, true);
227
}
228
},
229
(Some(Filter::Mask(mask)), Some(page_validity)) => {
230
assert_eq!(mask.len(), page_validity.len());
231
232
let num_rows = mask.set_bits();
233
target.reserve(num_rows);
234
235
let mut mask_iter = mask.fast_iter_u56();
236
let mut validity_iter = page_validity.fast_iter_u56();
237
238
let mut iter = |mut f: u64, mut v: u64| {
239
while f != 0 {
240
let offset = f.trailing_zeros();
241
242
let skip = (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;
243
for _ in 0..skip {
244
decode_one()?;
245
}
246
247
if (v >> offset) & 1 != 0 {
248
target.push(decode_one()?);
249
} else {
250
target.push(T::default());
251
}
252
253
v >>= offset + 1;
254
f >>= offset + 1;
255
}
256
257
for _ in 0..v.count_ones() as usize {
258
decode_one()?;
259
}
260
261
ParquetResult::Ok(())
262
};
263
264
for (f, v) in mask_iter.by_ref().zip(validity_iter.by_ref()) {
265
iter(f, v)?;
266
}
267
268
let (f, fl) = mask_iter.remainder();
269
let (v, vl) = validity_iter.remainder();
270
271
assert_eq!(fl, vl);
272
273
iter(f, v)?;
274
275
validity.extend_from_bitmap(&filter_boolean_kernel(&page_validity, &mask));
276
},
277
(Some(Filter::Predicate(_)), _) => todo!(),
278
}
279
280
Ok(())
281
}
282
283
/// The state that will be decoded into.
284
///
285
/// This is usually an Array and a validity mask as a MutableBitmap.
286
pub(super) trait Decoded {
287
/// The number of items in the container
288
fn len(&self) -> usize;
289
/// How much capacity is left.
290
fn remaining_capacity(&self) -> usize;
291
/// Extend the decoded state with `n` nulls.
292
fn extend_nulls(&mut self, n: usize);
293
}
294
295
/// A decoder that knows how to map `State` -> Array
296
pub(super) trait Decoder: Sized {
297
/// The state that this decoder derives from a [`DataPage`]. This is bound to the page.
298
type Translation<'a>: StateTranslation<'a, Self>;
299
/// The dictionary representation that the decoder uses
300
type Dict: Array + Clone;
301
/// The target state that this Decoder decodes into.
302
type DecodedState: Decoded;
303
type Output: IntoBoxedArray;
304
305
const CHUNKED: bool = false;
306
307
fn evaluate_dict_predicate(
308
&self,
309
dict: &Self::Dict,
310
predicate: &PredicateFilter,
311
) -> ParquetResult<Bitmap> {
312
Ok(predicate.predicate.evaluate(dict))
313
}
314
315
/// Initializes a new [`Self::DecodedState`].
316
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;
317
318
/// Deserializes a [`DictPage`] into [`Self::Dict`].
319
fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict>;
320
321
fn evaluate_predicate(
322
&mut self,
323
state: &State<'_, Self>,
324
predicate: Option<&SpecializedParquetColumnExpr>,
325
pred_true_mask: &mut BitmapBuilder,
326
dict_mask: Option<&Bitmap>,
327
) -> ParquetResult<bool>;
328
329
fn extend_decoded(
330
&self,
331
decoded: &mut Self::DecodedState,
332
additional: &dyn Array,
333
is_optional: bool,
334
) -> ParquetResult<()>;
335
336
fn unspecialized_predicate_decode(
337
&mut self,
338
state: State<'_, Self>,
339
decoded: &mut Self::DecodedState,
340
pred_true_mask: &mut BitmapBuilder,
341
predicate: &PredicateFilter,
342
dict: Option<Self::Dict>,
343
dtype: &ArrowDataType,
344
) -> ParquetResult<()> {
345
let is_optional = state.is_optional;
346
347
let mut intermediate_array = self.with_capacity(if Self::CHUNKED {
348
0
349
} else {
350
state.translation.num_rows()
351
});
352
if let Some(dict) = dict.as_ref() {
353
self.apply_dictionary(&mut intermediate_array, dict)?;
354
}
355
let mut chunks = Vec::new();
356
self.extend_filtered_with_state(state, &mut intermediate_array, None, &mut chunks)?;
357
let intermediate_array = if !chunks.is_empty() {
358
chunks.pop().unwrap()
359
} else {
360
self.finalize(dtype.underlying_physical_type(), dict, intermediate_array)?
361
}
362
.into_boxed();
363
364
let mask = if let Some(validity) = intermediate_array.validity() {
365
let ignore_validity_array = intermediate_array.with_validity(None);
366
let mask = predicate.predicate.evaluate(ignore_validity_array.as_ref());
367
368
if predicate.predicate.evaluate_null() {
369
arrow::bitmap::or_not(&mask, validity)
370
} else {
371
&mask & validity
372
}
373
} else {
374
predicate.predicate.evaluate(intermediate_array.as_ref())
375
};
376
377
let filtered =
378
polars_compute::filter::filter_with_bitmap(intermediate_array.as_ref(), &mask);
379
380
pred_true_mask.extend_from_bitmap(&mask);
381
self.extend_decoded(decoded, filtered.as_ref(), is_optional)?;
382
383
Ok(())
384
}
385
386
fn extend_filtered_with_state(
387
&mut self,
388
state: State<'_, Self>,
389
decoded: &mut Self::DecodedState,
390
filter: Option<Filter>,
391
chunks: &mut Vec<Self::Output>,
392
) -> ParquetResult<()>;
393
394
/// Extend the decoded state with `length` times the same `value`.
395
fn extend_constant(
396
&mut self,
397
decoded: &mut Self::DecodedState,
398
length: usize,
399
value: &ParquetScalar,
400
) -> ParquetResult<()>;
401
402
fn apply_dictionary(
403
&mut self,
404
_decoded: &mut Self::DecodedState,
405
_dict: &Self::Dict,
406
) -> ParquetResult<()> {
407
Ok(())
408
}
409
410
fn finalize(
411
&self,
412
dtype: ArrowDataType,
413
dict: Option<Self::Dict>,
414
decoded: Self::DecodedState,
415
) -> ParquetResult<Self::Output>;
416
}
417
418
enum DecodeType {
419
Plain,
420
Range,
421
Mask,
422
Predicate,
423
}
424
425
impl fmt::Display for DecodeType {
426
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427
f.write_str(match self {
428
DecodeType::Plain => "plain",
429
DecodeType::Range => "range",
430
DecodeType::Mask => "mask",
431
DecodeType::Predicate => "predicate",
432
})
433
}
434
}
435
436
struct DecodeMetrics {
437
field_name: PlSmallStr,
438
num_compressed_bytes: u64,
439
num_uncompressed_bytes: u64,
440
num_decompressed_pages: u64,
441
num_micros_spent_decompressing: u128,
442
num_micros_spent_decoding: u128,
443
decode_type: DecodeType,
444
}
445
446
impl DecodeMetrics {
447
fn new(field_name: &str) -> Self {
448
Self {
449
field_name: PlSmallStr::from_str(field_name),
450
num_compressed_bytes: 0,
451
num_uncompressed_bytes: 0,
452
num_decompressed_pages: 0,
453
num_micros_spent_decompressing: 0,
454
num_micros_spent_decoding: 0,
455
decode_type: DecodeType::Plain,
456
}
457
}
458
}
459
460
pub struct PageDecoder<D: Decoder> {
461
pub iter: BasicDecompressor,
462
pub dtype: ArrowDataType,
463
pub dict: Option<D::Dict>,
464
pub decoder: D,
465
466
pub init_nested: Option<Vec<InitNested>>,
467
468
/// Used to track metrics with `POLARS_PARQUET_METRICS=1`.
469
metrics: Option<Box<DecodeMetrics>>,
470
}
471
472
#[inline(always)]
473
fn option_time<T>(do_time: bool, f: impl FnOnce() -> T) -> (T, Option<u128>) {
474
if do_time {
475
let start = std::time::SystemTime::now();
476
let result = std::hint::black_box(f());
477
let elapsed = start.elapsed().unwrap().as_micros();
478
(result, Some(elapsed))
479
} else {
480
(f(), None)
481
}
482
}
483
484
static POLARS_PARQUET_METRICS: OnceLock<bool> = OnceLock::new();
485
486
impl<D: Decoder> PageDecoder<D> {
487
pub fn new(
488
field_name: &str,
489
mut iter: BasicDecompressor,
490
dtype: ArrowDataType,
491
mut decoder: D,
492
493
init_nested: Option<Vec<InitNested>>,
494
) -> ParquetResult<Self> {
495
let dict_page = iter.read_dict_page()?;
496
let dict = dict_page.map(|d| decoder.deserialize_dict(d)).transpose()?;
497
498
let do_metrics = POLARS_PARQUET_METRICS
499
.get_or_init(|| std::env::var("POLARS_PARQUET_METRICS").as_deref() == Ok("1"));
500
501
Ok(Self {
502
iter,
503
dtype,
504
dict,
505
decoder,
506
507
init_nested,
508
metrics: do_metrics.then(|| Box::new(DecodeMetrics::new(field_name))),
509
})
510
}
511
512
pub fn collect(
513
self,
514
filter: Option<Filter>,
515
) -> ParquetResult<(Option<NestedState>, Vec<D::Output>, Bitmap)> {
516
if self.init_nested.is_some() {
517
self.collect_nested(filter)
518
.map(|(nested, arr, ptm)| (Some(nested), arr, ptm))
519
} else {
520
match filter {
521
Some(Filter::Predicate(p)) => self
522
.collect_predicate_flat(&p)
523
.map(|(arr, ptm)| (None, arr, ptm)),
524
filter => self
525
.collect_flat(filter)
526
.map(|arrays| (None, arrays, Bitmap::new())),
527
}
528
}
529
}
530
531
pub fn collect_predicate_flat(
532
mut self,
533
p: &PredicateFilter,
534
) -> ParquetResult<(Vec<D::Output>, Bitmap)> {
535
let mut target = self.decoder.with_capacity(0);
536
let mut pred_true_mask = BitmapBuilder::with_capacity(self.iter.total_num_values());
537
538
let specialized_pred = p.predicate.as_specialized();
539
let pred_is_eq_null = matches!(
540
specialized_pred,
541
Some(SpecializedParquetColumnExpr::Equal(ParquetScalar::Null)),
542
);
543
let pred_tracks_nulls = p.predicate.evaluate_null();
544
545
let mut dict_mask = None;
546
if let Some(dict) = self.dict.as_ref() {
547
// @Performance. If we have a predicate, we can prune stuff out of the dictionary and
548
// reduce memory consumption.
549
self.decoder.apply_dictionary(&mut target, dict)?;
550
dict_mask = Some(self.decoder.evaluate_dict_predicate(dict, p)?);
551
}
552
553
if let Some(metrics) = self.metrics.as_deref_mut() {
554
metrics.decode_type = DecodeType::Predicate;
555
}
556
557
const MINIMUM_CHUNK_SIZE: usize = 256;
558
let mut chunks = Vec::new();
559
while let Some(page) = self.iter.next() {
560
let page = page?;
561
562
let mut can_skip_page = false;
563
564
// Skip a dictionary encoded page if none of the dictionary values match the predicate.
565
// This is essentially a slower version of statistics skipping.
566
can_skip_page |= dict_mask.as_ref().is_some_and(|dm| dm.set_bits() == 0)
567
&& page.page().header().is_dictionary_encoded()
568
&& (!pred_tracks_nulls
569
|| page.page().null_count() == Some(0)
570
|| page.page().descriptor.primitive_type.field_info.repetition
571
!= Repetition::Optional);
572
573
// If we are looking for nulls, but this page does not contain any nulls.
574
can_skip_page |= pred_is_eq_null
575
&& (page.page().descriptor.primitive_type.field_info.repetition
576
== Repetition::Required
577
|| page.page().null_count() == Some(0));
578
579
if can_skip_page {
580
pred_true_mask.extend_constant(page.num_values(), false);
581
continue;
582
}
583
584
if let Some(metrics) = self.metrics.as_deref_mut() {
585
metrics.num_compressed_bytes += page.page().buffer.len() as u64;
586
metrics.num_uncompressed_bytes += page.page().uncompressed_size() as u64;
587
}
588
589
let iter = &mut self.iter;
590
let (page, time) = option_time(self.metrics.is_some(), move || page.decompress(iter));
591
let page = page?;
592
593
if let Some(time) = time {
594
let metrics = self.metrics.as_deref_mut().unwrap();
595
metrics.num_micros_spent_decompressing += time;
596
metrics.num_decompressed_pages += 1;
597
}
598
599
let state = State::new(&self.decoder, &page, self.dict.as_ref())?;
600
601
let (result, time) = option_time(self.metrics.is_some(), || {
602
// Handle the case where column is held equal to Null. This can be the same for all
603
// non-nested columns.
604
if matches!(
605
p.predicate.as_specialized(),
606
Some(SpecializedParquetColumnExpr::Equal(ParquetScalar::Null)),
607
) {
608
if state.is_optional
609
&& let Some(v) = &state.page_validity
610
{
611
let start_set_bits = pred_true_mask.set_bits();
612
pred_true_mask.extend_from_bitmap(&!v);
613
if p.include_values {
614
target.extend_nulls(pred_true_mask.set_bits() - start_set_bits);
615
}
616
} else {
617
pred_true_mask.extend_constant(page.num_values(), false)
618
};
619
620
return Ok(());
621
}
622
623
// For now, we have a function that indicates whether the predicate can actually be
624
// handled in the kernels. If it cannot be handled in the kernels, catch it here
625
// and load it as if it weren't filtered.
626
let mut page_ptm = BitmapBuilder::new();
627
if self.decoder.evaluate_predicate(
628
&state,
629
specialized_pred,
630
&mut page_ptm,
631
dict_mask.as_ref(),
632
)? {
633
if page_ptm.set_bits() == 0 {
634
pred_true_mask.extend_constant(page.num_values(), false);
635
return Ok(());
636
}
637
638
let page_ptm = page_ptm.freeze().sliced(0, page.num_values());
639
pred_true_mask.extend_from_bitmap(&page_ptm);
640
let num_filtered_values = page_ptm.set_bits();
641
642
// If we would need to move data, just create a new chunk.
643
if p.include_values && num_filtered_values > target.remaining_capacity() {
644
let previous_target = std::mem::replace(
645
&mut target,
646
self.decoder
647
.with_capacity(usize::max(num_filtered_values, MINIMUM_CHUNK_SIZE)),
648
);
649
if previous_target.len() > 0 {
650
let chunk = self.decoder.finalize(
651
self.dtype.clone(),
652
self.dict.clone(),
653
previous_target,
654
)?;
655
chunks.push(chunk);
656
}
657
658
if let Some(dict) = self.dict.as_ref() {
659
self.decoder.apply_dictionary(&mut target, dict)?;
660
}
661
}
662
663
if p.include_values {
664
if let Some(SpecializedParquetColumnExpr::Equal(needle)) = specialized_pred
665
{
666
self.decoder.extend_constant(
667
&mut target,
668
num_filtered_values,
669
needle,
670
)?;
671
} else {
672
state.decode(
673
&mut self.decoder,
674
&mut target,
675
Some(Filter::Mask(page_ptm)),
676
&mut chunks,
677
)?;
678
}
679
}
680
} else {
681
self.decoder.unspecialized_predicate_decode(
682
state,
683
&mut target,
684
&mut pred_true_mask,
685
p,
686
self.dict.clone(),
687
&self.dtype,
688
)?;
689
}
690
691
ParquetResult::Ok(())
692
});
693
result?;
694
695
if let Some(time) = time {
696
let metrics = self.metrics.as_deref_mut().unwrap();
697
metrics.num_micros_spent_decoding += time;
698
}
699
700
self.iter.reuse_page_buffer(page);
701
}
702
703
if let Some(metrics) = self.metrics.as_ref() {
704
eprintln!(
705
"PQ-Metrics: {},{},{},{},{},{},{}",
706
metrics.field_name,
707
metrics.num_micros_spent_decompressing,
708
metrics.num_micros_spent_decoding,
709
metrics.num_compressed_bytes,
710
metrics.num_uncompressed_bytes,
711
metrics.num_decompressed_pages,
712
metrics.decode_type,
713
);
714
}
715
716
if target.len() > 0 || chunks.is_empty() {
717
chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);
718
}
719
Ok((chunks, pred_true_mask.freeze()))
720
}
721
722
pub fn collect_flat(mut self, mut filter: Option<Filter>) -> ParquetResult<Vec<D::Output>> {
723
let mut num_rows_remaining = Filter::opt_num_rows(&filter, self.iter.total_num_values());
724
725
let mut target =
726
self.decoder
727
.with_capacity(if D::CHUNKED { 0 } else { num_rows_remaining });
728
729
if let Some(dict) = self.dict.as_ref() {
730
// @Performance. If we have a predicate, we can prune stuff out of the dictionary and
731
// reduce memory consumption.
732
self.decoder.apply_dictionary(&mut target, dict)?;
733
}
734
735
if let Some(metrics) = self.metrics.as_deref_mut() {
736
metrics.decode_type = match &filter {
737
None => DecodeType::Plain,
738
Some(Filter::Range(_)) => DecodeType::Range,
739
Some(Filter::Mask(_)) => DecodeType::Mask,
740
Some(Filter::Predicate(_)) => unreachable!(),
741
};
742
}
743
744
let mut chunks = Vec::new();
745
while num_rows_remaining > 0 {
746
let Some(page) = self.iter.next() else {
747
break;
748
};
749
let page = page?;
750
751
let page_num_values = page.num_values();
752
753
let mut state_filter;
754
(state_filter, filter) = Filter::opt_split_at(&filter, page_num_values);
755
756
state_filter = state_filter.or(Some(Filter::Range(0..page_num_values)));
757
758
// Skip the whole page if we don't need any rows from it
759
if state_filter
760
.as_ref()
761
.is_some_and(|f| f.num_rows(page_num_values) == 0)
762
{
763
continue;
764
}
765
766
if let Some(metrics) = self.metrics.as_deref_mut() {
767
metrics.num_compressed_bytes += page.page().buffer.len() as u64;
768
metrics.num_uncompressed_bytes += page.page().uncompressed_size() as u64;
769
}
770
771
let iter = &mut self.iter;
772
let (page, time) = option_time(self.metrics.is_some(), move || page.decompress(iter));
773
let page = page?;
774
775
if let Some(time) = time {
776
let metrics = self.metrics.as_deref_mut().unwrap();
777
metrics.num_micros_spent_decompressing += time;
778
metrics.num_decompressed_pages += 1;
779
}
780
781
let state = State::new(&self.decoder, &page, self.dict.as_ref())?;
782
783
let start_length = target.len();
784
let (result, time) = option_time(self.metrics.is_some(), || {
785
state.decode(&mut self.decoder, &mut target, state_filter, &mut chunks)?;
786
787
ParquetResult::Ok(())
788
});
789
result?;
790
791
if let Some(time) = time {
792
let metrics = self.metrics.as_deref_mut().unwrap();
793
metrics.num_micros_spent_decoding += time;
794
}
795
796
let end_length = target.len();
797
798
num_rows_remaining -= end_length - start_length;
799
800
self.iter.reuse_page_buffer(page);
801
}
802
803
if let Some(metrics) = self.metrics.as_ref() {
804
eprintln!(
805
"PQ-Metrics: {},{},{},{},{},{},{}",
806
metrics.field_name,
807
metrics.num_micros_spent_decompressing,
808
metrics.num_micros_spent_decoding,
809
metrics.num_compressed_bytes,
810
metrics.num_uncompressed_bytes,
811
metrics.num_decompressed_pages,
812
metrics.decode_type,
813
);
814
}
815
816
if target.len() > 0 || chunks.is_empty() {
817
chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);
818
}
819
820
Ok(chunks)
821
}
822
823
pub fn collect_boxed(
824
self,
825
filter: Option<Filter>,
826
) -> ParquetResult<(Option<NestedState>, Vec<Box<dyn Array>>, Bitmap)> {
827
use arrow::array::IntoBoxedArray;
828
let (nested, array, ptm) = self.collect(filter)?;
829
let array = array.into_iter().map(|arr| arr.into_boxed()).collect();
830
Ok((nested, array, ptm))
831
}
832
}
833
834
#[inline]
835
pub(super) fn dict_indices_decoder(
836
page: &DataPage,
837
null_count: usize,
838
) -> ParquetResult<hybrid_rle::HybridRleDecoder<'_>> {
839
let indices_buffer = split_buffer(page)?.values;
840
841
// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
842
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
843
let bit_width = indices_buffer[0];
844
let indices_buffer = &indices_buffer[1..];
845
846
Ok(hybrid_rle::HybridRleDecoder::new(
847
indices_buffer,
848
bit_width as u32,
849
page.num_values() - null_count,
850
))
851
}
852
853
/// Freeze a [`MutableBitmap`] into a `Option<Bitmap>`.
854
///
855
/// This will turn the several instances where `None` (representing "all valid") suffices.
856
pub fn freeze_validity(validity: BitmapBuilder) -> Option<Bitmap> {
857
if validity.is_empty() || validity.unset_bits() == 0 {
858
return None;
859
}
860
861
let validity = validity.freeze();
862
Some(validity)
863
}
864
865
pub(crate) fn filter_from_range(rng: Range<usize>) -> Bitmap {
866
let mut bm = BitmapBuilder::with_capacity(rng.end);
867
868
bm.extend_constant(rng.start, false);
869
bm.extend_constant(rng.len(), true);
870
871
bm.freeze()
872
}
873
874
pub(crate) fn decode_hybrid_rle_into_bitmap(
875
mut page_validity: HybridRleDecoder<'_>,
876
limit: Option<usize>,
877
bitmap: &mut BitmapBuilder,
878
) -> ParquetResult<()> {
879
assert!(page_validity.num_bits() <= 1);
880
881
let mut limit = limit.unwrap_or(page_validity.len());
882
bitmap.reserve(limit);
883
884
while let Some(chunk) = page_validity.next_chunk()? {
885
if limit == 0 {
886
break;
887
}
888
889
match chunk {
890
HybridRleChunk::Rle(value, size) => {
891
let size = size.min(limit);
892
bitmap.extend_constant(size, value != 0);
893
limit -= size;
894
},
895
HybridRleChunk::Bitpacked(decoder) => {
896
let len = decoder.len().min(limit);
897
bitmap.extend_from_slice(decoder.as_slice(), 0, len);
898
limit -= len;
899
},
900
}
901
}
902
903
Ok(())
904
}
905
906
pub(crate) fn decode_page_validity(
907
mut page_validity: HybridRleDecoder<'_>,
908
limit: Option<usize>,
909
) -> ParquetResult<Option<Bitmap>> {
910
assert!(page_validity.num_bits() <= 1);
911
912
let mut num_ones = 0;
913
914
let mut bm = BitmapBuilder::new();
915
let limit = limit.unwrap_or(page_validity.len());
916
page_validity.limit_to(limit);
917
let num_values = page_validity.len();
918
919
// If all values are valid anyway, we will return a None so don't allocate until we disprove
920
// that that is the case.
921
while let Some(chunk) = page_validity.next_chunk()? {
922
match chunk {
923
HybridRleChunk::Rle(value, size) if value != 0 => num_ones += size,
924
HybridRleChunk::Rle(value, size) => {
925
bm.reserve(num_values);
926
bm.extend_constant(num_ones, true);
927
bm.extend_constant(size, value != 0);
928
break;
929
},
930
HybridRleChunk::Bitpacked(decoder) => {
931
let len = decoder.len();
932
bm.reserve(num_values);
933
bm.extend_constant(num_ones, true);
934
bm.extend_from_slice(decoder.as_slice(), 0, len);
935
break;
936
},
937
}
938
}
939
940
if page_validity.len() == 0 && bm.is_empty() {
941
return Ok(None);
942
}
943
944
decode_hybrid_rle_into_bitmap(page_validity, None, &mut bm)?;
945
Ok(Some(bm.freeze()))
946
}
947
948