Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs
8450 views
1
use arrow::array::PrimitiveArray;
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::datatypes::ArrowDataType;
4
use arrow::types::{AlignedBytes, NativeType};
5
use bytemuck::Zeroable;
6
7
use super::super::utils;
8
use super::{
9
AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, IntoDecoderFunction,
10
PrimitiveDecoder, UnitDecoderFunction,
11
};
12
use crate::parquet::encoding::{Encoding, byte_stream_split, delta_bitpacked, hybrid_rle};
13
use crate::parquet::error::ParquetResult;
14
use crate::parquet::page::{DataPage, DictPage, split_buffer};
15
use crate::parquet::types::{NativeType as ParquetNativeType, decode};
16
use crate::read::Filter;
17
use crate::read::deserialize::dictionary_encoded;
18
use crate::read::deserialize::utils::array_chunks::ArrayChunks;
19
use crate::read::deserialize::utils::{
20
dict_indices_decoder, freeze_validity, unspecialized_decode,
21
};
22
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
23
24
#[allow(clippy::large_enum_variant)]
25
#[derive(Debug)]
26
pub(crate) enum StateTranslation<'a> {
27
Plain(&'a [u8]),
28
Dictionary(hybrid_rle::HybridRleDecoder<'a>),
29
ByteStreamSplit(byte_stream_split::Decoder<'a>),
30
DeltaBinaryPacked(delta_bitpacked::Decoder<'a>),
31
}
32
33
impl<'a, P, T, D> utils::StateTranslation<'a, IntDecoder<P, T, D>> for StateTranslation<'a>
34
where
35
T: NativeType,
36
P: ParquetNativeType,
37
i64: num_traits::AsPrimitive<P>,
38
D: DecoderFunction<P, T>,
39
{
40
type PlainDecoder = &'a [u8];
41
42
fn new(
43
_decoder: &IntDecoder<P, T, D>,
44
page: &'a DataPage,
45
dict: Option<&'a <IntDecoder<P, T, D> as utils::Decoder>::Dict>,
46
page_validity: Option<&Bitmap>,
47
) -> ParquetResult<Self> {
48
match (page.encoding(), dict) {
49
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {
50
let values =
51
dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;
52
Ok(Self::Dictionary(values))
53
},
54
(Encoding::Plain, _) => {
55
let values = split_buffer(page)?.values;
56
Ok(Self::Plain(values))
57
},
58
(Encoding::ByteStreamSplit, _) => {
59
let values = split_buffer(page)?.values;
60
Ok(Self::ByteStreamSplit(byte_stream_split::Decoder::try_new(
61
values,
62
size_of::<P>(),
63
)?))
64
},
65
(Encoding::DeltaBinaryPacked, _) => {
66
let values = split_buffer(page)?.values;
67
Ok(Self::DeltaBinaryPacked(
68
delta_bitpacked::Decoder::try_new(values)?.0,
69
))
70
},
71
_ => Err(utils::not_implemented(page)),
72
}
73
}
74
fn num_rows(&self) -> usize {
75
match self {
76
Self::Plain(v) => v.len() / size_of::<P>(),
77
Self::Dictionary(i) => i.len(),
78
Self::ByteStreamSplit(i) => i.len(),
79
Self::DeltaBinaryPacked(i) => i.len(),
80
}
81
}
82
}
83
84
/// Decoder of integer parquet type
85
#[derive(Debug)]
86
pub(crate) struct IntDecoder<P, T, D>(PrimitiveDecoder<P, T, D>)
87
where
88
T: NativeType,
89
P: ParquetNativeType,
90
i64: num_traits::AsPrimitive<P>,
91
D: DecoderFunction<P, T>;
92
93
impl<P, T, D> IntDecoder<P, T, D>
94
where
95
P: ParquetNativeType,
96
T: NativeType,
97
i64: num_traits::AsPrimitive<P>,
98
D: DecoderFunction<P, T>,
99
{
100
#[inline]
101
fn new(decoder: D) -> Self {
102
Self(PrimitiveDecoder::new(decoder))
103
}
104
}
105
106
impl<T> IntDecoder<T, T, UnitDecoderFunction<T>>
107
where
108
T: NativeType + ParquetNativeType,
109
i64: num_traits::AsPrimitive<T>,
110
UnitDecoderFunction<T>: Default + DecoderFunction<T, T>,
111
{
112
pub(crate) fn unit() -> Self {
113
Self::new(UnitDecoderFunction::<T>::default())
114
}
115
}
116
117
impl<P, T> IntDecoder<P, T, AsDecoderFunction<P, T>>
118
where
119
P: ParquetNativeType,
120
T: NativeType,
121
i64: num_traits::AsPrimitive<P>,
122
AsDecoderFunction<P, T>: Default + DecoderFunction<P, T>,
123
{
124
pub(crate) fn cast_as() -> Self {
125
Self::new(AsDecoderFunction::<P, T>::default())
126
}
127
}
128
129
impl<P, T> IntDecoder<P, T, IntoDecoderFunction<P, T>>
130
where
131
P: ParquetNativeType,
132
T: NativeType,
133
i64: num_traits::AsPrimitive<P>,
134
IntoDecoderFunction<P, T>: Default + DecoderFunction<P, T>,
135
{
136
pub(crate) fn cast_into() -> Self {
137
Self::new(IntoDecoderFunction::<P, T>::default())
138
}
139
}
140
141
impl<P, T, F> IntDecoder<P, T, ClosureDecoderFunction<P, T, F>>
142
where
143
P: ParquetNativeType,
144
T: NativeType,
145
i64: num_traits::AsPrimitive<P>,
146
F: Copy + Fn(P) -> T,
147
{
148
pub(crate) fn closure(f: F) -> Self {
149
Self::new(ClosureDecoderFunction(f, std::marker::PhantomData))
150
}
151
}
152
153
impl<P, T, D> utils::Decoder for IntDecoder<P, T, D>
154
where
155
T: NativeType,
156
P: ParquetNativeType,
157
i64: num_traits::AsPrimitive<P>,
158
D: DecoderFunction<P, T>,
159
{
160
type Translation<'a> = StateTranslation<'a>;
161
type Dict = PrimitiveArray<T>;
162
type DecodedState = (Vec<T>, BitmapBuilder);
163
type Output = PrimitiveArray<T>;
164
165
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
166
(
167
Vec::<T>::with_capacity(capacity),
168
BitmapBuilder::with_capacity(capacity),
169
)
170
}
171
172
fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {
173
let values = page.buffer.as_ref();
174
175
let mut target = Vec::with_capacity(page.num_values);
176
super::plain::decode(
177
values,
178
false,
179
None,
180
None,
181
&mut BitmapBuilder::new(),
182
&mut self.0.intermediate,
183
&mut target,
184
self.0.decoder,
185
)?;
186
Ok(PrimitiveArray::new(
187
T::PRIMITIVE.into(),
188
target.into(),
189
None,
190
))
191
}
192
193
fn evaluate_predicate(
194
&mut self,
195
state: &utils::State<'_, Self>,
196
predicate: Option<&SpecializedParquetColumnExpr>,
197
pred_true_mask: &mut BitmapBuilder,
198
dict_mask: Option<&Bitmap>,
199
) -> ParquetResult<bool> {
200
// @Performance: This should be added
201
if state.page_validity.is_some() {
202
return Ok(false);
203
}
204
205
if let StateTranslation::Dictionary(values) = &state.translation {
206
let dict_mask = dict_mask.unwrap();
207
super::super::dictionary_encoded::predicate::decode(
208
values.clone(),
209
dict_mask,
210
pred_true_mask,
211
)?;
212
return Ok(true);
213
}
214
215
if !D::CAN_TRANSMUTE || D::NEED_TO_DECODE {
216
return Ok(false);
217
}
218
219
let Some(predicate) = predicate else {
220
return Ok(false);
221
};
222
223
use SpecializedParquetColumnExpr as S;
224
match (&state.translation, predicate) {
225
(StateTranslation::Plain(values), S::Equal(needle)) => {
226
let values = ArrayChunks::new(values).unwrap();
227
let needle = needle.to_aligned_bytes::<T::AlignedBytes>().unwrap();
228
super::plain::predicate::decode_equals(values, needle, pred_true_mask);
229
},
230
(StateTranslation::Plain(values), S::Between(low, high)) => {
231
let values = ArrayChunks::new(values).unwrap();
232
use arrow::types::PrimitiveType as PT;
233
let is_signed = match T::PRIMITIVE {
234
PT::Int8 | PT::Int16 | PT::Int32 | PT::Int64 => true,
235
PT::UInt8 | PT::UInt16 | PT::UInt32 | PT::UInt64 => false,
236
PT::Int128
237
| PT::Int256
238
| PT::UInt128
239
| PT::Float16
240
| PT::Float32
241
| PT::Float64
242
| PT::DaysMs
243
| PT::MonthDayNano
244
| PT::MonthDayMillis => return Ok(false),
245
};
246
247
let Some(low) = low.to_aligned_bytes::<T::AlignedBytes>() else {
248
return Ok(false);
249
};
250
let Some(high) = high.to_aligned_bytes::<T::AlignedBytes>() else {
251
return Ok(false);
252
};
253
254
let mut low1 = low;
255
let mut high1 = high;
256
let mut low2 = low;
257
let mut high2 = high;
258
259
if is_signed && !low.unsigned_leq(high) {
260
low1 = low;
261
high1 = T::AlignedBytes::ones();
262
263
low2 = T::AlignedBytes::zeros();
264
high2 = high;
265
}
266
267
super::plain::predicate::decode_between(
268
values,
269
low1,
270
high1,
271
low2,
272
high2,
273
pred_true_mask,
274
);
275
},
276
(StateTranslation::Plain(values), S::EqualOneOf(needles))
277
if (1..=8).contains(&needles.len()) =>
278
{
279
let values = ArrayChunks::new(values).unwrap();
280
let mut needles_array = [<T::AlignedBytes>::zeroed(); 8];
281
for i in 0..8 {
282
needles_array[i] = needles[i.min(needles.len() - 1)]
283
.to_aligned_bytes::<T::AlignedBytes>()
284
.unwrap();
285
}
286
super::plain::predicate::decode_is_in(values, &needles_array, pred_true_mask);
287
},
288
_ => return Ok(false),
289
}
290
291
Ok(true)
292
}
293
294
fn finalize(
295
&self,
296
dtype: ArrowDataType,
297
_dict: Option<Self::Dict>,
298
(values, validity): Self::DecodedState,
299
) -> ParquetResult<Self::Output> {
300
let validity = freeze_validity(validity);
301
Ok(PrimitiveArray::try_new(dtype, values.into(), validity).unwrap())
302
}
303
304
fn extend_decoded(
305
&self,
306
decoded: &mut Self::DecodedState,
307
additional: &dyn arrow::array::Array,
308
is_optional: bool,
309
) -> ParquetResult<()> {
310
let additional = additional
311
.as_any()
312
.downcast_ref::<PrimitiveArray<T>>()
313
.unwrap();
314
decoded.0.extend(additional.values().iter().copied());
315
match additional.validity() {
316
Some(v) => decoded.1.extend_from_bitmap(v),
317
None if is_optional => decoded.1.extend_constant(additional.len(), true),
318
None => {},
319
}
320
321
Ok(())
322
}
323
324
fn extend_constant(
325
&mut self,
326
decoded: &mut Self::DecodedState,
327
length: usize,
328
value: &ParquetScalar,
329
) -> ParquetResult<()> {
330
self.0.extend_constant(decoded, length, value)
331
}
332
333
fn extend_filtered_with_state(
334
&mut self,
335
mut state: utils::State<'_, Self>,
336
decoded: &mut Self::DecodedState,
337
filter: Option<Filter>,
338
_chunks: &mut Vec<Self::Output>,
339
) -> ParquetResult<()> {
340
match state.translation {
341
StateTranslation::Plain(ref mut values) => super::plain::decode(
342
values,
343
state.is_optional,
344
state.page_validity.as_ref(),
345
filter,
346
&mut decoded.1,
347
&mut self.0.intermediate,
348
&mut decoded.0,
349
self.0.decoder,
350
),
351
StateTranslation::Dictionary(ref mut indexes) => dictionary_encoded::decode_dict(
352
indexes.clone(),
353
state.dict.unwrap().values().as_slice(),
354
state.is_optional,
355
state.page_validity.as_ref(),
356
filter,
357
&mut decoded.1,
358
&mut decoded.0,
359
),
360
StateTranslation::ByteStreamSplit(mut decoder) => {
361
let num_rows = decoder.len();
362
let mut iter = decoder.iter_converted(|v| self.0.decoder.decode(decode(v)));
363
364
unspecialized_decode(
365
num_rows,
366
|| Ok(iter.next().unwrap()),
367
filter,
368
state.page_validity,
369
state.is_optional,
370
&mut decoded.1,
371
&mut decoded.0,
372
)
373
},
374
StateTranslation::DeltaBinaryPacked(decoder) => {
375
let num_rows = decoder.len();
376
let values = decoder.collect::<Vec<i64>>()?;
377
378
let mut i = 0;
379
unspecialized_decode(
380
num_rows,
381
|| {
382
use num_traits::AsPrimitive;
383
let value = values[i];
384
i += 1;
385
Ok(self.0.decoder.decode(value.as_()))
386
},
387
filter,
388
state.page_validity,
389
state.is_optional,
390
&mut decoded.1,
391
&mut decoded.0,
392
)
393
},
394
}
395
}
396
}
397
398