Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/byte_stream_split/decoder.rs
7887 views
1
use crate::parquet::error::ParquetError;
2
3
const MAX_ELEMENT_SIZE: usize = 8;
4
5
/// Decodes using the [Byte Stream Split](https://github.com/apache/parquet-format/blob/master/Encodings.md#byte-stream-split-byte_stream_split--9) encoding.
6
/// # Implementation
7
/// A fixed size buffer is stored inline to support reading types of up to 8 bytes in size.
8
#[derive(Debug)]
9
pub struct Decoder<'a> {
10
values: &'a [u8],
11
buffer: [u8; MAX_ELEMENT_SIZE],
12
num_elements: usize,
13
position: usize,
14
element_size: usize,
15
}
16
17
impl<'a> Decoder<'a> {
18
pub fn try_new(values: &'a [u8], element_size: usize) -> Result<Self, ParquetError> {
19
if element_size > MAX_ELEMENT_SIZE {
20
// Since Parquet format version 2.11 it's valid to use byte stream split for fixed-length byte array data,
21
// which could be larger than 8 bytes, but Polars doesn't yet support reading byte stream split encoded FLBA data.
22
return Err(ParquetError::oos(format!(
23
"Byte stream split decoding only supports up to {MAX_ELEMENT_SIZE} byte element sizes"
24
)));
25
}
26
27
let values_size = values.len();
28
if !values_size.is_multiple_of(element_size) {
29
return Err(ParquetError::oos(format!(
30
"Values array length ({values_size}) is not a multiple of the element size ({element_size})"
31
)));
32
}
33
let num_elements = values.len() / element_size;
34
35
Ok(Self {
36
values,
37
buffer: [0; MAX_ELEMENT_SIZE],
38
num_elements,
39
position: 0,
40
element_size,
41
})
42
}
43
44
pub fn move_next(&mut self) -> bool {
45
if self.position >= self.num_elements {
46
return false;
47
}
48
49
debug_assert!(self.element_size <= MAX_ELEMENT_SIZE);
50
debug_assert!(self.values.len() >= self.num_elements * self.element_size);
51
for n in 0..self.element_size {
52
unsafe {
53
// SAFETY:
54
// We have the invariants that element_size <= MAX_ELEMENT_SIZE,
55
// buffer.len() == MAX_ELEMENT_SIZE,
56
// position < num_elements and
57
// values.len() >= num_elements * element_size.
58
*self.buffer.get_unchecked_mut(n) = *self
59
.values
60
.get_unchecked((self.num_elements * n) + self.position)
61
}
62
}
63
64
self.position += 1;
65
true
66
}
67
68
/// The number of remaining values
69
pub fn len(&self) -> usize {
70
self.num_elements - self.position
71
}
72
73
pub fn current_value(&self) -> &[u8] {
74
&self.buffer[0..self.element_size]
75
}
76
77
pub fn iter_converted<'b, T, F>(&'b mut self, converter: F) -> DecoderIterator<'a, 'b, T, F>
78
where
79
F: Copy + Fn(&[u8]) -> T,
80
{
81
DecoderIterator {
82
decoder: self,
83
converter,
84
}
85
}
86
}
87
88
#[derive(Debug)]
89
pub struct DecoderIterator<'a, 'b, T, F>
90
where
91
F: Copy + Fn(&[u8]) -> T,
92
{
93
decoder: &'b mut Decoder<'a>,
94
converter: F,
95
}
96
97
impl<T, F> Iterator for DecoderIterator<'_, '_, T, F>
98
where
99
F: Copy + Fn(&[u8]) -> T,
100
{
101
type Item = T;
102
103
#[inline]
104
fn next(&mut self) -> Option<Self::Item> {
105
if self.decoder.move_next() {
106
Some((self.converter)(self.decoder.current_value()))
107
} else {
108
None
109
}
110
}
111
112
#[inline]
113
fn size_hint(&self) -> (usize, Option<usize>) {
114
(self.decoder.len(), Some(self.decoder.len()))
115
}
116
}
117
118