Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-row/src/variable/binary.rs
6939 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
//! Row encoding for Binary values
3
//!
4
//! - single `0_u8` if null
5
//! - single `1_u8` if empty array
6
//! - `2_u8` if not empty, followed by one or more blocks
7
//!
8
//! where a block is encoded as
9
//!
10
//! - [`BLOCK_SIZE`] bytes of string data, padded with 0s
11
//! - `0xFF_u8` if this is not the last block for this string
12
//! - otherwise the length of the block as a `u8`
13
use std::mem::MaybeUninit;
14
15
use arrow::array::{BinaryViewArray, MutableBinaryViewArray};
16
use polars_utils::slice::Slice2Uninit;
17
18
use crate::row::RowEncodingOptions;
19
use crate::utils::decode_opt_nulls;
20
21
/// The block size of the variable length encoding
22
pub(crate) const BLOCK_SIZE: usize = 32;
23
24
/// The continuation token.
25
pub(crate) const BLOCK_CONTINUATION_TOKEN: u8 = 0xFF;
26
27
/// Indicates an empty string
28
pub(crate) const EMPTY_SENTINEL: u8 = 1;
29
30
/// Indicates a non-empty string
31
pub(crate) const NON_EMPTY_SENTINEL: u8 = 2;
32
33
/// Returns the ceil of `value`/`divisor`
34
#[inline]
35
pub fn ceil(value: usize, divisor: usize) -> usize {
36
// Rewrite as `value.div_ceil(&divisor)` after
37
// https://github.com/rust-lang/rust/issues/88581 is merged.
38
value / divisor + !value.is_multiple_of(divisor) as usize
39
}
40
41
#[inline]
42
fn padded_length(a: usize) -> usize {
43
1 + ceil(a, BLOCK_SIZE) * (BLOCK_SIZE + 1)
44
}
45
46
#[inline]
47
pub fn encoded_len_from_len(a: Option<usize>, _opt: RowEncodingOptions) -> usize {
48
a.map_or(1, padded_length)
49
}
50
51
/// Encode one strings/bytes object and return the written length.
52
///
53
/// # Safety
54
/// `out` must have allocated enough room
55
unsafe fn encode_one(
56
out: &mut [MaybeUninit<u8>],
57
val: Option<&[MaybeUninit<u8>]>,
58
opt: RowEncodingOptions,
59
) -> usize {
60
let descending = opt.contains(RowEncodingOptions::DESCENDING);
61
match val {
62
Some([]) => {
63
let byte = if descending {
64
!EMPTY_SENTINEL
65
} else {
66
EMPTY_SENTINEL
67
};
68
*out.get_unchecked_mut(0) = MaybeUninit::new(byte);
69
1
70
},
71
Some(val) => {
72
let block_count = ceil(val.len(), BLOCK_SIZE);
73
let end_offset = 1 + block_count * (BLOCK_SIZE + 1);
74
75
let dst = out.get_unchecked_mut(..end_offset);
76
77
// Write `2_u8` to demarcate as non-empty, non-null string
78
*dst.get_unchecked_mut(0) = MaybeUninit::new(NON_EMPTY_SENTINEL);
79
80
let src_chunks = val.chunks_exact(BLOCK_SIZE);
81
let src_remainder = src_chunks.remainder();
82
83
// + 1 is for the BLOCK CONTINUATION TOKEN
84
let dst_chunks = dst.get_unchecked_mut(1..).chunks_exact_mut(BLOCK_SIZE + 1);
85
86
for (src, dst) in src_chunks.zip(dst_chunks) {
87
// we copy src.len() that leaves 1 bytes for the continuation tkn.
88
std::ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), src.len());
89
// Indicate that there are further blocks to follow
90
*dst.get_unchecked_mut(BLOCK_SIZE) = MaybeUninit::new(BLOCK_CONTINUATION_TOKEN);
91
}
92
93
// exactly BLOCK_SIZE bytes
94
// this means we only need to set the length
95
// all other bytes are already initialized
96
if src_remainder.is_empty() {
97
// overwrite the latest continuation marker.
98
// replace the "there is another block" with
99
// "we are finished this, this is the length of this block"
100
*dst.last_mut().unwrap_unchecked() = MaybeUninit::new(BLOCK_SIZE as u8);
101
}
102
// there are remainder bytes
103
else {
104
// get the last block
105
let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
106
let last_dst = dst.get_unchecked_mut(start_offset..);
107
let n_bytes_to_write = src_remainder.len();
108
109
std::ptr::copy_nonoverlapping(
110
src_remainder.as_ptr(),
111
last_dst.as_mut_ptr(),
112
n_bytes_to_write,
113
);
114
// write remainder as zeros
115
last_dst
116
.get_unchecked_mut(n_bytes_to_write..last_dst.len() - 1)
117
.fill(MaybeUninit::new(0));
118
*dst.last_mut().unwrap_unchecked() = MaybeUninit::new(src_remainder.len() as u8);
119
}
120
121
if descending {
122
for byte in dst {
123
*byte = MaybeUninit::new(!byte.assume_init());
124
}
125
}
126
end_offset
127
},
128
None => {
129
*out.get_unchecked_mut(0) = MaybeUninit::new(opt.null_sentinel());
130
// // write remainder as zeros
131
// out.get_unchecked_mut(1..).fill(MaybeUninit::new(0));
132
1
133
},
134
}
135
}
136
137
pub(crate) unsafe fn encode_iter<'a, I: Iterator<Item = Option<&'a [u8]>>>(
138
buffer: &mut [MaybeUninit<u8>],
139
input: I,
140
opt: RowEncodingOptions,
141
row_starts: &mut [usize],
142
) {
143
for (offset, opt_value) in row_starts.iter_mut().zip(input) {
144
let dst = buffer.get_unchecked_mut(*offset..);
145
let written_len = encode_one(dst, opt_value.map(|v| v.as_uninit()), opt);
146
*offset += written_len;
147
}
148
}
149
150
pub(crate) unsafe fn encoded_item_len(row: &[u8], opt: RowEncodingOptions) -> usize {
151
let descending = opt.contains(RowEncodingOptions::DESCENDING);
152
let (non_empty_sentinel, continuation_token) = if descending {
153
(!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN)
154
} else {
155
(NON_EMPTY_SENTINEL, BLOCK_CONTINUATION_TOKEN)
156
};
157
158
// empty or null
159
if *row.get_unchecked(0) != non_empty_sentinel {
160
return 1;
161
}
162
163
let mut idx = 1;
164
loop {
165
let sentinel = *row.get_unchecked(idx + BLOCK_SIZE);
166
if sentinel == continuation_token {
167
idx += BLOCK_SIZE + 1;
168
continue;
169
}
170
return idx + BLOCK_SIZE + 1;
171
}
172
}
173
174
unsafe fn decoded_len(
175
row: &[u8],
176
non_empty_sentinel: u8,
177
continuation_token: u8,
178
descending: bool,
179
) -> usize {
180
// empty or null
181
if *row.get_unchecked(0) != non_empty_sentinel {
182
return 0;
183
}
184
185
let mut str_len = 0;
186
let mut idx = 1;
187
loop {
188
let sentinel = *row.get_unchecked(idx + BLOCK_SIZE);
189
if sentinel == continuation_token {
190
idx += BLOCK_SIZE + 1;
191
str_len += BLOCK_SIZE;
192
continue;
193
}
194
// the sentinel of the last block has the length
195
// of that block. The rest is padding.
196
let block_length = if descending {
197
// all bits were inverted on encoding
198
!sentinel
199
} else {
200
sentinel
201
};
202
return str_len + block_length as usize;
203
}
204
}
205
206
pub(crate) unsafe fn decode_binview(
207
rows: &mut [&[u8]],
208
opt: RowEncodingOptions,
209
) -> BinaryViewArray {
210
let descending = opt.contains(RowEncodingOptions::DESCENDING);
211
let (non_empty_sentinel, continuation_token) = if descending {
212
(!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION_TOKEN)
213
} else {
214
(NON_EMPTY_SENTINEL, BLOCK_CONTINUATION_TOKEN)
215
};
216
217
let null_sentinel = opt.null_sentinel();
218
let validity = decode_opt_nulls(rows, null_sentinel);
219
220
let mut mutable = MutableBinaryViewArray::with_capacity(rows.len());
221
222
let mut scratch = vec![];
223
for row in rows {
224
scratch.set_len(0);
225
let str_len = decoded_len(row, non_empty_sentinel, continuation_token, descending);
226
let mut to_read = str_len;
227
// we start at one, as we skip the validity byte
228
let mut offset = 1;
229
230
while to_read >= BLOCK_SIZE {
231
to_read -= BLOCK_SIZE;
232
scratch.extend_from_slice(row.get_unchecked(offset..offset + BLOCK_SIZE));
233
offset += BLOCK_SIZE + 1;
234
}
235
236
if to_read != 0 {
237
scratch.extend_from_slice(row.get_unchecked(offset..offset + to_read));
238
offset += BLOCK_SIZE + 1;
239
}
240
*row = row.get_unchecked(offset..);
241
242
if descending {
243
scratch.iter_mut().for_each(|o| *o = !*o)
244
}
245
mutable.push_value_ignore_validity(&scratch);
246
}
247
248
let out: BinaryViewArray = mutable.into();
249
out.with_validity(validity)
250
}
251
252