Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/dictionary_encoded/mod.rs
8506 views
1
use arrow::bitmap::bitmask::BitMask;
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::types::{
4
AlignedBytes, Bytes1Alignment1, Bytes2Alignment2, Bytes4Alignment4, NativeType,
5
};
6
use polars_compute::filter::filter_boolean_kernel;
7
use polars_utils::vec::with_cast_mut_vec;
8
9
use super::ParquetError;
10
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
11
use crate::parquet::error::ParquetResult;
12
use crate::read::Filter;
13
14
mod optional;
15
mod optional_masked_dense;
16
pub mod predicate;
17
mod required;
18
mod required_masked_dense;
19
20
/// A mapping from a `u32` to a value. This is used in to map dictionary encoding to a value.
21
pub trait IndexMapping {
22
type Output: Copy + AlignedBytes;
23
24
fn is_empty(&self) -> bool {
25
self.len() == 0
26
}
27
fn len(&self) -> usize;
28
fn get(&self, idx: u32) -> Option<Self::Output> {
29
((idx as usize) < self.len()).then(|| unsafe { self.get_unchecked(idx) })
30
}
31
unsafe fn get_unchecked(&self, idx: u32) -> Self::Output;
32
}
33
34
// Base mapping used for everything except the CategoricalDecoder.
35
impl<T: Copy + AlignedBytes> IndexMapping for &[T] {
36
type Output = T;
37
38
#[inline(always)]
39
fn len(&self) -> usize {
40
<[T]>::len(self)
41
}
42
43
#[inline(always)]
44
unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {
45
*unsafe { <[T]>::get_unchecked(self, idx as usize) }
46
}
47
}
48
49
// Unit mapping used in the CategoricalDecoder.
50
impl IndexMapping for u8 {
51
type Output = Bytes1Alignment1;
52
53
#[inline(always)]
54
fn len(&self) -> usize {
55
*self as usize
56
}
57
58
#[inline(always)]
59
unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {
60
bytemuck::must_cast(idx as u8)
61
}
62
}
63
64
impl IndexMapping for u16 {
65
type Output = Bytes2Alignment2;
66
67
#[inline(always)]
68
fn len(&self) -> usize {
69
*self as usize
70
}
71
72
#[inline(always)]
73
unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {
74
bytemuck::must_cast(idx as u16)
75
}
76
}
77
78
impl IndexMapping for u32 {
79
type Output = Bytes4Alignment4;
80
81
#[inline(always)]
82
fn len(&self) -> usize {
83
*self as usize
84
}
85
86
#[inline(always)]
87
unsafe fn get_unchecked(&self, idx: u32) -> Self::Output {
88
bytemuck::must_cast(idx)
89
}
90
}
91
92
#[allow(clippy::too_many_arguments)]
93
pub fn decode_dict<T: NativeType>(
94
values: HybridRleDecoder<'_>,
95
dict: &[T],
96
is_optional: bool,
97
page_validity: Option<&Bitmap>,
98
filter: Option<Filter>,
99
validity: &mut BitmapBuilder,
100
target: &mut Vec<T>,
101
) -> ParquetResult<()> {
102
with_cast_mut_vec::<T, T::AlignedBytes, _, _>(target, |aligned_bytes_vec| {
103
decode_dict_dispatch(
104
values,
105
bytemuck::cast_slice(dict),
106
is_optional,
107
page_validity,
108
filter,
109
validity,
110
aligned_bytes_vec,
111
)
112
})
113
}
114
115
#[inline(never)]
116
#[allow(clippy::too_many_arguments)]
117
pub fn decode_dict_dispatch<B: AlignedBytes, D: IndexMapping<Output = B>>(
118
mut values: HybridRleDecoder<'_>,
119
dict: D,
120
is_optional: bool,
121
page_validity: Option<&Bitmap>,
122
filter: Option<Filter>,
123
validity: &mut BitmapBuilder,
124
target: &mut Vec<B>,
125
) -> ParquetResult<()> {
126
if is_optional {
127
append_validity(page_validity, filter.as_ref(), validity, values.len());
128
}
129
130
let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());
131
132
match (filter, page_validity) {
133
(None, None) => required::decode(values, dict, target, 0),
134
(Some(Filter::Range(rng)), None) => {
135
values.limit_to(rng.end);
136
required::decode(values, dict, target, rng.start)
137
},
138
(None, Some(page_validity)) => optional::decode(values, dict, page_validity, target, 0),
139
(Some(Filter::Range(rng)), Some(page_validity)) => {
140
optional::decode(values, dict, page_validity, target, rng.start)
141
},
142
(Some(Filter::Mask(filter)), None) => {
143
required_masked_dense::decode(values, dict, filter, target)
144
},
145
(Some(Filter::Mask(filter)), Some(page_validity)) => {
146
optional_masked_dense::decode(values, dict, filter, page_validity, target)
147
},
148
(Some(Filter::Predicate(_)), _) => unreachable!(),
149
}?;
150
151
Ok(())
152
}
153
154
pub(crate) fn append_validity(
155
page_validity: Option<&Bitmap>,
156
filter: Option<&Filter>,
157
validity: &mut BitmapBuilder,
158
values_len: usize,
159
) {
160
match (page_validity, filter) {
161
(None, None) => validity.extend_constant(values_len, true),
162
(None, Some(Filter::Range(range))) => validity.extend_constant(range.len(), true),
163
(None, Some(Filter::Mask(mask))) => validity.extend_constant(mask.set_bits(), true),
164
(None, Some(Filter::Predicate(_))) => {
165
// Done later.
166
},
167
(Some(page_validity), None) => validity.extend_from_bitmap(page_validity),
168
(Some(page_validity), Some(Filter::Range(rng))) => {
169
let page_validity = page_validity.clone();
170
validity.extend_from_bitmap(&page_validity.sliced(rng.start, rng.len()))
171
},
172
(Some(page_validity), Some(Filter::Mask(mask))) => {
173
validity.extend_from_bitmap(&filter_boolean_kernel(page_validity, mask))
174
},
175
(_, Some(Filter::Predicate(_))) => todo!(),
176
}
177
}
178
179
pub(crate) fn constrain_page_validity(
180
values_len: usize,
181
page_validity: Option<&Bitmap>,
182
filter: Option<&Filter>,
183
) -> Option<Bitmap> {
184
let num_unfiltered_rows = match (filter.as_ref(), page_validity) {
185
(None, None) => values_len,
186
(None, Some(pv)) => pv.len(),
187
(Some(f), Some(pv)) => {
188
debug_assert!(pv.len() >= f.max_offset(pv.len()));
189
f.max_offset(pv.len())
190
},
191
(Some(f), None) => f.max_offset(values_len),
192
};
193
194
page_validity.map(|pv| {
195
if pv.len() > num_unfiltered_rows {
196
pv.clone().sliced(0, num_unfiltered_rows)
197
} else {
198
pv.clone()
199
}
200
})
201
}
202
203
#[cold]
204
fn oob_dict_idx() -> ParquetError {
205
ParquetError::oos("Dictionary Index is out-of-bounds")
206
}
207
208
#[cold]
209
fn no_more_bitpacked_values() -> ParquetError {
210
ParquetError::oos("Bitpacked Hybrid-RLE ran out before all values were served")
211
}
212
213
#[inline(always)]
214
pub(super) fn verify_dict_indices(indices: &[u32], dict_size: usize) -> ParquetResult<()> {
215
debug_assert!(dict_size <= u32::MAX as usize);
216
let dict_size = dict_size as u32;
217
218
let mut is_valid = true;
219
for &idx in indices {
220
is_valid &= idx < dict_size;
221
}
222
223
if is_valid {
224
Ok(())
225
} else {
226
Err(oob_dict_idx())
227
}
228
}
229
230
/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
231
/// more than `num_values_to_skip` values.
232
#[inline(always)]
233
fn required_skip_whole_chunks(
234
values: &mut HybridRleDecoder<'_>,
235
num_values_to_skip: &mut usize,
236
) -> ParquetResult<()> {
237
if *num_values_to_skip == 0 {
238
return Ok(());
239
}
240
241
loop {
242
let mut values_clone = values.clone();
243
let Some(chunk_len) = values_clone.next_chunk_length()? else {
244
break;
245
};
246
if *num_values_to_skip < chunk_len {
247
break;
248
}
249
*values = values_clone;
250
*num_values_to_skip -= chunk_len;
251
}
252
253
Ok(())
254
}
255
256
/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
257
/// more than `num_values_to_skip` values.
258
#[inline(always)]
259
fn optional_skip_whole_chunks(
260
values: &mut HybridRleDecoder<'_>,
261
validity: &mut BitMask<'_>,
262
num_rows_to_skip: &mut usize,
263
num_values_to_skip: &mut usize,
264
) -> ParquetResult<()> {
265
if *num_values_to_skip == 0 {
266
return Ok(());
267
}
268
269
let mut total_num_skipped_values = 0;
270
271
loop {
272
let mut values_clone = values.clone();
273
let Some(chunk_len) = values_clone.next_chunk_length()? else {
274
break;
275
};
276
if *num_values_to_skip < chunk_len {
277
break;
278
}
279
*values = values_clone;
280
*num_values_to_skip -= chunk_len;
281
total_num_skipped_values += chunk_len;
282
}
283
284
if total_num_skipped_values > 0 {
285
let offset = validity
286
.nth_set_bit_idx(total_num_skipped_values - 1, 0)
287
.map_or(validity.len(), |v| v + 1);
288
*num_rows_to_skip -= offset;
289
validity.advance_by(offset);
290
}
291
292
Ok(())
293
}
294
295