Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/arrow/write/nested/dremel/mod.rs
6940 views
1
//! Implements the Dremel encoding part of Parquet with *repetition-levels* and *definition-levels*
2
3
use arrow::bitmap::Bitmap;
4
use arrow::offset::OffsetsBuffer;
5
use polars_utils::fixedringbuffer::FixedRingBuffer;
6
7
use super::super::pages::Nested;
8
9
#[cfg(test)]
10
mod tests;
11
12
/// A Dremel encoding value
13
#[derive(Clone, Copy)]
14
pub struct DremelValue {
15
/// A *repetition-level* value
16
pub rep: u16,
17
/// A *definition-level* value
18
pub def: u16,
19
}
20
21
/// This tries to mirror the Parquet Schema structures, so that is simple to reason about the
22
/// Dremel structures.
23
enum LevelContent<'a> {
24
/// Always 1 instance
25
Required,
26
/// Zero or more instances
27
Repeated,
28
/// Zero or one instance
29
Optional(Option<&'a Bitmap>),
30
}
31
32
struct Level<'a> {
33
content: LevelContent<'a>,
34
/// "Iterator" with number of elements for the next level
35
lengths: LevelLength<'a>,
36
/// Remaining number of elements to process. NOTE: This is **not** equal to `length - offset`.
37
remaining: usize,
38
/// Offset into level elements
39
offset: usize,
40
/// The definition-level associated with this level
41
definition_depth: u16,
42
/// The repetition-level associated with this level
43
repetition_depth: u16,
44
}
45
46
/// This contains the number of elements on the next level for each
47
enum LevelLength<'a> {
48
/// Fixed number of elements based on the validity of this element
49
Optional(usize),
50
/// Fixed number of elements irregardless of the validity of this element
51
Constant(usize),
52
/// Variable number of elements and calculated from the difference between two `i32` offsets
53
OffsetsI32(&'a OffsetsBuffer<i32>),
54
/// Variable number of elements and calculated from the difference between two `i64` offsets
55
OffsetsI64(&'a OffsetsBuffer<i64>),
56
}
57
58
/// A iterator for Dremel *repetition* and *definition-levels* in Parquet
59
///
60
/// This buffers many consequentive repetition and definition-levels as to not have to branch in
61
/// and out of this code constantly.
62
pub struct BufferedDremelIter<'a> {
63
buffer: FixedRingBuffer<DremelValue>,
64
65
levels: Box<[Level<'a>]>,
66
/// Current offset into `levels` that is being explored
67
current_level: usize,
68
69
last_repetition: u16,
70
}
71
72
/// return number values of the nested
73
pub fn num_values(nested: &[Nested]) -> usize {
74
// @TODO: Make this smarter
75
//
76
// This is not that smart because it is really slow, but not doing this would be:
77
// 1. Error prone
78
// 2. Repeat much of the logic that you find below
79
BufferedDremelIter::new(nested).count()
80
}
81
82
impl Level<'_> {
83
/// Fetch the number of elements given on the next level at `offset` on this level
84
fn next_level_length(&self, offset: usize, is_valid: bool) -> usize {
85
match self.lengths {
86
LevelLength::Optional(n) if is_valid => n,
87
LevelLength::Optional(_) => 0,
88
LevelLength::Constant(n) => n,
89
LevelLength::OffsetsI32(n) => n.length_at(offset),
90
LevelLength::OffsetsI64(n) => n.length_at(offset),
91
}
92
}
93
}
94
95
impl<'a> BufferedDremelIter<'a> {
96
// @NOTE: This can maybe just directly be gotten from the Field and array, this double
97
// conversion seems rather wasteful.
98
/// Create a new [`BufferedDremelIter`] from a set of nested structures
99
///
100
/// This creates a structure that resembles (but is not exactly the same) the Parquet schema,
101
/// we can then iterate this quite well.
102
pub fn new(nested: &'a [Nested]) -> Self {
103
let mut levels = Vec::with_capacity(nested.len() * 2 - 1);
104
105
let mut definition_depth = 0u16;
106
let mut repetition_depth = 0u16;
107
for n in nested {
108
match n {
109
Nested::Primitive(n) => {
110
let (content, lengths) = if n.is_optional {
111
definition_depth += 1;
112
(
113
LevelContent::Optional(n.validity.as_ref()),
114
LevelLength::Optional(1),
115
)
116
} else {
117
(LevelContent::Required, LevelLength::Constant(1))
118
};
119
120
levels.push(Level {
121
content,
122
lengths,
123
remaining: n.length,
124
offset: 0,
125
definition_depth,
126
repetition_depth,
127
});
128
},
129
Nested::List(n) => {
130
if n.is_optional {
131
definition_depth += 1;
132
levels.push(Level {
133
content: LevelContent::Optional(n.validity.as_ref()),
134
lengths: LevelLength::Constant(1),
135
remaining: n.offsets.len_proxy(),
136
offset: 0,
137
definition_depth,
138
repetition_depth,
139
});
140
}
141
142
definition_depth += 1;
143
levels.push(Level {
144
content: LevelContent::Repeated,
145
lengths: LevelLength::OffsetsI32(&n.offsets),
146
remaining: n.offsets.len_proxy(),
147
offset: 0,
148
definition_depth,
149
repetition_depth,
150
});
151
repetition_depth += 1;
152
},
153
Nested::LargeList(n) => {
154
if n.is_optional {
155
definition_depth += 1;
156
levels.push(Level {
157
content: LevelContent::Optional(n.validity.as_ref()),
158
lengths: LevelLength::Constant(1),
159
remaining: n.offsets.len_proxy(),
160
offset: 0,
161
definition_depth,
162
repetition_depth,
163
});
164
}
165
166
definition_depth += 1;
167
levels.push(Level {
168
content: LevelContent::Repeated,
169
lengths: LevelLength::OffsetsI64(&n.offsets),
170
remaining: n.offsets.len_proxy(),
171
offset: 0,
172
definition_depth,
173
repetition_depth,
174
});
175
repetition_depth += 1;
176
},
177
Nested::FixedSizeList(n) => {
178
if n.is_optional {
179
definition_depth += 1;
180
levels.push(Level {
181
content: LevelContent::Optional(n.validity.as_ref()),
182
lengths: LevelLength::Constant(1),
183
remaining: n.length,
184
offset: 0,
185
definition_depth,
186
repetition_depth,
187
});
188
}
189
190
definition_depth += 1;
191
levels.push(Level {
192
content: LevelContent::Repeated,
193
lengths: LevelLength::Constant(n.width),
194
remaining: n.length,
195
offset: 0,
196
definition_depth,
197
repetition_depth,
198
});
199
repetition_depth += 1;
200
},
201
Nested::Struct(n) => {
202
let content = if n.is_optional {
203
definition_depth += 1;
204
LevelContent::Optional(n.validity.as_ref())
205
} else {
206
LevelContent::Required
207
};
208
209
levels.push(Level {
210
content,
211
lengths: LevelLength::Constant(1),
212
remaining: n.length,
213
offset: 0,
214
definition_depth,
215
repetition_depth,
216
});
217
},
218
};
219
}
220
221
let levels = levels.into_boxed_slice();
222
223
Self {
224
// This size is rather arbitrary, but it seems good to make it not too, too high as to
225
// reduce memory consumption.
226
buffer: FixedRingBuffer::new(256),
227
228
levels,
229
current_level: 0,
230
last_repetition: 0,
231
}
232
}
233
234
/// Attempt to fill the rest to the buffer with as many values as possible
235
fn fill(&mut self) {
236
// First exit condition:
237
// If the buffer is full stop trying to fetch more values and just pop the first
238
// element in the buffer.
239
//
240
// Second exit condition:
241
// We have exhausted all elements at the final level, there are no elements left.
242
while !(self.buffer.is_full() || (self.current_level == 0 && self.levels[0].remaining == 0))
243
{
244
if self.levels[self.current_level].remaining == 0 {
245
self.last_repetition = u16::min(
246
self.last_repetition,
247
self.levels[self.current_level - 1].repetition_depth,
248
);
249
self.current_level -= 1;
250
continue;
251
}
252
253
let ns = &mut self.levels;
254
let lvl = self.current_level;
255
256
let is_last_nesting = ns.len() == self.current_level + 1;
257
258
macro_rules! push_value {
259
($def:expr) => {
260
self.buffer
261
.push(DremelValue {
262
rep: self.last_repetition,
263
def: $def,
264
})
265
.unwrap();
266
self.last_repetition = ns[lvl].repetition_depth;
267
};
268
}
269
270
let num_done = match (&ns[lvl].content, is_last_nesting) {
271
(LevelContent::Required | LevelContent::Optional(None), true) => {
272
push_value!(ns[lvl].definition_depth);
273
274
1 + self.buffer.fill_repeat(
275
DremelValue {
276
rep: self.last_repetition,
277
def: ns[lvl].definition_depth,
278
},
279
ns[lvl].remaining - 1,
280
)
281
},
282
(LevelContent::Required, false) => {
283
self.current_level += 1;
284
ns[lvl + 1].remaining = ns[lvl].next_level_length(ns[lvl].offset, true);
285
1
286
},
287
288
(LevelContent::Optional(Some(validity)), true) => {
289
let num_possible =
290
usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);
291
292
let validity = (*validity).clone().sliced(ns[lvl].offset, num_possible);
293
294
// @NOTE: maybe, we can do something here with leading zeros
295
for is_valid in validity.iter() {
296
push_value!(ns[lvl].definition_depth - u16::from(!is_valid));
297
}
298
299
num_possible
300
},
301
(LevelContent::Optional(None), false) => {
302
let num_possible =
303
usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);
304
let mut num_done = num_possible;
305
let def = ns[lvl].definition_depth;
306
307
// @NOTE: maybe, we can do something here with leading zeros
308
for i in 0..num_possible {
309
let next_level_length = ns[lvl].next_level_length(ns[lvl].offset + i, true);
310
311
if next_level_length == 0 {
312
// Zero-sized (fixed) lists
313
push_value!(def);
314
} else {
315
self.current_level += 1;
316
ns[lvl + 1].remaining = next_level_length;
317
num_done = i + 1;
318
break;
319
}
320
}
321
322
num_done
323
},
324
(LevelContent::Optional(Some(validity)), false) => {
325
let mut num_done = 0;
326
let num_possible =
327
usize::min(self.buffer.remaining_capacity(), ns[lvl].remaining);
328
let def = ns[lvl].definition_depth;
329
330
let validity = (*validity).clone().sliced(ns[lvl].offset, num_possible);
331
332
// @NOTE: we can do something here with trailing ones and trailing zeros
333
for is_valid in validity.iter() {
334
num_done += 1;
335
let next_level_length =
336
ns[lvl].next_level_length(ns[lvl].offset + num_done - 1, is_valid);
337
338
match (is_valid, next_level_length) {
339
(true, 0) => {
340
// Zero-sized (fixed) lists
341
push_value!(def);
342
},
343
(true, _) => {
344
self.current_level += 1;
345
ns[lvl + 1].remaining = next_level_length;
346
break;
347
},
348
(false, 0) => {
349
push_value!(def - 1);
350
},
351
(false, _) => {
352
ns[lvl + 1].remaining = next_level_length;
353
354
// @NOTE:
355
// This is needed for structs and fixed-size lists. These will have
356
// a non-zero length even if they are invalid. In that case, we
357
// need to skip over all the elements that would have been read if
358
// it was valid.
359
let mut embed_lvl = lvl + 1;
360
'embed: while embed_lvl > lvl {
361
if embed_lvl == ns.len() - 1 {
362
ns[embed_lvl].offset += ns[embed_lvl].remaining;
363
} else {
364
while ns[embed_lvl].remaining > 0 {
365
let length = ns[embed_lvl]
366
.next_level_length(ns[embed_lvl].offset, false);
367
368
ns[embed_lvl].offset += 1;
369
ns[embed_lvl].remaining -= 1;
370
371
if length > 0 {
372
ns[embed_lvl + 1].remaining = length;
373
embed_lvl += 1;
374
continue 'embed;
375
}
376
}
377
}
378
379
embed_lvl -= 1;
380
}
381
382
push_value!(def - 1);
383
},
384
}
385
}
386
387
num_done
388
},
389
(LevelContent::Repeated, _) => {
390
debug_assert!(!is_last_nesting);
391
let length = ns[lvl].next_level_length(ns[lvl].offset, true);
392
393
if length == 0 {
394
push_value!(ns[lvl].definition_depth - 1);
395
} else {
396
self.current_level += 1;
397
ns[lvl + 1].remaining = length;
398
}
399
400
1
401
},
402
};
403
404
ns[lvl].offset += num_done;
405
ns[lvl].remaining -= num_done;
406
}
407
}
408
}
409
410
impl Iterator for BufferedDremelIter<'_> {
411
type Item = DremelValue;
412
413
fn next(&mut self) -> Option<Self::Item> {
414
// Use an item from the buffer if it is available
415
if let Some(item) = self.buffer.pop_front() {
416
return Some(item);
417
}
418
419
self.fill();
420
self.buffer.pop_front()
421
}
422
}
423
424