Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/read/deserialize/binary/mod.rs
8475 views
1
use arrow::array::{Array, BinaryArray};
2
use arrow::bitmap::{Bitmap, BitmapBuilder};
3
use arrow::datatypes::ArrowDataType;
4
use arrow::offset::OffsetsBuffer;
5
use polars_buffer::Buffer;
6
use polars_compute::filter::filter_with_bitmap;
7
8
use super::utils::dict_indices_decoder;
9
use super::{Filter, PredicateFilter};
10
use crate::parquet::encoding::{Encoding, hybrid_rle};
11
use crate::parquet::error::ParquetResult;
12
use crate::parquet::page::{DataPage, DictPage, split_buffer};
13
use crate::read::deserialize::utils::{self};
14
use crate::read::expr::{ParquetScalar, SpecializedParquetColumnExpr};
15
16
mod dictionary;
17
mod plain;
18
19
type DecodedStateTuple = (Vec<u8>, Vec<i64>);
20
21
impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> {
22
type PlainDecoder = BinaryIter<'a>;
23
24
fn new(
25
_decoder: &BinaryDecoder,
26
page: &'a DataPage,
27
dict: Option<&'a <BinaryDecoder as utils::Decoder>::Dict>,
28
page_validity: Option<&Bitmap>,
29
) -> ParquetResult<Self> {
30
match (page.encoding(), dict) {
31
(Encoding::Plain, _) => {
32
let values = split_buffer(page)?.values;
33
let values = BinaryIter::new(values, page.num_values());
34
35
Ok(Self::Plain(values))
36
},
37
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => {
38
let values =
39
dict_indices_decoder(page, page_validity.map_or(0, |bm| bm.unset_bits()))?;
40
Ok(Self::Dictionary(values))
41
},
42
_ => Err(utils::not_implemented(page)),
43
}
44
}
45
46
fn num_rows(&self) -> usize {
47
match self {
48
StateTranslation::Plain(i) => i.max_num_values,
49
StateTranslation::Dictionary(i) => i.len(),
50
}
51
}
52
}
53
54
pub struct BinaryDecoder;
55
56
#[allow(clippy::large_enum_variant)]
57
#[derive(Debug)]
58
pub(crate) enum StateTranslation<'a> {
59
Plain(BinaryIter<'a>),
60
Dictionary(hybrid_rle::HybridRleDecoder<'a>),
61
}
62
63
impl utils::Decoded for DecodedStateTuple {
64
fn len(&self) -> usize {
65
self.1.len().saturating_sub(1)
66
}
67
68
fn extend_nulls(&mut self, n: usize) {
69
let last = *self.1.last().unwrap();
70
self.1.extend(std::iter::repeat_n(last, n));
71
}
72
73
fn remaining_capacity(&self) -> usize {
74
self.1.capacity() - self.1.len()
75
}
76
}
77
78
impl utils::Decoder for BinaryDecoder {
79
type Translation<'a> = StateTranslation<'a>;
80
type Dict = BinaryArray<i64>;
81
type DecodedState = DecodedStateTuple;
82
type Output = BinaryArray<i64>;
83
84
const CHUNKED: bool = true;
85
86
fn with_capacity(&self, _capacity: usize) -> Self::DecodedState {
87
// Handled in extend
88
(Vec::new(), Vec::new())
89
}
90
91
fn evaluate_dict_predicate(
92
&self,
93
dict: &Self::Dict,
94
predicate: &PredicateFilter,
95
) -> ParquetResult<Bitmap> {
96
Ok(predicate.predicate.evaluate(dict as &dyn Array))
97
}
98
99
fn evaluate_predicate(
100
&mut self,
101
_state: &utils::State<'_, Self>,
102
_predicate: Option<&SpecializedParquetColumnExpr>,
103
_pred_true_mask: &mut BitmapBuilder,
104
_dict_mask: Option<&Bitmap>,
105
) -> ParquetResult<bool> {
106
Ok(false)
107
}
108
109
fn apply_dictionary(
110
&mut self,
111
_decoded: &mut Self::DecodedState,
112
_dict: &Self::Dict,
113
) -> ParquetResult<()> {
114
Ok(())
115
}
116
117
fn deserialize_dict(&mut self, page: DictPage) -> ParquetResult<Self::Dict> {
118
let values = &page.buffer;
119
let num_values = page.num_values;
120
let mut target = Vec::<u8>::new();
121
let mut offsets = Vec::<i64>::with_capacity(page.num_values + 1);
122
123
plain::decode_plain(values, num_values, &mut target, &mut offsets)?;
124
125
let values = target.into();
126
let offsets = offsets.into();
127
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };
128
let arr = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);
129
130
Ok(arr)
131
}
132
133
fn extend_decoded(
134
&self,
135
decoded: &mut Self::DecodedState,
136
additional: &dyn Array,
137
is_optional: bool,
138
) -> ParquetResult<()> {
139
assert!(!is_optional);
140
141
let array = additional
142
.as_any()
143
.downcast_ref::<BinaryArray<i64>>()
144
.unwrap();
145
146
let offsets = array.offsets();
147
let fst = *offsets.first();
148
let lst = *offsets.last();
149
let current_lst = *decoded.1.last().unwrap();
150
decoded
151
.0
152
.extend_from_slice(&array.values()[fst as usize..lst as usize]);
153
decoded
154
.1
155
.extend(offsets.iter().map(|o| o + current_lst - fst));
156
157
Ok(())
158
}
159
160
fn extend_filtered_with_state(
161
&mut self,
162
mut state: utils::State<'_, Self>,
163
_decoded: &mut Self::DecodedState,
164
filter: Option<super::Filter>,
165
chunks: &mut Vec<Self::Output>,
166
) -> ParquetResult<()> {
167
if state.page_validity.is_some() || matches!(filter, Some(Filter::Predicate(_))) {
168
// Currently we only use BinaryArray for internal (de)serialization, so this is a
169
// limited implementation to save effort.
170
unimplemented!()
171
}
172
173
let mut target = Vec::new();
174
let mut offsets =
175
Vec::with_capacity(utils::StateTranslation::num_rows(&state.translation) + 1);
176
177
match state.translation {
178
StateTranslation::Plain(iter) => {
179
plain::decode_plain(iter.values, iter.max_num_values, &mut target, &mut offsets)?
180
},
181
StateTranslation::Dictionary(ref mut indexes) => {
182
let dict = state.dict.unwrap();
183
dictionary::decode_dictionary(indexes.clone(), &mut target, &mut offsets, dict)?;
184
},
185
}
186
187
if let Some(Filter::Range(slice)) = &filter
188
&& (slice.start > 0 || slice.end < offsets.len() - 1)
189
{
190
let mut new_target = Vec::new();
191
let mut new_offsets = Vec::new();
192
193
let buffer_start = offsets[slice.start] as usize;
194
let buffer_end = offsets[slice.end.min(offsets.len() - 1)] as usize;
195
196
new_target.extend(target.drain(buffer_start..buffer_end));
197
new_offsets.extend(offsets.drain(slice.start..slice.end.min(offsets.len() - 1)));
198
199
target = new_target;
200
offsets = new_offsets;
201
}
202
203
let values = target.into();
204
let offsets = offsets.into();
205
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };
206
207
let mut array = BinaryArray::new(ArrowDataType::LargeBinary, offsets, values, None);
208
209
if let Some(Filter::Mask(mask)) = &filter {
210
array = filter_with_bitmap(&array, mask)
211
.as_any()
212
.downcast_ref::<BinaryArray<i64>>()
213
.unwrap()
214
.clone();
215
}
216
217
chunks.push(array);
218
219
Ok(())
220
}
221
222
fn finalize(
223
&self,
224
dtype: ArrowDataType,
225
_dict: Option<Self::Dict>,
226
(values, mut offsets): Self::DecodedState,
227
) -> ParquetResult<Self::Output> {
228
assert!(values.is_empty());
229
assert!(offsets.is_empty());
230
offsets.push(0);
231
let values: Buffer<u8> = values.into();
232
let offsets: OffsetsBuffer<i64> = unsafe { OffsetsBuffer::new_unchecked(offsets.into()) };
233
Ok(BinaryArray::<i64>::new(dtype, offsets, values, None))
234
}
235
236
fn extend_constant(
237
&mut self,
238
_decoded: &mut Self::DecodedState,
239
_length: usize,
240
_value: &ParquetScalar,
241
) -> ParquetResult<()> {
242
todo!()
243
}
244
}
245
246
#[derive(Debug)]
247
pub struct BinaryIter<'a> {
248
values: &'a [u8],
249
250
/// A maximum number of items that this [`BinaryIter`] may produce.
251
///
252
/// This equal the length of the iterator i.f.f. the data encoded by the [`BinaryIter`] is not
253
/// nullable.
254
max_num_values: usize,
255
}
256
257
impl<'a> BinaryIter<'a> {
258
pub fn new(values: &'a [u8], max_num_values: usize) -> Self {
259
Self {
260
values,
261
max_num_values,
262
}
263
}
264
}
265
266
impl<'a> Iterator for BinaryIter<'a> {
267
type Item = &'a [u8];
268
269
#[inline]
270
fn next(&mut self) -> Option<Self::Item> {
271
if self.max_num_values == 0 {
272
assert!(self.values.is_empty());
273
return None;
274
}
275
276
let (length, remaining) = self.values.split_at(4);
277
let length: [u8; 4] = unsafe { length.try_into().unwrap_unchecked() };
278
let length = u32::from_le_bytes(length) as usize;
279
let (result, remaining) = remaining.split_at(length);
280
self.max_num_values -= 1;
281
self.values = remaining;
282
Some(result)
283
}
284
285
#[inline]
286
fn size_hint(&self) -> (usize, Option<usize>) {
287
(0, Some(self.max_num_values))
288
}
289
}
290
291