Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
8446 views
1
use arrow::bitmap::utils::BitmapIter;
2
use arrow::bitmap::{Bitmap, BitmapBuilder, MutableBitmap};
3
4
use super::utils::PageDecoder;
5
use super::{Filter, utils};
6
use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};
7
use crate::parquet::error::ParquetResult;
8
use crate::parquet::page::{DataPage, split_buffer};
9
use crate::parquet::read::levels::get_bit_width;
10
use crate::read::deserialize::utils::Decoded;
11
12
pub struct Nested {
13
validity: Option<BitmapBuilder>,
14
length: usize,
15
content: NestedContent,
16
17
// We batch the collection of valids and invalids to amortize the costs. This only really works
18
// when valids and invalids are grouped or there is a disbalance in the amount of valids vs.
19
// invalids. This, however, is a very common situation.
20
num_valids: usize,
21
num_invalids: usize,
22
}
23
24
#[derive(Debug)]
25
pub enum NestedContent {
26
Primitive,
27
List { offsets: Vec<i64> },
28
FixedSizeList { width: usize },
29
Struct,
30
}
31
32
impl Nested {
33
fn primitive(is_nullable: bool) -> Self {
34
// @NOTE: We allocate with `0` capacity here since we will not be pushing to this bitmap.
35
// This is because primitive does not keep track of the validity here. It keeps track in
36
// the decoder. We do still want to put something so that we can check for nullability by
37
// looking at the option.
38
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(0));
39
40
Self {
41
validity,
42
length: 0,
43
content: NestedContent::Primitive,
44
45
num_valids: 0,
46
num_invalids: 0,
47
}
48
}
49
50
fn list_with_capacity(is_nullable: bool, capacity: usize) -> Self {
51
let offsets = Vec::with_capacity(capacity);
52
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
53
Self {
54
validity,
55
length: 0,
56
content: NestedContent::List { offsets },
57
58
num_valids: 0,
59
num_invalids: 0,
60
}
61
}
62
63
fn fixedlist_with_capacity(is_nullable: bool, width: usize, capacity: usize) -> Self {
64
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
65
Self {
66
validity,
67
length: 0,
68
content: NestedContent::FixedSizeList { width },
69
70
num_valids: 0,
71
num_invalids: 0,
72
}
73
}
74
75
fn struct_with_capacity(is_nullable: bool, capacity: usize) -> Self {
76
let validity = is_nullable.then(|| BitmapBuilder::with_capacity(capacity));
77
Self {
78
validity,
79
length: 0,
80
content: NestedContent::Struct,
81
82
num_valids: 0,
83
num_invalids: 0,
84
}
85
}
86
87
fn take(mut self) -> (usize, Vec<i64>, Option<BitmapBuilder>) {
88
if !matches!(self.content, NestedContent::Primitive) {
89
if let Some(validity) = self.validity.as_mut() {
90
validity.extend_constant(self.num_valids, true);
91
validity.extend_constant(self.num_invalids, false);
92
}
93
94
debug_assert!(
95
self.validity
96
.as_ref()
97
.is_none_or(|v| v.len() == self.length)
98
);
99
}
100
101
self.num_valids = 0;
102
self.num_invalids = 0;
103
104
match self.content {
105
NestedContent::Primitive => {
106
debug_assert!(self.validity.is_none_or(|validity| validity.is_empty()));
107
(self.length, Vec::new(), None)
108
},
109
NestedContent::List { offsets } => (self.length, offsets, self.validity),
110
NestedContent::FixedSizeList { .. } => (self.length, Vec::new(), self.validity),
111
NestedContent::Struct => (self.length, Vec::new(), self.validity),
112
}
113
}
114
115
fn is_nullable(&self) -> bool {
116
self.validity.is_some()
117
}
118
119
fn is_repeated(&self) -> bool {
120
match self.content {
121
NestedContent::Primitive => false,
122
NestedContent::List { .. } => true,
123
NestedContent::FixedSizeList { .. } => true,
124
NestedContent::Struct => false,
125
}
126
}
127
128
fn is_required(&self) -> bool {
129
match self.content {
130
NestedContent::Primitive => false,
131
NestedContent::List { .. } => false,
132
NestedContent::FixedSizeList { .. } => false,
133
NestedContent::Struct => true,
134
}
135
}
136
137
/// number of rows
138
fn len(&self) -> usize {
139
self.length
140
}
141
142
fn invalid_num_values(&self) -> usize {
143
match &self.content {
144
NestedContent::Primitive => 1,
145
NestedContent::List { .. } => 0,
146
NestedContent::FixedSizeList { width } => *width,
147
NestedContent::Struct => 1,
148
}
149
}
150
151
fn push(&mut self, value: i64, is_valid: bool) {
152
let is_primitive = matches!(self.content, NestedContent::Primitive);
153
154
if is_valid && self.num_invalids != 0 {
155
debug_assert!(!is_primitive);
156
157
// @NOTE: Having invalid items might not necessarily mean that we have a validity mask.
158
//
159
// For instance, if we have a optional struct with a required list in it, that struct
160
// will have a validity mask and the list will not. In the arrow representation of this
161
// array, however, the list will still have invalid items where the struct is null.
162
//
163
// Array:
164
// [
165
// { 'x': [1] },
166
// None,
167
// { 'x': [1, 2] },
168
// ]
169
//
170
// Arrow:
171
// struct = [ list[0] None list[2] ]
172
// list = {
173
// values = [ 1, 1, 2 ],
174
// offsets = [ 0, 1, 1, 3 ],
175
// }
176
//
177
// Parquet:
178
// [ 1, 1, 2 ] + definition + repetition levels
179
//
180
// As you can see we need to insert an invalid item into the list even though it does
181
// not have a validity mask.
182
if let Some(validity) = self.validity.as_mut() {
183
validity.extend_constant(self.num_valids, true);
184
validity.extend_constant(self.num_invalids, false);
185
}
186
187
self.num_valids = 0;
188
self.num_invalids = 0;
189
}
190
191
self.num_valids += usize::from(!is_primitive & is_valid);
192
self.num_invalids += usize::from(!is_primitive & !is_valid);
193
194
self.length += 1;
195
if let NestedContent::List { offsets } = &mut self.content {
196
offsets.push(value);
197
}
198
}
199
200
fn push_default(&mut self, length: i64) {
201
let is_primitive = matches!(self.content, NestedContent::Primitive);
202
self.num_invalids += usize::from(!is_primitive);
203
204
self.length += 1;
205
if let NestedContent::List { offsets } = &mut self.content {
206
offsets.push(length);
207
}
208
}
209
}
210
211
/// Utility structure to create a `Filter` and `Validity` mask for the leaf values.
212
///
213
/// This batches the extending.
214
pub struct BatchedNestedDecoder<'a> {
215
pub(crate) num_waiting_valids: usize,
216
pub(crate) num_waiting_invalids: usize,
217
218
filter: &'a mut MutableBitmap,
219
validity: &'a mut MutableBitmap,
220
}
221
222
impl BatchedNestedDecoder<'_> {
223
fn push_valid(&mut self) -> ParquetResult<()> {
224
self.push_n_valids(1)
225
}
226
227
fn push_invalid(&mut self) -> ParquetResult<()> {
228
self.push_n_invalids(1)
229
}
230
231
fn push_n_valids(&mut self, n: usize) -> ParquetResult<()> {
232
if self.num_waiting_invalids == 0 {
233
self.num_waiting_valids += n;
234
return Ok(());
235
}
236
237
self.filter.extend_constant(self.num_waiting_valids, true);
238
self.validity.extend_constant(self.num_waiting_valids, true);
239
240
self.filter.extend_constant(self.num_waiting_invalids, true);
241
self.validity
242
.extend_constant(self.num_waiting_invalids, false);
243
244
self.num_waiting_valids = n;
245
self.num_waiting_invalids = 0;
246
247
Ok(())
248
}
249
250
fn push_n_invalids(&mut self, n: usize) -> ParquetResult<()> {
251
self.num_waiting_invalids += n;
252
Ok(())
253
}
254
255
fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
256
if self.num_waiting_valids > 0 {
257
self.filter.extend_constant(self.num_waiting_valids, true);
258
self.validity.extend_constant(self.num_waiting_valids, true);
259
self.num_waiting_valids = 0;
260
}
261
if self.num_waiting_invalids > 0 {
262
self.filter.extend_constant(self.num_waiting_invalids, true);
263
self.validity
264
.extend_constant(self.num_waiting_invalids, false);
265
self.num_waiting_invalids = 0;
266
}
267
268
self.filter.extend_constant(n, false);
269
self.validity.extend_constant(n, true);
270
271
Ok(())
272
}
273
274
fn finalize(self) -> ParquetResult<()> {
275
self.filter.extend_constant(self.num_waiting_valids, true);
276
self.validity.extend_constant(self.num_waiting_valids, true);
277
278
self.filter.extend_constant(self.num_waiting_invalids, true);
279
self.validity
280
.extend_constant(self.num_waiting_invalids, false);
281
282
Ok(())
283
}
284
}
285
286
/// The initial info of nested data types.
287
/// The `bool` indicates if the type is nullable.
288
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
289
pub enum InitNested {
290
/// Primitive data types
291
Primitive(bool),
292
/// List data types
293
List(bool),
294
/// Fixed-Size List data types
295
FixedSizeList(bool, usize),
296
/// Struct data types
297
Struct(bool),
298
}
299
300
/// Initialize [`NestedState`] from `&[InitNested]`.
301
pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState {
302
use {InitNested as IN, Nested as N};
303
304
let container = init
305
.iter()
306
.map(|init| match init {
307
IN::Primitive(is_nullable) => N::primitive(*is_nullable),
308
IN::List(is_nullable) => N::list_with_capacity(*is_nullable, capacity),
309
IN::FixedSizeList(is_nullable, width) => {
310
N::fixedlist_with_capacity(*is_nullable, *width, capacity)
311
},
312
IN::Struct(is_nullable) => N::struct_with_capacity(*is_nullable, capacity),
313
})
314
.collect();
315
316
NestedState::new(container)
317
}
318
319
/// The state of nested data types.
320
#[derive(Default)]
321
pub struct NestedState {
322
/// The nesteds composing `NestedState`.
323
nested: Vec<Nested>,
324
}
325
326
impl NestedState {
327
/// Creates a new [`NestedState`].
328
fn new(nested: Vec<Nested>) -> Self {
329
Self { nested }
330
}
331
332
pub fn pop(&mut self) -> Option<(usize, Vec<i64>, Option<BitmapBuilder>)> {
333
Some(self.nested.pop()?.take())
334
}
335
336
pub fn last(&self) -> Option<&NestedContent> {
337
self.nested.last().map(|v| &v.content)
338
}
339
340
/// The number of rows in this state
341
pub fn len(&self) -> usize {
342
// outermost is the number of rows
343
self.nested[0].len()
344
}
345
346
/// Returns the definition and repetition levels for each nesting level
347
fn levels(&self) -> (Vec<u16>, Vec<u16>) {
348
let depth = self.nested.len();
349
350
let mut def_levels = Vec::with_capacity(depth + 1);
351
let mut rep_levels = Vec::with_capacity(depth + 1);
352
353
def_levels.push(0);
354
rep_levels.push(0);
355
356
for i in 0..depth {
357
let nest = &self.nested[i];
358
359
let def_delta = nest.is_nullable() as u16 + nest.is_repeated() as u16;
360
let rep_delta = nest.is_repeated() as u16;
361
362
def_levels.push(def_levels[i] + def_delta);
363
rep_levels.push(rep_levels[i] + rep_delta);
364
}
365
366
(def_levels, rep_levels)
367
}
368
}
369
370
fn collect_level_values(
371
target: &mut Vec<u16>,
372
hybrid_rle: HybridRleDecoder<'_>,
373
) -> ParquetResult<()> {
374
target.reserve(hybrid_rle.len());
375
376
for chunk in hybrid_rle.into_chunk_iter() {
377
let chunk = chunk?;
378
379
match chunk {
380
HybridRleChunk::Rle(value, size) => {
381
target.resize(target.len() + size, value as u16);
382
},
383
HybridRleChunk::Bitpacked(decoder) => {
384
decoder.lower_element::<u16>()?.collect_into(target);
385
},
386
}
387
}
388
389
Ok(())
390
}
391
392
/// State to keep track of how many top-level values (i.e. rows) still need to be skipped and
393
/// collected.
394
///
395
/// This state should be kept between pages because a top-level value / row value may span several
396
/// pages.
397
///
398
/// - `num_skips = Some(n)` means that it will skip till the `n + 1`-th occurrence of the repetition
399
/// level of `0` (i.e. the start of a top-level value / row value).
400
/// - `num_collects = Some(n)` means that it will collect values till the `n + 1`-th occurrence of
401
/// the repetition level of `0` (i.e. the start of a top-level value / row value).
402
struct DecodingState {
403
num_skips: Option<usize>,
404
num_collects: Option<usize>,
405
}
406
407
#[allow(clippy::too_many_arguments)]
408
fn decode_nested(
409
mut current_def_levels: &[u16],
410
mut current_rep_levels: &[u16],
411
412
batched_collector: &mut BatchedNestedDecoder<'_>,
413
nested: &mut [Nested],
414
415
state: &mut DecodingState,
416
top_level_filter: &mut BitmapIter<'_>,
417
418
// Amortized allocations
419
def_levels: &[u16],
420
rep_levels: &[u16],
421
) -> ParquetResult<()> {
422
let max_depth = nested.len();
423
let leaf_def_level = *def_levels.last().unwrap();
424
425
while !current_def_levels.is_empty() {
426
debug_assert_eq!(current_def_levels.len(), current_rep_levels.len());
427
428
// Handle skips
429
if let Some(ref mut num_skips) = state.num_skips {
430
let mut i = 0;
431
let mut num_skipped_values = 0;
432
while i < current_def_levels.len() && (*num_skips > 0 || current_rep_levels[i] != 0) {
433
let def = current_def_levels[i];
434
let rep = current_rep_levels[i];
435
436
*num_skips -= usize::from(rep == 0);
437
i += 1;
438
439
// @NOTE:
440
// We don't need to account for higher def-levels that imply extra values, since we
441
// don't have those higher levels either.
442
num_skipped_values += usize::from(def == leaf_def_level);
443
}
444
batched_collector.skip_in_place(num_skipped_values)?;
445
446
current_def_levels = &current_def_levels[i..];
447
current_rep_levels = &current_rep_levels[i..];
448
449
if current_def_levels.is_empty() {
450
break;
451
} else {
452
state.num_skips = None;
453
}
454
}
455
456
// Handle collects
457
if let Some(ref mut num_collects) = state.num_collects {
458
let mut i = 0;
459
while i < current_def_levels.len() && (*num_collects > 0 || current_rep_levels[i] != 0)
460
{
461
let def = current_def_levels[i];
462
let rep = current_rep_levels[i];
463
464
*num_collects -= usize::from(rep == 0);
465
i += 1;
466
467
let mut is_required = false;
468
469
for depth in 0..max_depth {
470
// Defines whether this element is defined at `depth`
471
//
472
// e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and
473
// [ [ [ 1 ] ] ].
474
let is_defined_at_this_depth =
475
rep <= rep_levels[depth] && def >= def_levels[depth];
476
477
let length = nested
478
.get(depth + 1)
479
.map(|x| x.len() as i64)
480
// the last depth is the leaf, which is always increased by 1
481
.unwrap_or(1);
482
483
let nest = &mut nested[depth];
484
485
let is_valid = !nest.is_nullable() || def > def_levels[depth];
486
487
if is_defined_at_this_depth && !is_valid {
488
let mut num_elements = 1;
489
490
nest.push(length, is_valid);
491
492
for embed_depth in depth..max_depth {
493
let embed_length = nested
494
.get(embed_depth + 1)
495
.map(|x| x.len() as i64)
496
// the last depth is the leaf, which is always increased by 1
497
.unwrap_or(1);
498
499
let embed_nest = &mut nested[embed_depth];
500
501
if embed_depth > depth {
502
for _ in 0..num_elements {
503
embed_nest.push_default(embed_length);
504
}
505
}
506
507
let embed_num_values = embed_nest.invalid_num_values();
508
num_elements *= embed_num_values;
509
510
if embed_num_values == 0 {
511
break;
512
}
513
}
514
515
batched_collector.push_n_invalids(num_elements)?;
516
517
break;
518
}
519
520
if is_required || is_defined_at_this_depth {
521
nest.push(length, is_valid);
522
523
if depth == max_depth - 1 {
524
// the leaf / primitive
525
let is_valid = (def != def_levels[depth]) || !nest.is_nullable();
526
527
if is_valid {
528
batched_collector.push_valid()?;
529
} else {
530
batched_collector.push_invalid()?;
531
}
532
}
533
}
534
535
is_required = (is_required || is_defined_at_this_depth)
536
&& nest.is_required()
537
&& !is_valid;
538
}
539
}
540
541
current_def_levels = &current_def_levels[i..];
542
current_rep_levels = &current_rep_levels[i..];
543
544
if current_def_levels.is_empty() {
545
break;
546
} else {
547
state.num_collects = None;
548
}
549
}
550
551
if top_level_filter.num_remaining() == 0 {
552
break;
553
}
554
555
state.num_skips = Some(top_level_filter.take_leading_zeros()).filter(|v| *v != 0);
556
state.num_collects = Some(top_level_filter.take_leading_ones()).filter(|v| *v != 0);
557
}
558
559
Ok(())
560
}
561
562
/// Return the definition and repetition level iterators for this page.
563
fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder<'_>, HybridRleDecoder<'_>)> {
564
let split = split_buffer(page)?;
565
let def = split.def;
566
let rep = split.rep;
567
568
let max_def_level = page.descriptor.max_def_level;
569
let max_rep_level = page.descriptor.max_rep_level;
570
571
let def_iter = HybridRleDecoder::new(def, get_bit_width(max_def_level), page.num_values());
572
let rep_iter = HybridRleDecoder::new(rep, get_bit_width(max_rep_level), page.num_values());
573
574
Ok((def_iter, rep_iter))
575
}
576
577
impl<D: utils::Decoder> PageDecoder<D> {
578
pub fn collect_nested(
579
mut self,
580
filter: Option<Filter>,
581
) -> ParquetResult<(NestedState, Vec<D::Output>, Bitmap)> {
582
let init = self.init_nested.as_mut().unwrap();
583
584
// @TODO: We should probably count the filter so that we don't overallocate
585
let mut target = self.decoder.with_capacity(self.iter.total_num_values());
586
// @TODO: Self capacity
587
let mut nested_state = init_nested(init, 0);
588
589
if let Some(dict) = self.dict.as_ref() {
590
self.decoder.apply_dictionary(&mut target, dict)?;
591
}
592
593
// Amortize the allocations.
594
let (def_levels, rep_levels) = nested_state.levels();
595
596
let mut current_def_levels = Vec::<u16>::new();
597
let mut current_rep_levels = Vec::<u16>::new();
598
599
let (mut decode_state, top_level_filter) = match filter {
600
None => (
601
DecodingState {
602
num_skips: None,
603
num_collects: Some(usize::MAX),
604
},
605
Bitmap::new(),
606
),
607
Some(Filter::Range(range)) => (
608
DecodingState {
609
num_skips: Some(range.start),
610
num_collects: Some(range.len()),
611
},
612
Bitmap::new(),
613
),
614
Some(Filter::Mask(mask)) => (
615
DecodingState {
616
num_skips: None,
617
num_collects: None,
618
},
619
mask,
620
),
621
Some(Filter::Predicate(_)) => todo!(),
622
};
623
624
let mut top_level_filter = top_level_filter.iter();
625
626
let mut chunks = Vec::new();
627
while let Some(page) = self.iter.next() {
628
let page = page?;
629
let page = page.decompress(&mut self.iter)?;
630
631
let (mut def_iter, mut rep_iter) = level_iters(&page)?;
632
633
let num_levels = def_iter.len().min(rep_iter.len());
634
def_iter.limit_to(num_levels);
635
rep_iter.limit_to(num_levels);
636
637
current_def_levels.clear();
638
current_rep_levels.clear();
639
640
collect_level_values(&mut current_def_levels, def_iter)?;
641
collect_level_values(&mut current_rep_levels, rep_iter)?;
642
643
let mut leaf_filter = MutableBitmap::new();
644
let mut leaf_validity = MutableBitmap::new();
645
646
// @TODO: move this to outside the loop.
647
let mut batched_collector = BatchedNestedDecoder {
648
num_waiting_valids: 0,
649
num_waiting_invalids: 0,
650
651
filter: &mut leaf_filter,
652
validity: &mut leaf_validity,
653
};
654
655
decode_nested(
656
&current_def_levels,
657
&current_rep_levels,
658
&mut batched_collector,
659
&mut nested_state.nested,
660
&mut decode_state,
661
&mut top_level_filter,
662
&def_levels,
663
&rep_levels,
664
)?;
665
666
batched_collector.finalize()?;
667
668
let leaf_validity = leaf_validity.freeze();
669
let leaf_filter = leaf_filter.freeze();
670
671
let state = utils::State::new_nested(
672
&self.decoder,
673
&page,
674
self.dict.as_ref(),
675
Some(leaf_validity),
676
)?;
677
state.decode(
678
&mut self.decoder,
679
&mut target,
680
Some(Filter::Mask(leaf_filter)),
681
&mut chunks,
682
)?;
683
684
self.iter.reuse_page_buffer(page);
685
}
686
687
// we pop the primitive off here.
688
debug_assert!(matches!(
689
nested_state.nested.last().unwrap().content,
690
NestedContent::Primitive
691
));
692
_ = nested_state.pop().unwrap();
693
694
if target.len() > 0 || chunks.is_empty() {
695
chunks.push(self.decoder.finalize(self.dtype, self.dict, target)?);
696
}
697
698
Ok((nested_state, chunks, Bitmap::new()))
699
}
700
}
701
702