Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/binview/mod.rs
8500 views
1
use arrow::array::{Array, BinaryViewArray, MutableBinaryViewArray, Utf8ViewArray, View};
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::datatypes::{ArrowDataType, PhysicalType};
4
use polars_utils::aliases::PlIndexSet;
5
6
use super::dictionary_encoded::{append_validity, constrain_page_validity};
7
use super::utils::{
8
dict_indices_decoder, filter_from_range, freeze_validity, unspecialized_decode,
9
};
10
use super::{Filter, PredicateFilter, dictionary_encoded};
11
use crate::parquet::encoding::{Encoding, delta_byte_array, delta_length_byte_array, hybrid_rle};
12
use crate::parquet::error::{ParquetError, ParquetResult};
13
use crate::parquet::page::{DataPage, DictPage, split_buffer};
14
use crate::read::deserialize::utils::{self, Decoded};
15
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
16
17
mod optional;
18
mod optional_masked;
19
mod predicate;
20
mod required;
21
mod required_masked;
22
23
pub struct DecodedState {
24
binview: MutableBinaryViewArray<[u8]>,
25
validity: BitmapBuilder,
26
27
// Used to store the needles for EqualsOneOf::Set that were inserted
28
// into the buffers (but not the views).
29
needle_views: Vec<View>,
30
}
31
32
impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> {
33
type PlainDecoder = BinaryIter<'a>;
34
35
fn new(
36
_decoder: &BinViewDecoder,
37
page: &'a DataPage,
38
dict: Option<&'a <BinViewDecoder as utils::Decoder>::Dict>,
39
page_validity: Option<&Bitmap>,
40
) -> ParquetResult<Self> {
41
match (page.encoding(), dict) {
42
(Encoding::Plain, _) => {
43
let values = split_buffer(page)?.values;
44
let values = BinaryIter::new(values, page.num_values());
45
46
Ok(Self::Plain(values))
47
},
48
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {
49
let values =
50
dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;
51
Ok(Self::Dictionary(values))
52
},
53
(Encoding::DeltaLengthByteArray, _) => {
54
let values = split_buffer(page)?.values;
55
Ok(Self::DeltaLengthByteArray(
56
delta_length_byte_array::Decoder::try_new(values)?,
57
Vec::new(),
58
))
59
},
60
(Encoding::DeltaByteArray, _) => {
61
let values = split_buffer(page)?.values;
62
Ok(Self::DeltaBytes(delta_byte_array::Decoder::try_new(
63
values,
64
)?))
65
},
66
_ => Err(utils::not_implemented(page)),
67
}
68
}
69
70
fn num_rows(&self) -> usize {
71
match self {
72
StateTranslation::Plain(i) => i.max_num_values,
73
StateTranslation::Dictionary(i) => i.len(),
74
StateTranslation::DeltaLengthByteArray(i, _) => i.len(),
75
StateTranslation::DeltaBytes(i) => i.len(),
76
}
77
}
78
}
79
80
enum EqualsOneOf {
81
Empty,
82
Inlinable([View; 4]),
83
Set(PlIndexSet<Box<[u8]>>),
84
}
85
86
pub(crate) struct BinViewDecoder {
87
is_string: bool,
88
equals_one_of: Option<Box<EqualsOneOf>>,
89
}
90
91
impl BinViewDecoder {
92
pub fn new(is_string: bool) -> Self {
93
Self {
94
is_string,
95
equals_one_of: None,
96
}
97
}
98
99
pub fn new_string() -> Self {
100
Self::new(true)
101
}
102
103
fn initialize_predicate_equals_one_of(&mut self, needles: &[ParquetScalar]) -> &EqualsOneOf {
104
self.equals_one_of.get_or_insert_with(|| {
105
if needles.is_empty() {
106
return Box::new(EqualsOneOf::Empty);
107
}
108
109
let is_inlinable = needles.len() <= 4
110
&& needles.iter().all(|needle| {
111
let needle = if self.is_string {
112
needle.as_str().unwrap().as_bytes()
113
} else {
114
needle.as_binary().unwrap()
115
};
116
needle.len() < View::MAX_INLINE_SIZE as usize
117
});
118
119
Box::new(if is_inlinable {
120
let mut views = [View::default(); 4];
121
for (i, needle) in needles.iter().enumerate() {
122
let needle = if self.is_string {
123
needle.as_str().unwrap().as_bytes()
124
} else {
125
needle.as_binary().unwrap()
126
};
127
views[i] = View::new_inline(needle);
128
}
129
for i in needles.len()..4 {
130
views[i] = views[0];
131
}
132
EqualsOneOf::Inlinable(views)
133
} else {
134
let mut needle_set = PlIndexSet::<Box<_>>::default();
135
needle_set.extend(needles.iter().map(|needle| {
136
assert!(!needle.is_null());
137
let needle = if self.is_string {
138
needle.as_str().unwrap().as_bytes()
139
} else {
140
needle.as_binary().unwrap()
141
};
142
needle.into()
143
}));
144
EqualsOneOf::Set(needle_set)
145
})
146
})
147
}
148
149
fn initialize_decode_equals_one_of_state(
150
&mut self,
151
target: &mut DecodedState,
152
) -> Option<&EqualsOneOf> {
153
if let Some(EqualsOneOf::Set(needles)) = self.equals_one_of.as_deref_mut() {
154
if target.needle_views.is_empty() {
155
target.needle_views.extend(
156
needles
157
.iter()
158
.map(|needle| target.binview.push_value_into_buffer(needle)),
159
);
160
}
161
}
162
self.equals_one_of.as_deref()
163
}
164
}
165
166
#[allow(clippy::large_enum_variant)]
167
#[derive(Debug)]
168
pub(crate) enum StateTranslation<'a> {
169
Plain(BinaryIter<'a>),
170
Dictionary(hybrid_rle::HybridRleDecoder<'a>),
171
DeltaLengthByteArray(delta_length_byte_array::Decoder<'a>, Vec<u32>),
172
DeltaBytes(delta_byte_array::Decoder<'a>),
173
}
174
175
impl utils::Decoded for DecodedState {
176
fn len(&self) -> usize {
177
self.binview.len()
178
}
179
180
fn extend_nulls(&mut self, n: usize) {
181
self.binview.extend_constant(n, Some(&[]));
182
self.validity.extend_constant(n, false);
183
}
184
185
fn remaining_capacity(&self) -> usize {
186
(self.binview.capacity() - self.binview.len())
187
.min(self.validity.capacity() - self.validity.len())
188
}
189
}
190
191
#[allow(clippy::too_many_arguments)]
192
fn decode_plain(
193
values: &[u8],
194
max_num_values: usize,
195
state: &mut DecodedState,
196
is_optional: bool,
197
198
page_validity: Option<&Bitmap>,
199
filter: Option<Filter>,
200
201
equals_one_of_state: Option<&EqualsOneOf>,
202
verify_utf8: bool,
203
) -> ParquetResult<()> {
204
if is_optional {
205
append_validity(
206
page_validity,
207
filter.as_ref(),
208
&mut state.validity,
209
max_num_values,
210
);
211
}
212
213
if let Some(equals_one_of_state) = equals_one_of_state
214
&& page_validity.is_none()
215
{
216
let mut total_bytes_len = 0;
217
match equals_one_of_state {
218
EqualsOneOf::Empty => {},
219
EqualsOneOf::Inlinable(views) => {
220
predicate::decode_is_in_inlinable(
221
max_num_values,
222
values,
223
views,
224
unsafe { state.binview.views_mut() },
225
&mut total_bytes_len,
226
)?;
227
},
228
EqualsOneOf::Set(needles) => {
229
predicate::decode_is_in_non_inlinable(
230
max_num_values,
231
values,
232
needles,
233
&state.needle_views,
234
unsafe { state.binview.views_mut() },
235
&mut total_bytes_len,
236
)?;
237
},
238
}
239
240
let new_total_bytes_len = state.binview.total_bytes_len() + total_bytes_len;
241
242
// SAFETY: We know that the view is valid since we added it safely and we
243
// update the total_bytes_len afterwards. The total_buffer_len is not affected.
244
unsafe {
245
state.binview.set_total_bytes_len(new_total_bytes_len);
246
}
247
248
return Ok(());
249
}
250
251
let page_validity = constrain_page_validity(max_num_values, page_validity, filter.as_ref());
252
253
match (filter, page_validity) {
254
(None, None) => required::decode(
255
max_num_values,
256
values,
257
None,
258
&mut state.binview,
259
verify_utf8,
260
),
261
(Some(Filter::Range(rng)), None) if rng.start == 0 => required::decode(
262
max_num_values,
263
values,
264
Some(rng.end),
265
&mut state.binview,
266
verify_utf8,
267
),
268
(None, Some(page_validity)) => optional::decode(
269
page_validity.set_bits(),
270
values,
271
&mut state.binview,
272
&page_validity,
273
verify_utf8,
274
),
275
(Some(Filter::Range(rng)), Some(page_validity)) if rng.start == 0 => optional::decode(
276
page_validity.set_bits(),
277
values,
278
&mut state.binview,
279
&page_validity,
280
verify_utf8,
281
),
282
(Some(Filter::Mask(mask)), None) => required_masked::decode(
283
max_num_values,
284
values,
285
&mut state.binview,
286
&mask,
287
verify_utf8,
288
),
289
(Some(Filter::Mask(mask)), Some(page_validity)) => optional_masked::decode(
290
page_validity.set_bits(),
291
values,
292
&mut state.binview,
293
&page_validity,
294
&mask,
295
verify_utf8,
296
),
297
(Some(Filter::Range(rng)), None) => required_masked::decode(
298
max_num_values,
299
values,
300
&mut state.binview,
301
&filter_from_range(rng),
302
verify_utf8,
303
),
304
(Some(Filter::Range(rng)), Some(page_validity)) => optional_masked::decode(
305
page_validity.set_bits(),
306
values,
307
&mut state.binview,
308
&page_validity,
309
&filter_from_range(rng),
310
verify_utf8,
311
),
312
(Some(Filter::Predicate(_)), _) => unreachable!(),
313
}?;
314
315
Ok(())
316
}
317
318
#[cold]
319
fn invalid_input_err() -> ParquetError {
320
ParquetError::oos("String data does not match given length")
321
}
322
323
#[cold]
324
fn invalid_utf8_err() -> ParquetError {
325
ParquetError::oos("String data contained invalid UTF-8")
326
}
327
328
pub fn decode_plain_generic(
329
values: &[u8],
330
target: &mut MutableBinaryViewArray<[u8]>,
331
332
num_rows: usize,
333
mut next: impl FnMut() -> Option<(bool, bool)>,
334
335
verify_utf8: bool,
336
) -> ParquetResult<()> {
337
// Since the offset in the buffer is decided by the interleaved lengths, every value has to be
338
// walked no matter what. This makes decoding rather inefficient in general.
339
//
340
// There are three cases:
341
// 1. All inlinable values
342
// - Most time is spend in decoding
343
// - No additional buffer has to be formed
344
// - Possible UTF-8 verification is fast because the len_below_128 trick
345
// 2. All non-inlinable values
346
// - Little time is spend in decoding
347
// - Most time is spend in buffer memcopying (we remove the interleaved lengths)
348
// - Possible UTF-8 verification is fast because the continuation byte trick
349
// 3. Mixed inlinable and non-inlinable values
350
// - Time shared between decoding and buffer forming
351
// - UTF-8 verification might still use len_below_128 trick, but might need to fall back to
352
// slow path.
353
354
target.finish_in_progress();
355
unsafe { target.views_mut() }.reserve(num_rows);
356
357
let start_target_length = target.len();
358
359
let buffer_idx = target.completed_buffers().len() as u32;
360
let mut buffer = Vec::with_capacity(values.len() + 1);
361
let mut none_starting_with_continuation_byte = true; // Whether the transition from between strings is valid
362
// UTF-8
363
let mut all_len_below_128 = true; // Whether all the lengths of the values are below 128, this
364
// allows us to make UTF-8 verification a lot faster.
365
366
let mut total_bytes_len = 0;
367
let mut num_inlined = 0;
368
369
let mut mvalues = values;
370
while let Some((is_valid, is_selected)) = next() {
371
if !is_valid {
372
if is_selected {
373
unsafe { target.views_mut() }.push(unsafe { View::new_inline_unchecked(&[]) });
374
}
375
continue;
376
}
377
378
if mvalues.len() < 4 {
379
return Err(invalid_input_err());
380
}
381
382
let length;
383
(length, mvalues) = mvalues.split_at(4);
384
let length: &[u8; 4] = unsafe { length.try_into().unwrap_unchecked() };
385
let length = u32::from_le_bytes(*length);
386
387
if mvalues.len() < length as usize {
388
return Err(invalid_input_err());
389
}
390
391
let value;
392
(value, mvalues) = mvalues.split_at(length as usize);
393
394
all_len_below_128 &= value.len() < 128;
395
// Everything starting with 10.. .... is a continuation byte.
396
none_starting_with_continuation_byte &=
397
value.is_empty() || value[0] & 0b1100_0000 != 0b1000_0000;
398
399
if !is_selected {
400
continue;
401
}
402
403
let offset = buffer.len() as u32;
404
405
if value.len() <= View::MAX_INLINE_SIZE as usize {
406
unsafe { target.views_mut() }.push(unsafe { View::new_inline_unchecked(value) });
407
num_inlined += 1;
408
} else {
409
buffer.extend_from_slice(value);
410
unsafe { target.views_mut() }
411
.push(unsafe { View::new_noninline_unchecked(value, buffer_idx, offset) });
412
}
413
414
total_bytes_len += value.len();
415
}
416
417
unsafe {
418
target.set_total_bytes_len(target.total_bytes_len() + total_bytes_len);
419
}
420
421
if verify_utf8 {
422
// This is a trick that allows us to check the resulting buffer which allows to batch the
423
// UTF-8 verification.
424
//
425
// This is allowed if none of the strings start with a UTF-8 continuation byte, so we keep
426
// track of that during the decoding.
427
if num_inlined == 0 {
428
if !none_starting_with_continuation_byte || simdutf8::basic::from_utf8(&buffer).is_err()
429
{
430
return Err(invalid_utf8_err());
431
}
432
433
// This is a small trick that allows us to check the Parquet buffer instead of the view
434
// buffer. Batching the UTF-8 verification is more performant. For this to be allowed,
435
// all the interleaved lengths need to be valid UTF-8.
436
//
437
// Every strings prepended by 4 bytes (L, 0, 0, 0), since we check here L < 128. L is
438
// only a valid first byte of a UTF-8 code-point and (L, 0, 0, 0) is valid UTF-8.
439
// Consequently, it is valid to just check the whole buffer.
440
} else if all_len_below_128 {
441
if simdutf8::basic::from_utf8(&values[..values.len() - mvalues.len()]).is_err() {
442
return Err(invalid_utf8_err());
443
}
444
} else {
445
// We check all the non-inlined values here.
446
if !none_starting_with_continuation_byte || simdutf8::basic::from_utf8(&buffer).is_err()
447
{
448
return Err(invalid_utf8_err());
449
}
450
451
let mut all_inlined_are_ascii = true;
452
453
// @NOTE: This is only valid because we initialize our inline View's to be zeroes on
454
// non-included bytes.
455
for view in &target.views()[start_target_length..] {
456
all_inlined_are_ascii &= (view.length > View::MAX_INLINE_SIZE)
457
| (view.as_u128() & 0x0000_0000_8080_8080_8080_8080_8080_8080 == 0);
458
}
459
460
// This is the very slow path.
461
if !all_inlined_are_ascii {
462
let mut is_valid = true;
463
for view in &target.views()[start_target_length..] {
464
if view.length <= View::MAX_INLINE_SIZE {
465
is_valid &=
466
std::str::from_utf8(unsafe { view.get_inlined_slice_unchecked() })
467
.is_ok();
468
}
469
}
470
471
if !is_valid {
472
return Err(invalid_utf8_err());
473
}
474
}
475
}
476
}
477
478
target.push_buffer(buffer.into());
479
480
Ok(())
481
}
482
483
impl utils::Decoder for BinViewDecoder {
484
type Translation<'a> = StateTranslation<'a>;
485
type Dict = BinaryViewArray;
486
type DecodedState = DecodedState;
487
type Output = Box<dyn Array>;
488
489
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
490
DecodedState {
491
binview: MutableBinaryViewArray::with_capacity(capacity),
492
validity: BitmapBuilder::with_capacity(capacity),
493
needle_views: Vec::new(),
494
}
495
}
496
497
fn evaluate_dict_predicate(
498
&self,
499
dict: &Self::Dict,
500
predicate: &PredicateFilter,
501
) -> ParquetResult<Bitmap> {
502
let utf8_array;
503
let mut dict_arr = dict as &dyn Array;
504
505
if self.is_string {
506
utf8_array = unsafe { dict.to_utf8view_unchecked() };
507
dict_arr = &utf8_array
508
}
509
510
Ok(predicate.predicate.evaluate(dict_arr))
511
}
512
513
fn evaluate_predicate(
514
&mut self,
515
state: &utils::State<'_, Self>,
516
predicate: Option<&SpecializedParquetColumnExpr>,
517
pred_true_mask: &mut BitmapBuilder,
518
dict_mask: Option<&Bitmap>,
519
) -> ParquetResult<bool> {
520
if state.page_validity.is_some() {
521
// @Performance. This should be implemented.
522
return Ok(false);
523
}
524
525
if let StateTranslation::Dictionary(values) = &state.translation {
526
let dict_mask = dict_mask.unwrap();
527
super::dictionary_encoded::predicate::decode(
528
values.clone(),
529
dict_mask,
530
pred_true_mask,
531
)?;
532
return Ok(true);
533
}
534
535
let Some(predicate) = predicate else {
536
return Ok(false);
537
};
538
539
use {SpecializedParquetColumnExpr as Spce, StateTranslation as St};
540
match (&state.translation, predicate) {
541
(St::Plain(iter), Spce::Equal(needle)) => {
542
assert!(!needle.is_null());
543
544
let needle = if self.is_string {
545
needle.as_str().unwrap().as_bytes()
546
} else {
547
needle.as_binary().unwrap()
548
};
549
predicate::decode_equals(iter.max_num_values, iter.values, needle, pred_true_mask)?;
550
},
551
(St::Plain(iter), Spce::EqualOneOf(needles)) => {
552
let e = self.initialize_predicate_equals_one_of(needles);
553
554
match e {
555
EqualsOneOf::Empty => {
556
pred_true_mask.extend_constant(iter.max_num_values, false)
557
},
558
EqualsOneOf::Inlinable(views) => {
559
predicate::decode_is_in_no_values_inlinable(
560
iter.max_num_values,
561
iter.values,
562
views,
563
pred_true_mask,
564
)?;
565
},
566
EqualsOneOf::Set(needle_set) => {
567
predicate::decode_is_in_no_values_non_inlinable(
568
iter.max_num_values,
569
iter.values,
570
needle_set,
571
pred_true_mask,
572
)?;
573
},
574
}
575
},
576
(St::Plain(iter), Spce::StartsWith(pattern)) => predicate::decode_matches(
577
iter.max_num_values,
578
iter.values,
579
|v| v.starts_with(pattern),
580
pred_true_mask,
581
)?,
582
(St::Plain(iter), Spce::EndsWith(pattern)) => predicate::decode_matches(
583
iter.max_num_values,
584
iter.values,
585
|v| v.ends_with(pattern),
586
pred_true_mask,
587
)?,
588
(St::Plain(iter), Spce::RegexMatch(regex)) => predicate::decode_matches(
589
iter.max_num_values,
590
iter.values,
591
|v| regex.is_match(v),
592
pred_true_mask,
593
)?,
594
_ => return Ok(false),
595
}
596
597
Ok(true)
598
}
599
600
fn apply_dictionary(
601
&mut self,
602
state: &mut Self::DecodedState,
603
dict: &Self::Dict,
604
) -> ParquetResult<()> {
605
if state.binview.completed_buffers().len() < dict.data_buffers().len() {
606
for buffer in dict.data_buffers().as_ref() {
607
state.binview.push_buffer(buffer.clone());
608
}
609
}
610
611
assert!(state.binview.completed_buffers().len() == dict.data_buffers().len());
612
613
Ok(())
614
}
615
616
fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {
617
let values = &page.buffer;
618
let num_values = page.num_values;
619
620
let mut arr = MutableBinaryViewArray::new();
621
required::decode(num_values, values, None, &mut arr, self.is_string)?;
622
623
Ok(arr.freeze())
624
}
625
626
fn extend_decoded(
627
&self,
628
decoded: &mut Self::DecodedState,
629
additional: &dyn Array,
630
is_optional: bool,
631
) -> ParquetResult<()> {
632
let is_utf8 = self.is_string;
633
if is_utf8 {
634
let array = additional.as_any().downcast_ref::<Utf8ViewArray>().unwrap();
635
let mut array = array.to_binview();
636
637
if let Some(validity) = array.take_validity() {
638
decoded.binview.extend_from_array(&array);
639
decoded.validity.extend_from_bitmap(&validity);
640
} else {
641
decoded.binview.extend_from_array(&array);
642
if is_optional {
643
decoded.validity.extend_constant(array.len(), true);
644
}
645
}
646
} else {
647
let array = additional
648
.as_any()
649
.downcast_ref::<BinaryViewArray>()
650
.unwrap();
651
let mut array = array.clone();
652
653
if let Some(validity) = array.take_validity() {
654
decoded.binview.extend_from_array(&array);
655
decoded.validity.extend_from_bitmap(&validity);
656
} else {
657
decoded.binview.extend_from_array(&array);
658
if is_optional {
659
decoded.validity.extend_constant(array.len(), true);
660
}
661
}
662
}
663
664
Ok(())
665
}
666
667
fn extend_filtered_with_state(
668
&mut self,
669
mut state: utils::State<'_, Self>,
670
decoded: &mut Self::DecodedState,
671
filter: Option<super::Filter>,
672
_chunks: &mut Vec<Self::Output>,
673
) -> ParquetResult<()> {
674
let is_string = self.is_string;
675
let equals_one_of_state = self.initialize_decode_equals_one_of_state(decoded);
676
match state.translation {
677
StateTranslation::Plain(iter) => decode_plain(
678
iter.values,
679
iter.max_num_values,
680
decoded,
681
state.is_optional,
682
state.page_validity.as_ref(),
683
filter,
684
equals_one_of_state,
685
is_string,
686
),
687
StateTranslation::Dictionary(ref mut indexes) => {
688
let dict = state.dict.unwrap();
689
690
let start_length = decoded.binview.views().len();
691
692
dictionary_encoded::decode_dict(
693
indexes.clone(),
694
dict.views().as_slice(),
695
state.is_optional,
696
state.page_validity.as_ref(),
697
filter,
698
&mut decoded.validity,
699
unsafe { decoded.binview.views_mut() },
700
)?;
701
702
let total_length: usize = decoded
703
.binview
704
.views()
705
.iter()
706
.skip(start_length)
707
.map(|view| view.length as usize)
708
.sum();
709
unsafe {
710
decoded
711
.binview
712
.set_total_bytes_len(decoded.binview.total_bytes_len() + total_length);
713
}
714
715
Ok(())
716
},
717
StateTranslation::DeltaLengthByteArray(decoder, _vec) => {
718
let values = decoder.values;
719
let lengths = decoder.lengths.collect::<Vec<i64>>()?;
720
721
if self.is_string {
722
let mut none_starting_with_continuation_byte = true;
723
let mut offset = 0;
724
for length in &lengths {
725
none_starting_with_continuation_byte &=
726
*length == 0 || values[offset] & 0xC0 != 0x80;
727
offset += *length as usize;
728
}
729
730
if !none_starting_with_continuation_byte {
731
return Err(invalid_utf8_err());
732
}
733
734
if simdutf8::basic::from_utf8(&values[..offset]).is_err() {
735
return Err(invalid_utf8_err());
736
}
737
}
738
739
let mut i = 0;
740
let mut offset = 0;
741
unspecialized_decode(
742
lengths.len(),
743
|| {
744
let length = lengths[i] as usize;
745
746
let value = &values[offset..offset + length];
747
748
i += 1;
749
offset += length;
750
751
Ok(value)
752
},
753
filter,
754
state.page_validity,
755
state.is_optional,
756
&mut decoded.validity,
757
&mut decoded.binview,
758
)
759
},
760
StateTranslation::DeltaBytes(mut decoder) => {
761
let check_utf8 = self.is_string;
762
763
unspecialized_decode(
764
decoder.len(),
765
|| {
766
let value = decoder.next().unwrap()?;
767
768
if check_utf8 && simdutf8::basic::from_utf8(&value[..]).is_err() {
769
return Err(invalid_utf8_err());
770
}
771
772
Ok(value)
773
},
774
filter,
775
state.page_validity,
776
state.is_optional,
777
&mut decoded.validity,
778
&mut decoded.binview,
779
)
780
},
781
}
782
}
783
784
fn extend_constant(
785
&mut self,
786
decoded: &mut Self::DecodedState,
787
length: usize,
788
value: &ParquetScalar,
789
) -> ParquetResult<()> {
790
if value.is_null() {
791
decoded.extend_nulls(length);
792
return Ok(());
793
}
794
795
let value = match value {
796
ParquetScalar::String(v) => v.as_bytes(),
797
ParquetScalar::Binary(v) => v.as_ref(),
798
_ => unreachable!(),
799
};
800
801
decoded.binview.extend_constant(length, Some(value));
802
decoded.validity.extend_constant(length, true);
803
804
Ok(())
805
}
806
807
fn finalize(
808
&self,
809
dtype: ArrowDataType,
810
_dict: Option<Self::Dict>,
811
state: Self::DecodedState,
812
) -> ParquetResult<Box<dyn Array>> {
813
let mut array: BinaryViewArray = state.binview.freeze();
814
815
let validity = freeze_validity(state.validity);
816
array = array.with_validity(validity);
817
818
match dtype.to_physical_type() {
819
PhysicalType::BinaryView => Ok(array.boxed()),
820
PhysicalType::Utf8View => {
821
// SAFETY: we already checked utf8
822
unsafe {
823
Ok(Utf8ViewArray::new_unchecked(
824
dtype,
825
array.views().clone(),
826
array.data_buffers().clone(),
827
array.validity().cloned(),
828
array.try_total_bytes_len(),
829
array.total_buffer_len(),
830
)
831
.boxed())
832
}
833
},
834
_ => unreachable!(),
835
}
836
}
837
}
838
839
#[derive(Debug)]
840
pub struct BinaryIter<'a> {
841
values: &'a [u8],
842
843
/// A maximum number of items that this [`BinaryIter`] may produce.
844
///
845
/// This equal the length of the iterator i.f.f. the data encoded by the [`BinaryIter`] is not
846
/// nullable.
847
max_num_values: usize,
848
}
849
850
impl<'a> BinaryIter<'a> {
851
pub fn new(values: &'a [u8], max_num_values: usize) -> Self {
852
Self {
853
values,
854
max_num_values,
855
}
856
}
857
}
858
859
impl<'a> Iterator for BinaryIter<'a> {
860
type Item = &'a [u8];
861
862
#[inline]
863
fn next(&mut self) -> Option<Self::Item> {
864
if self.max_num_values == 0 {
865
assert!(self.values.is_empty());
866
return None;
867
}
868
869
let (length, remaining) = self.values.split_at(4);
870
let length: [u8; 4] = unsafe { length.try_into().unwrap_unchecked() };
871
let length = u32::from_le_bytes(length) as usize;
872
let (result, remaining) = remaining.split_at(length);
873
self.max_num_values -= 1;
874
self.values = remaining;
875
Some(result)
876
}
877
878
#[inline]
879
fn size_hint(&self) -> (usize, Option<usize>) {
880
(0, Some(self.max_num_values))
881
}
882
}
883
884