Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/array/binview/mutable.rs
6939 views
1
use std::any::Any;
2
use std::fmt::{Debug, Formatter};
3
use std::ops::Deref;
4
use std::sync::Arc;
5
6
use hashbrown::hash_map::Entry;
7
use polars_error::PolarsResult;
8
use polars_utils::aliases::{InitHashMaps, PlHashMap};
9
10
use crate::array::binview::iterator::MutableBinaryViewValueIter;
11
use crate::array::binview::view::validate_views_utf8_only;
12
use crate::array::binview::{
13
BinaryViewArrayGeneric, DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE, ViewType,
14
};
15
use crate::array::{Array, MutableArray, TryExtend, TryPush, View};
16
use crate::bitmap::MutableBitmap;
17
use crate::buffer::Buffer;
18
use crate::datatypes::ArrowDataType;
19
use crate::legacy::trusted_len::TrustedLenPush;
20
use crate::trusted_len::TrustedLen;
21
22
// Invariants:
23
//
24
// - Each view must point to a valid slice of a buffer
25
// - `total_buffer_len` must be equal to `completed_buffers.iter().map(Vec::len).sum()`
26
// - `total_bytes_len` must be equal to `views.iter().map(View::len).sum()`
27
pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
28
pub(crate) views: Vec<View>,
29
pub(crate) completed_buffers: Vec<Buffer<u8>>,
30
pub(crate) in_progress_buffer: Vec<u8>,
31
pub(crate) validity: Option<MutableBitmap>,
32
pub(crate) phantom: std::marker::PhantomData<T>,
33
/// Total bytes length if we would concatenate them all.
34
pub(crate) total_bytes_len: usize,
35
/// Total bytes in the buffer (excluding remaining capacity)
36
pub(crate) total_buffer_len: usize,
37
/// Mapping from `Buffer::deref()` to index in `completed_buffers`.
38
/// Used in `push_view()`.
39
pub(crate) stolen_buffers: PlHashMap<usize, u32>,
40
}
41
42
impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
43
fn clone(&self) -> Self {
44
Self {
45
views: self.views.clone(),
46
completed_buffers: self.completed_buffers.clone(),
47
in_progress_buffer: self.in_progress_buffer.clone(),
48
validity: self.validity.clone(),
49
phantom: Default::default(),
50
total_bytes_len: self.total_bytes_len,
51
total_buffer_len: self.total_buffer_len,
52
stolen_buffers: PlHashMap::new(),
53
}
54
}
55
}
56
57
impl<T: ViewType + ?Sized> Debug for MutableBinaryViewArray<T> {
58
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
59
write!(f, "mutable-binview{:?}", T::DATA_TYPE)
60
}
61
}
62
63
impl<T: ViewType + ?Sized> Default for MutableBinaryViewArray<T> {
64
fn default() -> Self {
65
Self::with_capacity(0)
66
}
67
}
68
69
impl<T: ViewType + ?Sized> From<MutableBinaryViewArray<T>> for BinaryViewArrayGeneric<T> {
70
fn from(mut value: MutableBinaryViewArray<T>) -> Self {
71
value.finish_in_progress();
72
unsafe {
73
Self::new_unchecked(
74
T::DATA_TYPE,
75
value.views.into(),
76
Arc::from(value.completed_buffers),
77
value.validity.map(|b| b.into()),
78
value.total_bytes_len,
79
value.total_buffer_len,
80
)
81
}
82
}
83
}
84
85
impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
86
pub fn new() -> Self {
87
Self::default()
88
}
89
90
pub fn with_capacity(capacity: usize) -> Self {
91
Self {
92
views: Vec::with_capacity(capacity),
93
completed_buffers: vec![],
94
in_progress_buffer: vec![],
95
validity: None,
96
phantom: Default::default(),
97
total_buffer_len: 0,
98
total_bytes_len: 0,
99
stolen_buffers: PlHashMap::new(),
100
}
101
}
102
103
/// Get a mutable reference to the [`Vec`] of [`View`]s in this [`MutableBinaryViewArray`].
104
///
105
/// # Safety
106
///
107
/// This is safe as long as any mutation of the [`Vec`] does not break any invariants of the
108
/// [`MutableBinaryViewArray`] before it is read again.
109
#[inline]
110
pub unsafe fn views_mut(&mut self) -> &mut Vec<View> {
111
&mut self.views
112
}
113
114
/// Set the `total_bytes_len` of the [`MutableBinaryViewArray`]
115
///
116
/// # Safety
117
///
118
/// This should not break invariants of the [`MutableBinaryViewArray`]
119
#[inline]
120
pub unsafe fn set_total_bytes_len(&mut self, value: usize) {
121
#[cfg(debug_assertions)]
122
{
123
let actual_length: usize = self.views().iter().map(|v| v.length as usize).sum();
124
assert_eq!(value, actual_length);
125
}
126
127
self.total_bytes_len = value;
128
}
129
130
pub fn total_bytes_len(&self) -> usize {
131
self.total_bytes_len
132
}
133
134
pub fn total_buffer_len(&self) -> usize {
135
self.total_buffer_len
136
}
137
138
#[inline]
139
pub fn views(&self) -> &[View] {
140
&self.views
141
}
142
143
#[inline]
144
pub fn completed_buffers(&self) -> &[Buffer<u8>] {
145
&self.completed_buffers
146
}
147
148
pub fn validity(&mut self) -> Option<&mut MutableBitmap> {
149
self.validity.as_mut()
150
}
151
152
/// Reserves `additional` elements and `additional_buffer` on the buffer.
153
pub fn reserve(&mut self, additional: usize) {
154
self.views.reserve(additional);
155
}
156
157
#[inline]
158
pub fn len(&self) -> usize {
159
self.views.len()
160
}
161
162
#[inline]
163
pub fn capacity(&self) -> usize {
164
self.views.capacity()
165
}
166
167
fn init_validity(&mut self, unset_last: bool) {
168
let mut validity = MutableBitmap::with_capacity(self.views.capacity());
169
validity.extend_constant(self.len(), true);
170
if unset_last {
171
validity.set(self.len() - 1, false);
172
}
173
self.validity = Some(validity);
174
}
175
176
/// # Safety
177
/// - caller must allocate enough capacity
178
/// - caller must ensure the view and buffers match.
179
/// - The array must not have validity.
180
pub(crate) unsafe fn push_view_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
181
let len = v.length;
182
self.total_bytes_len += len as usize;
183
if len <= 12 {
184
debug_assert!(self.views.capacity() > self.views.len());
185
self.views.push_unchecked(v)
186
} else {
187
self.total_buffer_len += len as usize;
188
let data = buffers.get_unchecked(v.buffer_idx as usize);
189
let offset = v.offset as usize;
190
let bytes = data.get_unchecked(offset..offset + len as usize);
191
let t = T::from_bytes_unchecked(bytes);
192
self.push_value_ignore_validity(t)
193
}
194
}
195
196
/// # Safety
197
/// - caller must allocate enough capacity
198
/// - caller must ensure the view and buffers match.
199
/// - The array must not have validity.
200
/// - caller must not mix use this function with other push functions.
201
pub unsafe fn push_view_unchecked_dedupe(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
202
let len = v.length;
203
self.total_bytes_len += len as usize;
204
if len <= 12 {
205
self.views.push_unchecked(v);
206
} else {
207
let buffer = buffers.get_unchecked(v.buffer_idx as usize);
208
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
209
Entry::Occupied(entry) => *entry.get(),
210
Entry::Vacant(entry) => {
211
let idx = self.completed_buffers.len() as u32;
212
entry.insert(idx);
213
self.completed_buffers.push(buffer.clone());
214
self.total_buffer_len += buffer.len();
215
idx
216
},
217
};
218
v.buffer_idx = idx;
219
self.views.push_unchecked(v);
220
}
221
}
222
223
pub fn push_view(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
224
let len = v.length;
225
self.total_bytes_len += len as usize;
226
if len <= 12 {
227
self.views.push(v);
228
} else {
229
// Do no mix use of push_view and push_value_ignore_validity -
230
// it causes fragmentation.
231
self.finish_in_progress();
232
233
let buffer = &buffers[v.buffer_idx as usize];
234
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
235
Entry::Occupied(entry) => {
236
let idx = *entry.get();
237
let target_buffer = &self.completed_buffers[idx as usize];
238
debug_assert_eq!(buffer, target_buffer);
239
idx
240
},
241
Entry::Vacant(entry) => {
242
let idx = self.completed_buffers.len() as u32;
243
entry.insert(idx);
244
self.completed_buffers.push(buffer.clone());
245
self.total_buffer_len += buffer.len();
246
idx
247
},
248
};
249
v.buffer_idx = idx;
250
self.views.push(v);
251
}
252
if let Some(validity) = &mut self.validity {
253
validity.push(true)
254
}
255
}
256
257
#[inline]
258
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
259
let bytes = value.as_ref().to_bytes();
260
self.total_bytes_len += bytes.len();
261
let view = self.push_value_into_buffer(bytes);
262
self.views.push(view);
263
}
264
265
#[inline]
266
pub fn push_buffer(&mut self, buffer: Buffer<u8>) -> u32 {
267
self.finish_in_progress();
268
269
let buffer_idx = self.completed_buffers.len();
270
self.total_buffer_len += buffer.len();
271
self.completed_buffers.push(buffer);
272
buffer_idx as u32
273
}
274
275
#[inline]
276
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
277
if let Some(validity) = &mut self.validity {
278
validity.push(true)
279
}
280
self.push_value_ignore_validity(value)
281
}
282
283
#[inline]
284
pub fn push<V: AsRef<T>>(&mut self, value: Option<V>) {
285
if let Some(value) = value {
286
self.push_value(value)
287
} else {
288
self.push_null()
289
}
290
}
291
292
#[inline]
293
pub fn push_null(&mut self) {
294
self.views.push(View::default());
295
match &mut self.validity {
296
Some(validity) => validity.push(false),
297
None => self.init_validity(true),
298
}
299
}
300
301
/// Get a [`View`] for a specific set of bytes.
302
pub fn push_value_into_buffer(&mut self, bytes: &[u8]) -> View {
303
assert!(bytes.len() <= u32::MAX as usize);
304
305
if bytes.len() <= View::MAX_INLINE_SIZE as usize {
306
View::new_inline(bytes)
307
} else {
308
self.total_buffer_len += bytes.len();
309
310
// We want to make sure that we never have to memcopy between buffers. So if the
311
// current buffer is not large enough, create a new buffer that is large enough and try
312
// to anticipate the larger size.
313
let required_capacity = self.in_progress_buffer.len() + bytes.len();
314
let does_not_fit_in_buffer = self.in_progress_buffer.capacity() < required_capacity;
315
316
// We can only save offsets that are below u32::MAX
317
let offset_will_not_fit = self.in_progress_buffer.len() > u32::MAX as usize;
318
319
if does_not_fit_in_buffer || offset_will_not_fit {
320
// Allocate a new buffer and flush the old buffer
321
let new_capacity = (self.in_progress_buffer.capacity() * 2)
322
.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
323
.max(bytes.len());
324
let in_progress = Vec::with_capacity(new_capacity);
325
let flushed = std::mem::replace(&mut self.in_progress_buffer, in_progress);
326
if !flushed.is_empty() {
327
self.completed_buffers.push(flushed.into())
328
}
329
}
330
331
let offset = self.in_progress_buffer.len() as u32;
332
self.in_progress_buffer.extend_from_slice(bytes);
333
334
let buffer_idx = u32::try_from(self.completed_buffers.len()).unwrap();
335
336
View::new_from_bytes(bytes, buffer_idx, offset)
337
}
338
}
339
340
pub fn extend_null(&mut self, additional: usize) {
341
if self.validity.is_none() && additional > 0 {
342
self.init_validity(false);
343
}
344
self.views
345
.extend(std::iter::repeat_n(View::default(), additional));
346
if let Some(validity) = &mut self.validity {
347
validity.extend_constant(additional, false);
348
}
349
}
350
351
pub fn extend_constant<V: AsRef<T>>(&mut self, additional: usize, value: Option<V>) {
352
if value.is_none() && self.validity.is_none() {
353
self.init_validity(false);
354
}
355
356
if let Some(validity) = &mut self.validity {
357
validity.extend_constant(additional, value.is_some())
358
}
359
360
// Push and pop to get the properly encoded value.
361
// For long string this leads to a dictionary encoding,
362
// as we push the string only once in the buffers
363
let view_value = value
364
.map(|v| {
365
self.push_value_ignore_validity(v);
366
self.views.pop().unwrap()
367
})
368
.unwrap_or_default();
369
self.views
370
.extend(std::iter::repeat_n(view_value, additional));
371
}
372
373
impl_mutable_array_mut_validity!();
374
375
#[inline]
376
pub fn extend_values<I, P>(&mut self, iterator: I)
377
where
378
I: Iterator<Item = P>,
379
P: AsRef<T>,
380
{
381
self.reserve(iterator.size_hint().0);
382
for v in iterator {
383
self.push_value(v)
384
}
385
}
386
387
#[inline]
388
pub fn extend_trusted_len_values<I, P>(&mut self, iterator: I)
389
where
390
I: TrustedLen<Item = P>,
391
P: AsRef<T>,
392
{
393
self.extend_values(iterator)
394
}
395
396
#[inline]
397
pub fn extend<I, P>(&mut self, iterator: I)
398
where
399
I: Iterator<Item = Option<P>>,
400
P: AsRef<T>,
401
{
402
self.reserve(iterator.size_hint().0);
403
for p in iterator {
404
self.push(p)
405
}
406
}
407
408
#[inline]
409
pub fn extend_trusted_len<I, P>(&mut self, iterator: I)
410
where
411
I: TrustedLen<Item = Option<P>>,
412
P: AsRef<T>,
413
{
414
self.extend(iterator)
415
}
416
417
#[inline]
418
pub fn extend_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
419
where
420
I: Iterator<Item = Option<View>>,
421
{
422
self.reserve(iterator.size_hint().0);
423
for p in iterator {
424
match p {
425
Some(v) => self.push_view(v, buffers),
426
None => self.push_null(),
427
}
428
}
429
}
430
431
#[inline]
432
pub fn extend_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
433
where
434
I: TrustedLen<Item = Option<View>>,
435
{
436
self.extend_views(iterator, buffers);
437
}
438
439
#[inline]
440
pub fn extend_non_null_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
441
where
442
I: Iterator<Item = View>,
443
{
444
self.reserve(iterator.size_hint().0);
445
for v in iterator {
446
self.push_view(v, buffers);
447
}
448
}
449
450
#[inline]
451
pub fn extend_non_null_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
452
where
453
I: TrustedLen<Item = View>,
454
{
455
self.extend_non_null_views(iterator, buffers);
456
}
457
458
/// # Safety
459
/// Same as `push_view_unchecked()`.
460
#[inline]
461
pub unsafe fn extend_non_null_views_unchecked<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
462
where
463
I: Iterator<Item = View>,
464
{
465
self.reserve(iterator.size_hint().0);
466
for v in iterator {
467
self.push_view_unchecked(v, buffers);
468
}
469
}
470
471
/// # Safety
472
/// Same as `push_view_unchecked()`.
473
#[inline]
474
pub unsafe fn extend_non_null_views_unchecked_dedupe<I>(
475
&mut self,
476
iterator: I,
477
buffers: &[Buffer<u8>],
478
) where
479
I: Iterator<Item = View>,
480
{
481
self.reserve(iterator.size_hint().0);
482
for v in iterator {
483
self.push_view_unchecked_dedupe(v, buffers);
484
}
485
}
486
487
#[inline]
488
pub fn from_iterator<I, P>(iterator: I) -> Self
489
where
490
I: Iterator<Item = Option<P>>,
491
P: AsRef<T>,
492
{
493
let mut mutable = Self::with_capacity(iterator.size_hint().0);
494
mutable.extend(iterator);
495
mutable
496
}
497
498
pub fn from_values_iter<I, P>(iterator: I) -> Self
499
where
500
I: Iterator<Item = P>,
501
P: AsRef<T>,
502
{
503
let mut mutable = Self::with_capacity(iterator.size_hint().0);
504
mutable.extend_values(iterator);
505
mutable
506
}
507
508
pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
509
Self::from_iterator(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
510
}
511
512
pub fn finish_in_progress(&mut self) -> bool {
513
if !self.in_progress_buffer.is_empty() {
514
self.completed_buffers
515
.push(std::mem::take(&mut self.in_progress_buffer).into());
516
true
517
} else {
518
false
519
}
520
}
521
522
#[inline]
523
pub fn freeze(self) -> BinaryViewArrayGeneric<T> {
524
self.into()
525
}
526
527
#[inline]
528
pub fn freeze_with_dtype(self, dtype: ArrowDataType) -> BinaryViewArrayGeneric<T> {
529
let mut arr: BinaryViewArrayGeneric<T> = self.into();
530
arr.dtype = dtype;
531
arr
532
}
533
534
pub fn take(self) -> (Vec<View>, Vec<Buffer<u8>>) {
535
(self.views, self.completed_buffers)
536
}
537
538
#[inline]
539
pub fn value(&self, i: usize) -> &T {
540
assert!(i < self.len());
541
unsafe { self.value_unchecked(i) }
542
}
543
544
/// Returns the element at index `i`
545
///
546
/// # Safety
547
/// Assumes that the `i < self.len`.
548
#[inline]
549
pub unsafe fn value_unchecked(&self, i: usize) -> &T {
550
self.value_from_view_unchecked(self.views.get_unchecked(i))
551
}
552
553
/// Returns the element indicated by the given view.
554
///
555
/// # Safety
556
/// Assumes the View belongs to this MutableBinaryViewArray.
557
pub unsafe fn value_from_view_unchecked<'a>(&'a self, view: &'a View) -> &'a T {
558
// View layout:
559
// length: 4 bytes
560
// prefix: 4 bytes
561
// buffer_index: 4 bytes
562
// offset: 4 bytes
563
564
// Inlined layout:
565
// length: 4 bytes
566
// data: 12 bytes
567
let len = view.length;
568
let bytes = if len <= 12 {
569
let ptr = view as *const View as *const u8;
570
std::slice::from_raw_parts(ptr.add(4), len as usize)
571
} else {
572
let buffer_idx = view.buffer_idx as usize;
573
let offset = view.offset;
574
575
let data = if buffer_idx == self.completed_buffers.len() {
576
self.in_progress_buffer.as_slice()
577
} else {
578
self.completed_buffers.get_unchecked(buffer_idx)
579
};
580
581
let offset = offset as usize;
582
data.get_unchecked(offset..offset + len as usize)
583
};
584
T::from_bytes_unchecked(bytes)
585
}
586
587
/// Returns an iterator of `&[u8]` over every element of this array, ignoring the validity
588
pub fn values_iter(&self) -> MutableBinaryViewValueIter<'_, T> {
589
MutableBinaryViewValueIter::new(self)
590
}
591
592
pub fn extend_from_array(&mut self, other: &BinaryViewArrayGeneric<T>) {
593
let slf_len = self.len();
594
match (&mut self.validity, other.validity()) {
595
(None, None) => {},
596
(Some(v), None) => v.extend_constant(other.len(), true),
597
(v @ None, Some(other)) => {
598
let mut bm = MutableBitmap::with_capacity(slf_len + other.len());
599
bm.extend_constant(slf_len, true);
600
bm.extend_from_bitmap(other);
601
*v = Some(bm);
602
},
603
(Some(slf), Some(other)) => slf.extend_from_bitmap(other),
604
}
605
606
if other.total_buffer_len() == 0 {
607
self.views.extend(other.views().iter().copied());
608
} else {
609
self.finish_in_progress();
610
611
let buffer_offset = self.completed_buffers().len() as u32;
612
self.completed_buffers
613
.extend(other.data_buffers().iter().cloned());
614
615
self.views.extend(other.views().iter().map(|view| {
616
let mut view = *view;
617
if view.length > View::MAX_INLINE_SIZE {
618
view.buffer_idx += buffer_offset;
619
}
620
view
621
}));
622
623
let new_total_buffer_len = self.total_buffer_len() + other.total_buffer_len();
624
self.total_buffer_len = new_total_buffer_len;
625
}
626
627
self.total_bytes_len = self.total_bytes_len() + other.total_bytes_len();
628
}
629
}
630
631
impl MutableBinaryViewArray<[u8]> {
632
pub fn validate_utf8(&mut self, buffer_offset: usize, views_offset: usize) -> PolarsResult<()> {
633
// Finish the in progress as it might be required for validation.
634
let pushed = self.finish_in_progress();
635
// views are correct
636
unsafe {
637
validate_views_utf8_only(
638
&self.views[views_offset..],
639
&self.completed_buffers,
640
buffer_offset,
641
)?
642
}
643
// Restore in-progress buffer as we don't want to get too small buffers
644
if pushed {
645
if let Some(last) = self.completed_buffers.pop() {
646
self.in_progress_buffer = last.into_mut().right().unwrap();
647
}
648
}
649
Ok(())
650
}
651
652
/// Extend from a `buffer` and `length` of items given some statistics about the lengths.
653
///
654
/// This will attempt to dispatch to several optimized implementations.
655
///
656
/// # Safety
657
///
658
/// This is safe if the statistics are correct.
659
pub unsafe fn extend_from_lengths_with_stats(
660
&mut self,
661
buffer: &[u8],
662
lengths_iterator: impl Clone + ExactSizeIterator<Item = usize>,
663
min_length: usize,
664
max_length: usize,
665
sum_length: usize,
666
) {
667
let num_items = lengths_iterator.len();
668
669
if num_items == 0 {
670
return;
671
}
672
673
#[cfg(debug_assertions)]
674
{
675
let (min, max, sum) = lengths_iterator.clone().map(|v| (v, v, v)).fold(
676
(usize::MAX, usize::MIN, 0usize),
677
|(cmin, cmax, csum), (emin, emax, esum)| {
678
(cmin.min(emin), cmax.max(emax), csum + esum)
679
},
680
);
681
682
assert_eq!(min, min_length);
683
assert_eq!(max, max_length);
684
assert_eq!(sum, sum_length);
685
}
686
687
assert!(sum_length <= buffer.len());
688
689
let mut buffer_offset = 0;
690
if min_length > View::MAX_INLINE_SIZE as usize
691
&& (num_items == 1 || sum_length + self.in_progress_buffer.len() <= u32::MAX as usize)
692
{
693
let buffer_idx = self.completed_buffers().len() as u32;
694
let in_progress_buffer_offset = self.in_progress_buffer.len();
695
696
self.total_bytes_len += sum_length;
697
self.total_buffer_len += sum_length;
698
699
self.in_progress_buffer
700
.extend_from_slice(&buffer[..sum_length]);
701
self.views.extend(lengths_iterator.map(|length| {
702
// SAFETY: We asserted before that the sum of all lengths is smaller or equal to
703
// the buffer length.
704
let view_buffer =
705
unsafe { buffer.get_unchecked(buffer_offset..buffer_offset + length) };
706
707
// SAFETY: We know that the minimum length > View::MAX_INLINE_SIZE. Therefore, this
708
// length is > View::MAX_INLINE_SIZE.
709
let view = unsafe {
710
View::new_noninline_unchecked(
711
view_buffer,
712
buffer_idx,
713
(buffer_offset + in_progress_buffer_offset) as u32,
714
)
715
};
716
buffer_offset += length;
717
view
718
}));
719
} else if max_length <= View::MAX_INLINE_SIZE as usize {
720
self.total_bytes_len += sum_length;
721
722
// If the min and max are the same, we can dispatch to the optimized SIMD
723
// implementation.
724
if min_length == max_length {
725
let length = min_length;
726
if length == 0 {
727
self.views
728
.resize(self.views.len() + num_items, View::new_inline(&[]));
729
} else {
730
View::extend_with_inlinable_strided(
731
&mut self.views,
732
&buffer[..length * num_items],
733
length as u8,
734
);
735
}
736
} else {
737
self.views.extend(lengths_iterator.map(|length| {
738
// SAFETY: We asserted before that the sum of all lengths is smaller or equal
739
// to the buffer length.
740
let view_buffer =
741
unsafe { buffer.get_unchecked(buffer_offset..buffer_offset + length) };
742
743
// SAFETY: We know that each view has a length <= View::MAX_INLINE_SIZE because
744
// the maximum length is <= View::MAX_INLINE_SIZE
745
let view = unsafe { View::new_inline_unchecked(view_buffer) };
746
747
buffer_offset += length;
748
749
view
750
}));
751
}
752
} else {
753
// If all fails, just fall back to a base implementation.
754
self.reserve(num_items);
755
for length in lengths_iterator {
756
let value = &buffer[buffer_offset..buffer_offset + length];
757
buffer_offset += length;
758
self.push_value(value);
759
}
760
}
761
}
762
763
/// Extend from a `buffer` and `length` of items.
764
///
765
/// This will attempt to dispatch to several optimized implementations.
766
#[inline]
767
pub fn extend_from_lengths(
768
&mut self,
769
buffer: &[u8],
770
lengths_iterator: impl Clone + ExactSizeIterator<Item = usize>,
771
) {
772
let (min, max, sum) = lengths_iterator.clone().map(|v| (v, v, v)).fold(
773
(usize::MAX, usize::MIN, 0usize),
774
|(cmin, cmax, csum), (emin, emax, esum)| (cmin.min(emin), cmax.max(emax), csum + esum),
775
);
776
777
// SAFETY: We just collected the right stats.
778
unsafe { self.extend_from_lengths_with_stats(buffer, lengths_iterator, min, max, sum) }
779
}
780
}
781
782
impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<Option<P>> for MutableBinaryViewArray<T> {
783
#[inline]
784
fn extend<I: IntoIterator<Item = Option<P>>>(&mut self, iter: I) {
785
Self::extend(self, iter.into_iter())
786
}
787
}
788
789
impl<T: ViewType + ?Sized, P: AsRef<T>> FromIterator<Option<P>> for MutableBinaryViewArray<T> {
790
#[inline]
791
fn from_iter<I: IntoIterator<Item = Option<P>>>(iter: I) -> Self {
792
Self::from_iterator(iter.into_iter())
793
}
794
}
795
796
impl<T: ViewType + ?Sized> MutableArray for MutableBinaryViewArray<T> {
797
fn dtype(&self) -> &ArrowDataType {
798
T::dtype()
799
}
800
801
fn len(&self) -> usize {
802
MutableBinaryViewArray::len(self)
803
}
804
805
fn validity(&self) -> Option<&MutableBitmap> {
806
self.validity.as_ref()
807
}
808
809
fn as_box(&mut self) -> Box<dyn Array> {
810
let mutable = std::mem::take(self);
811
let arr: BinaryViewArrayGeneric<T> = mutable.into();
812
arr.boxed()
813
}
814
815
fn as_any(&self) -> &dyn Any {
816
self
817
}
818
819
fn as_mut_any(&mut self) -> &mut dyn Any {
820
self
821
}
822
823
fn push_null(&mut self) {
824
MutableBinaryViewArray::push_null(self)
825
}
826
827
fn reserve(&mut self, additional: usize) {
828
MutableBinaryViewArray::reserve(self, additional)
829
}
830
831
fn shrink_to_fit(&mut self) {
832
self.views.shrink_to_fit()
833
}
834
}
835
836
impl<T: ViewType + ?Sized, P: AsRef<T>> TryExtend<Option<P>> for MutableBinaryViewArray<T> {
837
/// This is infallible and is implemented for consistency with all other types
838
#[inline]
839
fn try_extend<I: IntoIterator<Item = Option<P>>>(&mut self, iter: I) -> PolarsResult<()> {
840
self.extend(iter.into_iter());
841
Ok(())
842
}
843
}
844
845
impl<T: ViewType + ?Sized, P: AsRef<T>> TryPush<Option<P>> for MutableBinaryViewArray<T> {
846
/// This is infallible and is implemented for consistency with all other types
847
#[inline(always)]
848
fn try_push(&mut self, item: Option<P>) -> PolarsResult<()> {
849
self.push(item.as_ref().map(|p| p.as_ref()));
850
Ok(())
851
}
852
}
853
854
#[cfg(test)]
855
mod tests {
856
use super::*;
857
858
fn roundtrip(values: &[&[u8]]) -> bool {
859
let buffer = values
860
.iter()
861
.flat_map(|v| v.iter().copied())
862
.collect::<Vec<u8>>();
863
let lengths = values.iter().map(|v| v.len()).collect::<Vec<usize>>();
864
let mut bv = MutableBinaryViewArray::<[u8]>::with_capacity(values.len());
865
866
bv.extend_from_lengths(&buffer[..], lengths.into_iter());
867
868
&bv.values_iter().collect::<Vec<&[u8]>>()[..] == values
869
}
870
871
#[test]
872
fn extend_with_lengths_basic() {
873
assert!(roundtrip(&[]));
874
assert!(roundtrip(&[b"abc"]));
875
assert!(roundtrip(&[
876
b"a_very_very_long_string_that_is_not_inlinable"
877
]));
878
assert!(roundtrip(&[
879
b"abc",
880
b"a_very_very_long_string_that_is_not_inlinable"
881
]));
882
}
883
884
#[test]
885
fn extend_with_inlinable_fastpath() {
886
assert!(roundtrip(&[b"abc", b"defg", b"hix"]));
887
assert!(roundtrip(&[b"abc", b"defg", b"hix", b"xyza1234abcd"]));
888
}
889
890
#[test]
891
fn extend_with_inlinable_eq_len_fastpath() {
892
assert!(roundtrip(&[b"abc", b"def", b"hix"]));
893
assert!(roundtrip(&[b"abc", b"def", b"hix", b"xyz"]));
894
}
895
896
#[test]
897
fn extend_with_not_inlinable_fastpath() {
898
assert!(roundtrip(&[
899
b"a_very_long_string123",
900
b"a_longer_string_than_the_previous"
901
]));
902
}
903
}
904
905