Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
6940 views
1
use arrow::bitmap::utils::BitmapIter;
2
use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
3
4
use super::utils::PageDecoder;
5
use super::{Filter, utils};
6
use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};
7
use crate::parquet::error::ParquetResult;
8
use crate::parquet::page::{DataPage, split_buffer};
9
use crate::parquet::read::levels::get_bit_width;
10
11
pub struct Nested {
12
validity: Option<BitmapBuilder>,
13
length: usize,
14
content: NestedContent,
15
16
// We batch the collection of valids and invalids to amortize the costs. This only really works
17
// when valids and invalids are grouped or there is a disbalance in the amount of valids vs.
18
// invalids. This, however, is a very common situation.
19
num_valids: usize,
20
num_invalids: usize,
21
}
22
23
#[derive(Debug)]
24
pub enum NestedContent {
25
Primitive,
26
List { offsets: Vec<i64> },
27
FixedSizeList { width: usize },
28
Struct,
29
}
30
31
impl Nested {
32
fn primitive(is_nullable: bool) -> Self {
33
// @NOTE: We allocate with `0` capacity here since we will not be pushing to this bitmap.
34
// This is because primitive does not keep track of the validity here. It keeps track in
35
// the decoder. We do still want to put something so that we can check for nullability by
36
// looking at the option.
37
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(0));
38
39
Self {
40
validity,
41
length: 0,
42
content: NestedContent::Primitive,
43
44
num_valids: 0,
45
num_invalids: 0,
46
}
47
}
48
49
fn list_with_capacity(is_nullable: bool, capacity: usize) -> Self {
50
let offsets = Vec::with_capacity(capacity);
51
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
52
Self {
53
validity,
54
length: 0,
55
content: NestedContent::List { offsets },
56
57
num_valids: 0,
58
num_invalids: 0,
59
}
60
}
61
62
fn fixedlist_with_capacity(is_nullable: bool, width: usize, capacity: usize) -> Self {
63
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
64
Self {
65
validity,
66
length: 0,
67
content: NestedContent::FixedSizeList { width },
68
69
num_valids: 0,
70
num_invalids: 0,
71
}
72
}
73
74
fn struct_with_capacity(is_nullable: bool, capacity: usize) -> Self {
75
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
76
Self {
77
validity,
78
length: 0,
79
content: NestedContent::Struct,
80
81
num_valids: 0,
82
num_invalids: 0,
83
}
84
}
85
86
fn take(mut self) -> (usize, Vec<i64>, Option<BitmapBuilder>) {
87
if !matches!(self.content, NestedContent::Primitive) {
88
if let Some(validity) = self.validity.as_mut() {
89
validity.extend_constant(self.num_valids, true);
90
validity.extend_constant(self.num_invalids, false);
91
}
92
93
debug_assert!(
94
self.validity
95
.as_ref()
96
.is_none_or(|v| v.len() == self.length)
97
);
98
}
99
100
self.num_valids = 0;
101
self.num_invalids = 0;
102
103
match self.content {
104
NestedContent::Primitive => {
105
debug_assert!(self.validity.is_none_or(|validity| validity.is_empty()));
106
(self.length, Vec::new(), None)
107
},
108
NestedContent::List { offsets } => (self.length, offsets, self.validity),
109
NestedContent::FixedSizeList { .. } => (self.length, Vec::new(), self.validity),
110
NestedContent::Struct => (self.length, Vec::new(), self.validity),
111
}
112
}
113
114
fn is_nullable(&self) -> bool {
115
self.validity.is_some()
116
}
117
118
fn is_repeated(&self) -> bool {
119
match self.content {
120
NestedContent::Primitive => false,
121
NestedContent::List { .. } => true,
122
NestedContent::FixedSizeList { .. } => true,
123
NestedContent::Struct => false,
124
}
125
}
126
127
fn is_required(&self) -> bool {
128
match self.content {
129
NestedContent::Primitive => false,
130
NestedContent::List { .. } => false,
131
NestedContent::FixedSizeList { .. } => false,
132
NestedContent::Struct => true,
133
}
134
}
135
136
/// number of rows
137
fn len(&self) -> usize {
138
self.length
139
}
140
141
fn invalid_num_values(&self) -> usize {
142
match &self.content {
143
NestedContent::Primitive => 1,
144
NestedContent::List { .. } => 0,
145
NestedContent::FixedSizeList { width } => *width,
146
NestedContent::Struct => 1,
147
}
148
}
149
150
fn push(&mut self, value: i64, is_valid: bool) {
151
let is_primitive = matches!(self.content, NestedContent::Primitive);
152
153
if is_valid && self.num_invalids != 0 {
154
debug_assert!(!is_primitive);
155
156
// @NOTE: Having invalid items might not necessarily mean that we have a validity mask.
157
//
158
// For instance, if we have a optional struct with a required list in it, that struct
159
// will have a validity mask and the list will not. In the arrow representation of this
160
// array, however, the list will still have invalid items where the struct is null.
161
//
162
// Array:
163
// [
164
// { 'x': [1] },
165
// None,
166
// { 'x': [1, 2] },
167
// ]
168
//
169
// Arrow:
170
// struct = [ list[0] None list[2] ]
171
// list = {
172
// values = [ 1, 1, 2 ],
173
// offsets = [ 0, 1, 1, 3 ],
174
// }
175
//
176
// Parquet:
177
// [ 1, 1, 2 ] + definition + repetition levels
178
//
179
// As you can see we need to insert an invalid item into the list even though it does
180
// not have a validity mask.
181
if let Some(validity) = self.validity.as_mut() {
182
validity.extend_constant(self.num_valids, true);
183
validity.extend_constant(self.num_invalids, false);
184
}
185
186
self.num_valids = 0;
187
self.num_invalids = 0;
188
}
189
190
self.num_valids += usize::from(!is_primitive & is_valid);
191
self.num_invalids += usize::from(!is_primitive & !is_valid);
192
193
self.length += 1;
194
if let NestedContent::List { offsets } = &mut self.content {
195
offsets.push(value);
196
}
197
}
198
199
fn push_default(&mut self, length: i64) {
200
let is_primitive = matches!(self.content, NestedContent::Primitive);
201
self.num_invalids += usize::from(!is_primitive);
202
203
self.length += 1;
204
if let NestedContent::List { offsets } = &mut self.content {
205
offsets.push(length);
206
}
207
}
208
}
209
210
/// Utility structure to create a `Filter` and `Validity` mask for the leaf values.
211
///
212
/// This batches the extending.
213
pub struct BatchedNestedDecoder<'a> {
214
pub(crate) num_waiting_valids: usize,
215
pub(crate) num_waiting_invalids: usize,
216
217
filter: &'a mut MutableBitmap,
218
validity: &'a mut MutableBitmap,
219
}
220
221
impl BatchedNestedDecoder<'_> {
222
fn push_valid(&mut self) -> ParquetResult<()> {
223
self.push_n_valids(1)
224
}
225
226
fn push_invalid(&mut self) -> ParquetResult<()> {
227
self.push_n_invalids(1)
228
}
229
230
fn push_n_valids(&mut self, n: usize) -> ParquetResult<()> {
231
if self.num_waiting_invalids == 0 {
232
self.num_waiting_valids += n;
233
return Ok(());
234
}
235
236
self.filter.extend_constant(self.num_waiting_valids, true);
237
self.validity.extend_constant(self.num_waiting_valids, true);
238
239
self.filter.extend_constant(self.num_waiting_invalids, true);
240
self.validity
241
.extend_constant(self.num_waiting_invalids, false);
242
243
self.num_waiting_valids = n;
244
self.num_waiting_invalids = 0;
245
246
Ok(())
247
}
248
249
fn push_n_invalids(&mut self, n: usize) -> ParquetResult<()> {
250
self.num_waiting_invalids += n;
251
Ok(())
252
}
253
254
fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
255
if self.num_waiting_valids > 0 {
256
self.filter.extend_constant(self.num_waiting_valids, true);
257
self.validity.extend_constant(self.num_waiting_valids, true);
258
self.num_waiting_valids = 0;
259
}
260
if self.num_waiting_invalids > 0 {
261
self.filter.extend_constant(self.num_waiting_invalids, true);
262
self.validity
263
.extend_constant(self.num_waiting_invalids, false);
264
self.num_waiting_invalids = 0;
265
}
266
267
self.filter.extend_constant(n, false);
268
self.validity.extend_constant(n, true);
269
270
Ok(())
271
}
272
273
fn finalize(self) -> ParquetResult<()> {
274
self.filter.extend_constant(self.num_waiting_valids, true);
275
self.validity.extend_constant(self.num_waiting_valids, true);
276
277
self.filter.extend_constant(self.num_waiting_invalids, true);
278
self.validity
279
.extend_constant(self.num_waiting_invalids, false);
280
281
Ok(())
282
}
283
}
284
285
/// The initial info of nested data types.
286
/// The `bool` indicates if the type is nullable.
287
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288
pub enum InitNested {
289
/// Primitive data types
290
Primitive(bool),
291
/// List data types
292
List(bool),
293
/// Fixed-Size List data types
294
FixedSizeList(bool, usize),
295
/// Struct data types
296
Struct(bool),
297
}
298
299
/// Initialize [`NestedState`] from `&[InitNested]`.
300
pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState {
301
use {InitNested as IN, Nested as N};
302
303
let container = init
304
.iter()
305
.map(|init| match init {
306
IN::Primitive(is_nullable) => N::primitive(*is_nullable),
307
IN::List(is_nullable) => N::list_with_capacity(*is_nullable, capacity),
308
IN::FixedSizeList(is_nullable, width) => {
309
N::fixedlist_with_capacity(*is_nullable, *width, capacity)
310
},
311
IN::Struct(is_nullable) => N::struct_with_capacity(*is_nullable, capacity),
312
})
313
.collect();
314
315
NestedState::new(container)
316
}
317
318
/// The state of nested data types.
319
#[derive(Default)]
320
pub struct NestedState {
321
/// The nesteds composing `NestedState`.
322
nested: Vec<Nested>,
323
}
324
325
impl NestedState {
326
/// Creates a new [`NestedState`].
327
fn new(nested: Vec<Nested>) -> Self {
328
Self { nested }
329
}
330
331
pub fn pop(&mut self) -> Option<(usize, Vec<i64>, Option<BitmapBuilder>)> {
332
Some(self.nested.pop()?.take())
333
}
334
335
pub fn last(&self) -> Option<&NestedContent> {
336
self.nested.last().map(|v| &v.content)
337
}
338
339
/// The number of rows in this state
340
pub fn len(&self) -> usize {
341
// outermost is the number of rows
342
self.nested[0].len()
343
}
344
345
/// Returns the definition and repetition levels for each nesting level
346
fn levels(&self) -> (Vec<u16>, Vec<u16>) {
347
let depth = self.nested.len();
348
349
let mut def_levels = Vec::with_capacity(depth + 1);
350
let mut rep_levels = Vec::with_capacity(depth + 1);
351
352
def_levels.push(0);
353
rep_levels.push(0);
354
355
for i in 0..depth {
356
let nest = &self.nested[i];
357
358
let def_delta = nest.is_nullable() as u16 + nest.is_repeated() as u16;
359
let rep_delta = nest.is_repeated() as u16;
360
361
def_levels.push(def_levels[i] + def_delta);
362
rep_levels.push(rep_levels[i] + rep_delta);
363
}
364
365
(def_levels, rep_levels)
366
}
367
}
368
369
fn collect_level_values(
370
target: &mut Vec<u16>,
371
hybrid_rle: HybridRleDecoder<'_>,
372
) -> ParquetResult<()> {
373
target.reserve(hybrid_rle.len());
374
375
for chunk in hybrid_rle.into_chunk_iter() {
376
let chunk = chunk?;
377
378
match chunk {
379
HybridRleChunk::Rle(value, size) => {
380
target.resize(target.len() + size, value as u16);
381
},
382
HybridRleChunk::Bitpacked(decoder) => {
383
decoder.lower_element::<u16>()?.collect_into(target);
384
},
385
}
386
}
387
388
Ok(())
389
}
390
391
/// State to keep track of how many top-level values (i.e. rows) still need to be skipped and
392
/// collected.
393
///
394
/// This state should be kept between pages because a top-level value / row value may span several
395
/// pages.
396
///
397
/// - `num_skips = Some(n)` means that it will skip till the `n + 1`-th occurrence of the repetition
398
/// level of `0` (i.e. the start of a top-level value / row value).
399
/// - `num_collects = Some(n)` means that it will collect values till the `n + 1`-th occurrence of
400
/// the repetition level of `0` (i.e. the start of a top-level value / row value).
401
struct DecodingState {
402
num_skips: Option<usize>,
403
num_collects: Option<usize>,
404
}
405
406
#[allow(clippy::too_many_arguments)]
407
fn decode_nested(
408
mut current_def_levels: &[u16],
409
mut current_rep_levels: &[u16],
410
411
batched_collector: &mut BatchedNestedDecoder<'_>,
412
nested: &mut [Nested],
413
414
state: &mut DecodingState,
415
top_level_filter: &mut BitmapIter<'_>,
416
417
// Amortized allocations
418
def_levels: &[u16],
419
rep_levels: &[u16],
420
) -> ParquetResult<()> {
421
let max_depth = nested.len();
422
let leaf_def_level = *def_levels.last().unwrap();
423
424
while !current_def_levels.is_empty() {
425
debug_assert_eq!(current_def_levels.len(), current_rep_levels.len());
426
427
// Handle skips
428
if let Some(ref mut num_skips) = state.num_skips {
429
let mut i = 0;
430
let mut num_skipped_values = 0;
431
while i < current_def_levels.len() && (*num_skips > 0 || current_rep_levels[i] != 0) {
432
let def = current_def_levels[i];
433
let rep = current_rep_levels[i];
434
435
*num_skips -= usize::from(rep == 0);
436
i += 1;
437
438
// @NOTE:
439
// We don't need to account for higher def-levels that imply extra values, since we
440
// don't have those higher levels either.
441
num_skipped_values += usize::from(def == leaf_def_level);
442
}
443
batched_collector.skip_in_place(num_skipped_values)?;
444
445
current_def_levels = &current_def_levels[i..];
446
current_rep_levels = &current_rep_levels[i..];
447
448
if current_def_levels.is_empty() {
449
break;
450
} else {
451
state.num_skips = None;
452
}
453
}
454
455
// Handle collects
456
if let Some(ref mut num_collects) = state.num_collects {
457
let mut i = 0;
458
while i < current_def_levels.len() && (*num_collects > 0 || current_rep_levels[i] != 0)
459
{
460
let def = current_def_levels[i];
461
let rep = current_rep_levels[i];
462
463
*num_collects -= usize::from(rep == 0);
464
i += 1;
465
466
let mut is_required = false;
467
468
for depth in 0..max_depth {
469
// Defines whether this element is defined at `depth`
470
//
471
// e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and
472
// [ [ [ 1 ] ] ].
473
let is_defined_at_this_depth =
474
rep <= rep_levels[depth] && def >= def_levels[depth];
475
476
let length = nested
477
.get(depth + 1)
478
.map(|x| x.len() as i64)
479
// the last depth is the leaf, which is always increased by 1
480
.unwrap_or(1);
481
482
let nest = &mut nested[depth];
483
484
let is_valid = !nest.is_nullable() || def > def_levels[depth];
485
486
if is_defined_at_this_depth && !is_valid {
487
let mut num_elements = 1;
488
489
nest.push(length, is_valid);
490
491
for embed_depth in depth..max_depth {
492
let embed_length = nested
493
.get(embed_depth + 1)
494
.map(|x| x.len() as i64)
495
// the last depth is the leaf, which is always increased by 1
496
.unwrap_or(1);
497
498
let embed_nest = &mut nested[embed_depth];
499
500
if embed_depth > depth {
501
for _ in 0..num_elements {
502
embed_nest.push_default(embed_length);
503
}
504
}
505
506
let embed_num_values = embed_nest.invalid_num_values();
507
num_elements *= embed_num_values;
508
509
if embed_num_values == 0 {
510
break;
511
}
512
}
513
514
batched_collector.push_n_invalids(num_elements)?;
515
516
break;
517
}
518
519
if is_required || is_defined_at_this_depth {
520
nest.push(length, is_valid);
521
522
if depth == max_depth - 1 {
523
// the leaf / primitive
524
let is_valid = (def != def_levels[depth]) || !nest.is_nullable();
525
526
if is_valid {
527
batched_collector.push_valid()?;
528
} else {
529
batched_collector.push_invalid()?;
530
}
531
}
532
}
533
534
is_required = (is_required || is_defined_at_this_depth)
535
&& nest.is_required()
536
&& !is_valid;
537
}
538
}
539
540
current_def_levels = &current_def_levels[i..];
541
current_rep_levels = &current_rep_levels[i..];
542
543
if current_def_levels.is_empty() {
544
break;
545
} else {
546
state.num_collects = None;
547
}
548
}
549
550
if top_level_filter.num_remaining() == 0 {
551
break;
552
}
553
554
state.num_skips = Some(top_level_filter.take_leading_zeros()).filter(|v| *v != 0);
555
state.num_collects = Some(top_level_filter.take_leading_ones()).filter(|v| *v != 0);
556
}
557
558
Ok(())
559
}
560
561
/// Return the definition and repetition level iterators for this page.
562
fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder<'_>, HybridRleDecoder<'_>)> {
563
let split = split_buffer(page)?;
564
let def = split.def;
565
let rep = split.rep;
566
567
let max_def_level = page.descriptor.max_def_level;
568
let max_rep_level = page.descriptor.max_rep_level;
569
570
let def_iter = HybridRleDecoder::new(def, get_bit_width(max_def_level), page.num_values());
571
let rep_iter = HybridRleDecoder::new(rep, get_bit_width(max_rep_level), page.num_values());
572
573
Ok((def_iter, rep_iter))
574
}
575
576
impl<D: utils::Decoder> PageDecoder<D> {
577
pub fn collect_nested(
578
mut self,
579
filter: Option<Filter>,
580
) -> ParquetResult<(NestedState, D::Output, Bitmap)> {
581
let init = self.init_nested.as_mut().unwrap();
582
583
// @TODO: We should probably count the filter so that we don't overallocate
584
let mut target = self.decoder.with_capacity(self.iter.total_num_values());
585
// @TODO: Self capacity
586
let mut nested_state = init_nested(init, 0);
587
588
if let Some(dict) = self.dict.as_ref() {
589
self.decoder.apply_dictionary(&mut target, dict)?;
590
}
591
592
// Amortize the allocations.
593
let (def_levels, rep_levels) = nested_state.levels();
594
595
let mut current_def_levels = Vec::<u16>::new();
596
let mut current_rep_levels = Vec::<u16>::new();
597
598
let (mut decode_state, top_level_filter) = match filter {
599
None => (
600
DecodingState {
601
num_skips: None,
602
num_collects: Some(usize::MAX),
603
},
604
Bitmap::new(),
605
),
606
Some(Filter::Range(range)) => (
607
DecodingState {
608
num_skips: Some(range.start),
609
num_collects: Some(range.len()),
610
},
611
Bitmap::new(),
612
),
613
Some(Filter::Mask(mask)) => (
614
DecodingState {
615
num_skips: None,
616
num_collects: None,
617
},
618
mask,
619
),
620
Some(Filter::Predicate(_)) => todo!(),
621
};
622
623
let mut top_level_filter = top_level_filter.iter();
624
625
loop {
626
let Some(page) = self.iter.next() else {
627
break;
628
};
629
let page = page?;
630
let page = page.decompress(&mut self.iter)?;
631
632
let (mut def_iter, mut rep_iter) = level_iters(&page)?;
633
634
let num_levels = def_iter.len().min(rep_iter.len());
635
def_iter.limit_to(num_levels);
636
rep_iter.limit_to(num_levels);
637
638
current_def_levels.clear();
639
current_rep_levels.clear();
640
641
collect_level_values(&mut current_def_levels, def_iter)?;
642
collect_level_values(&mut current_rep_levels, rep_iter)?;
643
644
let mut leaf_filter = MutableBitmap::new();
645
let mut leaf_validity = MutableBitmap::new();
646
647
// @TODO: move this to outside the loop.
648
let mut batched_collector = BatchedNestedDecoder {
649
num_waiting_valids: 0,
650
num_waiting_invalids: 0,
651
652
filter: &mut leaf_filter,
653
validity: &mut leaf_validity,
654
};
655
656
decode_nested(
657
&current_def_levels,
658
&current_rep_levels,
659
&mut batched_collector,
660
&mut nested_state.nested,
661
&mut decode_state,
662
&mut top_level_filter,
663
&def_levels,
664
&rep_levels,
665
)?;
666
667
batched_collector.finalize()?;
668
669
let leaf_validity = leaf_validity.freeze();
670
let leaf_filter = leaf_filter.freeze();
671
672
let state = utils::State::new_nested(
673
&self.decoder,
674
&page,
675
self.dict.as_ref(),
676
Some(leaf_validity),
677
)?;
678
state.decode(
679
&mut self.decoder,
680
&mut target,
681
&mut BitmapBuilder::new(), // This will not get used or filled
682
Some(Filter::Mask(leaf_filter)),
683
)?;
684
685
self.iter.reuse_page_buffer(page);
686
}
687
688
// we pop the primitive off here.
689
debug_assert!(matches!(
690
nested_state.nested.last().unwrap().content,
691
NestedContent::Primitive
692
));
693
_ = nested_state.pop().unwrap();
694
695
let array = self.decoder.finalize(self.dtype, self.dict, target)?;
696
697
Ok((nested_state, array, Bitmap::new()))
698
}
699
}
700
701