Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/array/binview/mutable.rs
8363 views
1
use std::any::Any;
2
use std::fmt::{Debug, Formatter};
3
use std::ops::Deref;
4
5
use hashbrown::hash_map::Entry;
6
use polars_buffer::Buffer;
7
use polars_error::PolarsResult;
8
use polars_utils::aliases::{InitHashMaps, PlHashMap};
9
10
use crate::array::binview::iterator::MutableBinaryViewValueIter;
11
use crate::array::binview::view::validate_views_utf8_only;
12
use crate::array::binview::{
13
BinaryViewArrayGeneric, DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE, ViewType,
14
};
15
use crate::array::{Array, MutableArray, TryExtend, TryPush, View};
16
use crate::bitmap::MutableBitmap;
17
use crate::datatypes::ArrowDataType;
18
use crate::legacy::trusted_len::TrustedLenPush;
19
use crate::trusted_len::TrustedLen;
20
21
// Invariants:
22
//
23
// - Each view must point to a valid slice of a buffer
24
// - `total_buffer_len` must be equal to `completed_buffers.iter().map(Vec::len).sum()`
25
// - `total_bytes_len` must be equal to `views.iter().map(View::len).sum()`
26
pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
27
pub(crate) views: Vec<View>,
28
pub(crate) completed_buffers: Vec<Buffer<u8>>,
29
pub(crate) in_progress_buffer: Vec<u8>,
30
pub(crate) validity: Option<MutableBitmap>,
31
pub(crate) phantom: std::marker::PhantomData<T>,
32
/// Total bytes length if we would concatenate them all.
33
pub(crate) total_bytes_len: usize,
34
/// Total bytes in the buffer (excluding remaining capacity)
35
pub(crate) total_buffer_len: usize,
36
/// Mapping from `Buffer::deref()` to index in `completed_buffers`.
37
/// Used in `push_view()`.
38
pub(crate) stolen_buffers: PlHashMap<usize, u32>,
39
}
40
41
impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
42
fn clone(&self) -> Self {
43
Self {
44
views: self.views.clone(),
45
completed_buffers: self.completed_buffers.clone(),
46
in_progress_buffer: self.in_progress_buffer.clone(),
47
validity: self.validity.clone(),
48
phantom: Default::default(),
49
total_bytes_len: self.total_bytes_len,
50
total_buffer_len: self.total_buffer_len,
51
stolen_buffers: PlHashMap::new(),
52
}
53
}
54
}
55
56
impl<T: ViewType + ?Sized> Debug for MutableBinaryViewArray<T> {
57
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
58
write!(f, "mutable-binview{:?}", T::DATA_TYPE)
59
}
60
}
61
62
impl<T: ViewType + ?Sized> Default for MutableBinaryViewArray<T> {
63
fn default() -> Self {
64
Self::with_capacity(0)
65
}
66
}
67
68
impl<T: ViewType + ?Sized> From<MutableBinaryViewArray<T>> for BinaryViewArrayGeneric<T> {
69
fn from(mut value: MutableBinaryViewArray<T>) -> Self {
70
value.finish_in_progress();
71
unsafe {
72
Self::new_unchecked(
73
T::DATA_TYPE,
74
value.views.into(),
75
Buffer::from(value.completed_buffers),
76
value.validity.map(|b| b.into()),
77
Some(value.total_bytes_len),
78
value.total_buffer_len,
79
)
80
}
81
}
82
}
83
84
impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
85
pub fn new() -> Self {
86
Self::default()
87
}
88
89
pub fn with_capacity(capacity: usize) -> Self {
90
Self {
91
views: Vec::with_capacity(capacity),
92
completed_buffers: vec![],
93
in_progress_buffer: vec![],
94
validity: None,
95
phantom: Default::default(),
96
total_buffer_len: 0,
97
total_bytes_len: 0,
98
stolen_buffers: PlHashMap::new(),
99
}
100
}
101
102
/// Get a mutable reference to the [`Vec`] of [`View`]s in this [`MutableBinaryViewArray`].
103
///
104
/// # Safety
105
///
106
/// This is safe as long as any mutation of the [`Vec`] does not break any invariants of the
107
/// [`MutableBinaryViewArray`] before it is read again.
108
#[inline]
109
pub unsafe fn views_mut(&mut self) -> &mut Vec<View> {
110
&mut self.views
111
}
112
113
/// Set the `total_bytes_len` of the [`MutableBinaryViewArray`]
114
///
115
/// # Safety
116
///
117
/// This should not break invariants of the [`MutableBinaryViewArray`]
118
#[inline]
119
pub unsafe fn set_total_bytes_len(&mut self, value: usize) {
120
#[cfg(debug_assertions)]
121
{
122
let actual_length: usize = self.views().iter().map(|v| v.length as usize).sum();
123
assert_eq!(value, actual_length);
124
}
125
126
self.total_bytes_len = value;
127
}
128
129
pub fn total_bytes_len(&self) -> usize {
130
self.total_bytes_len
131
}
132
133
pub fn total_buffer_len(&self) -> usize {
134
self.total_buffer_len
135
}
136
137
#[inline]
138
pub fn views(&self) -> &[View] {
139
&self.views
140
}
141
142
#[inline]
143
pub fn completed_buffers(&self) -> &[Buffer<u8>] {
144
&self.completed_buffers
145
}
146
147
pub fn validity(&mut self) -> Option<&mut MutableBitmap> {
148
self.validity.as_mut()
149
}
150
151
/// Reserves `additional` elements and `additional_buffer` on the buffer.
152
pub fn reserve(&mut self, additional: usize) {
153
self.views.reserve(additional);
154
}
155
156
#[inline]
157
pub fn len(&self) -> usize {
158
self.views.len()
159
}
160
161
#[inline]
162
pub fn capacity(&self) -> usize {
163
self.views.capacity()
164
}
165
166
fn init_validity(&mut self, unset_last: bool) {
167
let mut validity = MutableBitmap::with_capacity(self.views.capacity());
168
validity.extend_constant(self.len(), true);
169
if unset_last {
170
validity.set(self.len() - 1, false);
171
}
172
self.validity = Some(validity);
173
}
174
175
/// # Safety
176
/// - caller must allocate enough capacity
177
/// - caller must ensure the view and buffers match.
178
/// - The array must not have validity.
179
pub(crate) unsafe fn push_view_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
180
let len = v.length;
181
if len <= View::MAX_INLINE_SIZE {
182
debug_assert!(self.views.capacity() > self.views.len());
183
self.views.push_unchecked(v);
184
self.total_bytes_len += len as usize;
185
} else {
186
let data = buffers.get_unchecked(v.buffer_idx as usize);
187
let offset = v.offset as usize;
188
let bytes = data.get_unchecked(offset..offset + len as usize);
189
let t = T::from_bytes_unchecked(bytes);
190
self.push_value_ignore_validity(t)
191
}
192
}
193
194
/// # Safety
195
/// - caller must allocate enough capacity
196
/// - caller must ensure the view and buffers match.
197
/// - The array must not have validity.
198
/// - caller must not mix use this function with other push functions.
199
pub unsafe fn push_view_unchecked_dedupe(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
200
let len = v.length;
201
self.total_bytes_len += len as usize;
202
if len <= View::MAX_INLINE_SIZE {
203
self.views.push_unchecked(v);
204
} else {
205
let buffer = buffers.get_unchecked(v.buffer_idx as usize);
206
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
207
Entry::Occupied(entry) => *entry.get(),
208
Entry::Vacant(entry) => {
209
let idx = self.completed_buffers.len() as u32;
210
entry.insert(idx);
211
self.completed_buffers.push(buffer.clone());
212
self.total_buffer_len += buffer.len();
213
idx
214
},
215
};
216
v.buffer_idx = idx;
217
self.views.push_unchecked(v);
218
}
219
}
220
221
pub fn push_view(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
222
let len = v.length;
223
self.total_bytes_len += len as usize;
224
if len <= View::MAX_INLINE_SIZE {
225
self.views.push(v);
226
} else {
227
// Do no mix use of push_view and push_value_ignore_validity -
228
// it causes fragmentation.
229
self.finish_in_progress();
230
231
let buffer = &buffers[v.buffer_idx as usize];
232
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
233
Entry::Occupied(entry) => {
234
let idx = *entry.get();
235
let target_buffer = &self.completed_buffers[idx as usize];
236
debug_assert_eq!(buffer, target_buffer);
237
idx
238
},
239
Entry::Vacant(entry) => {
240
let idx = self.completed_buffers.len() as u32;
241
entry.insert(idx);
242
self.completed_buffers.push(buffer.clone());
243
self.total_buffer_len += buffer.len();
244
idx
245
},
246
};
247
v.buffer_idx = idx;
248
self.views.push(v);
249
}
250
if let Some(validity) = &mut self.validity {
251
validity.push(true)
252
}
253
}
254
255
#[inline]
256
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
257
let bytes = value.as_ref().to_bytes();
258
self.total_bytes_len += bytes.len();
259
let view = self.push_value_into_buffer(bytes);
260
self.views.push(view);
261
}
262
263
#[inline]
264
pub fn push_buffer(&mut self, buffer: Buffer<u8>) -> u32 {
265
self.finish_in_progress();
266
267
let buffer_idx = self.completed_buffers.len();
268
self.total_buffer_len += buffer.len();
269
self.completed_buffers.push(buffer);
270
buffer_idx as u32
271
}
272
273
#[inline]
274
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
275
if let Some(validity) = &mut self.validity {
276
validity.push(true)
277
}
278
self.push_value_ignore_validity(value)
279
}
280
281
#[inline]
282
pub fn push<V: AsRef<T>>(&mut self, value: Option<V>) {
283
if let Some(value) = value {
284
self.push_value(value)
285
} else {
286
self.push_null()
287
}
288
}
289
290
#[inline]
291
pub fn push_null(&mut self) {
292
self.views.push(View::default());
293
match &mut self.validity {
294
Some(validity) => validity.push(false),
295
None => self.init_validity(true),
296
}
297
}
298
299
/// Get a [`View`] for a specific set of bytes.
300
pub fn push_value_into_buffer(&mut self, bytes: &[u8]) -> View {
301
assert!(bytes.len() <= u32::MAX as usize);
302
303
if bytes.len() <= View::MAX_INLINE_SIZE as usize {
304
View::new_inline(bytes)
305
} else {
306
self.total_buffer_len += bytes.len();
307
308
// We want to make sure that we never have to memcopy between buffers. So if the
309
// current buffer is not large enough, create a new buffer that is large enough and try
310
// to anticipate the larger size.
311
let required_capacity = self.in_progress_buffer.len() + bytes.len();
312
let does_not_fit_in_buffer = self.in_progress_buffer.capacity() < required_capacity;
313
314
// We can only save offsets that are below u32::MAX
315
let offset_will_not_fit = self.in_progress_buffer.len() > u32::MAX as usize;
316
317
if does_not_fit_in_buffer || offset_will_not_fit {
318
// Allocate a new buffer and flush the old buffer
319
let new_capacity = (self.in_progress_buffer.capacity() * 2)
320
.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
321
.max(bytes.len());
322
let in_progress = Vec::with_capacity(new_capacity);
323
let flushed = std::mem::replace(&mut self.in_progress_buffer, in_progress);
324
if !flushed.is_empty() {
325
self.completed_buffers.push(flushed.into())
326
}
327
}
328
329
let offset = self.in_progress_buffer.len() as u32;
330
self.in_progress_buffer.extend_from_slice(bytes);
331
332
let buffer_idx = u32::try_from(self.completed_buffers.len()).unwrap();
333
334
View::new_from_bytes(bytes, buffer_idx, offset)
335
}
336
}
337
338
pub fn extend_null(&mut self, additional: usize) {
339
if self.validity.is_none() && additional > 0 {
340
self.init_validity(false);
341
}
342
self.views
343
.extend(std::iter::repeat_n(View::default(), additional));
344
if let Some(validity) = &mut self.validity {
345
validity.extend_constant(additional, false);
346
}
347
}
348
349
pub fn extend_constant<V: AsRef<T>>(&mut self, additional: usize, value: Option<V>) {
350
if value.is_none() && self.validity.is_none() {
351
self.init_validity(false);
352
}
353
354
if let Some(validity) = &mut self.validity {
355
validity.extend_constant(additional, value.is_some())
356
}
357
358
// Push and pop to get the properly encoded value.
359
// For long string this leads to a dictionary encoding,
360
// as we push the string only once in the buffers
361
if let Some(bytes) = value {
362
let view = self.push_value_into_buffer(bytes.as_ref().to_bytes());
363
self.views.extend(std::iter::repeat_n(view, additional));
364
self.total_bytes_len += view.length as usize * additional;
365
}
366
}
367
368
impl_mutable_array_mut_validity!();
369
370
#[inline]
371
pub fn extend_values<I, P>(&mut self, iterator: I)
372
where
373
I: Iterator<Item = P>,
374
P: AsRef<T>,
375
{
376
self.reserve(iterator.size_hint().0);
377
for v in iterator {
378
self.push_value(v)
379
}
380
}
381
382
#[inline]
383
pub fn extend_trusted_len_values<I, P>(&mut self, iterator: I)
384
where
385
I: TrustedLen<Item = P>,
386
P: AsRef<T>,
387
{
388
self.extend_values(iterator)
389
}
390
391
#[inline]
392
pub fn extend<I, P>(&mut self, iterator: I)
393
where
394
I: Iterator<Item = Option<P>>,
395
P: AsRef<T>,
396
{
397
self.reserve(iterator.size_hint().0);
398
for p in iterator {
399
self.push(p)
400
}
401
}
402
403
#[inline]
404
pub fn extend_trusted_len<I, P>(&mut self, iterator: I)
405
where
406
I: TrustedLen<Item = Option<P>>,
407
P: AsRef<T>,
408
{
409
self.extend(iterator)
410
}
411
412
#[inline]
413
pub fn extend_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
414
where
415
I: Iterator<Item = Option<View>>,
416
{
417
self.reserve(iterator.size_hint().0);
418
for p in iterator {
419
match p {
420
Some(v) => self.push_view(v, buffers),
421
None => self.push_null(),
422
}
423
}
424
}
425
426
#[inline]
427
pub fn extend_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
428
where
429
I: TrustedLen<Item = Option<View>>,
430
{
431
self.extend_views(iterator, buffers);
432
}
433
434
#[inline]
435
pub fn extend_non_null_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
436
where
437
I: Iterator<Item = View>,
438
{
439
self.reserve(iterator.size_hint().0);
440
for v in iterator {
441
self.push_view(v, buffers);
442
}
443
}
444
445
#[inline]
446
pub fn extend_non_null_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
447
where
448
I: TrustedLen<Item = View>,
449
{
450
self.extend_non_null_views(iterator, buffers);
451
}
452
453
/// # Safety
454
/// Same as `push_view_unchecked()`.
455
#[inline]
456
pub unsafe fn extend_non_null_views_unchecked<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
457
where
458
I: Iterator<Item = View>,
459
{
460
self.reserve(iterator.size_hint().0);
461
for v in iterator {
462
self.push_view_unchecked(v, buffers);
463
}
464
}
465
466
/// # Safety
467
/// Same as `push_view_unchecked()`.
468
#[inline]
469
pub unsafe fn extend_non_null_views_unchecked_dedupe<I>(
470
&mut self,
471
iterator: I,
472
buffers: &[Buffer<u8>],
473
) where
474
I: Iterator<Item = View>,
475
{
476
self.reserve(iterator.size_hint().0);
477
for v in iterator {
478
self.push_view_unchecked_dedupe(v, buffers);
479
}
480
}
481
482
#[inline]
483
pub fn from_iterator<I, P>(iterator: I) -> Self
484
where
485
I: Iterator<Item = Option<P>>,
486
P: AsRef<T>,
487
{
488
let mut mutable = Self::with_capacity(iterator.size_hint().0);
489
mutable.extend(iterator);
490
mutable
491
}
492
493
pub fn from_values_iter<I, P>(iterator: I) -> Self
494
where
495
I: Iterator<Item = P>,
496
P: AsRef<T>,
497
{
498
let mut mutable = Self::with_capacity(iterator.size_hint().0);
499
mutable.extend_values(iterator);
500
mutable
501
}
502
503
pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
504
Self::from_iterator(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
505
}
506
507
pub fn finish_in_progress(&mut self) -> bool {
508
if !self.in_progress_buffer.is_empty() {
509
self.completed_buffers
510
.push(std::mem::take(&mut self.in_progress_buffer).into());
511
true
512
} else {
513
false
514
}
515
}
516
517
#[inline]
518
pub fn freeze(self) -> BinaryViewArrayGeneric<T> {
519
self.into()
520
}
521
522
#[inline]
523
pub fn freeze_with_dtype(self, dtype: ArrowDataType) -> BinaryViewArrayGeneric<T> {
524
let mut arr: BinaryViewArrayGeneric<T> = self.into();
525
arr.dtype = dtype;
526
arr
527
}
528
529
pub fn take(self) -> (Vec<View>, Vec<Buffer<u8>>) {
530
(self.views, self.completed_buffers)
531
}
532
533
#[inline]
534
pub fn value(&self, i: usize) -> &T {
535
assert!(i < self.len());
536
unsafe { self.value_unchecked(i) }
537
}
538
539
/// Returns the element at index `i`
540
///
541
/// # Safety
542
/// Assumes that the `i < self.len`.
543
#[inline]
544
pub unsafe fn value_unchecked(&self, i: usize) -> &T {
545
self.value_from_view_unchecked(self.views.get_unchecked(i))
546
}
547
548
/// Returns the element indicated by the given view.
549
///
550
/// # Safety
551
/// Assumes the View belongs to this MutableBinaryViewArray.
552
pub unsafe fn value_from_view_unchecked<'a>(&'a self, view: &'a View) -> &'a T {
553
// View layout:
554
// length: 4 bytes
555
// prefix: 4 bytes
556
// buffer_index: 4 bytes
557
// offset: 4 bytes
558
559
// Inlined layout:
560
// length: 4 bytes
561
// data: 12 bytes
562
let len = view.length;
563
let bytes = if len <= View::MAX_INLINE_SIZE {
564
let ptr = view as *const View as *const u8;
565
std::slice::from_raw_parts(ptr.add(4), len as usize)
566
} else {
567
let buffer_idx = view.buffer_idx as usize;
568
let offset = view.offset;
569
570
let data = if buffer_idx == self.completed_buffers.len() {
571
self.in_progress_buffer.as_slice()
572
} else {
573
self.completed_buffers.get_unchecked(buffer_idx)
574
};
575
576
let offset = offset as usize;
577
data.get_unchecked(offset..offset + len as usize)
578
};
579
T::from_bytes_unchecked(bytes)
580
}
581
582
/// Returns an iterator of `&[u8]` over every element of this array, ignoring the validity
583
pub fn values_iter(&self) -> MutableBinaryViewValueIter<'_, T> {
584
MutableBinaryViewValueIter::new(self)
585
}
586
587
pub fn extend_from_array(&mut self, other: &BinaryViewArrayGeneric<T>) {
588
let slf_len = self.len();
589
match (&mut self.validity, other.validity()) {
590
(None, None) => {},
591
(Some(v), None) => v.extend_constant(other.len(), true),
592
(v @ None, Some(other)) => {
593
let mut bm = MutableBitmap::with_capacity(slf_len + other.len());
594
bm.extend_constant(slf_len, true);
595
bm.extend_from_bitmap(other);
596
*v = Some(bm);
597
},
598
(Some(slf), Some(other)) => slf.extend_from_bitmap(other),
599
}
600
601
if other.total_buffer_len() == 0 {
602
self.views.extend(other.views().iter().copied());
603
} else {
604
self.finish_in_progress();
605
606
let buffer_offset = self.completed_buffers().len() as u32;
607
self.completed_buffers
608
.extend(other.data_buffers().iter().cloned());
609
610
self.views.extend(other.views().iter().map(|view| {
611
let mut view = *view;
612
if view.length > View::MAX_INLINE_SIZE {
613
view.buffer_idx += buffer_offset;
614
}
615
view
616
}));
617
618
let new_total_buffer_len = self.total_buffer_len() + other.total_buffer_len();
619
self.total_buffer_len = new_total_buffer_len;
620
}
621
622
self.total_bytes_len = self.total_bytes_len() + other.total_bytes_len();
623
}
624
}
625
626
impl MutableBinaryViewArray<[u8]> {
627
pub fn validate_utf8(&mut self, buffer_offset: usize, views_offset: usize) -> PolarsResult<()> {
628
// Finish the in progress as it might be required for validation.
629
let pushed = self.finish_in_progress();
630
// views are correct
631
unsafe {
632
validate_views_utf8_only(
633
&self.views[views_offset..],
634
&self.completed_buffers,
635
buffer_offset,
636
)?
637
}
638
// Restore in-progress buffer as we don't want to get too small buffers
639
if pushed {
640
if let Some(last) = self.completed_buffers.pop() {
641
self.in_progress_buffer = last.into_mut().right().unwrap();
642
}
643
}
644
Ok(())
645
}
646
647
/// Extend from a `buffer` and `length` of items given some statistics about the lengths.
648
///
649
/// This will attempt to dispatch to several optimized implementations.
650
///
651
/// # Safety
652
///
653
/// This is safe if the statistics are correct.
654
pub unsafe fn extend_from_lengths_with_stats(
655
&mut self,
656
buffer: &[u8],
657
lengths_iterator: impl Clone + ExactSizeIterator<Item = usize>,
658
min_length: usize,
659
max_length: usize,
660
sum_length: usize,
661
) {
662
let num_items = lengths_iterator.len();
663
664
if num_items == 0 {
665
return;
666
}
667
668
#[cfg(debug_assertions)]
669
{
670
let (min, max, sum) = lengths_iterator.clone().map(|v| (v, v, v)).fold(
671
(usize::MAX, usize::MIN, 0usize),
672
|(cmin, cmax, csum), (emin, emax, esum)| {
673
(cmin.min(emin), cmax.max(emax), csum + esum)
674
},
675
);
676
677
assert_eq!(min, min_length);
678
assert_eq!(max, max_length);
679
assert_eq!(sum, sum_length);
680
}
681
682
assert!(sum_length <= buffer.len());
683
684
let mut buffer_offset = 0;
685
if min_length > View::MAX_INLINE_SIZE as usize
686
&& (num_items == 1 || sum_length + self.in_progress_buffer.len() <= u32::MAX as usize)
687
{
688
let buffer_idx = self.completed_buffers().len() as u32;
689
let in_progress_buffer_offset = self.in_progress_buffer.len();
690
691
self.total_bytes_len += sum_length;
692
self.total_buffer_len += sum_length;
693
694
self.in_progress_buffer
695
.extend_from_slice(&buffer[..sum_length]);
696
self.views.extend(lengths_iterator.map(|length| {
697
// SAFETY: We asserted before that the sum of all lengths is smaller or equal to
698
// the buffer length.
699
let view_buffer =
700
unsafe { buffer.get_unchecked(buffer_offset..buffer_offset + length) };
701
702
// SAFETY: We know that the minimum length > View::MAX_INLINE_SIZE. Therefore, this
703
// length is > View::MAX_INLINE_SIZE.
704
let view = unsafe {
705
View::new_noninline_unchecked(
706
view_buffer,
707
buffer_idx,
708
(buffer_offset + in_progress_buffer_offset) as u32,
709
)
710
};
711
buffer_offset += length;
712
view
713
}));
714
} else if max_length <= View::MAX_INLINE_SIZE as usize {
715
self.total_bytes_len += sum_length;
716
717
// If the min and max are the same, we can dispatch to the optimized SIMD
718
// implementation.
719
if min_length == max_length {
720
let length = min_length;
721
if length == 0 {
722
self.views
723
.resize(self.views.len() + num_items, View::new_inline(&[]));
724
} else {
725
View::extend_with_inlinable_strided(
726
&mut self.views,
727
&buffer[..length * num_items],
728
length as u8,
729
);
730
}
731
} else {
732
self.views.extend(lengths_iterator.map(|length| {
733
// SAFETY: We asserted before that the sum of all lengths is smaller or equal
734
// to the buffer length.
735
let view_buffer =
736
unsafe { buffer.get_unchecked(buffer_offset..buffer_offset + length) };
737
738
// SAFETY: We know that each view has a length <= View::MAX_INLINE_SIZE because
739
// the maximum length is <= View::MAX_INLINE_SIZE
740
let view = unsafe { View::new_inline_unchecked(view_buffer) };
741
742
buffer_offset += length;
743
744
view
745
}));
746
}
747
} else {
748
// If all fails, just fall back to a base implementation.
749
self.reserve(num_items);
750
for length in lengths_iterator {
751
let value = &buffer[buffer_offset..buffer_offset + length];
752
buffer_offset += length;
753
self.push_value(value);
754
}
755
}
756
}
757
758
/// Extend from a `buffer` and `length` of items.
759
///
760
/// This will attempt to dispatch to several optimized implementations.
761
#[inline]
762
pub fn extend_from_lengths(
763
&mut self,
764
buffer: &[u8],
765
lengths_iterator: impl Clone + ExactSizeIterator<Item = usize>,
766
) {
767
let (min, max, sum) = lengths_iterator.clone().map(|v| (v, v, v)).fold(
768
(usize::MAX, usize::MIN, 0usize),
769
|(cmin, cmax, csum), (emin, emax, esum)| (cmin.min(emin), cmax.max(emax), csum + esum),
770
);
771
772
// SAFETY: We just collected the right stats.
773
unsafe { self.extend_from_lengths_with_stats(buffer, lengths_iterator, min, max, sum) }
774
}
775
}
776
777
impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<Option<P>> for MutableBinaryViewArray<T> {
778
#[inline]
779
fn extend<I: IntoIterator<Item = Option<P>>>(&mut self, iter: I) {
780
Self::extend(self, iter.into_iter())
781
}
782
}
783
784
impl<T: ViewType + ?Sized, P: AsRef<T>> FromIterator<Option<P>> for MutableBinaryViewArray<T> {
785
#[inline]
786
fn from_iter<I: IntoIterator<Item = Option<P>>>(iter: I) -> Self {
787
Self::from_iterator(iter.into_iter())
788
}
789
}
790
791
impl<T: ViewType + ?Sized> MutableArray for MutableBinaryViewArray<T> {
792
fn dtype(&self) -> &ArrowDataType {
793
T::dtype()
794
}
795
796
fn len(&self) -> usize {
797
MutableBinaryViewArray::len(self)
798
}
799
800
fn validity(&self) -> Option<&MutableBitmap> {
801
self.validity.as_ref()
802
}
803
804
fn as_box(&mut self) -> Box<dyn Array> {
805
let mutable = std::mem::take(self);
806
let arr: BinaryViewArrayGeneric<T> = mutable.into();
807
arr.boxed()
808
}
809
810
fn as_any(&self) -> &dyn Any {
811
self
812
}
813
814
fn as_mut_any(&mut self) -> &mut dyn Any {
815
self
816
}
817
818
fn push_null(&mut self) {
819
MutableBinaryViewArray::push_null(self)
820
}
821
822
fn reserve(&mut self, additional: usize) {
823
MutableBinaryViewArray::reserve(self, additional)
824
}
825
826
fn shrink_to_fit(&mut self) {
827
self.views.shrink_to_fit()
828
}
829
}
830
831
impl<T: ViewType + ?Sized, P: AsRef<T>> TryExtend<Option<P>> for MutableBinaryViewArray<T> {
832
/// This is infallible and is implemented for consistency with all other types
833
#[inline]
834
fn try_extend<I: IntoIterator<Item = Option<P>>>(&mut self, iter: I) -> PolarsResult<()> {
835
self.extend(iter.into_iter());
836
Ok(())
837
}
838
}
839
840
impl<T: ViewType + ?Sized, P: AsRef<T>> TryPush<Option<P>> for MutableBinaryViewArray<T> {
841
/// This is infallible and is implemented for consistency with all other types
842
#[inline(always)]
843
fn try_push(&mut self, item: Option<P>) -> PolarsResult<()> {
844
self.push(item.as_ref().map(|p| p.as_ref()));
845
Ok(())
846
}
847
}
848
849
#[cfg(test)]
850
mod tests {
851
use super::*;
852
853
fn roundtrip(values: &[&[u8]]) -> bool {
854
let buffer = values
855
.iter()
856
.flat_map(|v| v.iter().copied())
857
.collect::<Vec<u8>>();
858
let lengths = values.iter().map(|v| v.len()).collect::<Vec<usize>>();
859
let mut bv = MutableBinaryViewArray::<[u8]>::with_capacity(values.len());
860
861
bv.extend_from_lengths(&buffer[..], lengths.into_iter());
862
863
&bv.values_iter().collect::<Vec<&[u8]>>()[..] == values
864
}
865
866
#[test]
867
fn extend_with_lengths_basic() {
868
assert!(roundtrip(&[]));
869
assert!(roundtrip(&[b"abc"]));
870
assert!(roundtrip(&[
871
b"a_very_very_long_string_that_is_not_inlinable"
872
]));
873
assert!(roundtrip(&[
874
b"abc",
875
b"a_very_very_long_string_that_is_not_inlinable"
876
]));
877
}
878
879
#[test]
880
fn extend_with_inlinable_fastpath() {
881
assert!(roundtrip(&[b"abc", b"defg", b"hix"]));
882
assert!(roundtrip(&[b"abc", b"defg", b"hix", b"xyza1234abcd"]));
883
}
884
885
#[test]
886
fn extend_with_inlinable_eq_len_fastpath() {
887
assert!(roundtrip(&[b"abc", b"def", b"hix"]));
888
assert!(roundtrip(&[b"abc", b"def", b"hix", b"xyz"]));
889
}
890
891
#[test]
892
fn extend_with_not_inlinable_fastpath() {
893
assert!(roundtrip(&[
894
b"a_very_long_string123",
895
b"a_longer_string_than_the_previous"
896
]));
897
}
898
}
899
900