Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
8475 views
1
use arrow::array::{BooleanArray, Splitable};
2
use arrow::bitmap::bitmask::BitMask;
3
use arrow::bitmap::utils::BitmapIter;
4
use arrow::bitmap::{Bitmap, BitmapBuilder};
5
use arrow::datatypes::ArrowDataType;
6
use polars_compute::filter::filter_boolean_kernel;
7
8
use super::Filter;
9
use super::dictionary_encoded::{append_validity, constrain_page_validity};
10
use super::utils::{
11
self, Decoded, Decoder, decode_hybrid_rle_into_bitmap, filter_from_range, freeze_validity,
12
};
13
use crate::parquet::encoding::Encoding;
14
use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};
15
use crate::parquet::error::ParquetResult;
16
use crate::parquet::page::{DataPage, DictPage, split_buffer};
17
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
18
19
#[allow(clippy::large_enum_variant)]
20
#[derive(Debug)]
21
pub(crate) enum StateTranslation<'a> {
22
Plain(BitMask<'a>),
23
Rle(HybridRleDecoder<'a>),
24
}
25
26
impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {
27
type PlainDecoder = BitmapIter<'a>;
28
29
fn new(
30
_decoder: &BooleanDecoder,
31
page: &'a DataPage,
32
_dict: Option<&'a <BooleanDecoder as Decoder>::Dict>,
33
page_validity: Option<&Bitmap>,
34
) -> ParquetResult<Self> {
35
let values = split_buffer(page)?.values;
36
37
match page.encoding() {
38
Encoding::Plain => {
39
let max_num_values = values.len() * u8::BITS as usize;
40
let num_values = if page_validity.is_some() {
41
// @NOTE: We overestimate the amount of values here, but in the V1
42
// specification we don't really have a way to know the number of valid items.
43
// Without traversing the list.
44
max_num_values
45
} else {
46
// @NOTE: We cannot really trust the value from this as it might relate to the
47
// number of top-level nested values. Therefore, we do a `min` with the maximum
48
// number of possible values.
49
usize::min(page.num_values(), max_num_values)
50
};
51
52
Ok(Self::Plain(BitMask::new(values, 0, num_values)))
53
},
54
Encoding::Rle => {
55
// @NOTE: For a nullable list, we might very well overestimate the amount of
56
// values, but we never collect those items. We don't really have a way to know the
57
// number of valid items in the V1 specification.
58
59
// For RLE boolean values the length in bytes is pre-pended.
60
// https://github.com/apache/parquet-format/blob/e517ac4dbe08d518eb5c2e58576d4c711973db94/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
61
let (_len_in_bytes, values) = values.split_at(4);
62
Ok(Self::Rle(HybridRleDecoder::new(
63
values,
64
1,
65
page.num_values(),
66
)))
67
},
68
_ => Err(utils::not_implemented(page)),
69
}
70
}
71
fn num_rows(&self) -> usize {
72
match self {
73
Self::Plain(m) => m.len(),
74
Self::Rle(m) => m.len(),
75
}
76
}
77
}
78
79
fn decode_required_rle(
80
values: HybridRleDecoder<'_>,
81
limit: Option<usize>,
82
target: &mut BitmapBuilder,
83
) -> ParquetResult<()> {
84
decode_hybrid_rle_into_bitmap(values, limit, target)?;
85
Ok(())
86
}
87
88
fn decode_optional_rle(
89
values: HybridRleDecoder<'_>,
90
target: &mut BitmapBuilder,
91
page_validity: &Bitmap,
92
) -> ParquetResult<()> {
93
debug_assert!(page_validity.set_bits() <= values.len());
94
95
if page_validity.unset_bits() == 0 {
96
return decode_required_rle(values, Some(page_validity.len()), target);
97
}
98
99
target.reserve(page_validity.len());
100
101
let mut validity_mask = BitMask::from_bitmap(page_validity);
102
103
for chunk in values.into_chunk_iter() {
104
let chunk = chunk?;
105
106
match chunk {
107
HybridRleChunk::Rle(value, size) => {
108
let offset = validity_mask
109
.nth_set_bit_idx(size, 0)
110
.unwrap_or(validity_mask.len());
111
112
let t;
113
(t, validity_mask) = validity_mask.split_at(offset);
114
115
target.extend_constant(t.len(), value != 0);
116
},
117
HybridRleChunk::Bitpacked(decoder) => {
118
let decoder_slice = decoder.as_slice();
119
let offset = validity_mask
120
.nth_set_bit_idx(decoder.len(), 0)
121
.unwrap_or(validity_mask.len());
122
123
let decoder_validity;
124
(decoder_validity, validity_mask) = validity_mask.split_at(offset);
125
126
let mut offset = 0;
127
let mut validity_iter = decoder_validity.iter();
128
while validity_iter.num_remaining() > 0 {
129
let num_valid = validity_iter.take_leading_ones();
130
target.extend_from_slice(decoder_slice, offset, num_valid);
131
offset += num_valid;
132
133
let num_invalid = validity_iter.take_leading_zeros();
134
target.extend_constant(num_invalid, false);
135
}
136
},
137
}
138
}
139
140
if cfg!(debug_assertions) {
141
assert_eq!(validity_mask.set_bits(), 0);
142
}
143
target.extend_constant(validity_mask.len(), false);
144
145
Ok(())
146
}
147
148
fn decode_masked_required_rle(
149
values: HybridRleDecoder<'_>,
150
target: &mut BitmapBuilder,
151
mask: &Bitmap,
152
) -> ParquetResult<()> {
153
debug_assert!(mask.len() <= values.len());
154
155
if mask.unset_bits() == 0 {
156
return decode_required_rle(values, Some(mask.len()), target);
157
}
158
159
let mut im_target = BitmapBuilder::new();
160
decode_required_rle(values, Some(mask.len()), &mut im_target)?;
161
162
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));
163
164
Ok(())
165
}
166
167
fn decode_masked_optional_rle(
168
values: HybridRleDecoder<'_>,
169
target: &mut BitmapBuilder,
170
page_validity: &Bitmap,
171
mask: &Bitmap,
172
) -> ParquetResult<()> {
173
debug_assert_eq!(page_validity.len(), mask.len());
174
debug_assert!(mask.len() <= values.len());
175
176
if mask.unset_bits() == 0 {
177
return decode_optional_rle(values, target, page_validity);
178
}
179
180
if page_validity.unset_bits() == 0 {
181
return decode_masked_required_rle(values, target, mask);
182
}
183
184
let mut im_target = BitmapBuilder::new();
185
decode_optional_rle(values, &mut im_target, page_validity)?;
186
187
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));
188
189
Ok(())
190
}
191
192
fn decode_required_plain(values: BitMask<'_>, target: &mut BitmapBuilder) -> ParquetResult<()> {
193
target.extend_from_bitmask(values);
194
Ok(())
195
}
196
197
fn decode_optional_plain(
198
mut values: BitMask<'_>,
199
target: &mut BitmapBuilder,
200
mut page_validity: Bitmap,
201
) -> ParquetResult<()> {
202
debug_assert!(page_validity.set_bits() <= values.len());
203
204
if page_validity.unset_bits() == 0 {
205
return decode_required_plain(values.sliced(0, page_validity.len()), target);
206
}
207
208
target.reserve(page_validity.len());
209
210
while !page_validity.is_empty() {
211
let num_valid = page_validity.take_leading_ones();
212
let iv;
213
(iv, values) = values.split_at(num_valid);
214
target.extend_from_bitmask(iv);
215
216
let num_invalid = page_validity.take_leading_zeros();
217
target.extend_constant(num_invalid, false);
218
}
219
220
Ok(())
221
}
222
223
fn decode_masked_required_plain(
224
mut values: BitMask,
225
target: &mut BitmapBuilder,
226
mut mask: Bitmap,
227
) -> ParquetResult<()> {
228
debug_assert!(mask.len() <= values.len());
229
230
let leading_zeros = mask.take_leading_zeros();
231
mask.take_trailing_zeros();
232
233
values = values.sliced(leading_zeros, mask.len());
234
235
if mask.unset_bits() == 0 {
236
return decode_required_plain(values, target);
237
}
238
239
let mut im_target = BitmapBuilder::new();
240
decode_required_plain(values, &mut im_target)?;
241
242
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));
243
244
Ok(())
245
}
246
247
fn decode_masked_optional_plain(
248
mut values: BitMask<'_>,
249
target: &mut BitmapBuilder,
250
mut page_validity: Bitmap,
251
mut mask: Bitmap,
252
) -> ParquetResult<()> {
253
debug_assert_eq!(page_validity.len(), mask.len());
254
debug_assert!(page_validity.set_bits() <= values.len());
255
256
let leading_zeros = mask.take_leading_zeros();
257
mask.take_trailing_zeros();
258
259
let (skipped, truncated);
260
(skipped, page_validity) = page_validity.split_at(leading_zeros);
261
(page_validity, truncated) = page_validity.split_at(mask.len());
262
263
let skipped_values = skipped.set_bits();
264
let truncated_values = truncated.set_bits();
265
values = values.sliced(
266
skipped_values,
267
values.len() - skipped_values - truncated_values,
268
);
269
270
if mask.unset_bits() == 0 {
271
return decode_optional_plain(values, target, page_validity);
272
}
273
274
if page_validity.unset_bits() == 0 {
275
return decode_masked_required_plain(values, target, mask);
276
}
277
278
let mut im_target = BitmapBuilder::new();
279
decode_optional_plain(values, &mut im_target, page_validity)?;
280
281
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));
282
283
Ok(())
284
}
285
286
impl Decoded for (BitmapBuilder, BitmapBuilder) {
287
fn len(&self) -> usize {
288
self.0.len()
289
}
290
291
fn extend_nulls(&mut self, n: usize) {
292
self.0.extend_constant(n, false);
293
self.1.extend_constant(n, false);
294
}
295
296
fn remaining_capacity(&self) -> usize {
297
self.0.capacity().min(self.1.capacity())
298
}
299
}
300
301
pub(crate) struct BooleanDecoder;
302
303
impl Decoder for BooleanDecoder {
304
type Translation<'a> = StateTranslation<'a>;
305
type Dict = BooleanArray;
306
type DecodedState = (BitmapBuilder, BitmapBuilder);
307
type Output = BooleanArray;
308
309
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
310
(
311
BitmapBuilder::with_capacity(capacity),
312
BitmapBuilder::with_capacity(capacity),
313
)
314
}
315
316
fn deserialize_dict(&mut self, _: DictPage) -> ParquetResult<Self::Dict> {
317
Ok(BooleanArray::new_empty(ArrowDataType::Boolean))
318
}
319
320
fn finalize(
321
&self,
322
dtype: ArrowDataType,
323
_dict: Option<Self::Dict>,
324
(values, validity): Self::DecodedState,
325
) -> ParquetResult<Self::Output> {
326
let validity = freeze_validity(validity);
327
Ok(BooleanArray::new(dtype, values.freeze(), validity))
328
}
329
330
fn evaluate_predicate(
331
&mut self,
332
_state: &utils::State<'_, Self>,
333
_predicate: Option<&SpecializedParquetColumnExpr>,
334
_pred_true_mask: &mut BitmapBuilder,
335
_dict_mask: Option<&Bitmap>,
336
) -> ParquetResult<bool> {
337
// @TODO: This can be enabled for the fast paths
338
Ok(false)
339
}
340
341
fn extend_decoded(
342
&self,
343
decoded: &mut Self::DecodedState,
344
additional: &dyn arrow::array::Array,
345
is_optional: bool,
346
) -> ParquetResult<()> {
347
let additional = additional.as_any().downcast_ref::<BooleanArray>().unwrap();
348
decoded.0.extend_from_bitmap(additional.values());
349
match additional.validity() {
350
Some(v) => decoded.1.extend_from_bitmap(v),
351
None if is_optional => decoded.1.extend_constant(additional.len(), true),
352
None => {},
353
}
354
355
Ok(())
356
}
357
358
fn extend_filtered_with_state(
359
&mut self,
360
state: utils::State<'_, Self>,
361
(target, validity): &mut Self::DecodedState,
362
filter: Option<super::Filter>,
363
_chunks: &mut Vec<Self::Output>,
364
) -> ParquetResult<()> {
365
match state.translation {
366
StateTranslation::Plain(values) => {
367
if state.is_optional {
368
append_validity(
369
state.page_validity.as_ref(),
370
filter.as_ref(),
371
validity,
372
values.len(),
373
);
374
}
375
376
let page_validity = constrain_page_validity(
377
values.len(),
378
state.page_validity.as_ref(),
379
filter.as_ref(),
380
);
381
382
match (filter, page_validity) {
383
(None, None) => decode_required_plain(values, target),
384
(Some(Filter::Range(rng)), None) => {
385
decode_required_plain(values.sliced(rng.start, rng.len()), target)
386
},
387
(None, Some(page_validity)) => {
388
decode_optional_plain(values, target, page_validity)
389
},
390
(Some(Filter::Range(rng)), Some(mut page_validity)) => {
391
let (skipped, truncated);
392
(skipped, page_validity) = page_validity.split_at(rng.start);
393
(page_validity, truncated) = page_validity.split_at(rng.len());
394
395
let skipped_values = skipped.set_bits();
396
let truncated_values = truncated.set_bits();
397
let values = values.sliced(
398
skipped_values,
399
values.len() - skipped_values - truncated_values,
400
);
401
402
decode_optional_plain(values, target, page_validity)
403
},
404
(Some(Filter::Mask(mask)), None) => {
405
decode_masked_required_plain(values, target, mask)
406
},
407
(Some(Filter::Mask(mask)), Some(page_validity)) => {
408
decode_masked_optional_plain(values, target, page_validity, mask)
409
},
410
(Some(Filter::Predicate(_)), _) => todo!(),
411
}?;
412
413
Ok(())
414
},
415
StateTranslation::Rle(values) => {
416
if state.is_optional {
417
append_validity(
418
state.page_validity.as_ref(),
419
filter.as_ref(),
420
validity,
421
values.len(),
422
);
423
}
424
425
let page_validity = constrain_page_validity(
426
values.len(),
427
state.page_validity.as_ref(),
428
filter.as_ref(),
429
);
430
431
match (filter, page_validity) {
432
(None, None) => decode_required_rle(values, None, target),
433
(Some(Filter::Range(rng)), None) if rng.start == 0 => {
434
decode_required_rle(values, Some(rng.end), target)
435
},
436
(None, Some(page_validity)) => {
437
decode_optional_rle(values, target, &page_validity)
438
},
439
(Some(Filter::Range(rng)), Some(page_validity)) if rng.start == 0 => {
440
decode_optional_rle(values, target, &page_validity)
441
},
442
(Some(Filter::Mask(filter)), None) => {
443
decode_masked_required_rle(values, target, &filter)
444
},
445
(Some(Filter::Mask(filter)), Some(page_validity)) => {
446
decode_masked_optional_rle(values, target, &page_validity, &filter)
447
},
448
(Some(Filter::Range(rng)), None) => {
449
decode_masked_required_rle(values, target, &filter_from_range(rng))
450
},
451
(Some(Filter::Range(rng)), Some(page_validity)) => decode_masked_optional_rle(
452
values,
453
target,
454
&page_validity,
455
&filter_from_range(rng),
456
),
457
(Some(Filter::Predicate(_)), _) => todo!(),
458
}?;
459
460
Ok(())
461
},
462
}
463
}
464
465
fn extend_constant(
466
&mut self,
467
decoded: &mut Self::DecodedState,
468
length: usize,
469
value: &ParquetScalar,
470
) -> ParquetResult<()> {
471
if value.is_null() {
472
decoded.extend_nulls(length);
473
return Ok(());
474
}
475
476
let value = value.as_bool().unwrap();
477
478
decoded.0.extend_constant(length, value);
479
decoded.1.extend_constant(length, true);
480
481
Ok(())
482
}
483
}
484
485