Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/array/binview/builder.rs
6939 views
1
use std::marker::PhantomData;
2
use std::sync::{Arc, LazyLock};
3
4
use hashbrown::hash_map::Entry;
5
use polars_utils::IdxSize;
6
use polars_utils::aliases::{InitHashMaps, PlHashMap};
7
8
use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
9
use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
10
use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
11
use crate::bitmap::OptBitmapBuilder;
12
use crate::buffer::Buffer;
13
use crate::datatypes::ArrowDataType;
14
use crate::pushable::Pushable;
15
16
static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18
pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19
dtype: ArrowDataType,
20
views: Vec<View>,
21
active_buffer: Vec<u8>,
22
active_buffer_idx: u32,
23
buffer_set: Vec<Buffer<u8>>,
24
stolen_buffers: PlHashMap<usize, u32>,
25
26
// With these we can amortize buffer set translation costs if repeatedly
27
// stealing from the same set of buffers.
28
last_buffer_set_stolen_from: Option<Arc<[Buffer<u8>]>>,
29
buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)
30
buffer_set_translation_generation: u32,
31
32
validity: OptBitmapBuilder,
33
/// Total bytes length if we would concatenate them all.
34
total_bytes_len: usize,
35
/// Total bytes in the buffer set (excluding remaining capacity).
36
total_buffer_len: usize,
37
view_type: PhantomData<V>,
38
}
39
40
impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41
pub fn new(dtype: ArrowDataType) -> Self {
42
Self {
43
dtype,
44
views: Vec::new(),
45
active_buffer: Vec::new(),
46
active_buffer_idx: 0,
47
buffer_set: Vec::new(),
48
stolen_buffers: PlHashMap::new(),
49
last_buffer_set_stolen_from: None,
50
buffer_set_translation_idxs: Vec::new(),
51
buffer_set_translation_generation: 0,
52
validity: OptBitmapBuilder::default(),
53
total_bytes_len: 0,
54
total_buffer_len: 0,
55
view_type: PhantomData,
56
}
57
}
58
59
#[inline]
60
fn reserve_active_buffer(&mut self, additional: usize) {
61
let len = self.active_buffer.len();
62
let cap = self.active_buffer.capacity();
63
if additional > cap - len || len + additional >= (u32::MAX - 1) as usize {
64
self.reserve_active_buffer_slow(additional);
65
}
66
}
67
68
#[cold]
69
fn reserve_active_buffer_slow(&mut self, additional: usize) {
70
assert!(
71
additional <= (u32::MAX - 1) as usize,
72
"strings longer than 2^32 - 2 are not supported"
73
);
74
75
// Allocate a new buffer and flush the old buffer.
76
let new_capacity = (self.active_buffer.capacity() * 2)
77
.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
78
.max(additional);
79
80
let old_buffer =
81
core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
82
if !old_buffer.is_empty() {
83
// Replace dummy with real buffer.
84
self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
85
}
86
self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
87
self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.
88
}
89
90
pub fn push_value_ignore_validity(&mut self, bytes: &V) {
91
let bytes = bytes.to_bytes();
92
self.total_bytes_len += bytes.len();
93
unsafe {
94
let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
95
self.reserve_active_buffer(bytes.len());
96
97
let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.
98
self.active_buffer.extend_from_slice(bytes);
99
self.total_buffer_len += bytes.len();
100
View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
101
} else {
102
View::new_inline_unchecked(bytes)
103
};
104
self.views.push(view);
105
}
106
}
107
108
/// # Safety
109
/// The view must be inline.
110
pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
111
debug_assert!(view.is_inline());
112
self.total_bytes_len += view.length as usize;
113
self.views.push(view);
114
}
115
116
fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Arc<[Buffer<u8>]>) {
117
// Fat pointer equality, checks both start and length.
118
if self
119
.last_buffer_set_stolen_from
120
.as_ref()
121
.is_some_and(|stolen_bs| std::ptr::eq(Arc::as_ptr(stolen_bs), Arc::as_ptr(buffer_set)))
122
{
123
return; // Already active.
124
}
125
126
// Switch to new generation (invalidating all old translation indices),
127
// and resizing the buffer with invalid indices if necessary.
128
let old_gen = self.buffer_set_translation_generation;
129
self.buffer_set_translation_generation = old_gen.wrapping_add(1);
130
if self.buffer_set_translation_idxs.len() < buffer_set.len() {
131
self.buffer_set_translation_idxs
132
.resize(buffer_set.len(), (0, old_gen));
133
}
134
}
135
136
unsafe fn translate_view(
137
&mut self,
138
mut view: View,
139
other_bufferset: &Arc<[Buffer<u8>]>,
140
) -> View {
141
// Translate from old array-local buffer idx to global stolen buffer idx.
142
let (mut new_buffer_idx, gen_) = *self
143
.buffer_set_translation_idxs
144
.get_unchecked(view.buffer_idx as usize);
145
if gen_ != self.buffer_set_translation_generation {
146
// This buffer index wasn't seen before for this array, do a dedup lookup.
147
// Since we map by starting pointer and different subslices may have different lengths, we expand
148
// the buffer to the maximum it could be.
149
let buffer = other_bufferset
150
.get_unchecked(view.buffer_idx as usize)
151
.clone()
152
.expand_end_to_storage();
153
let buf_id = buffer.as_slice().as_ptr().addr();
154
let idx = match self.stolen_buffers.entry(buf_id) {
155
Entry::Occupied(o) => *o.get(),
156
Entry::Vacant(v) => {
157
let idx = self.buffer_set.len() as u32;
158
self.total_buffer_len += buffer.len();
159
self.buffer_set.push(buffer);
160
v.insert(idx);
161
idx
162
},
163
};
164
165
// Cache result for future lookups.
166
*self
167
.buffer_set_translation_idxs
168
.get_unchecked_mut(view.buffer_idx as usize) =
169
(idx, self.buffer_set_translation_generation);
170
new_buffer_idx = idx;
171
}
172
view.buffer_idx = new_buffer_idx;
173
view
174
}
175
176
unsafe fn extend_views_dedup_ignore_validity(
177
&mut self,
178
views: impl IntoIterator<Item = View>,
179
other_bufferset: &Arc<[Buffer<u8>]>,
180
) {
181
// TODO: if there are way more buffers than length translate per-view
182
// rather than all at once.
183
self.switch_active_stealing_bufferset_to(other_bufferset);
184
185
for mut view in views {
186
if view.length > View::MAX_INLINE_SIZE {
187
view = self.translate_view(view, other_bufferset);
188
}
189
self.total_bytes_len += view.length as usize;
190
self.views.push(view);
191
}
192
}
193
194
unsafe fn extend_views_each_repeated_dedup_ignore_validity(
195
&mut self,
196
views: impl IntoIterator<Item = View>,
197
repeats: usize,
198
other_bufferset: &Arc<[Buffer<u8>]>,
199
) {
200
// TODO: if there are way more buffers than length translate per-view
201
// rather than all at once.
202
self.switch_active_stealing_bufferset_to(other_bufferset);
203
204
for mut view in views {
205
if view.length > View::MAX_INLINE_SIZE {
206
view = self.translate_view(view, other_bufferset);
207
}
208
self.total_bytes_len += repeats * view.length as usize;
209
for _ in 0..repeats {
210
self.views.push(view);
211
}
212
}
213
}
214
}
215
216
impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
217
type Array = BinaryViewArrayGeneric<V>;
218
219
fn dtype(&self) -> &ArrowDataType {
220
&self.dtype
221
}
222
223
fn reserve(&mut self, additional: usize) {
224
self.views.reserve(additional);
225
self.validity.reserve(additional);
226
}
227
228
fn freeze(mut self) -> Self::Array {
229
// Flush active buffer and/or remove extra placeholder buffer.
230
if !self.active_buffer.is_empty() {
231
self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
232
} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
233
self.buffer_set.pop();
234
}
235
236
unsafe {
237
BinaryViewArrayGeneric::new_unchecked(
238
self.dtype,
239
Buffer::from(self.views),
240
Arc::from(self.buffer_set),
241
self.validity.into_opt_validity(),
242
self.total_bytes_len,
243
self.total_buffer_len,
244
)
245
}
246
}
247
248
fn freeze_reset(&mut self) -> Self::Array {
249
// Flush active buffer and/or remove extra placeholder buffer.
250
if !self.active_buffer.is_empty() {
251
self.buffer_set[self.active_buffer_idx as usize] =
252
Buffer::from(core::mem::take(&mut self.active_buffer));
253
} else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
254
self.buffer_set.pop();
255
}
256
257
let out = unsafe {
258
BinaryViewArrayGeneric::new_unchecked(
259
self.dtype.clone(),
260
Buffer::from(core::mem::take(&mut self.views)),
261
Arc::from(core::mem::take(&mut self.buffer_set)),
262
core::mem::take(&mut self.validity).into_opt_validity(),
263
self.total_bytes_len,
264
self.total_buffer_len,
265
)
266
};
267
268
self.total_buffer_len = 0;
269
self.total_bytes_len = 0;
270
self.active_buffer_idx = 0;
271
self.stolen_buffers.clear();
272
self.last_buffer_set_stolen_from = None;
273
out
274
}
275
276
fn len(&self) -> usize {
277
self.views.len()
278
}
279
280
fn extend_nulls(&mut self, length: usize) {
281
self.views.extend_constant(length, View::default());
282
self.validity.extend_constant(length, false);
283
}
284
285
fn subslice_extend(
286
&mut self,
287
other: &Self::Array,
288
start: usize,
289
length: usize,
290
share: ShareStrategy,
291
) {
292
self.views.reserve(length);
293
294
unsafe {
295
match share {
296
ShareStrategy::Never => {
297
if let Some(v) = other.validity() {
298
for i in start..start + length {
299
if v.get_bit_unchecked(i) {
300
self.push_value_ignore_validity(other.value_unchecked(i));
301
} else {
302
self.views.push(View::default())
303
}
304
}
305
} else {
306
for i in start..start + length {
307
self.push_value_ignore_validity(other.value_unchecked(i));
308
}
309
}
310
},
311
ShareStrategy::Always => {
312
let other_views = &other.views()[start..start + length];
313
self.extend_views_dedup_ignore_validity(
314
other_views.iter().copied(),
315
other.data_buffers(),
316
);
317
},
318
}
319
}
320
321
self.validity
322
.subslice_extend_from_opt_validity(other.validity(), start, length);
323
}
324
325
fn subslice_extend_each_repeated(
326
&mut self,
327
other: &Self::Array,
328
start: usize,
329
length: usize,
330
repeats: usize,
331
share: ShareStrategy,
332
) {
333
self.views.reserve(length * repeats);
334
335
unsafe {
336
match share {
337
ShareStrategy::Never => {
338
if let Some(v) = other.validity() {
339
for i in start..start + length {
340
if v.get_bit_unchecked(i) {
341
for _ in 0..repeats {
342
self.push_value_ignore_validity(other.value_unchecked(i));
343
}
344
} else {
345
for _ in 0..repeats {
346
self.views.push(View::default())
347
}
348
}
349
}
350
} else {
351
for i in start..start + length {
352
for _ in 0..repeats {
353
self.push_value_ignore_validity(other.value_unchecked(i));
354
}
355
}
356
}
357
},
358
ShareStrategy::Always => {
359
let other_views = &other.views()[start..start + length];
360
self.extend_views_each_repeated_dedup_ignore_validity(
361
other_views.iter().copied(),
362
repeats,
363
other.data_buffers(),
364
);
365
},
366
}
367
}
368
369
self.validity
370
.subslice_extend_each_repeated_from_opt_validity(
371
other.validity(),
372
start,
373
length,
374
repeats,
375
);
376
}
377
378
unsafe fn gather_extend(
379
&mut self,
380
other: &Self::Array,
381
idxs: &[IdxSize],
382
share: ShareStrategy,
383
) {
384
self.views.reserve(idxs.len());
385
386
unsafe {
387
match share {
388
ShareStrategy::Never => {
389
if let Some(v) = other.validity() {
390
for idx in idxs {
391
if v.get_bit_unchecked(*idx as usize) {
392
self.push_value_ignore_validity(
393
other.value_unchecked(*idx as usize),
394
);
395
} else {
396
self.views.push(View::default())
397
}
398
}
399
} else {
400
for idx in idxs {
401
self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
402
}
403
}
404
},
405
ShareStrategy::Always => {
406
let other_view_slice = other.views().as_slice();
407
let other_views = idxs
408
.iter()
409
.map(|idx| *other_view_slice.get_unchecked(*idx as usize));
410
self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
411
},
412
}
413
}
414
415
self.validity
416
.gather_extend_from_opt_validity(other.validity(), idxs);
417
}
418
419
fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
420
self.views.reserve(idxs.len());
421
422
unsafe {
423
match share {
424
ShareStrategy::Never => {
425
if let Some(v) = other.validity() {
426
for idx in idxs {
427
if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
428
self.push_value_ignore_validity(
429
other.value_unchecked(*idx as usize),
430
);
431
} else {
432
self.views.push(View::default())
433
}
434
}
435
} else {
436
for idx in idxs {
437
if (*idx as usize) < other.len() {
438
self.push_value_ignore_validity(
439
other.value_unchecked(*idx as usize),
440
);
441
} else {
442
self.views.push(View::default())
443
}
444
}
445
}
446
},
447
ShareStrategy::Always => {
448
let other_view_slice = other.views().as_slice();
449
let other_views = idxs.iter().map(|idx| {
450
other_view_slice
451
.get(*idx as usize)
452
.copied()
453
.unwrap_or_default()
454
});
455
self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
456
},
457
}
458
}
459
460
self.validity
461
.opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
462
}
463
}
464
465