Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/decoder.rs
7887 views
1
//! This module implements the `DELTA_BINARY_PACKED` encoding.
2
//!
3
//! For performance reasons this is done without iterators. Instead, we have `gather_n` functions
4
//! and a `DeltaGatherer` trait. These allow efficient decoding and mapping of the decoded values.
5
//!
6
//! Full information on the delta encoding can be found on the Apache Parquet Format repository.
7
//!
8
//! <https://github.com/apache/parquet-format/blob/e517ac4dbe08d518eb5c2e58576d4c711973db94/Encodings.md#delta-encoding-delta_binary_packed--5>
9
//!
10
//! Delta encoding compresses sequential integer values by encoding the first value and the
11
//! differences between consequentive values. This variant encodes the data into `Block`s and
12
//! `MiniBlock`s.
13
//!
14
//! - A `Block` contains a minimum delta, bitwidths and one or more miniblocks.
15
//! - A `MiniBlock` contains many deltas that are encoded in [`bitpacked`] encoding.
16
//!
17
//! The decoder keeps track of the last value and calculates a new value with the following
18
//! function.
19
//!
20
//! ```text
21
//! NextValue(Delta) = {
22
//! Value = Decoder.LastValue + Delta + Block.MinDelta
23
//! Decoder.LastValue = Value
24
//! return Value
25
//! }
26
//! ```
27
//!
28
//! Note that all these additions need to be wrapping.
29
30
use super::super::{bitpacked, uleb128, zigzag_leb128};
31
use super::lin_natural_sum;
32
use crate::parquet::encoding::bitpacked::{Unpackable, Unpacked};
33
use crate::parquet::error::{ParquetError, ParquetResult};
34
35
const MAX_BITWIDTH: u8 = 64;
36
37
/// Decoder of parquets' `DELTA_BINARY_PACKED`.
38
#[derive(Debug)]
39
pub struct Decoder<'a> {
40
num_miniblocks_per_block: usize,
41
values_per_block: usize,
42
43
values_remaining: usize,
44
45
last_value: i64,
46
47
values: &'a [u8],
48
49
block: Block<'a>,
50
}
51
52
#[derive(Debug)]
53
struct Block<'a> {
54
min_delta: i64,
55
56
/// Bytes that give the `num_bits` for the [`bitpacked::Decoder`].
57
///
58
/// Invariant: `bitwidth[i] <= MAX_BITWIDTH` for all `i`
59
bitwidths: &'a [u8],
60
values_remaining: usize,
61
miniblock: MiniBlock<'a>,
62
}
63
64
#[derive(Debug)]
65
struct MiniBlock<'a> {
66
decoder: bitpacked::Decoder<'a, u64>,
67
buffered: <u64 as Unpackable>::Unpacked,
68
unpacked_start: usize,
69
unpacked_end: usize,
70
}
71
72
pub(crate) struct SumGatherer(pub(crate) usize);
73
74
pub trait DeltaGatherer {
75
type Target: std::fmt::Debug;
76
77
fn target_len(&self, target: &Self::Target) -> usize;
78
fn target_reserve(&self, target: &mut Self::Target, n: usize);
79
80
/// Gather one element with value `v` into `target`.
81
fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()>;
82
83
/// Gather `num_repeats` elements into `target`.
84
///
85
/// The first value is `v` and the `n`-th value is `v + (n-1)*delta`.
86
fn gather_constant(
87
&mut self,
88
target: &mut Self::Target,
89
v: i64,
90
delta: i64,
91
num_repeats: usize,
92
) -> ParquetResult<()> {
93
for i in 0..num_repeats {
94
self.gather_one(target, v + (i as i64) * delta)?;
95
}
96
Ok(())
97
}
98
/// Gather a `slice` of elements into `target`.
99
fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> {
100
for &v in slice {
101
self.gather_one(target, v)?;
102
}
103
Ok(())
104
}
105
/// Gather a `chunk` of elements into `target`.
106
fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> {
107
self.gather_slice(target, chunk)
108
}
109
}
110
111
impl DeltaGatherer for SumGatherer {
112
type Target = usize;
113
114
fn target_len(&self, _target: &Self::Target) -> usize {
115
self.0
116
}
117
fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {}
118
119
fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> {
120
if v < 0 {
121
return Err(ParquetError::oos(format!(
122
"Invalid delta encoding length {v}"
123
)));
124
}
125
126
*target += v as usize;
127
self.0 += 1;
128
Ok(())
129
}
130
fn gather_constant(
131
&mut self,
132
target: &mut Self::Target,
133
v: i64,
134
delta: i64,
135
num_repeats: usize,
136
) -> ParquetResult<()> {
137
if v < 0 || (delta < 0 && num_repeats > 0 && (num_repeats - 1) as i64 * delta + v < 0) {
138
return Err(ParquetError::oos("Invalid delta encoding length"));
139
}
140
141
*target += lin_natural_sum(v, delta, num_repeats) as usize;
142
143
Ok(())
144
}
145
fn gather_slice(&mut self, target: &mut Self::Target, slice: &[i64]) -> ParquetResult<()> {
146
let min = slice.iter().copied().min().unwrap_or_default();
147
if min < 0 {
148
return Err(ParquetError::oos(format!(
149
"Invalid delta encoding length {min}"
150
)));
151
}
152
153
*target += slice.iter().copied().map(|v| v as usize).sum::<usize>();
154
self.0 += slice.len();
155
Ok(())
156
}
157
fn gather_chunk(&mut self, target: &mut Self::Target, chunk: &[i64; 64]) -> ParquetResult<()> {
158
let min = chunk.iter().copied().min().unwrap_or_default();
159
if min < 0 {
160
return Err(ParquetError::oos(format!(
161
"Invalid delta encoding length {min}"
162
)));
163
}
164
*target += chunk.iter().copied().map(|v| v as usize).sum::<usize>();
165
self.0 += chunk.len();
166
Ok(())
167
}
168
}
169
170
/// Gather the rest of the [`bitpacked::Decoder`] into `target`
171
fn gather_bitpacked<G: DeltaGatherer>(
172
target: &mut G::Target,
173
min_delta: i64,
174
last_value: &mut i64,
175
mut decoder: bitpacked::Decoder<u64>,
176
gatherer: &mut G,
177
) -> ParquetResult<()> {
178
let mut chunked = decoder.chunked();
179
for mut chunk in chunked.by_ref() {
180
for value in &mut chunk {
181
*last_value = last_value
182
.wrapping_add(*value as i64)
183
.wrapping_add(min_delta);
184
*value = *last_value as u64;
185
}
186
187
let chunk = bytemuck::cast_ref(&chunk);
188
gatherer.gather_chunk(target, chunk)?;
189
}
190
191
if let Some((mut chunk, length)) = chunked.remainder() {
192
let slice = &mut chunk[..length];
193
194
for value in slice.iter_mut() {
195
*last_value = last_value
196
.wrapping_add(*value as i64)
197
.wrapping_add(min_delta);
198
*value = *last_value as u64;
199
}
200
201
let slice = bytemuck::cast_slice(slice);
202
gatherer.gather_slice(target, slice)?;
203
}
204
205
Ok(())
206
}
207
208
/// Gather an entire [`MiniBlock`] into `target`
209
fn gather_miniblock<G: DeltaGatherer>(
210
target: &mut G::Target,
211
min_delta: i64,
212
bitwidth: u8,
213
values: &[u8],
214
values_per_miniblock: usize,
215
last_value: &mut i64,
216
gatherer: &mut G,
217
) -> ParquetResult<()> {
218
let bitwidth = bitwidth as usize;
219
220
if bitwidth == 0 {
221
let v = last_value.wrapping_add(min_delta);
222
gatherer.gather_constant(target, v, min_delta, values_per_miniblock)?;
223
*last_value = last_value.wrapping_add(min_delta * values_per_miniblock as i64);
224
return Ok(());
225
}
226
227
debug_assert!(bitwidth <= 64);
228
debug_assert_eq!((bitwidth * values_per_miniblock).div_ceil(8), values.len());
229
230
let start_length = gatherer.target_len(target);
231
gather_bitpacked(
232
target,
233
min_delta,
234
last_value,
235
bitpacked::Decoder::new(values, bitwidth, values_per_miniblock),
236
gatherer,
237
)?;
238
let target_length = gatherer.target_len(target);
239
240
debug_assert_eq!(target_length - start_length, values_per_miniblock);
241
242
Ok(())
243
}
244
245
/// Gather an entire [`Block`] into `target`
246
fn gather_block<'a, G: DeltaGatherer>(
247
target: &mut G::Target,
248
num_miniblocks: usize,
249
values_per_miniblock: usize,
250
mut values: &'a [u8],
251
last_value: &mut i64,
252
gatherer: &mut G,
253
) -> ParquetResult<&'a [u8]> {
254
let (min_delta, consumed) = zigzag_leb128::decode(values);
255
values = &values[consumed..];
256
let bitwidths;
257
(bitwidths, values) = values
258
.split_at_checked(num_miniblocks)
259
.ok_or_else(|| ParquetError::oos("Not enough bitwidths available in delta encoding"))?;
260
261
gatherer.target_reserve(target, num_miniblocks * values_per_miniblock);
262
for &bitwidth in bitwidths {
263
let miniblock;
264
(miniblock, values) = values
265
.split_at_checked((bitwidth as usize * values_per_miniblock).div_ceil(8))
266
.ok_or_else(|| ParquetError::oos("Not enough bytes for miniblock in delta encoding"))?;
267
gather_miniblock(
268
target,
269
min_delta,
270
bitwidth,
271
miniblock,
272
values_per_miniblock,
273
last_value,
274
gatherer,
275
)?;
276
}
277
278
Ok(values)
279
}
280
281
impl<'a> Decoder<'a> {
282
pub fn try_new(mut values: &'a [u8]) -> ParquetResult<(Self, &'a [u8])> {
283
let header_err = || ParquetError::oos("Insufficient bytes for Delta encoding header");
284
285
// header:
286
// <block size in values> <number of miniblocks in a block> <total value count> <first value>
287
288
let (values_per_block, consumed) = uleb128::decode(values);
289
let values_per_block = values_per_block as usize;
290
values = values.get(consumed..).ok_or_else(header_err)?;
291
292
assert_eq!(values_per_block % 128, 0);
293
294
let (num_miniblocks_per_block, consumed) = uleb128::decode(values);
295
let num_miniblocks_per_block = num_miniblocks_per_block as usize;
296
values = values.get(consumed..).ok_or_else(header_err)?;
297
298
let (total_count, consumed) = uleb128::decode(values);
299
let total_count = total_count as usize;
300
values = values.get(consumed..).ok_or_else(header_err)?;
301
302
let (first_value, consumed) = zigzag_leb128::decode(values);
303
values = values.get(consumed..).ok_or_else(header_err)?;
304
305
assert_eq!(values_per_block % num_miniblocks_per_block, 0);
306
assert_eq!((values_per_block / num_miniblocks_per_block) % 32, 0);
307
308
let values_per_miniblock = values_per_block / num_miniblocks_per_block;
309
assert_eq!(values_per_miniblock % 8, 0);
310
311
// We skip over all the values to determine where the slice stops.
312
//
313
// This also has the added benefit of error checking in advance, thus we can unwrap in
314
// other places.
315
316
let mut rem = values;
317
if total_count > 1 {
318
let mut num_values_left = total_count - 1;
319
while num_values_left > 0 {
320
// If the number of values is does not need all the miniblocks anymore, we need to
321
// ignore the later miniblocks and regard them as having bitwidth = 0.
322
//
323
// Quoted from the specification:
324
//
325
// > If, in the last block, less than <number of miniblocks in a block> miniblocks
326
// > are needed to store the values, the bytes storing the bit widths of the
327
// > unneeded miniblocks are still present, their value should be zero, but readers
328
// > must accept arbitrary values as well. There are no additional padding bytes for
329
// > the miniblock bodies though, as if their bit widths were 0 (regardless of the
330
// > actual byte values). The reader knows when to stop reading by keeping track of
331
// > the number of values read.
332
let num_remaining_mini_blocks = usize::min(
333
num_miniblocks_per_block,
334
num_values_left.div_ceil(values_per_miniblock),
335
);
336
337
// block:
338
// <min delta> <list of bitwidths of miniblocks> <miniblocks>
339
340
let (_, consumed) = zigzag_leb128::decode(rem);
341
rem = rem.get(consumed..).ok_or_else(|| {
342
ParquetError::oos("No min-delta value in delta encoding miniblock")
343
})?;
344
345
if rem.len() < num_miniblocks_per_block {
346
return Err(ParquetError::oos(
347
"Not enough bitwidths available in delta encoding",
348
));
349
}
350
if let Some(err_bitwidth) = rem
351
.get(..num_remaining_mini_blocks)
352
.expect("num_remaining_mini_blocks <= num_miniblocks_per_block")
353
.iter()
354
.copied()
355
.find(|&bitwidth| bitwidth > MAX_BITWIDTH)
356
{
357
return Err(ParquetError::oos(format!(
358
"Delta encoding miniblock with bitwidth {err_bitwidth} higher than maximum {MAX_BITWIDTH} bits",
359
)));
360
}
361
362
let num_bitpacking_bytes = rem[..num_remaining_mini_blocks]
363
.iter()
364
.copied()
365
.map(|bitwidth| (bitwidth as usize * values_per_miniblock).div_ceil(8))
366
.sum::<usize>();
367
368
rem = rem
369
.get(num_miniblocks_per_block + num_bitpacking_bytes..)
370
.ok_or_else(|| {
371
ParquetError::oos(
372
"Not enough bytes for all bitpacked values in delta encoding",
373
)
374
})?;
375
376
num_values_left = num_values_left.saturating_sub(values_per_block);
377
}
378
}
379
380
let values = &values[..values.len() - rem.len()];
381
382
let decoder = Self {
383
num_miniblocks_per_block,
384
values_per_block,
385
values_remaining: total_count.saturating_sub(1),
386
last_value: first_value,
387
values,
388
389
block: Block {
390
// @NOTE:
391
// We add one delta=0 into the buffered block which allows us to
392
// prepend like the `first_value` is just any normal value.
393
//
394
// This is a bit of a hack, but makes the rest of the logic
395
// **A LOT** simpler.
396
values_remaining: usize::from(total_count > 0),
397
min_delta: 0,
398
bitwidths: &[],
399
miniblock: MiniBlock {
400
decoder: bitpacked::Decoder::try_new_allow_zero(&[], 0, 1)?,
401
buffered: <u64 as Unpackable>::Unpacked::zero(),
402
unpacked_start: 0,
403
unpacked_end: 0,
404
},
405
},
406
};
407
408
Ok((decoder, rem))
409
}
410
411
/// Consume a new [`Block`] from `self.values`.
412
fn consume_block(&mut self) {
413
// @NOTE: All the panics here should be prevented in the `Decoder::try_new`.
414
415
debug_assert!(!self.values.is_empty());
416
417
let values_per_miniblock = self.values_per_miniblock();
418
419
let length = usize::min(self.values_remaining, self.values_per_block);
420
let actual_num_miniblocks = usize::min(
421
self.num_miniblocks_per_block,
422
length.div_ceil(values_per_miniblock),
423
);
424
425
debug_assert!(actual_num_miniblocks > 0);
426
427
// <min delta> <list of bitwidths of miniblocks> <miniblocks>
428
429
let (min_delta, consumed) = zigzag_leb128::decode(self.values);
430
431
self.values = &self.values[consumed..];
432
let (bitwidths, remainder) = self.values.split_at(self.num_miniblocks_per_block);
433
434
let first_bitwidth = bitwidths[0];
435
let bitwidths = &bitwidths[1..actual_num_miniblocks];
436
debug_assert!(first_bitwidth <= MAX_BITWIDTH);
437
let first_bitwidth = first_bitwidth as usize;
438
439
let values_in_first_miniblock = usize::min(length, values_per_miniblock);
440
let num_allocated_bytes = (first_bitwidth * values_per_miniblock).div_ceil(8);
441
let num_actual_bytes = (first_bitwidth * values_in_first_miniblock).div_ceil(8);
442
let (bytes, remainder) = remainder.split_at(num_allocated_bytes);
443
let bytes = &bytes[..num_actual_bytes];
444
445
let decoder =
446
bitpacked::Decoder::new_allow_zero(bytes, first_bitwidth, values_in_first_miniblock);
447
448
self.block = Block {
449
min_delta,
450
bitwidths,
451
values_remaining: length,
452
miniblock: MiniBlock {
453
decoder,
454
// We can leave this as it should not be read before it is updated
455
buffered: self.block.miniblock.buffered,
456
unpacked_start: 0,
457
unpacked_end: 0,
458
},
459
};
460
461
self.values_remaining -= length;
462
self.values = remainder;
463
}
464
465
/// Gather `n` elements from the current [`MiniBlock`] to `target`
466
fn gather_miniblock_n_into<G: DeltaGatherer>(
467
&mut self,
468
target: &mut G::Target,
469
mut n: usize,
470
gatherer: &mut G,
471
) -> ParquetResult<()> {
472
debug_assert!(n > 0);
473
debug_assert!(self.miniblock_len() >= n);
474
475
// If the `num_bits == 0`, the delta is constant and equal to `min_delta`. The
476
// `bitpacked::Decoder` basically only keeps track of the length.
477
if self.block.miniblock.decoder.num_bits() == 0 {
478
let num_repeats = usize::min(self.miniblock_len(), n);
479
let v = self.last_value.wrapping_add(self.block.min_delta);
480
gatherer.gather_constant(target, v, self.block.min_delta, num_repeats)?;
481
self.last_value = self
482
.last_value
483
.wrapping_add(self.block.min_delta * num_repeats as i64);
484
self.block.miniblock.decoder.length -= num_repeats;
485
return Ok(());
486
}
487
488
if self.block.miniblock.unpacked_start < self.block.miniblock.unpacked_end {
489
let length = usize::min(
490
n,
491
self.block.miniblock.unpacked_end - self.block.miniblock.unpacked_start,
492
);
493
self.block.miniblock.buffered
494
[self.block.miniblock.unpacked_start..self.block.miniblock.unpacked_start + length]
495
.iter_mut()
496
.for_each(|v| {
497
self.last_value = self
498
.last_value
499
.wrapping_add(*v as i64)
500
.wrapping_add(self.block.min_delta);
501
*v = self.last_value as u64;
502
});
503
gatherer.gather_slice(
504
target,
505
bytemuck::cast_slice(
506
&self.block.miniblock.buffered[self.block.miniblock.unpacked_start
507
..self.block.miniblock.unpacked_start + length],
508
),
509
)?;
510
n -= length;
511
self.block.miniblock.unpacked_start += length;
512
}
513
514
if n == 0 {
515
return Ok(());
516
}
517
518
const ITEMS_PER_PACK: usize = <<u64 as Unpackable>::Unpacked as Unpacked<u64>>::LENGTH;
519
for _ in 0..n / ITEMS_PER_PACK {
520
let mut chunk = self.block.miniblock.decoder.chunked().next().unwrap();
521
chunk.iter_mut().for_each(|v| {
522
self.last_value = self
523
.last_value
524
.wrapping_add(*v as i64)
525
.wrapping_add(self.block.min_delta);
526
*v = self.last_value as u64;
527
});
528
gatherer.gather_chunk(target, bytemuck::cast_ref(&chunk))?;
529
n -= ITEMS_PER_PACK;
530
}
531
532
if n == 0 {
533
return Ok(());
534
}
535
536
let Some((chunk, len)) = self.block.miniblock.decoder.chunked().next_inexact() else {
537
debug_assert_eq!(n, 0);
538
self.block.miniblock.buffered = <u64 as Unpackable>::Unpacked::zero();
539
self.block.miniblock.unpacked_start = 0;
540
self.block.miniblock.unpacked_end = 0;
541
return Ok(());
542
};
543
544
self.block.miniblock.buffered = chunk;
545
self.block.miniblock.unpacked_start = 0;
546
self.block.miniblock.unpacked_end = len;
547
548
if n > 0 {
549
let length = usize::min(n, self.block.miniblock.unpacked_end);
550
self.block.miniblock.buffered[..length]
551
.iter_mut()
552
.for_each(|v| {
553
self.last_value = self
554
.last_value
555
.wrapping_add(*v as i64)
556
.wrapping_add(self.block.min_delta);
557
*v = self.last_value as u64;
558
});
559
gatherer.gather_slice(
560
target,
561
bytemuck::cast_slice(&self.block.miniblock.buffered[..length]),
562
)?;
563
self.block.miniblock.unpacked_start = length;
564
}
565
566
Ok(())
567
}
568
569
/// Gather `n` elements from the current [`Block`] to `target`
570
fn gather_block_n_into<G: DeltaGatherer>(
571
&mut self,
572
target: &mut G::Target,
573
n: usize,
574
gatherer: &mut G,
575
) -> ParquetResult<()> {
576
let values_per_miniblock = self.values_per_miniblock();
577
578
debug_assert!(n <= self.values_per_block);
579
debug_assert!(self.values_per_block >= values_per_miniblock);
580
debug_assert_eq!(self.values_per_block % values_per_miniblock, 0);
581
582
let mut n = usize::min(self.block.values_remaining, n);
583
584
if n == 0 {
585
return Ok(());
586
}
587
588
let miniblock_len = self.miniblock_len();
589
if n < miniblock_len {
590
self.gather_miniblock_n_into(target, n, gatherer)?;
591
debug_assert_eq!(self.miniblock_len(), miniblock_len - n);
592
self.block.values_remaining -= n;
593
return Ok(());
594
}
595
596
if miniblock_len > 0 {
597
self.gather_miniblock_n_into(target, miniblock_len, gatherer)?;
598
n -= miniblock_len;
599
self.block.values_remaining -= miniblock_len;
600
}
601
602
while n >= values_per_miniblock {
603
let bitwidth = self.block.bitwidths[0];
604
self.block.bitwidths = &self.block.bitwidths[1..];
605
606
let miniblock;
607
(miniblock, self.values) = self
608
.values
609
.split_at((bitwidth as usize * values_per_miniblock).div_ceil(8));
610
gather_miniblock(
611
target,
612
self.block.min_delta,
613
bitwidth,
614
miniblock,
615
values_per_miniblock,
616
&mut self.last_value,
617
gatherer,
618
)?;
619
n -= values_per_miniblock;
620
self.block.values_remaining -= values_per_miniblock;
621
}
622
623
if n == 0 {
624
return Ok(());
625
}
626
627
if !self.block.bitwidths.is_empty() {
628
let bitwidth = self.block.bitwidths[0];
629
self.block.bitwidths = &self.block.bitwidths[1..];
630
631
if bitwidth > MAX_BITWIDTH {
632
return Err(ParquetError::oos(format!(
633
"Delta encoding bitwidth '{bitwidth}' is larger than maximum {MAX_BITWIDTH})"
634
)));
635
}
636
637
let length = usize::min(values_per_miniblock, self.block.values_remaining);
638
639
let num_allocated_bytes = (bitwidth as usize * values_per_miniblock).div_ceil(8);
640
let num_actual_bytes = (bitwidth as usize * length).div_ceil(8);
641
642
let miniblock;
643
(miniblock, self.values) =
644
self.values
645
.split_at_checked(num_allocated_bytes)
646
.ok_or(ParquetError::oos(
647
"Not enough space for delta encoded miniblock",
648
))?;
649
650
let miniblock = &miniblock[..num_actual_bytes];
651
652
let decoder =
653
bitpacked::Decoder::try_new_allow_zero(miniblock, bitwidth as usize, length)?;
654
self.block.miniblock = MiniBlock {
655
decoder,
656
buffered: self.block.miniblock.buffered,
657
unpacked_start: 0,
658
unpacked_end: 0,
659
};
660
661
if n > 0 {
662
self.gather_miniblock_n_into(target, n, gatherer)?;
663
self.block.values_remaining -= n;
664
}
665
}
666
667
Ok(())
668
}
669
670
/// Gather `n` elements to `target`
671
pub fn gather_n_into<G: DeltaGatherer>(
672
&mut self,
673
target: &mut G::Target,
674
mut n: usize,
675
gatherer: &mut G,
676
) -> ParquetResult<()> {
677
n = usize::min(n, self.len());
678
679
if n == 0 {
680
return Ok(());
681
}
682
683
let values_per_miniblock = self.values_per_block / self.num_miniblocks_per_block;
684
685
let start_num_values_remaining = self.block.values_remaining;
686
if n <= self.block.values_remaining {
687
self.gather_block_n_into(target, n, gatherer)?;
688
debug_assert_eq!(self.block.values_remaining, start_num_values_remaining - n);
689
return Ok(());
690
}
691
692
n -= self.block.values_remaining;
693
self.gather_block_n_into(target, self.block.values_remaining, gatherer)?;
694
debug_assert_eq!(self.block.values_remaining, 0);
695
696
while usize::min(n, self.values_remaining) >= self.values_per_block {
697
self.values = gather_block(
698
target,
699
self.num_miniblocks_per_block,
700
values_per_miniblock,
701
self.values,
702
&mut self.last_value,
703
gatherer,
704
)?;
705
n -= self.values_per_block;
706
self.values_remaining -= self.values_per_block;
707
}
708
709
if n == 0 {
710
return Ok(());
711
}
712
713
self.consume_block();
714
self.gather_block_n_into(target, n, gatherer)?;
715
716
Ok(())
717
}
718
719
pub(crate) fn collect_n<E: std::fmt::Debug + Extend<i64>>(
720
&mut self,
721
e: &mut E,
722
n: usize,
723
) -> ParquetResult<()> {
724
struct ExtendGatherer<'a, E: std::fmt::Debug + Extend<i64>>(
725
std::marker::PhantomData<&'a E>,
726
);
727
728
impl<'a, E: std::fmt::Debug + Extend<i64>> DeltaGatherer for ExtendGatherer<'a, E> {
729
type Target = (usize, &'a mut E);
730
731
fn target_len(&self, target: &Self::Target) -> usize {
732
target.0
733
}
734
735
fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {}
736
737
fn gather_one(&mut self, target: &mut Self::Target, v: i64) -> ParquetResult<()> {
738
target.1.extend(Some(v));
739
target.0 += 1;
740
Ok(())
741
}
742
}
743
744
let mut gatherer = ExtendGatherer(std::marker::PhantomData);
745
let mut target = (0, e);
746
747
self.gather_n_into(&mut target, n, &mut gatherer)
748
}
749
750
pub(crate) fn collect<E: std::fmt::Debug + Extend<i64> + Default>(
751
mut self,
752
) -> ParquetResult<E> {
753
let mut e = E::default();
754
self.collect_n(&mut e, self.len())?;
755
Ok(e)
756
}
757
758
pub fn len(&self) -> usize {
759
self.values_remaining + self.block.values_remaining
760
}
761
762
fn values_per_miniblock(&self) -> usize {
763
debug_assert_eq!(self.values_per_block % self.num_miniblocks_per_block, 0);
764
self.values_per_block / self.num_miniblocks_per_block
765
}
766
767
fn miniblock_len(&self) -> usize {
768
self.block.miniblock.unpacked_end - self.block.miniblock.unpacked_start
769
+ self.block.miniblock.decoder.len()
770
}
771
}
772
773
#[cfg(test)]
774
mod tests {
775
use super::*;
776
777
#[test]
778
fn single_value() {
779
// Generated by parquet-rs
780
//
781
// header: [128, 1, 4, 1, 2]
782
// block size: 128, 1
783
// mini-blocks: 4
784
// elements: 1
785
// first_value: 2 <=z> 1
786
let data = &[128, 1, 4, 1, 2];
787
788
let (decoder, rem) = Decoder::try_new(data).unwrap();
789
let r = decoder.collect::<Vec<_>>().unwrap();
790
791
assert_eq!(&r[..], &[1]);
792
assert_eq!(data.len() - rem.len(), 5);
793
}
794
795
#[test]
796
fn test_from_spec() {
797
let expected = (1..=5).collect::<Vec<_>>();
798
// VALIDATED FROM SPARK==3.1.1
799
// header: [128, 1, 4, 5, 2]
800
// block size: 128, 1
801
// mini-blocks: 4
802
// elements: 5
803
// first_value: 2 <=z> 1
804
// block1: [2, 0, 0, 0, 0]
805
// min_delta: 2 <=z> 1
806
// bit_width: 0
807
let data = &[128, 1, 4, 5, 2, 2, 0, 0, 0, 0];
808
809
let (decoder, rem) = Decoder::try_new(data).unwrap();
810
let r = decoder.collect::<Vec<_>>().unwrap();
811
812
assert_eq!(expected, r);
813
814
assert_eq!(data.len() - rem.len(), 10);
815
}
816
817
#[test]
818
fn case2() {
819
let expected = vec![1, 2, 3, 4, 5, 1];
820
// VALIDATED FROM SPARK==3.1.1
821
// header: [128, 1, 4, 6, 2]
822
// block size: 128, 1 <=u> 128
823
// mini-blocks: 4 <=u> 4
824
// elements: 6 <=u> 6
825
// first_value: 2 <=z> 1
826
// block1: [7, 3, 0, 0, 0]
827
// min_delta: 7 <=z> -4
828
// bit_widths: [3, 0, 0, 0]
829
// values: [
830
// 0b01101101
831
// 0b00001011
832
// ...
833
// ] <=b> [3, 3, 3, 3, 0]
834
let data = &[
835
128, 1, 4, 6, 2, 7, 3, 0, 0, 0, 0b01101101, 0b00001011, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
836
// these should not be consumed
837
1, 2, 3,
838
];
839
840
let (decoder, rem) = Decoder::try_new(data).unwrap();
841
let r = decoder.collect::<Vec<_>>().unwrap();
842
843
assert_eq!(expected, r);
844
assert_eq!(rem, &[1, 2, 3]);
845
}
846
847
#[test]
848
fn multiple_miniblocks() {
849
#[rustfmt::skip]
850
let data = &[
851
// Header: [128, 1, 4, 65, 100]
852
128, 1, // block size <=u> 128
853
4, // number of mini-blocks <=u> 4
854
65, // number of elements <=u> 65
855
100, // first_value <=z> 50
856
857
// Block 1 header: [7, 3, 4, 0, 0]
858
7, // min_delta <=z> -4
859
3, 4, 255, 0, // bit_widths (255 should not be used as only two miniblocks are needed)
860
861
// 32 3-bit values of 0 for mini-block 1 (12 bytes)
862
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
863
864
// 32 4-bit values of 8 for mini-block 2 (16 bytes)
865
0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88, 0x88,
866
0x88, 0x88,
867
868
// these should not be consumed
869
1, 2, 3,
870
];
871
872
#[rustfmt::skip]
873
let expected = [
874
// First value
875
50,
876
877
// Mini-block 1: 32 deltas of -4
878
46, 42, 38, 34, 30, 26, 22, 18, 14, 10, 6, 2, -2, -6, -10, -14, -18, -22, -26, -30, -34,
879
-38, -42, -46, -50, -54, -58, -62, -66, -70, -74, -78,
880
881
// Mini-block 2: 32 deltas of 4
882
-74, -70, -66, -62, -58, -54, -50, -46, -42, -38, -34, -30, -26, -22, -18, -14, -10, -6,
883
-2, 2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42, 46, 50,
884
];
885
886
let (decoder, rem) = Decoder::try_new(data).unwrap();
887
let r = decoder.collect::<Vec<_>>().unwrap();
888
889
assert_eq!(&expected[..], &r[..]);
890
assert_eq!(data.len() - rem.len(), data.len() - 3);
891
assert_eq!(rem.len(), 3);
892
}
893
}
894
895