Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-parquet/src/parquet/encoding/delta_bitpacked/encoder.rs
7887 views
1
use super::super::{bitpacked, uleb128, zigzag_leb128};
2
use crate::parquet::encoding::ceil8;
3
4
/// Encodes an iterator of `i64` according to parquet's `DELTA_BINARY_PACKED`.
5
/// # Implementation
6
/// * This function does not allocate on the heap.
7
/// * The number of mini-blocks is always 1. This may change in the future.
8
pub fn encode<I: ExactSizeIterator<Item = i64>>(
9
mut iterator: I,
10
buffer: &mut Vec<u8>,
11
num_miniblocks_per_block: usize,
12
) {
13
const BLOCK_SIZE: usize = 256;
14
assert!([1, 2, 4].contains(&num_miniblocks_per_block));
15
let values_per_miniblock = BLOCK_SIZE / num_miniblocks_per_block;
16
17
let mut container = [0u8; 10];
18
let encoded_len = uleb128::encode(BLOCK_SIZE as u64, &mut container);
19
buffer.extend_from_slice(&container[..encoded_len]);
20
21
let encoded_len = uleb128::encode(num_miniblocks_per_block as u64, &mut container);
22
buffer.extend_from_slice(&container[..encoded_len]);
23
24
let length = iterator.len();
25
let encoded_len = uleb128::encode(length as u64, &mut container);
26
buffer.extend_from_slice(&container[..encoded_len]);
27
28
let mut values = [0i64; BLOCK_SIZE];
29
let mut deltas = [0u64; BLOCK_SIZE];
30
let mut num_bits = [0u8; 4];
31
32
let first_value = iterator.next().unwrap_or_default();
33
let (container, encoded_len) = zigzag_leb128::encode(first_value);
34
buffer.extend_from_slice(&container[..encoded_len]);
35
36
let mut prev = first_value;
37
let mut length = iterator.len();
38
while length != 0 {
39
let mut min_delta = i64::MAX;
40
let mut max_delta = i64::MIN;
41
for (i, integer) in iterator.by_ref().enumerate().take(BLOCK_SIZE) {
42
if i % values_per_miniblock == 0 {
43
min_delta = i64::MAX;
44
max_delta = i64::MIN
45
}
46
47
let delta = integer.wrapping_sub(prev);
48
min_delta = min_delta.min(delta);
49
max_delta = max_delta.max(delta);
50
51
let miniblock_idx = i / values_per_miniblock;
52
num_bits[miniblock_idx] = (64 - max_delta.abs_diff(min_delta).leading_zeros()) as u8;
53
values[i] = delta;
54
prev = integer;
55
}
56
let consumed = std::cmp::min(length - iterator.len(), BLOCK_SIZE);
57
length = iterator.len();
58
let values = &values[..consumed];
59
60
values.iter().zip(deltas.iter_mut()).for_each(|(v, delta)| {
61
*delta = v.wrapping_sub(min_delta) as u64;
62
});
63
64
// <min delta> <list of bitwidths of miniblocks> <miniblocks>
65
let (container, encoded_len) = zigzag_leb128::encode(min_delta);
66
buffer.extend_from_slice(&container[..encoded_len]);
67
68
// one miniblock => 1 byte
69
let mut values_remaining = consumed;
70
buffer.extend_from_slice(&num_bits[..num_miniblocks_per_block]);
71
for i in 0..num_miniblocks_per_block {
72
if values_remaining == 0 {
73
break;
74
}
75
76
values_remaining = values_remaining.saturating_sub(values_per_miniblock);
77
write_miniblock(
78
buffer,
79
num_bits[i],
80
&deltas[i * values_per_miniblock..(i + 1) * values_per_miniblock],
81
);
82
}
83
}
84
}
85
86
fn write_miniblock(buffer: &mut Vec<u8>, num_bits: u8, deltas: &[u64]) {
87
let num_bits = num_bits as usize;
88
if num_bits > 0 {
89
let start = buffer.len();
90
91
// bitpack encode all (deltas.len = 128 which is a multiple of 32)
92
let bytes_needed = start + ceil8(deltas.len() * num_bits);
93
buffer.resize(bytes_needed, 0);
94
bitpacked::encode(deltas, num_bits, &mut buffer[start..]);
95
96
let bytes_needed = start + ceil8(deltas.len() * num_bits);
97
buffer.truncate(bytes_needed);
98
}
99
}
100
101
#[cfg(test)]
102
mod tests {
103
use super::*;
104
105
#[test]
106
fn constant_delta() {
107
// header: [128, 2, 1, 5, 2]:
108
// block size: 256 <=u> 128, 2
109
// mini-blocks: 1 <=u> 1
110
// elements: 5 <=u> 5
111
// first_value: 2 <=z> 1
112
// block1: [2, 0, 0, 0, 0]
113
// min_delta: 1 <=z> 2
114
// bitwidth: 0
115
let data = 1..=5;
116
let expected = vec![128u8, 2, 1, 5, 2, 2, 0];
117
118
let mut buffer = vec![];
119
encode(data.collect::<Vec<_>>().into_iter(), &mut buffer, 1);
120
assert_eq!(expected, buffer);
121
}
122
123
#[test]
124
fn negative_min_delta() {
125
// max - min = 1 - -4 = 5
126
let data = vec![1, 2, 3, 4, 5, 1];
127
// header: [128, 2, 4, 6, 2]
128
// block size: 256 <=u> 128, 2
129
// mini-blocks: 1 <=u> 1
130
// elements: 6 <=u> 5
131
// first_value: 2 <=z> 1
132
// block1: [7, 3, 253, 255]
133
// min_delta: -4 <=z> 7
134
// bitwidth: 3
135
// values: [5, 5, 5, 5, 0] <=b> [
136
// 0b01101101
137
// 0b00001011
138
// ]
139
let mut expected = vec![128u8, 2, 1, 6, 2, 7, 3, 0b01101101, 0b00001011];
140
expected.extend(std::iter::repeat_n(0, 256 * 3 / 8 - 2)); // 128 values, 3 bits, 2 already used
141
142
let mut buffer = vec![];
143
encode(data.into_iter(), &mut buffer, 1);
144
assert_eq!(expected, buffer);
145
}
146
}
147
148