Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/required.rs
8509 views
1
use arrow::types::AlignedBytes;
2
3
use super::{IndexMapping, oob_dict_idx, required_skip_whole_chunks, verify_dict_indices};
4
use crate::parquet::encoding::hybrid_rle::{HybridRleChunk, HybridRleDecoder};
5
use crate::parquet::error::ParquetResult;
6
7
/// Decoding kernel for required dictionary encoded.
8
#[inline(never)]
9
pub fn decode<B: AlignedBytes, D: IndexMapping<Output = B>>(
10
mut values: HybridRleDecoder<'_>,
11
dict: D,
12
target: &mut Vec<B>,
13
mut num_rows_to_skip: usize,
14
) -> ParquetResult<()> {
15
debug_assert!(num_rows_to_skip <= values.len());
16
17
let num_rows = values.len() - num_rows_to_skip;
18
let end_length = target.len() + num_rows;
19
20
if num_rows == 0 {
21
return Ok(());
22
}
23
24
target.reserve(num_rows);
25
26
if dict.is_empty() {
27
return Err(oob_dict_idx());
28
}
29
30
// Skip over whole HybridRleChunks
31
required_skip_whole_chunks(&mut values, &mut num_rows_to_skip)?;
32
33
while let Some(chunk) = values.next_chunk()? {
34
debug_assert!(num_rows_to_skip < chunk.len() || chunk.len() == 0);
35
36
match chunk {
37
HybridRleChunk::Rle(value, size) => {
38
if size == 0 {
39
continue;
40
}
41
42
let Some(value) = dict.get(value) else {
43
return Err(oob_dict_idx());
44
};
45
46
target.resize(target.len() + size - num_rows_to_skip, value);
47
},
48
HybridRleChunk::Bitpacked(mut decoder) => {
49
if num_rows_to_skip > 0 {
50
decoder.skip_chunks(num_rows_to_skip / 32);
51
num_rows_to_skip %= 32;
52
53
if let Some((chunk, chunk_size)) = decoder.chunked().next_inexact() {
54
let chunk = &chunk[num_rows_to_skip..chunk_size];
55
verify_dict_indices(chunk, dict.len())?;
56
target.extend(chunk.iter().map(|&idx| {
57
// SAFETY: The dict indices were verified before.
58
unsafe { dict.get_unchecked(idx) }
59
}));
60
}
61
}
62
63
let mut chunked = decoder.chunked();
64
for chunk in chunked.by_ref() {
65
verify_dict_indices(&chunk, dict.len())?;
66
target.extend(chunk.iter().map(|&idx| {
67
// SAFETY: The dict indices were verified before.
68
unsafe { dict.get_unchecked(idx) }
69
}));
70
}
71
72
if let Some((chunk, chunk_size)) = chunked.remainder() {
73
verify_dict_indices(&chunk[..chunk_size], dict.len())?;
74
target.extend(chunk[..chunk_size].iter().map(|&idx| {
75
// SAFETY: The dict indices were verified before.
76
unsafe { dict.get_unchecked(idx) }
77
}));
78
}
79
},
80
}
81
82
num_rows_to_skip = 0;
83
}
84
85
debug_assert_eq!(target.len(), end_length);
86
87
Ok(())
88
}
89
90