Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs
7887 views
1
use std::io::Write;
2
3
use super::bitpacked_encode;
4
use crate::parquet::encoding::{bitpacked, ceil8, uleb128};
5
6
// Arbitrary value that balances memory usage and storage overhead
7
const MAX_VALUES_PER_LITERAL_RUN: usize = (1 << 10) * 8;
8
9
pub trait Encoder<T: PartialEq + Default + Copy> {
10
fn bitpacked_encode<W: Write, I: Iterator<Item = T>>(
11
writer: &mut W,
12
iterator: I,
13
num_bits: usize,
14
) -> std::io::Result<()>;
15
16
fn run_length_encode<W: Write>(
17
writer: &mut W,
18
run_length: usize,
19
value: T,
20
bit_width: u32,
21
) -> std::io::Result<()>;
22
}
23
24
const U32_BLOCK_LEN: usize = 32;
25
26
impl Encoder<u32> for u32 {
27
fn bitpacked_encode<W: Write, I: Iterator<Item = u32>>(
28
writer: &mut W,
29
mut iterator: I,
30
num_bits: usize,
31
) -> std::io::Result<()> {
32
// the length of the iterator.
33
let length = iterator.size_hint().1.unwrap();
34
35
let mut header = ceil8(length) as u64;
36
header <<= 1;
37
header |= 1; // it is bitpacked => first bit is set
38
let mut container = [0; 10];
39
let used = uleb128::encode(header, &mut container);
40
writer.write_all(&container[..used])?;
41
42
let chunks = length / U32_BLOCK_LEN;
43
let remainder = length - chunks * U32_BLOCK_LEN;
44
let mut buffer = [0u32; U32_BLOCK_LEN];
45
46
// simplified from ceil8(U32_BLOCK_LEN * num_bits) since U32_BLOCK_LEN = 32
47
let compressed_chunk_size = 4 * num_bits;
48
49
for _ in 0..chunks {
50
iterator
51
.by_ref()
52
.take(U32_BLOCK_LEN)
53
.zip(buffer.iter_mut())
54
.for_each(|(item, buf)| *buf = item);
55
56
let mut packed = [0u8; 4 * U32_BLOCK_LEN];
57
bitpacked::encode_pack::<u32>(&buffer, num_bits, packed.as_mut());
58
writer.write_all(&packed[..compressed_chunk_size])?;
59
}
60
61
if remainder != 0 {
62
// Must be careful here to ensure we write a multiple of `num_bits`
63
// (the bit width) to align with the spec. Some readers also rely on
64
// this - see https://github.com/pola-rs/polars/pull/13883.
65
66
// this is ceil8(remainder * num_bits), but we ensure the output is a
67
// multiple of num_bits by rewriting it as ceil8(remainder) * num_bits
68
let compressed_remainder_size = ceil8(remainder) * num_bits;
69
iterator
70
.by_ref()
71
.take(remainder)
72
.zip(buffer.iter_mut())
73
.for_each(|(item, buf)| *buf = item);
74
75
let mut packed = [0u8; 4 * U32_BLOCK_LEN];
76
// No need to zero rest of buffer because remainder is either:
77
// * Multiple of 8: We pad non-terminal literal runs to have a
78
// multiple of 8 values. Once compressed, the data will end on
79
// clean byte boundaries and packed[..compressed_remainder_size]
80
// will include only the remainder values and nothing extra.
81
// * Final run: Extra values from buffer will be included in
82
// packed[..compressed_remainder_size] but ignored when decoding
83
// because they extend beyond known column length
84
bitpacked::encode_pack(&buffer, num_bits, packed.as_mut());
85
writer.write_all(&packed[..compressed_remainder_size])?;
86
};
87
Ok(())
88
}
89
90
fn run_length_encode<W: Write>(
91
writer: &mut W,
92
run_length: usize,
93
value: u32,
94
bit_width: u32,
95
) -> std::io::Result<()> {
96
// write the length + indicator
97
let mut header = run_length as u64;
98
header <<= 1;
99
let mut container = [0; 10];
100
let used = uleb128::encode(header, &mut container);
101
writer.write_all(&container[..used])?;
102
103
let num_bytes = ceil8(bit_width as usize);
104
let bytes = value.to_le_bytes();
105
writer.write_all(&bytes[..num_bytes])?;
106
Ok(())
107
}
108
}
109
110
impl Encoder<bool> for bool {
111
fn bitpacked_encode<W: Write, I: Iterator<Item = bool>>(
112
writer: &mut W,
113
iterator: I,
114
_num_bits: usize,
115
) -> std::io::Result<()> {
116
// the length of the iterator.
117
let length = iterator.size_hint().1.unwrap();
118
119
let mut header = ceil8(length) as u64;
120
header <<= 1;
121
header |= 1; // it is bitpacked => first bit is set
122
let mut container = [0; 10];
123
let used = uleb128::encode(header, &mut container);
124
writer.write_all(&container[..used])?;
125
bitpacked_encode(writer, iterator)?;
126
Ok(())
127
}
128
129
fn run_length_encode<W: Write>(
130
writer: &mut W,
131
run_length: usize,
132
value: bool,
133
_bit_width: u32,
134
) -> std::io::Result<()> {
135
// write the length + indicator
136
let mut header = run_length as u64;
137
header <<= 1;
138
let mut container = [0; 10];
139
let used = uleb128::encode(header, &mut container);
140
writer.write_all(&container[..used])?;
141
writer.write_all(&(value as u8).to_le_bytes())?;
142
Ok(())
143
}
144
}
145
146
#[allow(clippy::comparison_chain)]
147
pub fn encode<T: PartialEq + Default + Copy + Encoder<T>, W: Write, I: Iterator<Item = T>>(
148
writer: &mut W,
149
iterator: I,
150
num_bits: u32,
151
) -> std::io::Result<()> {
152
let mut consecutive_repeats: usize = 0;
153
let mut previous_val = T::default();
154
let mut buffered_bits = [previous_val; MAX_VALUES_PER_LITERAL_RUN];
155
let mut buffer_idx = 0;
156
let mut literal_run_idx = 0;
157
for val in iterator {
158
if val == previous_val {
159
consecutive_repeats += 1;
160
if consecutive_repeats >= 8 {
161
// Run is long enough to RLE, no need to buffer values
162
if consecutive_repeats > 8 {
163
continue;
164
} else {
165
// When we encounter a run long enough to potentially RLE,
166
// we must first ensure that the buffered literal run has
167
// a multiple of 8 values for bit-packing. If not, we pad
168
// up by taking some of the consecutive repeats
169
let literal_padding = (8 - (literal_run_idx % 8)) % 8;
170
consecutive_repeats -= literal_padding;
171
literal_run_idx += literal_padding;
172
}
173
}
174
// Too short to RLE, continue to buffer values
175
} else if consecutive_repeats > 8 {
176
// Value changed so start a new run but the current run is long
177
// enough to RLE. First, bit-pack any buffered literal run. Then,
178
// RLE current run and reset consecutive repeat counter and buffer.
179
if literal_run_idx > 0 {
180
debug_assert!(literal_run_idx % 8 == 0);
181
T::bitpacked_encode(
182
writer,
183
buffered_bits.iter().take(literal_run_idx).copied(),
184
num_bits as usize,
185
)?;
186
literal_run_idx = 0;
187
}
188
T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?;
189
consecutive_repeats = 1;
190
buffer_idx = 0;
191
} else {
192
// Value changed so start a new run but the current run is not long
193
// enough to RLE. Consolidate all consecutive repeats into buffered
194
// literal run.
195
literal_run_idx = buffer_idx;
196
consecutive_repeats = 1;
197
}
198
// If buffer is full, bit-pack as literal run and reset
199
if buffer_idx == MAX_VALUES_PER_LITERAL_RUN {
200
T::bitpacked_encode(writer, buffered_bits.iter().copied(), num_bits as usize)?;
201
// If buffer fills up in the middle of a run, all but the last
202
// repeat is consolidated into the literal run.
203
debug_assert!(
204
(consecutive_repeats < 8)
205
&& (buffer_idx - literal_run_idx == consecutive_repeats - 1)
206
);
207
consecutive_repeats = 1;
208
buffer_idx = 0;
209
literal_run_idx = 0;
210
}
211
buffered_bits[buffer_idx] = val;
212
previous_val = val;
213
buffer_idx += 1;
214
}
215
// Final run not long enough to RLE, extend literal run.
216
if consecutive_repeats <= 8 {
217
literal_run_idx = buffer_idx;
218
}
219
// Bit-pack final buffered literal run, if any
220
if literal_run_idx > 0 {
221
T::bitpacked_encode(
222
writer,
223
buffered_bits.iter().take(literal_run_idx).copied(),
224
num_bits as usize,
225
)?;
226
}
227
// RLE final consecutive run if long enough
228
if consecutive_repeats > 8 {
229
T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?;
230
}
231
Ok(())
232
}
233
234
#[cfg(test)]
235
mod tests {
236
use super::super::bitmap::BitmapIter;
237
use super::*;
238
239
#[test]
240
fn bool_basics_1() -> std::io::Result<()> {
241
let iter = BitmapIter::new(&[0b10011101u8, 0b10011101], 0, 14);
242
243
let mut vec = vec![];
244
245
encode::<bool, _, _>(&mut vec, iter, 1)?;
246
247
assert_eq!(vec, vec![((2 << 1) | 1), 0b10011101u8, 0b00011101]);
248
249
Ok(())
250
}
251
252
#[test]
253
fn bool_from_iter() -> std::io::Result<()> {
254
let mut vec = vec![];
255
256
encode::<bool, _, _>(
257
&mut vec,
258
vec![true, true, true, true, true, true, true, true].into_iter(),
259
1,
260
)?;
261
262
assert_eq!(vec, vec![((1 << 1) | 1), 0b11111111]);
263
Ok(())
264
}
265
266
#[test]
267
fn test_encode_u32() -> std::io::Result<()> {
268
let mut vec = vec![];
269
270
encode::<u32, _, _>(&mut vec, vec![0, 1, 2, 1, 2, 1, 1, 0, 3].into_iter(), 2)?;
271
272
assert_eq!(
273
vec,
274
vec![
275
((2 << 1) | 1),
276
0b01_10_01_00,
277
0b00_01_01_10,
278
0b_00_00_00_11,
279
0b0
280
]
281
);
282
Ok(())
283
}
284
285
#[test]
286
fn test_encode_u32_large() -> std::io::Result<()> {
287
let mut vec = vec![];
288
289
let values = (0..128).map(|x| x % 4);
290
291
encode::<u32, _, _>(&mut vec, values, 2)?;
292
293
let length = 128;
294
let expected = 0b11_10_01_00u8;
295
296
let mut expected = vec![expected; length / 4];
297
expected.insert(0, (((length / 8) as u8) << 1) | 1);
298
299
assert_eq!(vec, expected);
300
Ok(())
301
}
302
303
#[test]
304
fn test_u32_other() -> std::io::Result<()> {
305
let values = vec![3, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3].into_iter();
306
307
let mut vec = vec![];
308
encode::<u32, _, _>(&mut vec, values, 2)?;
309
310
let expected = vec![5, 207, 254, 247, 51];
311
assert_eq!(expected, vec);
312
Ok(())
313
}
314
}
315
316