Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/array/binview/builder.rs
8382 views
1
use std::marker::PhantomData;
2
use std::sync::LazyLock;
3
4
use hashbrown::hash_map::Entry;
5
use polars_buffer::Buffer;
6
use polars_utils::IdxSize;
7
use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9
use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
10
use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
11
use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
12
use crate::bitmap::OptBitmapBuilder;
13
use crate::datatypes::ArrowDataType;
14
use crate::pushable::Pushable;
15
16
static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18
pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19
dtype: ArrowDataType,
20
views: Vec<View>,
21
active_buffer: Vec<u8>,
22
active_buffer_idx: u32,
23
buffer_set: Vec<Buffer<u8>>,
24
stolen_buffers: PlHashMap<usize, u32>,
25
26
// With these we can amortize buffer set translation costs if repeatedly
27
// stealing from the same set of buffers.
28
last_buffer_set_stolen_from: Option<Buffer<Buffer<u8>>>,
29
buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)
30
buffer_set_translation_generation: u32,
31
32
validity: OptBitmapBuilder,
33
/// Total bytes length if we would concatenate them all.
34
total_bytes_len: usize,
35
/// Total bytes in the buffer set (excluding remaining capacity).
36
total_buffer_len: usize,
37
view_type: PhantomData<V>,
38
}
39
40
impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41
pub const MAX_ROW_BYTE_LEN: usize = (u32::MAX - 1) as _;
42
43
pub fn new(dtype: ArrowDataType) -> Self {
44
Self {
45
dtype,
46
views: Vec::new(),
47
active_buffer: Vec::new(),
48
active_buffer_idx: 0,
49
buffer_set: Vec::new(),
50
stolen_buffers: PlHashMap::new(),
51
last_buffer_set_stolen_from: None,
52
buffer_set_translation_idxs: Vec::new(),
53
buffer_set_translation_generation: 0,
54
validity: OptBitmapBuilder::default(),
55
total_bytes_len: 0,
56
total_buffer_len: 0,
57
view_type: PhantomData,
58
}
59
}
60
61
#[inline]
62
fn reserve_active_buffer(&mut self, additional: usize) {
63
let len = self.active_buffer.len();
64
let cap = self.active_buffer.capacity();
65
if additional > cap - len || len + additional >= Self::MAX_ROW_BYTE_LEN {
66
self.reserve_active_buffer_slow(additional);
67
}
68
}
69
70
#[cold]
71
fn reserve_active_buffer_slow(&mut self, additional: usize) {
72
assert!(
73
additional <= Self::MAX_ROW_BYTE_LEN,
74
"strings longer than 2^32 - 2 are not supported"
75
);
76
77
// Allocate a new buffer and flush the old buffer.
78
let new_capacity = (self.active_buffer.capacity() * 2)
79
.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
80
.max(additional);
81
82
let old_buffer =
83
core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
84
if !old_buffer.is_empty() {
85
// Replace dummy with real buffer.
86
self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
87
}
88
self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
89
self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.
90
}
91
92
pub fn push_value_ignore_validity(&mut self, bytes: &V) {
93
let bytes = bytes.to_bytes();
94
self.total_bytes_len += bytes.len();
95
unsafe {
96
let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
97
self.reserve_active_buffer(bytes.len());
98
99
let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.
100
self.active_buffer.extend_from_slice(bytes);
101
self.total_buffer_len += bytes.len();
102
View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
103
} else {
104
View::new_inline_unchecked(bytes)
105
};
106
self.views.push(view);
107
}
108
}
109
110
/// # Safety
111
/// The view must be inline.
112
pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
113
debug_assert!(view.is_inline());
114
self.total_bytes_len += view.length as usize;
115
self.views.push(view);
116
}
117
118
fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Buffer<Buffer<u8>>) {
119
if self
120
.last_buffer_set_stolen_from
121
.as_ref()
122
.is_some_and(|stolen_bs| {
123
stolen_bs.as_ptr() == buffer_set.as_ptr() && stolen_bs.len() >= buffer_set.len()
124
})
125
{
126
return; // Already active.
127
}
128
129
// Switch to new generation (invalidating all old translation indices),
130
// and resizing the buffer with invalid indices if necessary.
131
let old_gen = self.buffer_set_translation_generation;
132
self.buffer_set_translation_generation = old_gen.wrapping_add(1);
133
if self.buffer_set_translation_idxs.len() < buffer_set.len() {
134
self.buffer_set_translation_idxs
135
.resize(buffer_set.len(), (0, old_gen));
136
}
137
}
138
139
unsafe fn translate_view(
140
&mut self,
141
mut view: View,
142
other_bufferset: &Buffer<Buffer<u8>>,
143
) -> View {
144
// Translate from old array-local buffer idx to global stolen buffer idx.
145
let (mut new_buffer_idx, gen_) = *self
146
.buffer_set_translation_idxs
147
.get_unchecked(view.buffer_idx as usize);
148
if gen_ != self.buffer_set_translation_generation {
149
// This buffer index wasn't seen before for this array, do a dedup lookup.
150
// Since we map by starting pointer and different subslices may have different lengths, we expand
151
// the buffer to the maximum it could be.
152
let buffer = other_bufferset
153
.get_unchecked(view.buffer_idx as usize)
154
.clone()
155
.expand_end_to_storage();
156
let buf_id = buffer.as_slice().as_ptr().addr();
157
let idx = match self.stolen_buffers.entry(buf_id) {
158
Entry::Occupied(o) => *o.get(),
159
Entry::Vacant(v) => {
160
let idx = self.buffer_set.len() as u32;
161
self.total_buffer_len += buffer.len();
162
self.buffer_set.push(buffer);
163
v.insert(idx);
164
idx
165
},
166
};
167
168
// Cache result for future lookups.
169
*self
170
.buffer_set_translation_idxs
171
.get_unchecked_mut(view.buffer_idx as usize) =
172
(idx, self.buffer_set_translation_generation);
173
new_buffer_idx = idx;
174
}
175
view.buffer_idx = new_buffer_idx;
176
view
177
}
178
179
unsafe fn extend_views_dedup_ignore_validity(
180
&mut self,
181
views: impl IntoIterator<Item = View>,
182
other_bufferset: &Buffer<Buffer<u8>>,
183
) {
184
// TODO: if there are way more buffers than length translate per-view
185
// rather than all at once.
186
self.switch_active_stealing_bufferset_to(other_bufferset);
187
188
for mut view in views {
189
if view.length > View::MAX_INLINE_SIZE {
190
view = self.translate_view(view, other_bufferset);
191
}
192
self.total_bytes_len += view.length as usize;
193
self.views.push(view);
194
}
195
}
196
197
unsafe fn extend_views_each_repeated_dedup_ignore_validity(
198
&mut self,
199
views: impl IntoIterator<Item = View>,
200
repeats: usize,
201
other_bufferset: &Buffer<Buffer<u8>>,
202
) {
203
// TODO: if there are way more buffers than length translate per-view
204
// rather than all at once.
205
self.switch_active_stealing_bufferset_to(other_bufferset);
206
207
for mut view in views {
208
if view.length > View::MAX_INLINE_SIZE {
209
view = self.translate_view(view, other_bufferset);
210
}
211
self.total_bytes_len += repeats * view.length as usize;
212
for _ in 0..repeats {
213
self.views.push(view);
214
}
215
}
216
}
217
}
218
219
impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
220
type Array = BinaryViewArrayGeneric<V>;
221
222
fn dtype(&self) -> &ArrowDataType {
223
&self.dtype
224
}
225
226
fn reserve(&mut self, additional: usize) {
227
self.views.reserve(additional);
228
self.validity.reserve(additional);
229
}
230
231
fn freeze(mut self) -> Self::Array {
232
// Flush active buffer and/or remove extra placeholder buffer.
233
if !self.active_buffer.is_empty() {
234
self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
235
} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
236
self.buffer_set.pop();
237
}
238
239
unsafe {
240
BinaryViewArrayGeneric::new_unchecked(
241
self.dtype,
242
Buffer::from(self.views),
243
Buffer::from(self.buffer_set),
244
self.validity.into_opt_validity(),
245
Some(self.total_bytes_len),
246
self.total_buffer_len,
247
)
248
}
249
}
250
251
fn freeze_reset(&mut self) -> Self::Array {
252
// Flush active buffer and/or remove extra placeholder buffer.
253
if !self.active_buffer.is_empty() {
254
self.buffer_set[self.active_buffer_idx as usize] =
255
Buffer::from(core::mem::take(&mut self.active_buffer));
256
} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
257
self.buffer_set.pop();
258
}
259
260
let out = unsafe {
261
BinaryViewArrayGeneric::new_unchecked(
262
self.dtype.clone(),
263
Buffer::from(core::mem::take(&mut self.views)),
264
Buffer::from(core::mem::take(&mut self.buffer_set)),
265
core::mem::take(&mut self.validity).into_opt_validity(),
266
Some(self.total_bytes_len),
267
self.total_buffer_len,
268
)
269
};
270
271
self.total_buffer_len = 0;
272
self.total_bytes_len = 0;
273
self.active_buffer_idx = 0;
274
self.stolen_buffers.clear();
275
self.last_buffer_set_stolen_from = None;
276
out
277
}
278
279
fn len(&self) -> usize {
280
self.views.len()
281
}
282
283
fn extend_nulls(&mut self, length: usize) {
284
self.views.extend_constant(length, View::default());
285
self.validity.extend_constant(length, false);
286
}
287
288
fn subslice_extend(
289
&mut self,
290
other: &Self::Array,
291
start: usize,
292
length: usize,
293
share: ShareStrategy,
294
) {
295
self.views.reserve(length);
296
297
unsafe {
298
match share {
299
ShareStrategy::Never => {
300
if let Some(v) = other.validity() {
301
for i in start..start + length {
302
if v.get_bit_unchecked(i) {
303
self.push_value_ignore_validity(other.value_unchecked(i));
304
} else {
305
self.views.push(View::default())
306
}
307
}
308
} else {
309
for i in start..start + length {
310
self.push_value_ignore_validity(other.value_unchecked(i));
311
}
312
}
313
},
314
ShareStrategy::Always => {
315
let other_views = &other.views()[start..start + length];
316
self.extend_views_dedup_ignore_validity(
317
other_views.iter().copied(),
318
other.data_buffers(),
319
);
320
},
321
}
322
}
323
324
self.validity
325
.subslice_extend_from_opt_validity(other.validity(), start, length);
326
}
327
328
fn subslice_extend_each_repeated(
329
&mut self,
330
other: &Self::Array,
331
start: usize,
332
length: usize,
333
repeats: usize,
334
share: ShareStrategy,
335
) {
336
self.views.reserve(length * repeats);
337
338
unsafe {
339
match share {
340
ShareStrategy::Never => {
341
if let Some(v) = other.validity() {
342
for i in start..start + length {
343
if v.get_bit_unchecked(i) {
344
for _ in 0..repeats {
345
self.push_value_ignore_validity(other.value_unchecked(i));
346
}
347
} else {
348
for _ in 0..repeats {
349
self.views.push(View::default())
350
}
351
}
352
}
353
} else {
354
for i in start..start + length {
355
for _ in 0..repeats {
356
self.push_value_ignore_validity(other.value_unchecked(i));
357
}
358
}
359
}
360
},
361
ShareStrategy::Always => {
362
let other_views = &other.views()[start..start + length];
363
self.extend_views_each_repeated_dedup_ignore_validity(
364
other_views.iter().copied(),
365
repeats,
366
other.data_buffers(),
367
);
368
},
369
}
370
}
371
372
self.validity
373
.subslice_extend_each_repeated_from_opt_validity(
374
other.validity(),
375
start,
376
length,
377
repeats,
378
);
379
}
380
381
unsafe fn gather_extend(
382
&mut self,
383
other: &Self::Array,
384
idxs: &[IdxSize],
385
share: ShareStrategy,
386
) {
387
self.views.reserve(idxs.len());
388
389
unsafe {
390
match share {
391
ShareStrategy::Never => {
392
if let Some(v) = other.validity() {
393
for idx in idxs {
394
if v.get_bit_unchecked(*idx as usize) {
395
self.push_value_ignore_validity(
396
other.value_unchecked(*idx as usize),
397
);
398
} else {
399
self.views.push(View::default())
400
}
401
}
402
} else {
403
for idx in idxs {
404
self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
405
}
406
}
407
},
408
ShareStrategy::Always => {
409
let other_view_slice = other.views().as_slice();
410
let other_views = idxs
411
.iter()
412
.map(|idx| *other_view_slice.get_unchecked(*idx as usize));
413
self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
414
},
415
}
416
}
417
418
self.validity
419
.gather_extend_from_opt_validity(other.validity(), idxs);
420
}
421
422
fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
423
self.views.reserve(idxs.len());
424
425
unsafe {
426
match share {
427
ShareStrategy::Never => {
428
if let Some(v) = other.validity() {
429
for idx in idxs {
430
if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
431
self.push_value_ignore_validity(
432
other.value_unchecked(*idx as usize),
433
);
434
} else {
435
self.views.push(View::default())
436
}
437
}
438
} else {
439
for idx in idxs {
440
if (*idx as usize) < other.len() {
441
self.push_value_ignore_validity(
442
other.value_unchecked(*idx as usize),
443
);
444
} else {
445
self.views.push(View::default())
446
}
447
}
448
}
449
},
450
ShareStrategy::Always => {
451
let other_view_slice = other.views().as_slice();
452
let other_views = idxs.iter().map(|idx| {
453
other_view_slice
454
.get(*idx as usize)
455
.copied()
456
.unwrap_or_default()
457
});
458
self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
459
},
460
}
461
}
462
463
self.validity
464
.opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
465
}
466
}
467
468