Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
6940 views
1
use arrow::array::{BooleanArray, Splitable};
2
use arrow::bitmap::bitmask::BitMask;
3
use arrow::bitmap::utils::BitmapIter;
4
use arrow::bitmap::{Bitmap, BitmapBuilder};
5
use arrow::datatypes::ArrowDataType;
6
use polars_compute::filter::filter_boolean_kernel;
7
8
use super::dictionary_encoded::{append_validity, constrain_page_validity};
9
use super::utils::{
10
self, Decoded, Decoder, decode_hybrid_rle_into_bitmap, filter_from_range, freeze_validity,
11
};
12
use super::{Filter, PredicateFilter};
13
use crate::parquet::encoding::Encoding;
14
use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};
15
use crate::parquet::error::ParquetResult;
16
use crate::parquet::page::{DataPage, DictPage, split_buffer};
17
18
#[allow(clippy::large_enum_variant)]
19
#[derive(Debug)]
20
pub(crate) enum StateTranslation<'a> {
21
Plain(BitMask<'a>),
22
Rle(HybridRleDecoder<'a>),
23
}
24
25
impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {
26
type PlainDecoder = BitmapIter<'a>;
27
28
fn new(
29
_decoder: &BooleanDecoder,
30
page: &'a DataPage,
31
_dict: Option<&'a <BooleanDecoder as Decoder>::Dict>,
32
page_validity: Option<&Bitmap>,
33
) -> ParquetResult<Self> {
34
let values = split_buffer(page)?.values;
35
36
match page.encoding() {
37
Encoding::Plain => {
38
let max_num_values = values.len() * u8::BITS as usize;
39
let num_values = if page_validity.is_some() {
40
// @NOTE: We overestimate the amount of values here, but in the V1
41
// specification we don't really have a way to know the number of valid items.
42
// Without traversing the list.
43
max_num_values
44
} else {
45
// @NOTE: We cannot really trust the value from this as it might relate to the
46
// number of top-level nested values. Therefore, we do a `min` with the maximum
47
// number of possible values.
48
usize::min(page.num_values(), max_num_values)
49
};
50
51
Ok(Self::Plain(BitMask::new(values, 0, num_values)))
52
},
53
Encoding::Rle => {
54
// @NOTE: For a nullable list, we might very well overestimate the amount of
55
// values, but we never collect those items. We don't really have a way to know the
56
// number of valid items in the V1 specification.
57
58
// For RLE boolean values the length in bytes is pre-pended.
59
// https://github.com/apache/parquet-format/blob/e517ac4dbe08d518eb5c2e58576d4c711973db94/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3
60
let (_len_in_bytes, values) = values.split_at(4);
61
Ok(Self::Rle(HybridRleDecoder::new(
62
values,
63
1,
64
page.num_values(),
65
)))
66
},
67
_ => Err(utils::not_implemented(page)),
68
}
69
}
70
fn num_rows(&self) -> usize {
71
match self {
72
Self::Plain(m) => m.len(),
73
Self::Rle(m) => m.len(),
74
}
75
}
76
}
77
78
fn decode_required_rle(
79
values: HybridRleDecoder<'_>,
80
limit: Option<usize>,
81
target: &mut BitmapBuilder,
82
) -> ParquetResult<()> {
83
decode_hybrid_rle_into_bitmap(values, limit, target)?;
84
Ok(())
85
}
86
87
fn decode_optional_rle(
88
values: HybridRleDecoder<'_>,
89
target: &mut BitmapBuilder,
90
page_validity: &Bitmap,
91
) -> ParquetResult<()> {
92
debug_assert!(page_validity.set_bits() <= values.len());
93
94
if page_validity.unset_bits() == 0 {
95
return decode_required_rle(values, Some(page_validity.len()), target);
96
}
97
98
target.reserve(page_validity.len());
99
100
let mut validity_mask = BitMask::from_bitmap(page_validity);
101
102
for chunk in values.into_chunk_iter() {
103
let chunk = chunk?;
104
105
match chunk {
106
HybridRleChunk::Rle(value, size) => {
107
let offset = validity_mask
108
.nth_set_bit_idx(size, 0)
109
.unwrap_or(validity_mask.len());
110
111
let t;
112
(t, validity_mask) = validity_mask.split_at(offset);
113
114
target.extend_constant(t.len(), value != 0);
115
},
116
HybridRleChunk::Bitpacked(decoder) => {
117
let decoder_slice = decoder.as_slice();
118
let offset = validity_mask
119
.nth_set_bit_idx(decoder.len(), 0)
120
.unwrap_or(validity_mask.len());
121
122
let decoder_validity;
123
(decoder_validity, validity_mask) = validity_mask.split_at(offset);
124
125
let mut offset = 0;
126
let mut validity_iter = decoder_validity.iter();
127
while validity_iter.num_remaining() > 0 {
128
let num_valid = validity_iter.take_leading_ones();
129
target.extend_from_slice(decoder_slice, offset, num_valid);
130
offset += num_valid;
131
132
let num_invalid = validity_iter.take_leading_zeros();
133
target.extend_constant(num_invalid, false);
134
}
135
},
136
}
137
}
138
139
if cfg!(debug_assertions) {
140
assert_eq!(validity_mask.set_bits(), 0);
141
}
142
target.extend_constant(validity_mask.len(), false);
143
144
Ok(())
145
}
146
147
fn decode_masked_required_rle(
148
values: HybridRleDecoder<'_>,
149
target: &mut BitmapBuilder,
150
mask: &Bitmap,
151
) -> ParquetResult<()> {
152
debug_assert!(mask.len() <= values.len());
153
154
if mask.unset_bits() == 0 {
155
return decode_required_rle(values, Some(mask.len()), target);
156
}
157
158
let mut im_target = BitmapBuilder::new();
159
decode_required_rle(values, Some(mask.len()), &mut im_target)?;
160
161
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));
162
163
Ok(())
164
}
165
166
fn decode_masked_optional_rle(
167
values: HybridRleDecoder<'_>,
168
target: &mut BitmapBuilder,
169
page_validity: &Bitmap,
170
mask: &Bitmap,
171
) -> ParquetResult<()> {
172
debug_assert_eq!(page_validity.len(), mask.len());
173
debug_assert!(mask.len() <= values.len());
174
175
if mask.unset_bits() == 0 {
176
return decode_optional_rle(values, target, page_validity);
177
}
178
179
if page_validity.unset_bits() == 0 {
180
return decode_masked_required_rle(values, target, mask);
181
}
182
183
let mut im_target = BitmapBuilder::new();
184
decode_optional_rle(values, &mut im_target, page_validity)?;
185
186
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), mask));
187
188
Ok(())
189
}
190
191
fn decode_required_plain(values: BitMask<'_>, target: &mut BitmapBuilder) -> ParquetResult<()> {
192
target.extend_from_bitmask(values);
193
Ok(())
194
}
195
196
fn decode_optional_plain(
197
mut values: BitMask<'_>,
198
target: &mut BitmapBuilder,
199
mut page_validity: Bitmap,
200
) -> ParquetResult<()> {
201
debug_assert!(page_validity.set_bits() <= values.len());
202
203
if page_validity.unset_bits() == 0 {
204
return decode_required_plain(values.sliced(0, page_validity.len()), target);
205
}
206
207
target.reserve(page_validity.len());
208
209
while !page_validity.is_empty() {
210
let num_valid = page_validity.take_leading_ones();
211
let iv;
212
(iv, values) = values.split_at(num_valid);
213
target.extend_from_bitmask(iv);
214
215
let num_invalid = page_validity.take_leading_zeros();
216
target.extend_constant(num_invalid, false);
217
}
218
219
Ok(())
220
}
221
222
fn decode_masked_required_plain(
223
mut values: BitMask,
224
target: &mut BitmapBuilder,
225
mut mask: Bitmap,
226
) -> ParquetResult<()> {
227
debug_assert!(mask.len() <= values.len());
228
229
let leading_zeros = mask.take_leading_zeros();
230
mask.take_trailing_zeros();
231
232
values = values.sliced(leading_zeros, mask.len());
233
234
if mask.unset_bits() == 0 {
235
return decode_required_plain(values, target);
236
}
237
238
let mut im_target = BitmapBuilder::new();
239
decode_required_plain(values, &mut im_target)?;
240
241
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));
242
243
Ok(())
244
}
245
246
fn decode_masked_optional_plain(
247
mut values: BitMask<'_>,
248
target: &mut BitmapBuilder,
249
mut page_validity: Bitmap,
250
mut mask: Bitmap,
251
) -> ParquetResult<()> {
252
debug_assert_eq!(page_validity.len(), mask.len());
253
debug_assert!(page_validity.set_bits() <= values.len());
254
255
let leading_zeros = mask.take_leading_zeros();
256
mask.take_trailing_zeros();
257
258
let (skipped, truncated);
259
(skipped, page_validity) = page_validity.split_at(leading_zeros);
260
(page_validity, truncated) = page_validity.split_at(mask.len());
261
262
let skipped_values = skipped.set_bits();
263
let truncated_values = truncated.set_bits();
264
values = values.sliced(
265
skipped_values,
266
values.len() - skipped_values - truncated_values,
267
);
268
269
if mask.unset_bits() == 0 {
270
return decode_optional_plain(values, target, page_validity);
271
}
272
273
if page_validity.unset_bits() == 0 {
274
return decode_masked_required_plain(values, target, mask);
275
}
276
277
let mut im_target = BitmapBuilder::new();
278
decode_optional_plain(values, &mut im_target, page_validity)?;
279
280
target.extend_from_bitmap(&filter_boolean_kernel(&im_target.freeze(), &mask));
281
282
Ok(())
283
}
284
285
impl Decoded for (BitmapBuilder, BitmapBuilder) {
286
fn len(&self) -> usize {
287
self.0.len()
288
}
289
fn extend_nulls(&mut self, n: usize) {
290
self.0.extend_constant(n, false);
291
self.1.extend_constant(n, false);
292
}
293
}
294
295
pub(crate) struct BooleanDecoder;
296
297
impl Decoder for BooleanDecoder {
298
type Translation<'a> = StateTranslation<'a>;
299
type Dict = BooleanArray;
300
type DecodedState = (BitmapBuilder, BitmapBuilder);
301
type Output = BooleanArray;
302
303
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
304
(
305
BitmapBuilder::with_capacity(capacity),
306
BitmapBuilder::with_capacity(capacity),
307
)
308
}
309
310
fn deserialize_dict(&mut self, _: DictPage) -> ParquetResult<Self::Dict> {
311
Ok(BooleanArray::new_empty(ArrowDataType::Boolean))
312
}
313
314
fn finalize(
315
&self,
316
dtype: ArrowDataType,
317
_dict: Option<Self::Dict>,
318
(values, validity): Self::DecodedState,
319
) -> ParquetResult<Self::Output> {
320
let validity = freeze_validity(validity);
321
Ok(BooleanArray::new(dtype, values.freeze(), validity))
322
}
323
324
fn has_predicate_specialization(
325
&self,
326
_state: &utils::State<'_, Self>,
327
_predicate: &PredicateFilter,
328
) -> ParquetResult<bool> {
329
// @TODO: This can be enabled for the fast paths
330
Ok(false)
331
}
332
333
fn extend_decoded(
334
&self,
335
decoded: &mut Self::DecodedState,
336
additional: &dyn arrow::array::Array,
337
is_optional: bool,
338
) -> ParquetResult<()> {
339
let additional = additional.as_any().downcast_ref::<BooleanArray>().unwrap();
340
decoded.0.extend_from_bitmap(additional.values());
341
match additional.validity() {
342
Some(v) => decoded.1.extend_from_bitmap(v),
343
None if is_optional => decoded.1.extend_constant(additional.len(), true),
344
None => {},
345
}
346
347
Ok(())
348
}
349
350
fn extend_filtered_with_state(
351
&mut self,
352
state: utils::State<'_, Self>,
353
(target, validity): &mut Self::DecodedState,
354
_pred_true_mask: &mut BitmapBuilder,
355
filter: Option<super::Filter>,
356
) -> ParquetResult<()> {
357
match state.translation {
358
StateTranslation::Plain(values) => {
359
if state.is_optional {
360
append_validity(
361
state.page_validity.as_ref(),
362
filter.as_ref(),
363
validity,
364
values.len(),
365
);
366
}
367
368
let page_validity = constrain_page_validity(
369
values.len(),
370
state.page_validity.as_ref(),
371
filter.as_ref(),
372
);
373
374
match (filter, page_validity) {
375
(None, None) => decode_required_plain(values, target),
376
(Some(Filter::Range(rng)), None) => {
377
decode_required_plain(values.sliced(rng.start, rng.len()), target)
378
},
379
(None, Some(page_validity)) => {
380
decode_optional_plain(values, target, page_validity)
381
},
382
(Some(Filter::Range(rng)), Some(mut page_validity)) => {
383
let (skipped, truncated);
384
(skipped, page_validity) = page_validity.split_at(rng.start);
385
(page_validity, truncated) = page_validity.split_at(rng.len());
386
387
let skipped_values = skipped.set_bits();
388
let truncated_values = truncated.set_bits();
389
let values = values.sliced(
390
skipped_values,
391
values.len() - skipped_values - truncated_values,
392
);
393
394
decode_optional_plain(values, target, page_validity)
395
},
396
(Some(Filter::Mask(mask)), None) => {
397
decode_masked_required_plain(values, target, mask)
398
},
399
(Some(Filter::Mask(mask)), Some(page_validity)) => {
400
decode_masked_optional_plain(values, target, page_validity, mask)
401
},
402
(Some(Filter::Predicate(_)), _) => todo!(),
403
}?;
404
405
Ok(())
406
},
407
StateTranslation::Rle(values) => {
408
if state.is_optional {
409
append_validity(
410
state.page_validity.as_ref(),
411
filter.as_ref(),
412
validity,
413
values.len(),
414
);
415
}
416
417
let page_validity = constrain_page_validity(
418
values.len(),
419
state.page_validity.as_ref(),
420
filter.as_ref(),
421
);
422
423
match (filter, page_validity) {
424
(None, None) => decode_required_rle(values, None, target),
425
(Some(Filter::Range(rng)), None) if rng.start == 0 => {
426
decode_required_rle(values, Some(rng.end), target)
427
},
428
(None, Some(page_validity)) => {
429
decode_optional_rle(values, target, &page_validity)
430
},
431
(Some(Filter::Range(rng)), Some(page_validity)) if rng.start == 0 => {
432
decode_optional_rle(values, target, &page_validity)
433
},
434
(Some(Filter::Mask(filter)), None) => {
435
decode_masked_required_rle(values, target, &filter)
436
},
437
(Some(Filter::Mask(filter)), Some(page_validity)) => {
438
decode_masked_optional_rle(values, target, &page_validity, &filter)
439
},
440
(Some(Filter::Range(rng)), None) => {
441
decode_masked_required_rle(values, target, &filter_from_range(rng))
442
},
443
(Some(Filter::Range(rng)), Some(page_validity)) => decode_masked_optional_rle(
444
values,
445
target,
446
&page_validity,
447
&filter_from_range(rng),
448
),
449
(Some(Filter::Predicate(_)), _) => todo!(),
450
}?;
451
452
Ok(())
453
},
454
}
455
}
456
}
457
458