Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/primitive/float.rs
8480 views
1
use arrow::array::PrimitiveArray;
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::datatypes::ArrowDataType;
4
use arrow::types::NativeType;
5
6
use super::super::utils;
7
use super::{ClosureDecoderFunction, DecoderFunction, PrimitiveDecoder, UnitDecoderFunction};
8
use crate::parquet::encoding::{Encoding, byte_stream_split, hybrid_rle};
9
use crate::parquet::error::ParquetResult;
10
use crate::parquet::page::{DataPage, DictPage, split_buffer};
11
use crate::parquet::types::{NativeType as ParquetNativeType, decode};
12
use crate::read::Filter;
13
use crate::read::deserialize::dictionary_encoded;
14
use crate::read::deserialize::utils::{
15
dict_indices_decoder, freeze_validity, unspecialized_decode,
16
};
17
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
18
19
#[allow(clippy::large_enum_variant)]
20
#[derive(Debug)]
21
pub(crate) enum StateTranslation<'a> {
22
Plain(&'a [u8]),
23
Dictionary(hybrid_rle::HybridRleDecoder<'a>),
24
ByteStreamSplit(byte_stream_split::Decoder<'a>),
25
}
26
27
impl<'a, P, T, D> utils::StateTranslation<'a, FloatDecoder<P, T, D>> for StateTranslation<'a>
28
where
29
T: NativeType,
30
P: ParquetNativeType + for<'b> TryFrom<&'b ParquetScalar>,
31
D: DecoderFunction<P, T>,
32
{
33
type PlainDecoder = &'a [u8];
34
35
fn new(
36
_decoder: &FloatDecoder<P, T, D>,
37
page: &'a DataPage,
38
dict: Option<&'a <FloatDecoder<P, T, D> as utils::Decoder>::Dict>,
39
page_validity: Option<&Bitmap>,
40
) -> ParquetResult<Self> {
41
match (page.encoding(), dict) {
42
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {
43
let values =
44
dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;
45
Ok(Self::Dictionary(values))
46
},
47
(Encoding::Plain, _) => {
48
let values = split_buffer(page)?.values;
49
Ok(Self::Plain(values))
50
},
51
(Encoding::ByteStreamSplit, _) => {
52
let values = split_buffer(page)?.values;
53
Ok(Self::ByteStreamSplit(byte_stream_split::Decoder::try_new(
54
values,
55
size_of::<P>(),
56
)?))
57
},
58
_ => Err(utils::not_implemented(page)),
59
}
60
}
61
fn num_rows(&self) -> usize {
62
match self {
63
Self::Plain(v) => v.len() / size_of::<P>(),
64
Self::Dictionary(i) => i.len(),
65
Self::ByteStreamSplit(i) => i.len(),
66
}
67
}
68
}
69
70
#[derive(Debug)]
71
pub(crate) struct FloatDecoder<P, T, D>(PrimitiveDecoder<P, T, D>)
72
where
73
P: ParquetNativeType,
74
T: NativeType,
75
D: DecoderFunction<P, T>;
76
77
impl<P, T, D> FloatDecoder<P, T, D>
78
where
79
P: ParquetNativeType,
80
T: NativeType,
81
D: DecoderFunction<P, T>,
82
{
83
#[inline]
84
fn new(decoder: D) -> Self {
85
Self(PrimitiveDecoder::new(decoder))
86
}
87
}
88
89
impl<T> FloatDecoder<T, T, UnitDecoderFunction<T>>
90
where
91
T: NativeType + ParquetNativeType,
92
UnitDecoderFunction<T>: Default + DecoderFunction<T, T>,
93
{
94
pub(crate) fn unit() -> Self {
95
Self::new(UnitDecoderFunction::<T>::default())
96
}
97
}
98
99
impl<P, T, F> FloatDecoder<P, T, ClosureDecoderFunction<P, T, F>>
100
where
101
P: ParquetNativeType,
102
T: NativeType,
103
F: Copy + Fn(P) -> T,
104
{
105
pub(crate) fn closure(f: F) -> Self {
106
Self::new(ClosureDecoderFunction(f, std::marker::PhantomData))
107
}
108
}
109
110
impl<T: NativeType> utils::Decoded for (Vec<T>, BitmapBuilder) {
111
fn len(&self) -> usize {
112
self.0.len()
113
}
114
115
fn extend_nulls(&mut self, n: usize) {
116
self.0.resize(self.0.len() + n, T::default());
117
self.1.extend_constant(n, false);
118
}
119
120
fn remaining_capacity(&self) -> usize {
121
(self.0.capacity() - self.0.len()).min(self.1.capacity() - self.1.len())
122
}
123
}
124
125
impl<P, T, D> utils::Decoder for FloatDecoder<P, T, D>
126
where
127
T: NativeType,
128
P: ParquetNativeType,
129
D: DecoderFunction<P, T>,
130
{
131
type Translation<'a> = StateTranslation<'a>;
132
type Dict = PrimitiveArray<T>;
133
type DecodedState = (Vec<T>, BitmapBuilder);
134
type Output = PrimitiveArray<T>;
135
136
fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
137
(
138
Vec::<T>::with_capacity(capacity),
139
BitmapBuilder::with_capacity(capacity),
140
)
141
}
142
143
fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {
144
let values = page.buffer.as_ref();
145
146
let mut target = Vec::with_capacity(page.num_values);
147
super::plain::decode(
148
values,
149
false,
150
None,
151
None,
152
&mut BitmapBuilder::new(),
153
&mut self.0.intermediate,
154
&mut target,
155
self.0.decoder,
156
)?;
157
Ok(PrimitiveArray::new(
158
T::PRIMITIVE.into(),
159
target.into(),
160
None,
161
))
162
}
163
164
fn evaluate_predicate(
165
&mut self,
166
state: &utils::State<'_, Self>,
167
_predicate: Option<&SpecializedParquetColumnExpr>,
168
pred_true_mask: &mut BitmapBuilder,
169
dict_mask: Option<&Bitmap>,
170
) -> ParquetResult<bool> {
171
if state.page_validity.is_some() {
172
// @Performance: implement validity aware
173
return Ok(false);
174
}
175
176
if let StateTranslation::Dictionary(values) = &state.translation {
177
let dict_mask = dict_mask.unwrap();
178
super::super::dictionary_encoded::predicate::decode(
179
values.clone(),
180
dict_mask,
181
pred_true_mask,
182
)?;
183
return Ok(true);
184
}
185
186
Ok(false)
187
}
188
189
fn extend_decoded(
190
&self,
191
decoded: &mut Self::DecodedState,
192
additional: &dyn arrow::array::Array,
193
is_optional: bool,
194
) -> ParquetResult<()> {
195
let additional = additional
196
.as_any()
197
.downcast_ref::<PrimitiveArray<T>>()
198
.unwrap();
199
decoded.0.extend(additional.values().iter().copied());
200
match additional.validity() {
201
Some(v) => decoded.1.extend_from_bitmap(v),
202
None if is_optional => decoded.1.extend_constant(additional.len(), true),
203
None => {},
204
}
205
206
Ok(())
207
}
208
209
fn extend_filtered_with_state(
210
&mut self,
211
mut state: utils::State<'_, Self>,
212
decoded: &mut Self::DecodedState,
213
filter: Option<Filter>,
214
_chunks: &mut Vec<Self::Output>,
215
) -> ParquetResult<()> {
216
match state.translation {
217
StateTranslation::Plain(ref mut values) => super::plain::decode(
218
values,
219
state.is_optional,
220
state.page_validity.as_ref(),
221
filter,
222
&mut decoded.1,
223
&mut self.0.intermediate,
224
&mut decoded.0,
225
self.0.decoder,
226
),
227
StateTranslation::Dictionary(ref mut indexes) => dictionary_encoded::decode_dict(
228
indexes.clone(),
229
state.dict.unwrap().values().as_slice(),
230
state.is_optional,
231
state.page_validity.as_ref(),
232
filter,
233
&mut decoded.1,
234
&mut decoded.0,
235
),
236
StateTranslation::ByteStreamSplit(mut decoder) => {
237
let num_rows = decoder.len();
238
let mut iter = decoder.iter_converted(|v| self.0.decoder.decode(decode(v)));
239
240
unspecialized_decode(
241
num_rows,
242
|| Ok(iter.next().unwrap()),
243
filter,
244
state.page_validity,
245
state.is_optional,
246
&mut decoded.1,
247
&mut decoded.0,
248
)
249
},
250
}
251
}
252
253
fn extend_constant(
254
&mut self,
255
decoded: &mut Self::DecodedState,
256
length: usize,
257
value: &ParquetScalar,
258
) -> ParquetResult<()> {
259
self.0.extend_constant(decoded, length, value)
260
}
261
262
fn finalize(
263
&self,
264
dtype: ArrowDataType,
265
_dict: Option<Self::Dict>,
266
(values, validity): Self::DecodedState,
267
) -> ParquetResult<Self::Output> {
268
let validity = freeze_validity(validity);
269
Ok(PrimitiveArray::try_new(dtype, values.into(), validity).unwrap())
270
}
271
}
272
273