Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/delta_byte_array/decoder.rs
7887 views
1
use super::super::delta_bitpacked;
2
use crate::parquet::encoding::delta_bitpacked::SumGatherer;
3
use crate::parquet::error::ParquetResult;
4
5
/// Decodes according to [Delta strings](https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7),
6
/// prefixes, lengths and values
7
/// # Implementation
8
/// This struct does not allocate on the heap.
9
#[derive(Debug)]
10
pub struct Decoder<'a> {
11
pub(crate) prefix_lengths: delta_bitpacked::Decoder<'a>,
12
pub(crate) suffix_lengths: delta_bitpacked::Decoder<'a>,
13
pub(crate) values: &'a [u8],
14
15
pub(crate) offset: usize,
16
pub(crate) last: Vec<u8>,
17
}
18
19
impl<'a> Decoder<'a> {
20
pub fn try_new(values: &'a [u8]) -> ParquetResult<Self> {
21
let (prefix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?;
22
let (suffix_lengths, values) = delta_bitpacked::Decoder::try_new(values)?;
23
24
Ok(Self {
25
prefix_lengths,
26
suffix_lengths,
27
values,
28
29
offset: 0,
30
last: Vec::with_capacity(32),
31
})
32
}
33
34
pub fn values(&self) -> &'a [u8] {
35
self.values
36
}
37
38
pub fn len(&self) -> usize {
39
debug_assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());
40
self.prefix_lengths.len()
41
}
42
43
pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
44
let mut prefix_sum = 0usize;
45
self.prefix_lengths
46
.gather_n_into(&mut prefix_sum, n, &mut SumGatherer(0))?;
47
let mut suffix_sum = 0usize;
48
self.suffix_lengths
49
.gather_n_into(&mut suffix_sum, n, &mut SumGatherer(0))?;
50
self.offset += prefix_sum + suffix_sum;
51
Ok(())
52
}
53
}
54
55
impl Iterator for Decoder<'_> {
56
type Item = ParquetResult<Vec<u8>>;
57
58
fn next(&mut self) -> Option<Self::Item> {
59
if self.len() == 0 {
60
return None;
61
}
62
63
let mut prefix_length = vec![];
64
let mut suffix_length = vec![];
65
if let Err(e) = self.prefix_lengths.collect_n(&mut prefix_length, 1) {
66
return Some(Err(e));
67
}
68
if let Err(e) = self.suffix_lengths.collect_n(&mut suffix_length, 1) {
69
return Some(Err(e));
70
}
71
let prefix_length = prefix_length[0];
72
let suffix_length = suffix_length[0];
73
74
let prefix_length = prefix_length as usize;
75
let suffix_length = suffix_length as usize;
76
77
let mut value = Vec::with_capacity(prefix_length + suffix_length);
78
79
value.extend_from_slice(&self.last[..prefix_length]);
80
value.extend_from_slice(&self.values[self.offset..self.offset + suffix_length]);
81
82
self.last.clear();
83
self.last.extend_from_slice(&value);
84
85
self.offset += suffix_length;
86
87
Some(Ok(value))
88
}
89
90
fn size_hint(&self) -> (usize, Option<usize>) {
91
(self.prefix_lengths.len(), Some(self.prefix_lengths.len()))
92
}
93
}
94
95
impl ExactSizeIterator for Decoder<'_> {}
96
97
#[cfg(test)]
98
mod tests {
99
use super::*;
100
101
#[test]
102
fn test_bla() -> ParquetResult<()> {
103
// VALIDATED from spark==3.1.1
104
let data = &[
105
128, 1, 4, 2, 0, 0, 0, 0, 0, 0, 128, 1, 4, 2, 10, 0, 0, 0, 0, 0, 72, 101, 108, 108,
106
111, 87, 111, 114, 108, 100,
107
// extra bytes are not from spark, but they should be ignored by the decoder
108
// because they are beyond the sum of all lengths.
109
1, 2, 3,
110
];
111
112
let decoder = Decoder::try_new(data)?;
113
let values = decoder.collect::<Result<Vec<_>, _>>()?;
114
assert_eq!(values, vec![b"Hello".to_vec(), b"World".to_vec()]);
115
116
Ok(())
117
}
118
119
#[test]
120
fn test_with_prefix() -> ParquetResult<()> {
121
// VALIDATED from spark==3.1.1
122
let data = &[
123
128, 1, 4, 2, 0, 6, 0, 0, 0, 0, 128, 1, 4, 2, 10, 4, 0, 0, 0, 0, 72, 101, 108, 108,
124
111, 105, 99, 111, 112, 116, 101, 114,
125
// extra bytes are not from spark, but they should be ignored by the decoder
126
// because they are beyond the sum of all lengths.
127
1, 2, 3,
128
];
129
130
let decoder = Decoder::try_new(data)?;
131
let prefixes = decoder.collect::<Result<Vec<_>, _>>()?;
132
assert_eq!(prefixes, vec![b"Hello".to_vec(), b"Helicopter".to_vec()]);
133
134
Ok(())
135
}
136
}
137
138