Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/plain/mod.rs
8506 views
1
use arrow::array::{PrimitiveArray, Splitable};
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::types::{AlignedBytes, NativeType, PrimitiveType};
4
use polars_utils::vec::with_cast_mut_vec;
5
6
use super::DecoderFunction;
7
use crate::parquet::error::ParquetResult;
8
use crate::parquet::types::NativeType as ParquetNativeType;
9
use crate::read::deserialize::dictionary_encoded::{append_validity, constrain_page_validity};
10
use crate::read::deserialize::utils::array_chunks::ArrayChunks;
11
use crate::read::deserialize::utils::freeze_validity;
12
use crate::read::expr::SpecializedParquetColumnExpr;
13
use crate::read::{Filter, ParquetError};
14
15
pub mod predicate;
16
mod required;
17
18
#[allow(clippy::too_many_arguments)]
19
pub fn decode<P: ParquetNativeType, T: NativeType, D: DecoderFunction<P, T>>(
20
values: &[u8],
21
is_optional: bool,
22
page_validity: Option<&Bitmap>,
23
filter: Option<Filter>,
24
validity: &mut BitmapBuilder,
25
intermediate: &mut Vec<P>,
26
target: &mut Vec<T>,
27
dfn: D,
28
) -> ParquetResult<()> {
29
let can_filter_on_raw_data =
30
// Floats have different equality that just byte-wise comparison.
31
// @TODO: Maybe be smarter about this, because most predicates should not hit this problem.
32
!matches!(T::PRIMITIVE, PrimitiveType::Float16 | PrimitiveType::Float32 | PrimitiveType::Float64) &&
33
D::CAN_TRANSMUTE && !D::NEED_TO_DECODE;
34
35
match filter {
36
Some(Filter::Predicate(p))
37
if !can_filter_on_raw_data
38
|| matches!(
39
p.predicate.as_specialized(),
40
Some(SpecializedParquetColumnExpr::Equal(_))
41
) =>
42
{
43
let num_values = values.len() / size_of::<P::AlignedBytes>();
44
45
// @TODO: Do something smarter with the validity
46
let mut unfiltered_target = Vec::with_capacity(num_values);
47
let mut unfiltered_validity = if page_validity.is_some() {
48
BitmapBuilder::with_capacity(num_values)
49
} else {
50
Default::default()
51
};
52
53
decode_no_incompact_predicates(
54
values,
55
is_optional,
56
page_validity,
57
None,
58
&mut unfiltered_validity,
59
intermediate,
60
&mut unfiltered_target,
61
dfn,
62
)?;
63
64
let unfiltered_validity = freeze_validity(unfiltered_validity);
65
66
let array = PrimitiveArray::new(
67
T::PRIMITIVE.into(),
68
unfiltered_target.into(),
69
unfiltered_validity,
70
);
71
let intermediate_pred_true_mask = p.predicate.evaluate(&array);
72
73
let array =
74
polars_compute::filter::filter_with_bitmap(&array, &intermediate_pred_true_mask);
75
let array = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
76
77
target.extend(array.values().iter().copied());
78
if is_optional {
79
match array.validity() {
80
None => validity.extend_constant(array.len(), true),
81
Some(v) => validity.extend_from_bitmap(v),
82
}
83
}
84
},
85
f => {
86
decode_no_incompact_predicates(
87
values,
88
is_optional,
89
page_validity,
90
f,
91
validity,
92
intermediate,
93
target,
94
dfn,
95
)?;
96
},
97
}
98
99
Ok(())
100
}
101
102
#[allow(clippy::too_many_arguments)]
103
pub fn decode_no_incompact_predicates<
104
P: ParquetNativeType,
105
T: NativeType,
106
D: DecoderFunction<P, T>,
107
>(
108
values: &[u8],
109
is_optional: bool,
110
page_validity: Option<&Bitmap>,
111
filter: Option<Filter>,
112
validity: &mut BitmapBuilder,
113
intermediate: &mut Vec<P>,
114
target: &mut Vec<T>,
115
dfn: D,
116
) -> ParquetResult<()> {
117
if cfg!(debug_assertions) && is_optional {
118
assert_eq!(target.len(), validity.len());
119
}
120
121
if D::CAN_TRANSMUTE {
122
let values = ArrayChunks::<'_, T::AlignedBytes>::new(values).ok_or_else(|| {
123
ParquetError::oos("Page content does not align with expected element size")
124
})?;
125
126
let start_length = target.len();
127
with_cast_mut_vec::<T, T::AlignedBytes, _, _>(target, |aligned_bytes_vec| {
128
decode_aligned_bytes_dispatch(
129
values,
130
is_optional,
131
page_validity,
132
filter,
133
validity,
134
aligned_bytes_vec,
135
)
136
})?;
137
138
if D::NEED_TO_DECODE {
139
let to_decode: &mut [P] = bytemuck::cast_slice_mut(&mut target[start_length..]);
140
141
for v in to_decode {
142
*v = bytemuck::cast(dfn.decode(*v));
143
}
144
}
145
} else {
146
let values = ArrayChunks::<'_, P::AlignedBytes>::new(values).ok_or_else(|| {
147
ParquetError::oos("Page content does not align with expected element size")
148
})?;
149
150
intermediate.clear();
151
with_cast_mut_vec::<P, P::AlignedBytes, _, _>(intermediate, |aligned_bytes_vec| {
152
decode_aligned_bytes_dispatch(
153
values,
154
is_optional,
155
page_validity,
156
filter,
157
validity,
158
aligned_bytes_vec,
159
)
160
})?;
161
162
target.extend(intermediate.iter().copied().map(|v| dfn.decode(v)));
163
}
164
165
if cfg!(debug_assertions) && is_optional {
166
assert_eq!(target.len(), validity.len());
167
}
168
169
Ok(())
170
}
171
172
#[inline(never)]
173
pub fn decode_aligned_bytes_dispatch<B: AlignedBytes>(
174
values: ArrayChunks<'_, B>,
175
is_optional: bool,
176
page_validity: Option<&Bitmap>,
177
filter: Option<Filter>,
178
validity: &mut BitmapBuilder,
179
target: &mut Vec<B>,
180
) -> ParquetResult<()> {
181
if is_optional {
182
append_validity(page_validity, filter.as_ref(), validity, values.len());
183
}
184
185
let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());
186
187
match (filter, page_validity) {
188
(None, None) => required::decode(values, target),
189
(None, Some(page_validity)) => decode_optional(values, page_validity, target),
190
191
(Some(Filter::Range(rng)), None) => {
192
required::decode(values.slice(rng.start, rng.len()), target)
193
},
194
(Some(Filter::Range(rng)), Some(mut page_validity)) => {
195
let mut values = values;
196
if rng.start > 0 {
197
let prevalidity;
198
(prevalidity, page_validity) = page_validity.split_at(rng.start);
199
page_validity.slice(0, rng.len());
200
let values_start = prevalidity.set_bits();
201
values = values.slice(values_start, values.len() - values_start);
202
}
203
204
decode_optional(values, page_validity, target)
205
},
206
207
(Some(Filter::Mask(filter)), None) => decode_masked_required(values, filter, target),
208
(Some(Filter::Mask(filter)), Some(page_validity)) => {
209
decode_masked_optional(values, page_validity, filter, target)
210
},
211
(Some(Filter::Predicate(_)), _) => unreachable!(),
212
}?;
213
214
Ok(())
215
}
216
217
#[inline(never)]
218
fn decode_optional<B: AlignedBytes>(
219
values: ArrayChunks<'_, B>,
220
mut validity: Bitmap,
221
target: &mut Vec<B>,
222
) -> ParquetResult<()> {
223
target.reserve(validity.len());
224
225
// Handle the leading and trailing zeros. This may allow dispatch to a faster kernel or
226
// possibly removes iterations from the lower kernel.
227
let num_leading_nulls = validity.take_leading_zeros();
228
target.resize(target.len() + num_leading_nulls, B::zeroed());
229
let num_trailing_nulls = validity.take_trailing_zeros();
230
231
// Dispatch to a faster kernel if possible.
232
let num_values = validity.set_bits();
233
if num_values == validity.len() {
234
required::decode(values.truncate(validity.len()), target)?;
235
target.resize(target.len() + num_trailing_nulls, B::zeroed());
236
return Ok(());
237
}
238
239
assert!(num_values <= values.len());
240
241
let start_length = target.len();
242
let end_length = target.len() + validity.len();
243
let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };
244
245
let mut validity_iter = validity.fast_iter_u56();
246
let mut num_values_remaining = num_values;
247
let mut value_offset = 0;
248
249
let mut iter = |mut v: u64, len: usize| {
250
debug_assert!(len < 64);
251
252
let num_chunk_values = v.count_ones() as usize;
253
254
if num_values_remaining == num_chunk_values {
255
for i in 0..len {
256
let is_valid = v & 1 != 0;
257
let value = if is_valid {
258
unsafe { values.get_unchecked(value_offset) }
259
} else {
260
B::zeroed()
261
};
262
unsafe { target_ptr.add(i).write(value) };
263
264
value_offset += (v & 1) as usize;
265
v >>= 1;
266
}
267
} else {
268
for i in 0..len {
269
let value = unsafe { values.get_unchecked(value_offset) };
270
unsafe { target_ptr.add(i).write(value) };
271
272
value_offset += (v & 1) as usize;
273
v >>= 1;
274
}
275
}
276
277
num_values_remaining -= num_chunk_values;
278
unsafe {
279
target_ptr = target_ptr.add(len);
280
}
281
};
282
283
let mut num_remaining = validity.len();
284
for v in validity_iter.by_ref() {
285
if num_remaining < 56 {
286
iter(v, num_remaining);
287
} else {
288
iter(v, 56);
289
}
290
num_remaining -= 56;
291
}
292
293
let (v, vl) = validity_iter.remainder();
294
295
iter(v, vl.min(num_remaining));
296
297
unsafe { target.set_len(end_length) };
298
target.resize(target.len() + num_trailing_nulls, B::zeroed());
299
300
Ok(())
301
}
302
303
#[inline(never)]
304
fn decode_masked_required<B: AlignedBytes>(
305
values: ArrayChunks<'_, B>,
306
mut mask: Bitmap,
307
target: &mut Vec<B>,
308
) -> ParquetResult<()> {
309
// Remove leading or trailing filtered values. This may allow dispatch to a faster kernel or
310
// may remove iterations from the slower kernel below.
311
let num_leading_filtered = mask.take_leading_zeros();
312
mask.take_trailing_zeros();
313
let values = values.slice(num_leading_filtered, mask.len());
314
315
// Dispatch to a faster kernel if possible.
316
let num_rows = mask.set_bits();
317
if num_rows == mask.len() {
318
return required::decode(values.truncate(num_rows), target);
319
}
320
321
assert!(mask.len() <= values.len());
322
323
let start_length = target.len();
324
target.reserve(num_rows);
325
let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };
326
327
let mut mask_iter = mask.fast_iter_u56();
328
let mut num_rows_left = num_rows;
329
let mut value_offset = 0;
330
331
let mut iter = |mut f: u64, len: usize| {
332
if num_rows_left == 0 {
333
return false;
334
}
335
336
let mut num_read = 0;
337
let mut num_written = 0;
338
339
while f != 0 {
340
let offset = f.trailing_zeros() as usize;
341
342
num_read += offset;
343
344
// SAFETY:
345
// 1. `values_buffer` starts out as only zeros, which we know is in the
346
// dictionary following the original `dict.is_empty` check.
347
// 2. Each time we write to `values_buffer`, it is followed by a
348
// `verify_dict_indices`.
349
let value = unsafe { values.get_unchecked(value_offset + num_read) };
350
unsafe { target_ptr.add(num_written).write(value) };
351
352
num_written += 1;
353
num_read += 1;
354
355
f >>= offset + 1; // Clear least significant bit.
356
}
357
358
unsafe {
359
target_ptr = target_ptr.add(num_written);
360
}
361
value_offset += len;
362
num_rows_left -= num_written;
363
364
true
365
};
366
367
for f in mask_iter.by_ref() {
368
if !iter(f, 56) {
369
break;
370
}
371
}
372
let (f, fl) = mask_iter.remainder();
373
iter(f, fl);
374
375
unsafe { target.set_len(start_length + num_rows) };
376
377
Ok(())
378
}
379
380
#[inline(never)]
381
fn decode_masked_optional<B: AlignedBytes>(
382
values: ArrayChunks<'_, B>,
383
mut validity: Bitmap,
384
mut mask: Bitmap,
385
target: &mut Vec<B>,
386
) -> ParquetResult<()> {
387
assert_eq!(validity.len(), mask.len());
388
389
let num_leading_filtered = mask.take_leading_zeros();
390
mask.take_trailing_zeros();
391
let leading_validity;
392
(leading_validity, validity) = validity.split_at(num_leading_filtered);
393
validity.slice(0, mask.len());
394
395
let num_rows = mask.set_bits();
396
let num_values = validity.set_bits();
397
398
let values = values.slice(leading_validity.set_bits(), num_values);
399
400
// Dispatch to a faster kernel if possible.
401
if num_rows == mask.len() {
402
return decode_optional(values, validity, target);
403
}
404
if num_values == validity.len() {
405
return decode_masked_required(values, mask, target);
406
}
407
408
assert!(num_values <= values.len());
409
410
let start_length = target.len();
411
target.reserve(num_rows);
412
let mut target_ptr = unsafe { target.as_mut_ptr().add(start_length) };
413
414
let mut validity_iter = validity.fast_iter_u56();
415
let mut mask_iter = mask.fast_iter_u56();
416
let mut num_values_left = num_values;
417
let mut num_rows_left = num_rows;
418
let mut value_offset = 0;
419
420
let mut iter = |mut f: u64, mut v: u64| {
421
if num_rows_left == 0 {
422
return false;
423
}
424
425
let num_chunk_values = v.count_ones() as usize;
426
427
let mut num_read = 0;
428
let mut num_written = 0;
429
430
if num_chunk_values == num_values_left {
431
while f != 0 {
432
let offset = f.trailing_zeros() as usize;
433
434
num_read += (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;
435
v >>= offset;
436
437
let is_valid = v & 1 != 0;
438
let value = if is_valid {
439
unsafe { values.get_unchecked(value_offset + num_read) }
440
} else {
441
B::zeroed()
442
};
443
unsafe { target_ptr.add(num_written).write(value) };
444
445
num_written += 1;
446
num_read += (v & 1) as usize;
447
448
f >>= offset + 1; // Clear least significant bit.
449
v >>= 1;
450
}
451
} else {
452
while f != 0 {
453
let offset = f.trailing_zeros() as usize;
454
455
num_read += (v & (1u64 << offset).wrapping_sub(1)).count_ones() as usize;
456
v >>= offset;
457
458
let value = unsafe { values.get_unchecked(value_offset + num_read) };
459
unsafe { target_ptr.add(num_written).write(value) };
460
461
num_written += 1;
462
num_read += (v & 1) as usize;
463
464
f >>= offset + 1; // Clear least significant bit.
465
v >>= 1;
466
}
467
}
468
469
unsafe {
470
target_ptr = target_ptr.add(num_written);
471
}
472
value_offset += num_chunk_values;
473
num_rows_left -= num_written;
474
num_values_left -= num_chunk_values;
475
476
true
477
};
478
479
for (f, v) in mask_iter.by_ref().zip(validity_iter.by_ref()) {
480
if !iter(f, v) {
481
break;
482
}
483
}
484
485
let (f, fl) = mask_iter.remainder();
486
let (v, vl) = validity_iter.remainder();
487
assert_eq!(fl, vl);
488
iter(f, v);
489
490
unsafe { target.set_len(start_length + num_rows) };
491
492
Ok(())
493
}
494
495
#[cfg(test)]
496
mod tests {
497
use arrow::bitmap::proptest::bitmap;
498
use proptest::collection::size_range;
499
use proptest::prelude::*;
500
501
use super::*;
502
503
fn values_and_mask() -> impl Strategy<Value = (Vec<u32>, Bitmap)> {
504
any_with::<Vec<u32>>(size_range(0..100).lift()).prop_flat_map(|vec| {
505
let len = vec.len();
506
(Just(vec), bitmap(len))
507
})
508
}
509
510
fn validity_values_and_mask() -> impl Strategy<Value = (Bitmap, Vec<u32>, Bitmap)> {
511
bitmap(0..100).prop_flat_map(|validity| {
512
let len = validity.len();
513
let values_length = validity.set_bits();
514
515
(
516
Just(validity),
517
any_with::<Vec<u32>>(size_range(values_length).lift()),
518
bitmap(len),
519
)
520
})
521
}
522
523
fn _test_decode_masked_required(values: &Vec<u32>, mask: &Bitmap) {
524
let mut reference_result = Vec::with_capacity(mask.set_bits());
525
for (value, is_selected) in values.iter().zip(mask.iter()) {
526
if is_selected {
527
reference_result.push(*value);
528
}
529
}
530
531
let mut result = Vec::<arrow::types::Bytes4Alignment4>::with_capacity(mask.set_bits());
532
decode_masked_required(
533
ArrayChunks::new(bytemuck::cast_slice(values.as_slice())).unwrap(),
534
mask.clone(),
535
&mut result,
536
)
537
.unwrap();
538
539
let result = bytemuck::cast_vec::<_, u32>(result);
540
assert_eq!(reference_result, result);
541
}
542
543
fn _test_decode_masked_optional(validity: &Bitmap, values: &Vec<u32>, mask: &Bitmap) {
544
let mut result = Vec::<arrow::types::Bytes4Alignment4>::with_capacity(mask.set_bits());
545
decode_masked_optional(
546
ArrayChunks::new(bytemuck::cast_slice(values.as_slice())).unwrap(),
547
validity.clone(),
548
mask.clone(),
549
&mut result,
550
)
551
.unwrap();
552
553
let result = bytemuck::cast_vec::<_, u32>(result);
554
555
let mut result_i = 0;
556
let mut values_i = 0;
557
for (is_valid, is_selected) in validity.iter().zip(mask.iter()) {
558
if is_selected {
559
if is_valid {
560
assert_eq!(result[result_i], values[values_i]);
561
}
562
result_i += 1;
563
}
564
565
if is_valid {
566
values_i += 1;
567
}
568
}
569
}
570
571
proptest! {
572
#[test]
573
fn test_decode_masked_required(
574
(ref values, ref mask) in values_and_mask()
575
) {
576
_test_decode_masked_required(values, mask)
577
}
578
}
579
580
proptest! {
581
#[test]
582
fn test_decode_masked_optional(
583
(ref validity, ref values, ref mask) in validity_values_and_mask()
584
) {
585
_test_decode_masked_optional(validity, values, mask)
586
}
587
}
588
}
589
590