Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs
7887 views
1
use polars_utils::chunks::Chunks;
2
3
use super::{Packed, Unpackable, Unpacked};
4
use crate::parquet::error::{ParquetError, ParquetResult};
5
6
/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.
7
/// # Implementation
8
/// This iterator unpacks bytes in chunks and does not allocate.
9
#[derive(Debug, Clone)]
10
pub struct Decoder<'a, T: Unpackable> {
11
packed: Chunks<'a, u8>,
12
num_bits: usize,
13
/// number of items
14
pub(crate) length: usize,
15
_pd: std::marker::PhantomData<T>,
16
}
17
18
impl<T: Unpackable> Default for Decoder<'_, T> {
19
fn default() -> Self {
20
Self {
21
packed: Chunks::new(&[], 1),
22
num_bits: 0,
23
length: 0,
24
_pd: std::marker::PhantomData,
25
}
26
}
27
}
28
29
#[inline]
30
fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {
31
if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {
32
let mut buf = T::Packed::zero();
33
buf.as_mut()[..packed.len()].copy_from_slice(packed);
34
T::unpack(buf.as_ref(), num_bits, unpacked)
35
} else {
36
T::unpack(packed, num_bits, unpacked)
37
}
38
}
39
40
impl<'a, T: Unpackable> Decoder<'a, T> {
41
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
42
pub fn new(packed: &'a [u8], num_bits: usize, length: usize) -> Self {
43
Self::try_new(packed, num_bits, length).unwrap()
44
}
45
46
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
47
///
48
/// `num_bits` is allowed to be `0`.
49
pub fn new_allow_zero(packed: &'a [u8], num_bits: usize, length: usize) -> Self {
50
Self::try_new_allow_zero(packed, num_bits, length).unwrap()
51
}
52
53
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
54
///
55
/// `num_bits` is allowed to be `0`.
56
pub fn try_new_allow_zero(
57
packed: &'a [u8],
58
num_bits: usize,
59
length: usize,
60
) -> ParquetResult<Self> {
61
let block_size = size_of::<T>() * num_bits;
62
63
if packed.len() * 8 < length * num_bits {
64
return Err(ParquetError::oos(format!(
65
"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",
66
length * num_bits / 8
67
)));
68
}
69
70
debug_assert!(num_bits != 0 || packed.is_empty());
71
let block_size = block_size.max(1);
72
let packed = Chunks::new(packed, block_size);
73
74
Ok(Self {
75
length,
76
packed,
77
num_bits,
78
_pd: Default::default(),
79
})
80
}
81
82
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
83
pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> ParquetResult<Self> {
84
let block_size = size_of::<T>() * num_bits;
85
86
if num_bits == 0 {
87
return Err(ParquetError::oos("Bitpacking requires num_bits > 0"));
88
}
89
90
if packed.len() * 8 < length * num_bits {
91
return Err(ParquetError::oos(format!(
92
"Unpacking {length} items with a number of bits {num_bits} requires at least {} bytes.",
93
length * num_bits / 8
94
)));
95
}
96
97
let packed = Chunks::new(packed, block_size);
98
99
Ok(Self {
100
length,
101
packed,
102
num_bits,
103
_pd: Default::default(),
104
})
105
}
106
107
pub fn num_bits(&self) -> usize {
108
self.num_bits
109
}
110
111
pub fn as_slice(&self) -> &[u8] {
112
self.packed.as_slice()
113
}
114
115
pub fn lower_element<N: Unpackable>(self) -> ParquetResult<Decoder<'a, u16>> {
116
let packed = self.packed.as_slice();
117
Decoder::try_new(packed, self.num_bits, self.length)
118
}
119
}
120
121
/// A iterator over the exact chunks in a [`Decoder`].
122
///
123
/// The remainder can be accessed using `remainder` or `next_inexact`.
124
#[derive(Debug)]
125
pub struct ChunkedDecoder<'a, 'b, T: Unpackable> {
126
pub(crate) decoder: &'b mut Decoder<'a, T>,
127
}
128
129
impl<T: Unpackable> Iterator for ChunkedDecoder<'_, '_, T> {
130
type Item = T::Unpacked;
131
132
#[inline]
133
fn next(&mut self) -> Option<Self::Item> {
134
if self.decoder.len() < T::Unpacked::LENGTH {
135
return None;
136
}
137
138
let mut unpacked = T::Unpacked::zero();
139
self.next_into(&mut unpacked)?;
140
Some(unpacked)
141
}
142
143
fn size_hint(&self) -> (usize, Option<usize>) {
144
let len = self.decoder.len() / T::Unpacked::LENGTH;
145
(len, Some(len))
146
}
147
}
148
149
impl<T: Unpackable> ExactSizeIterator for ChunkedDecoder<'_, '_, T> {}
150
151
impl<T: Unpackable> ChunkedDecoder<'_, '_, T> {
152
/// Get and consume the remainder chunk if it exists.
153
///
154
/// This should only be called after all the chunks full are consumed.
155
pub fn remainder(&mut self) -> Option<(T::Unpacked, usize)> {
156
if self.decoder.len() == 0 {
157
return None;
158
}
159
160
debug_assert!(self.decoder.len() < T::Unpacked::LENGTH);
161
let remainder_len = self.decoder.len() % T::Unpacked::LENGTH;
162
163
let mut unpacked = T::Unpacked::zero();
164
let packed = self.decoder.packed.next()?;
165
decode_pack::<T>(packed, self.decoder.num_bits, &mut unpacked);
166
self.decoder.length -= remainder_len;
167
Some((unpacked, remainder_len))
168
}
169
170
/// Get the next (possibly partial) chunk and its filled length
171
pub fn next_inexact(&mut self) -> Option<(T::Unpacked, usize)> {
172
if self.decoder.len() >= T::Unpacked::LENGTH {
173
Some((self.next().unwrap(), T::Unpacked::LENGTH))
174
} else {
175
self.remainder()
176
}
177
}
178
179
/// Consume the next chunk into `unpacked`.
180
pub fn next_into(&mut self, unpacked: &mut T::Unpacked) -> Option<usize> {
181
if self.decoder.len() == 0 {
182
return None;
183
}
184
185
let unpacked_len = self.decoder.len().min(T::Unpacked::LENGTH);
186
let packed = self.decoder.packed.next()?;
187
decode_pack::<T>(packed, self.decoder.num_bits, unpacked);
188
self.decoder.length -= unpacked_len;
189
190
Some(unpacked_len)
191
}
192
}
193
194
impl<'a, T: Unpackable> Decoder<'a, T> {
195
pub fn chunked<'b>(&'b mut self) -> ChunkedDecoder<'a, 'b, T> {
196
ChunkedDecoder { decoder: self }
197
}
198
199
pub fn len(&self) -> usize {
200
self.length
201
}
202
203
pub fn skip_chunks(&mut self, n: usize) {
204
debug_assert!(n * T::Unpacked::LENGTH <= self.length);
205
206
for _ in (&mut self.packed).take(n) {}
207
self.length -= n * T::Unpacked::LENGTH;
208
}
209
210
pub fn take(&mut self) -> Self {
211
let block_size = self.packed.chunk_size();
212
let packed = std::mem::replace(&mut self.packed, Chunks::new(&[], block_size));
213
let length = self.length;
214
self.length = 0;
215
216
Self {
217
packed,
218
num_bits: self.num_bits,
219
length,
220
_pd: Default::default(),
221
}
222
}
223
224
#[inline]
225
pub fn collect_into(mut self, vec: &mut Vec<T>) {
226
// @NOTE:
227
// When microbenchmarking changing this from a element-wise iterator to a collect into
228
// improves the speed by around 4x.
229
//
230
// The unsafe code here allows us to not have to do a double memcopy. This saves us 20% in
231
// our microbenchmark.
232
//
233
// GB: I did some profiling on this function using the Yellow NYC Taxi dataset. There, the
234
// average self.length is ~52.8 and the average num_packs is ~2.2. Let this guide your
235
// decisions surrounding the optimization of this function.
236
237
// @NOTE:
238
// Since T::Unpacked::LENGTH is always a power of two and known at compile time. Division,
239
// modulo and multiplication are just trivial operators.
240
let num_packs = (self.length / T::Unpacked::LENGTH)
241
+ usize::from(!self.length.is_multiple_of(T::Unpacked::LENGTH));
242
243
// We reserve enough space here for self.length rounded up to the next multiple of
244
// T::Unpacked::LENGTH so that we can safely just write into that memory. Otherwise, we
245
// would have to make a special path where we memcopy twice which is less than ideal.
246
vec.reserve(num_packs * T::Unpacked::LENGTH);
247
248
// IMPORTANT: This pointer calculation has to appear after the reserve since that reserve
249
// might move the buffer.
250
let mut unpacked_ptr = vec.as_mut_ptr().wrapping_add(vec.len());
251
252
for _ in 0..num_packs {
253
// This unwrap should never fail since the packed length is checked on initialized of
254
// the `Decoder`.
255
let packed = self.packed.next().unwrap();
256
257
// SAFETY:
258
// Since we did a `vec::reserve` before with the total length, we know that the memory
259
// necessary for a `T::Unpacked` is available.
260
//
261
// - The elements in this buffer are properly aligned, so elements in a slice will also
262
// be properly aligned.
263
// - It is deferencable because it is (i) not null, (ii) in one allocated object, (iii)
264
// not pointing to deallocated memory, (iv) we do not rely on atomicity and (v) we do
265
// not read or write beyond the lifetime of `vec`.
266
// - All data is initialized before reading it. This is not perfect but should not lead
267
// to any UB.
268
// - We don't alias the same data from anywhere else at the same time, because we have
269
// the mutable reference to `vec`.
270
let unpacked_ref = unsafe { (unpacked_ptr as *mut T::Unpacked).as_mut() }.unwrap();
271
272
decode_pack::<T>(packed, self.num_bits, unpacked_ref);
273
274
unpacked_ptr = unpacked_ptr.wrapping_add(T::Unpacked::LENGTH);
275
}
276
277
// SAFETY:
278
// We have written these elements before so we know that these are available now.
279
//
280
// - The capacity is larger since we reserved enough spaced with the opening
281
// `vec::reserve`.
282
// - All elements are initialized by the `decode_pack` into the `unpacked_ref`.
283
unsafe { vec.set_len(vec.len() + self.length) }
284
}
285
}
286
287
#[cfg(test)]
288
mod tests {
289
use super::super::tests::case1;
290
use super::*;
291
292
impl<T: Unpackable> Decoder<'_, T> {
293
pub fn collect(self) -> Vec<T> {
294
let mut vec = Vec::new();
295
self.collect_into(&mut vec);
296
vec
297
}
298
}
299
300
#[test]
301
fn test_decode_rle() {
302
// Test data: 0-7 with bit width 3
303
// 0: 000
304
// 1: 001
305
// 2: 010
306
// 3: 011
307
// 4: 100
308
// 5: 101
309
// 6: 110
310
// 7: 111
311
let num_bits = 3;
312
let length = 8;
313
// encoded: 0b10001000u8, 0b11000110, 0b11111010
314
let data = vec![0b10001000u8, 0b11000110, 0b11111010];
315
316
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
317
.unwrap()
318
.collect();
319
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
320
}
321
322
#[test]
323
fn decode_large() {
324
let (num_bits, expected, data) = case1();
325
326
let decoded = Decoder::<u32>::try_new(&data, num_bits, expected.len())
327
.unwrap()
328
.collect();
329
assert_eq!(decoded, expected);
330
}
331
332
#[test]
333
fn test_decode_bool() {
334
let num_bits = 1;
335
let length = 8;
336
let data = vec![0b10101010];
337
338
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
339
.unwrap()
340
.collect();
341
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
342
}
343
344
#[test]
345
fn test_decode_u64() {
346
let num_bits = 1;
347
let length = 8;
348
let data = vec![0b10101010];
349
350
let decoded = Decoder::<u64>::try_new(&data, num_bits, length)
351
.unwrap()
352
.collect();
353
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
354
}
355
356
#[test]
357
fn even_case() {
358
// [0, 1, 2, 3, 4, 5, 6, 0]x99
359
let data = &[0b10001000u8, 0b11000110, 0b00011010];
360
let num_bits = 3;
361
let copies = 99; // 8 * 99 % 32 != 0
362
let expected = std::iter::repeat_n(&[0u32, 1, 2, 3, 4, 5, 6, 0], copies)
363
.flatten()
364
.copied()
365
.collect::<Vec<_>>();
366
let data = std::iter::repeat_n(data, copies)
367
.flatten()
368
.copied()
369
.collect::<Vec<_>>();
370
let length = expected.len();
371
372
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
373
.unwrap()
374
.collect();
375
assert_eq!(decoded, expected);
376
}
377
378
#[test]
379
fn odd_case() {
380
// [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2]
381
let data = &[0b10001000u8, 0b11000110, 0b00011010];
382
let num_bits = 3;
383
let copies = 4;
384
let expected = std::iter::repeat_n(&[0u32, 1, 2, 3, 4, 5, 6, 0], copies)
385
.flatten()
386
.copied()
387
.chain(std::iter::once(2))
388
.collect::<Vec<_>>();
389
let data = std::iter::repeat_n(data, copies)
390
.flatten()
391
.copied()
392
.chain(std::iter::once(0b00000010u8))
393
.collect::<Vec<_>>();
394
let length = expected.len();
395
396
let decoded = Decoder::<u32>::try_new(&data, num_bits, length)
397
.unwrap()
398
.collect();
399
assert_eq!(decoded, expected);
400
}
401
402
#[test]
403
fn test_errors() {
404
// zero length
405
assert!(Decoder::<u64>::try_new(&[], 1, 0).is_ok());
406
// no bytes
407
assert!(Decoder::<u64>::try_new(&[], 1, 1).is_err());
408
// too few bytes
409
assert!(Decoder::<u64>::try_new(&[1], 1, 8).is_ok());
410
assert!(Decoder::<u64>::try_new(&[1, 1], 2, 8).is_ok());
411
assert!(Decoder::<u64>::try_new(&[1], 1, 9).is_err());
412
// zero num_bits
413
assert!(Decoder::<u64>::try_new(&[1], 0, 1).is_err());
414
}
415
}
416
417